[docs]classUpdateFile:""" Representation of a device update file. """__slots__=["_fname","_update_fname","_tempdir","_format","_models","_metadata",]# ------------------------------------------------------------------------
[docs]def__init__(self,fname:str,check_version:bool=True):""" Loads an update file. Parameters ---------- fname: str The file to load. check_version: bool Whether to throw a NotImplementedError if the minimum configuration management client version reported by the update file is newer than our client version. """super().__init__()# Save filename.self._fname=fname# Be lenient: if the user downloaded a release file and forgot to# extract it, extract it for them transparently.self._update_fname=Noneself._tempdir=Nonedefextract(fin):log.debug('"%s" looks like a release file, extracting update.tar.gz from it...',self._fname,)self._tempdir=tempfile.TemporaryDirectory()self._update_fname=os.path.join(self._tempdir.__enter__(),"update.tar.gz")withopen(self._update_fname,"wb")asfout:whileTrue:buf=fin.read(4096)ifnotbuf:breakwhilebuf:buf=buf[fout.write(buf):]try:log.debug('Determining file type of "%s"...',self._fname)withtarfile.TarFile.open(self._fname,"r:*")astar:fornameintar.getnames():ifname.endswith("update.tar.gz"):withtar.extractfile(name)asfin:extract(fin)breakelse:log.debug('"%s" looks like it might indeed be an update file.',self._fname,)self._update_fname=self._fnameexcepttarfile.TarError:try:withzipfile.ZipFile(self._fname,"r")aszip:fornameinzip.namelist():ifname.endswith("update.tar.gz"):withzip.open(name,"r")asfin:extract(fin)breakexceptzipfile.BadZipFile:passifself._update_fnameisNone:raiseValueError("invalid update file")# Read the tar file.try:log.debug('Scanning update tar file "%s"...',self._update_fname)withtarfile.TarFile.open(self._update_fname,"r:gz")astar:fmts=set()meta_json=Nonemodels=set()metadata={}whileTrue:info=tar.next()ifinfoisNone:breakname=info.namelog.debug(" %s",name)ifname.startswith("."):name=name[1:]ifname.startswith("/")orname.startswith("\\"):name=name[1:]name,*tail=re.split(r"/|\\",name,maxsplit=1)ifname=="meta.json"andnottail:fmts.add("multi")meta_json=infoelifname.startswith("only_"):name=name[5:]ifnamenotinmodels:fmts.add("multi")metadata[name]={"manufacturer":"qblox","model":name}models.add(name)elifname=="common":fmts.add("multi")else:ifnamenotinmodels:fmts.add("legacy")metadata[name]={"manufacturer":"qblox","model":name}models.add(name)log.debug("Scan complete")log.debug("")ifmeta_jsonisnotNone:withtar.extractfile(meta_json)asf:metadata.update(json.loads(f.read()))iflen(fmts)!=1:raiseValueError("invalid update file")self._format=next(iter(fmts))self._models={model:DeviceInfo.from_dict(metadata[model])formodelinsorted(models)}self._metadata=metadata.get("meta",{})excepttarfile.TarError:raiseValueError("invalid update file")# Check client version.ifcheck_version:if(self._metadata.get("meta",{}).get("min_cfg_man_client",(0,0,0))>VERSION):raiseNotImplementedError("update file format is too new. Please update Qblox Instruments first")
[docs]defclose(self):""" Cleans up any operating resources that we may have claimed. Parameters ---------- Returns ------- """ifhasattr(self,"_tempdir")andself._tempdirisnotNone:self._tempdir.cleanup()self._tempdir=None
[docs]defneeds_confirmation(self)->Optional[str]:""" Returns whether the update file requests the user to confirm something before application, and if so, what message should be printed. Parameters ---------- Returns ------- Optional[str] None if there is nothing exceptional about this file, otherwise this is the confirmation message. """returnself._metadata.get("confirm",None)
[docs]defpprint(self,output:Callable[[str],None]=log.info)->None:""" Pretty-prints the update file metadata. Parameters ---------- output: Callable[[str], None] The function used for printing. Each call represents a line. Returns ------- """min_client=self._metadata.get("min_cfg_man_client",None)ifmin_clientisnotNone:ifself._format!="legacy":min_client=(0,2,0)min_client=".".join(map(str,min_client))query_message=self._metadata.get("confirm","None")output(f"Update file : {self._fname}")output(f"File format : {self._format}")output(f"Minimum client version : {min_client}")output(f"Query message : {query_message}")output(f"Contains updates for : {len(self._models)} product(s)")formodel,diinself._models.items():output(f" Model : {model}")forkey,prettyin(("sw","Application"),("fw","FPGA firmware"),("kmod","Kernel module"),("cfg_man","Cfg. manager"),):try:output(f" {pretty+' version':<21}: {di[key]}"),exceptKeyError:continue
[docs]defload(self,ci:ConnectionInfo)->BinaryIO:""" Loads an update file, checking whether the given update file is compatible within the given connection context. Returns a file-like object opened in binary read mode if compatible, or throws a ValueError if there is a problem. Parameters ---------- ci: ConnectionInfo Connection information object retrieved from autoconf(), to verify that the update file is compatible, or to make it compatible, if possible. Returns ------- BinaryIO Binary file-like object for the update file. Will at least be opened for reading, and rewound to the start of the file. This may effectively be ``open(fname, "rb")``, but could also be a ``tempfile.TemporaryFile`` to an update file specifically converted to be compatible with the given environment. It is the responsibility of the caller to close the file. Raises ------ ValueError If there is a problem with the given update file. """# Check whether the update includes data for all the devices we need to# support.formodelinci.all_updatable_models:ifmodelnotinself._models:raiseValueError(f"update file is not compatible with {model} devices")# If we're connected to the server via the legacy update protocol, we# must also supply a legacy update file. So if this is not already in# the legacy format, we have to downconvert the file format.ifci.protocol=="legacy"andself._format!="legacy":iflen(ci.all_updatable_models)!=1:raiseValueError("cannot update multiple devices at once with legacy configuration managers")log.info("Converting multi-device update to legacy update file for %s...",ci.device.model,)withtarfile.open(self._update_fname,"r:gz")astar:common={}specific={}infos=[]log.debug("Scanning input tar file...")whileTrue:info=tar.next()ifinfoisNone:breaklog.debug(" %s",info.name)infos.append(info)forinfoininfos:# Split filename into the name of the root directory of the# tar file and the corresponding root path on the device.name=info.nameifname.startswith("."):name=name[1:]ifname.startswith("/")orname.startswith("\\"):name=name[1:]tar_dir,*root_path=re.split(r"[\\/]",name,maxsplit=1)ifroot_path:root_path="/"+root_path[0]else:root_path="/"# Save the info blocks for the files relevant to us.iftar_dir=="only_"+ci.device.model:specific[root_path]=infoeliftar_dir=="common":common[root_path]=info# Device-specific files override common files.files=commonfiles.update(specific)# Create a new tar.gz file with the files for this device# specifically.log.debug("Recompressing in legacy format...")file_obj=tempfile.TemporaryFile("w+b")try:withtarfile.open(None,"w:gz",file_obj)astar_out:foridx,(path,info)inenumerate(sorted(files.items())):log.progress(idx/len(files),"Recompressing update archive in legacy format...",)# Determine the path in the new tarfile.out_info=copy.copy(info)ifpath=="/":out_info.name=f"./{ci.device.model}"else:out_info.name=f"./{ci.device.model}{path}"log.debug(" %s",out_info.name)tar_out.addfile(out_info,tar.extractfile(info))finally:log.clear_progress()log.debug("Legacy update file complete")log.debug("")# Rewind back to the start of the file to comply with# postconditions.file_obj.seek(0)returnfile_obj# No need to change the contents of the update file, so just open the# file as-is.returnopen(self._update_fname,"rb")