Source code for qblox_instruments.cfg_man.update_file

# ----------------------------------------------------------------------------
# Description    : Update file format utilities
# Git repository : https://gitlab.com/qblox/packages/software/qblox_instruments.git
# Copyright (C) Qblox BV (2021)
# ----------------------------------------------------------------------------


# -- include -----------------------------------------------------------------
import configparser
import json
import os
import re
import tarfile
import zipfile
from collections.abc import Iterable, Mapping
from dataclasses import dataclass
from enum import Enum, auto
from io import BytesIO, FileIO
from typing import IO, Any, BinaryIO, Callable, Optional, Union

from PySquashfsImage import SquashFsImage

from qblox_instruments.build import DeviceInfo
from qblox_instruments.cfg_man import log
from qblox_instruments.cfg_man.const import VERSION
from qblox_instruments.cfg_man.probe import ConnectionInfo
from qblox_instruments.pnp import CMM_SLOT_INDEX
from qblox_instruments.types import TypeHandle


# ----------------------------------------------------------------------------
[docs] class ArchiveType(Enum): ZIP = auto() TAR_GZ = auto() TAR_XZ = auto() TAR_BZ2 = auto() TAR = auto() SQUASHFS = auto()
[docs] class ArchiveExtension(Enum): ZIP = auto() TAR_GZ = auto() TAR_XZ = auto() TAR_BZ2 = auto() TAR = auto() RAUC_BUNDLE = auto()
UPDATE_V1_ARCHIVE_TYPES = { ArchiveType.TAR, ArchiveType.TAR_BZ2, ArchiveType.TAR_GZ, ArchiveType.TAR_XZ, ArchiveType.ZIP, } RAUC_MANIFEST_FILE = "manifest.raucm"
[docs] class UpdateTarget(Enum): """ Enumeration of update firmware targets. """ QBLOX_OS = auto() """ Qblox-OS update file (raucb format). """ QBLOX_OS_MIGRATION = auto() """ Pulsar-OS to Qblox-OS migration file. """ PULSAR_OS = auto() """ Regular Pulsar-OS update file. """
def _is_pulsar_os( version: tuple[int, int, int], type_handle: Union[TypeHandle, None] = None ) -> bool: """ Determines if a version tuple represents pulsar-os. Parameters ---------- version: tuple[int, int, int] Version tuple (major, minor, patch). type_handle: TypeHandle Optional type handle, which can be passed if the OS can be learned from the module type. Returns ------- bool True if version <= 0.13 (pulsar-os), False if >= 1.0 (qblox-os) or the module is a QRC/QSM. """ if type_handle and (type_handle.is_qrc_type or type_handle.is_qsm_type): return False major, minor, patch = version return major == 0 and minor <= 13 def _is_qblox_os( version: tuple[int, int, int], type_handle: Union[TypeHandle, None] = None ) -> bool: """ Determines if a version tuple represents qblox-os. Parameters ---------- version: tuple[int, int, int] Version tuple (major, minor, patch). type_handle: TypeHandle Optional type handle, which can be passed if the OS can be learned from the module type. Returns ------- bool True if version >= 1.0 (qblox-os) or the module is a QRC/QSM, False if <= 0.13 (pulsar-os). """ if type_handle and (type_handle.is_qrc_type or type_handle.is_qsm_type): return True major, minor, patch = version return major >= 1
[docs] @dataclass class UpdateBatch: file: BinaryIO """ Binary file-like object for the update file. Will at least be opened for reading, and rewound to the start of the file. This may effectively be ``open(fname, "rb")``, but could also be a ``tempfile.TemporaryFile`` to an update file specifically converted to be compatible with the given environment. It is the responsibility of the caller to close the file. """ slots: list[int] """ Target slot(s) for this file. """ description: str """ Description for this batch. """
[docs] @dataclass class UpdateInfo: file: BinaryIO """ Binary file-like object for the update file. """ device: Optional[DeviceInfo] = None """ Target device for this file. """
[docs] class UpdateFile: """ Representation of a device update file. """ _file: FileIO _filename: str _update_archive: Any _update_file: BinaryIO _format: str _metadata: Mapping[str, Any] _models: Mapping[str, UpdateInfo] _has_migrate_folder: bool __slots__ = ( "_file", "_filename", "_format", "_has_migrate_folder", "_metadata", "_models", "_update_archive", "_update_file", ) # ------------------------------------------------------------------------
[docs] def __init__(self, file: Union[FileIO, str]) -> None: """ Load an update file. Parameters ---------- fname: Union[FileIO, str] If specified, file or filename to load. """ self._file = None self._filename = None self._update_archive = None self._update_file = None self._has_migrate_folder = False self.parse(file)
[docs] def parse(self, file: Union[FileIO, str]) -> None: """ Load update file. Parameters ---------- file: Optional[str] File or filename to load. """ # Save file and filename if isinstance(file, str): self._filename = file self._file = open(file, "rb") # noqa: SIM115 else: self._filename = file.name self._file = file # Determine file types and extensions archive_type = _detect_archive_type_magic(self._file) archive_extension = _detect_archive_extension(self._filename) if archive_type in UPDATE_V1_ARCHIVE_TYPES: self._parse_v1() elif ( archive_type == ArchiveType.SQUASHFS and archive_extension == ArchiveExtension.RAUC_BUNDLE ): self._parse_v2() else: raise ValueError(f"{self._filename}: unknown update file format")
def _parse_v1(self) -> None: self._parse_v1_nested() # Read the tar file. try: log.debug(f"{self._filename}: scanning update tar file...") self._update_archive = tarfile.TarFile.open(fileobj=self._update_file, mode="r:gz") formats: set[str] = set() meta_json = None models: dict[str, BinaryIO] = {} metadata: dict[str, Any] = {} for info in self._update_archive: if info is None: break name = info.name log.debug(" %s", name) if info.isdir(): path_parts = re.split(r"/|\\", name.lower()) if "migration" in path_parts or "migrate" in path_parts: self._has_migrate_folder = True formats.add("multi") if name.startswith("."): name = name[1:] if name.startswith("/") or name.startswith("\\"): name = name[1:] name, *tail = re.split(r"/|\\", name, maxsplit=1) if name == "meta.json" and not tail: formats.add("multi") meta_json = info elif name.startswith("only_"): name = name[5:] if name not in models: formats.add("multi") metadata[name] = { "manufacturer": "qblox", "model": name, } models[name] = self._update_file elif name == "common": formats.add("multi") elif _detect_archive_extension(info.name) == ArchiveExtension.RAUC_BUNDLE: sqfs = self._update_archive.extractfile(info.name) if _detect_archive_type_magic(sqfs) == ArchiveType.SQUASHFS: rauc_model = self._parse_rauc(sqfs) models[rauc_model] = sqfs formats.add("raucb") log.debug("Scan complete") if meta_json is not None: with self._update_archive.extractfile(meta_json) as f: metadata.update(json.loads(f.read())) self._parse_metadata(formats, metadata, models) except tarfile.TarError as err: log.debug(f"Error while trying to open {self._filename} as tar file.\n{err}") raise ValueError(f"{self._filename}: invalid update file: {err}") # Check client version. if self._metadata.get("meta", {}).get("min_cfg_man_client", (0, 0, 0)) > VERSION: raise NotImplementedError( "update file format is too new. Please update Qblox Instruments first" ) def _parse_v1_nested(self) -> None: log.debug(f"{self._filename}: determining file type...") try: tar_archive = tarfile.TarFile.open(fileobj=self._file, mode="r:*") for name in tar_archive.getnames(): if name == "update.tar.gz" or name.endswith("/update.tar.gz"): log.debug(f"{self._filename}: nested .tar file.") self._update_archive = tar_archive self._update_file = self._update_archive.extractfile(name) break else: log.debug(f"{self._filename}: real update file.") self._update_file = self._file self._update_file.seek(0) except tarfile.TarError as err: log.debug(f"{self._filename}: invalid .tar.file: {err}") self._file.seek(0) try: zip_archive = zipfile.ZipFile(self._file, "r") for name in zip_archive.namelist(): if name == "update.tar.gz" or name.endswith("/update.tar.gz"): log.debug(f"{self._filename}: nested .zip file.") self._update_archive = zip_archive self._update_file = self._update_archive.open(name) break except zipfile.BadZipFile as err: log.debug(f"{self._filename}: invalid .zip file: {err}") if self._update_file is None: raise ValueError(f"{self._filename}: invalid update file") def _parse_v2(self) -> None: model = self._parse_rauc(self._file) self._has_migrate_folder = False self._parse_metadata( formats={"raucb"}, metadata={}, models={model: self._file}, ) def _parse_rauc(self, file: BinaryIO) -> str: image = SquashFsImage(file) manifest_entry = image.find("manifest.raucm") if manifest_entry is None: raise ValueError("No manifest in RAUC bundle") manifest = _parse_rauc_manifest(BytesIO(manifest_entry.read_bytes())) file.seek(0) return manifest["update"]["compatible"] def _parse_metadata( self, formats: set[str], metadata: dict[str, Any], models: dict[str, BinaryIO] ) -> None: if len(formats) != 1: raise ValueError("invalid update file") self._format = next(iter(formats)) self._metadata = metadata.get("meta", {}) self._models = { model: UpdateInfo( file=file, device=DeviceInfo.from_dict(metadata[model]) if model in metadata else None, ) for model, file in sorted(models.items()) } # ------------------------------------------------------------------------
[docs] def close(self) -> None: """ Cleans up any operating resources that we may have claimed. """ if hasattr(self, "_tempdir") and self._tempdir is not None: self._tempdir.cleanup() self._tempdir = None
# ------------------------------------------------------------------------ def __del__(self) -> None: self.close() # ------------------------------------------------------------------------ def __enter__(self) -> "UpdateFile": return self # ------------------------------------------------------------------------ def __exit__(self, exc_type, exc_value, traceback) -> Optional[bool]: self.close() # ------------------------------------------------------------------------
[docs] def needs_confirmation(self) -> Optional[str]: """ Returns whether the update file requests the user to confirm something before application, and if so, what message should be printed. Returns ------- Optional[str] None if there is nothing exceptional about this file, otherwise this is the confirmation message. """ return self._metadata.get("confirm", None)
# ------------------------------------------------------------------------ def __str__(self) -> str: return self._filename # ------------------------------------------------------------------------ def __repr__(self) -> str: return repr(self._filename) # ------------------------------------------------------------------------
[docs] def summarize(self) -> str: """ Returns a summary of the update file format. Returns ------- str Update file summary. """ return f"update file for {', '.join(self._models)}"
# ------------------------------------------------------------------------
[docs] def pprint(self, output: Callable[[str], None] = log.info) -> None: """ Pretty-prints the update file metadata. Parameters ---------- output: Callable[[str], None] The function used for printing. Each call represents a line. """ min_client = self._metadata.get("min_cfg_man_client", None) if min_client is not None: min_client = ".".join(map(str, min_client)) query_message = self._metadata.get("confirm", "None") output(f"Update file : {self._filename}") output(f"File format : {self._format}") output(f"Minimum client version : {min_client}") output(f"Query message : {query_message}") output(f"Contains updates for : {len(self._models)} product(s)") for model, info in self._models.items(): output(f" Model : {model}") for key, pretty in ( ("sw", "Application"), ("fw", "FPGA firmware"), ("kmod", "Kernel module"), ("cfg_man", "Cfg. manager"), ): if info.device is not None and key in info.device: output(f" {pretty + ' version':<21}: {info.device[key]}")
# ------------------------------------------------------------------------
[docs] def load( self, ci: ConnectionInfo, included_slots: Optional[Iterable[int]] = None, excluded_slots: Optional[Iterable[int]] = None, ) -> list[UpdateBatch]: """ Loads an update file, checking whether the given update file is compatible within the given connection context. Returns a list of update batches to be run, each containing a file-like object and a target slot list, or throws a ValueError if there is a problem. Parameters ---------- ci: ConnectionInfo Connection information object retrieved from autoconf(), to verify that the update file is compatible, or to make it compatible, if possible. included_slots: Optional[Iterable[int]] list of included slot indices. Optional, by default None. excluded_slots: Optional[Iterable[int]] list of excluded slot indices. Optional, by default None. Returns ------- list[UpdateBatch] List of update batches to run, each containing Raises ------ ValueError If there is a problem with the given update file. """ # Check whether the update includes data for all the devices we need to # support. log.info(f"Models In Cluster : {sorted(ci.all_updatable_models)}") log.info(f"Models In Update Package : {sorted(set(self._models.keys()))}") if ci.slot_index is not None: # Single slot update log.info(f"Single-Slot Update in slot {ci.slot_index}") model = next(iter(ci.all_updatable_models)) slot_no = int(ci.slot_index) slot_models = {slot_no: model} else: # Multiple slots update log.info("Multi-Slot Update") slot_models = {int(slot): module.model for slot, module in ci.device.modules.items()} if CMM_SLOT_INDEX not in slot_models: slot_models[CMM_SLOT_INDEX] = "cluster_mm" # Handle in- and exclusions if excluded_slots is not None: slot_models = { slot: model for slot, model in slot_models.items() if slot not in excluded_slots } if included_slots is not None: slot_models = { slot: model for slot, model in slot_models.items() if slot in included_slots } # Check compatibility and build model list incompatible_models = set() models: dict[str, list[int]] = {} for slot, model in slot_models.items(): if model not in ci.all_updatable_models: continue if model not in self._models: incompatible_models.add(model) models.setdefault(model, []).append(slot) # FIXME: Skip QSM in the case that there is one in the cluster but no update file for it if "cluster_qsm" in incompatible_models: log.warn("QSM not present in update file, skipping...") incompatible_models.remove("cluster_qsm") # won't raise a ValueError below models.pop("cluster_qsm", None) # won't be included in update batches incompatible_models = list(sorted(incompatible_models)) if incompatible_models: if len(incompatible_models) == 1: to_print = incompatible_models[0] else: to_print = ", ".join(incompatible_models[:-1]) + " and " + incompatible_models[-1] raise ValueError(f"update file is not compatible with {to_print} devices") # Now build update batches! batches = {} for model, slots in models.items(): update_info = self._models[model] # If different models use the same update files, merge them if id(update_info.file) in batches: batch = batches[id(update_info.file)] else: batch = UpdateBatch(file=update_info.file, slots=[], description="") batch.slots.extend(slots) if batch.description: batch.description += ", " + model else: batch.description = model batches[id(update_info.file)] = batch # Update cluster_mm last return list(sorted(batches.values(), key=lambda b: "cluster_mm" in b.description))
[docs] def get_update_type(self) -> UpdateTarget: """ Determines the type of update file. Returns ------- UpdateTarget The type of update file: - QBLOX_OS (qblox-os update), - MIGRATION (pulsar-os to qblox-os migration), - PULSAR_OS (regular pulsar-os update). """ if self._format == "raucb": return UpdateTarget.QBLOX_OS if self._has_migrate_folder: return UpdateTarget.QBLOX_OS_MIGRATION return UpdateTarget.PULSAR_OS
def _detect_archive_type_magic(file_obj: IO[bytes]) -> Optional[ArchiveType]: """ Detect archive/image type based on magic bytes. Parameters ---------- filepath: str The file to load. Returns ------- ArchiveType An Enum with the different file types or None if not supported. """ if not file_obj.seekable(): return None pos = file_obj.tell() file_obj.seek(0) header = file_obj.read(264) file_obj.seek(pos) magic_type: Optional[ArchiveType] = None if header.startswith(b"\x50\x4b\x03\x04"): magic_type = ArchiveType.ZIP elif header.startswith(b"\x1f\x8b"): magic_type = ArchiveType.TAR_GZ elif header.startswith(b"\xfd\x37\x7a\x58\x5a\x00"): magic_type = ArchiveType.TAR_XZ elif header.startswith(b"\x42\x5a\x68"): magic_type = ArchiveType.TAR_BZ2 elif header.startswith(b"hsqs"): # squashfs of rauc bundles has 'hsqs' ie 0x73717368 magic number magic_type = ArchiveType.SQUASHFS elif len(header) >= 262 and (header[257:262] in [b"ustar", b"ustar\x00"]): # Tar files do not have a fixed magic at the beginning, # but the ustar signature is at byte offset 257 magic_type = ArchiveType.TAR return magic_type def _detect_archive_extension(filepath: str) -> Optional[ArchiveExtension]: """ Detects the archive type based on file extension. Parameters ---------- filepath: str The file to load. Returns ------- ArchiveExtension An Enum file extension type, 'zip', 'tar.gz', 'tar.xz', 'tar.bz2', 'tar', 'raucb' or None if not supported """ filename = os.path.basename(filepath).lower() extension = None if filename.endswith((".zip",)): extension = ArchiveExtension.ZIP elif filename.endswith((".tar.gz", ".tgz")): extension = ArchiveExtension.TAR_GZ elif filename.endswith((".tar.xz", ".txz")): extension = ArchiveExtension.TAR_XZ elif filename.endswith((".tar.bz2", ".tbz2")): extension = ArchiveExtension.TAR_BZ2 elif filename.endswith((".tar",)): extension = ArchiveExtension.TAR elif filename.endswith((".raucb",)): extension = ArchiveExtension.RAUC_BUNDLE return extension def _parse_rauc_manifest(fobj: BinaryIO) -> Mapping[str, Any]: config = configparser.ConfigParser() config.read_string(fobj.read().decode("utf-8")) return config