# ----------------------------------------------------------------------------
# Description : Update file format utilities
# Git repository : https://gitlab.com/qblox/packages/software/qblox_instruments.git
# Copyright (C) Qblox BV (2021)
# ----------------------------------------------------------------------------
# -- include -----------------------------------------------------------------
import configparser
import json
import os
import re
import tarfile
import tempfile
import zipfile
from collections.abc import Iterable, Mapping
from enum import Enum, auto
from typing import IO, Any, BinaryIO, Callable, Optional
from PySquashfsImage import SquashFsImage
from qblox_instruments.build import DeviceInfo
from qblox_instruments.cfg_man import log
from qblox_instruments.cfg_man.const import VERSION
from qblox_instruments.cfg_man.probe import ConnectionInfo
# ----------------------------------------------------------------------------
[docs]
class ArchiveType(Enum):
ZIP = auto()
TAR_GZ = auto()
TAR_XZ = auto()
TAR_BZ2 = auto()
TAR = auto()
SQUASHFS = auto()
[docs]
class ArchiveExtension(Enum):
ZIP = auto()
TAR_GZ = auto()
TAR_XZ = auto()
TAR_BZ2 = auto()
TAR = auto()
RAUC_BUNDLE = auto()
UPDATE_V1_ARCHIVE_TYPES = {
ArchiveType.TAR,
ArchiveType.TAR_BZ2,
ArchiveType.TAR_GZ,
ArchiveType.TAR_XZ,
ArchiveType.ZIP,
}
RAUC_MANIFEST_FILE = "manifest.raucm"
[docs]
class UpdateFile:
"""
Representation of a device update file.
"""
__slots__ = [
"_fname",
"_format",
"_metadata",
"_models",
"_tempdir",
"_update_fname",
]
# ------------------------------------------------------------------------
[docs]
def __init__(self, fname: str, check_version: bool = True) -> None:
"""
Loads an update file.
Parameters
----------
fname: str
The file to load.
check_version: bool
Whether to throw a NotImplementedError if the minimum
configuration management client version reported by the update
file is newer than our client version.
"""
super().__init__()
# Save filename.
self._fname = fname
# Be lenient: if the user downloaded a release file and forgot to
# extract it, extract it for them transparently.
self._update_fname = None
self._tempdir = None
# Type declarations
self._format: str
self._metadata: dict[str, Any]
self._models: Mapping[str, DeviceInfo]
def extract(fin: IO[bytes]) -> None:
log.debug(
'"%s" looks like a release file, extracting update.tar.gz from it...',
self._fname,
)
self._tempdir = tempfile.TemporaryDirectory()
self._update_fname = os.path.join(self._tempdir.__enter__(), "update.tar.gz")
with open(self._update_fname, "wb") as fout:
while True:
buf = fin.read(4096)
if not buf:
break
while buf:
buf = buf[fout.write(buf) :]
with open(self._fname, "rb") as f:
archive_type = _detect_archive_type_magic(f)
archive_extension = _detect_archive_extension(self._fname)
if archive_type in UPDATE_V1_ARCHIVE_TYPES:
try:
log.debug('Determining file type of "%s"...', self._fname)
with tarfile.TarFile.open(self._fname, "r:*") as tar_obj:
for name in tar_obj.getnames():
if name.endswith("update.tar.gz"):
with tar_obj.extractfile(name) as fin:
extract(fin)
break
else:
log.debug(
'"%s" looks like it might indeed be an update file.',
self._fname,
)
self._update_fname = self._fname
except tarfile.TarError as err:
log.debug(f"Error while trying to open {self._fname} as tar file.\n{err}")
try:
with zipfile.ZipFile(self._fname, "r") as zip_obj:
for name in zip_obj.namelist():
if name.endswith("update.tar.gz"):
with zip_obj.open(name, "r") as fin:
extract(fin)
break
except zipfile.BadZipFile as err:
log.debug(f"Error while trying to open {self._fname} as zip file.\n{err}")
pass
if self._update_fname is None:
raise ValueError("invalid update file")
# Read the tar file.
try:
log.debug('Scanning update tar file "%s"...', self._update_fname)
with tarfile.TarFile.open(self._update_fname, "r:gz") as tar:
fmts: set[str] = set()
meta_json = None
models: set[str] = set()
metadata: dict[str, Any] = {}
for info in tar:
if info is None:
break
name = info.name
log.debug(" %s", name)
if name.startswith("."):
name = name[1:]
if name.startswith("/") or name.startswith("\\"):
name = name[1:]
name, *tail = re.split(r"/|\\", name, maxsplit=1)
if name == "meta.json" and not tail:
fmts.add("multi")
meta_json = info
elif name.startswith("only_"):
name = name[5:]
if name not in models:
fmts.add("multi")
metadata[name] = {
"manufacturer": "qblox",
"model": name,
}
models.add(name)
elif name == "common":
fmts.add("multi")
elif _detect_archive_extension(info.name) == ArchiveExtension.RAUC_BUNDLE:
with tar.extractfile(info.name) as sqfs:
if _detect_archive_type_magic(sqfs) == ArchiveType.SQUASHFS:
imgbytes = sqfs.read()
manifest = _read_rauc_manifest_bytes(imgbytes)
if manifest:
parsedmanifest = self._parse_rauc_manifest(manifest)
else:
raise RuntimeError("No manifest in RAUC bundle")
models.add(parsedmanifest["update"]["compatible"])
fmts.add("raucb")
log.debug("Scan complete")
log.debug("")
if meta_json is not None:
with tar.extractfile(meta_json) as f:
metadata.update(json.loads(f.read()))
self._process_update_file(fmts, metadata, models)
except tarfile.TarError as err:
log.debug(f"Error while trying to open {self._fname} as tar file.\n{err}")
raise ValueError("invalid update file")
# Check client version.
if check_version and (
self._metadata.get("meta", {}).get("min_cfg_man_client", (0, 0, 0)) > VERSION
):
raise NotImplementedError(
"update file format is too new. Please update Qblox Instruments first"
)
elif (
archive_type == ArchiveType.SQUASHFS
and archive_extension == ArchiveExtension.RAUC_BUNDLE
):
log.info("Seems to be a RAUC bundle! (squashfs type and raucb extension)")
manifest = _read_rauc_manifest_path(self._fname)
if manifest:
parsedmanifest = self._parse_rauc_manifest(manifest)
else:
raise RuntimeError("No manifest in RAUC bundle")
self._process_update_file(
fmts={"raucb"}, metadata={}, models={parsedmanifest["update"]["compatible"]}
)
else:
raise ValueError("invalid update file")
def _process_update_file(
self, fmts: set[str], metadata: dict[str, Any], models: set[str]
) -> None:
if len(fmts) != 1:
raise ValueError("invalid update file")
self._format = next(iter(fmts))
if self._format != "raucb":
self._models = {
model: DeviceInfo.from_dict(metadata[model]) for model in sorted(models)
}
self._metadata = metadata.get("meta", {})
else:
self._metadata = {}
self._update_fname = self._fname
self._models = {model: {} for model in sorted(models)}
def _parse_rauc_manifest(self, content: str) -> configparser.ConfigParser:
config = configparser.ConfigParser()
config.read_string(content)
return config
# ------------------------------------------------------------------------
[docs]
def close(self) -> None:
"""
Cleans up any operating resources that we may have claimed.
"""
if hasattr(self, "_tempdir") and self._tempdir is not None:
self._tempdir.cleanup()
self._tempdir = None
# ------------------------------------------------------------------------
def __del__(self) -> None:
self.close()
# ------------------------------------------------------------------------
def __enter__(self) -> "UpdateFile":
return self
# ------------------------------------------------------------------------
def __exit__(self, exc_type, exc_value, traceback) -> Optional[bool]:
self.close()
# ------------------------------------------------------------------------
[docs]
def needs_confirmation(self) -> Optional[str]:
"""
Returns whether the update file requests the user to confirm something
before application, and if so, what message should be printed.
Returns
-------
Optional[str]
None if there is nothing exceptional about this file, otherwise
this is the confirmation message.
"""
return self._metadata.get("confirm", None)
# ------------------------------------------------------------------------
def __str__(self) -> str:
return self._fname
# ------------------------------------------------------------------------
def __repr__(self) -> str:
return repr(self._fname)
# ------------------------------------------------------------------------
[docs]
def summarize(self) -> str:
"""
Returns a summary of the update file format.
Returns
-------
str
Update file summary.
"""
return f"update file for {', '.join(self._models)}"
# ------------------------------------------------------------------------
[docs]
def pprint(self, output: Callable[[str], None] = log.info) -> None:
"""
Pretty-prints the update file metadata.
Parameters
----------
output: Callable[[str], None]
The function used for printing. Each call represents a line.
"""
min_client = self._metadata.get("min_cfg_man_client", None)
if min_client is not None:
min_client = ".".join(map(str, min_client))
query_message = self._metadata.get("confirm", "None")
output(f"Update file : {self._fname}")
output(f"File format : {self._format}")
output(f"Minimum client version : {min_client}")
output(f"Query message : {query_message}")
output(f"Contains updates for : {len(self._models)} product(s)")
for model, di in self._models.items():
output(f" Model : {model}")
for key, pretty in (
("sw", "Application"),
("fw", "FPGA firmware"),
("kmod", "Kernel module"),
("cfg_man", "Cfg. manager"),
):
if key in di:
output(f" {pretty + ' version':<21}: {di[key]}")
# ------------------------------------------------------------------------
[docs]
def load(
self,
ci: ConnectionInfo,
included_slots: Optional[Iterable[int]] = None,
excluded_slots: Optional[Iterable[int]] = None,
) -> BinaryIO:
"""
Loads an update file, checking whether the given update file is
compatible within the given connection context. Returns a file-like
object opened in binary read mode if compatible, or throws a
ValueError if there is a problem.
Parameters
----------
ci: ConnectionInfo
Connection information object retrieved from autoconf(), to verify
that the update file is compatible, or to make it compatible, if
possible.
included_slots: Optional[Iterable[int]]
list of included slot indices. Optional, by default None.
excluded_slots: Optional[Iterable[int]]
list of excluded slot indices. Optional, by default None.
Returns
-------
BinaryIO
Binary file-like object for the update file. Will at least be
opened for reading, and rewound to the start of the file. This may
effectively be ``open(fname, "rb")``, but could also be a
``tempfile.TemporaryFile`` to an update file specifically
converted to be compatible with the given environment. It is the
responsibility of the caller to close the file.
Raises
------
ValueError
If there is a problem with the given update file.
"""
# Check whether the update includes data for all the devices we need to
# support.
log.info(f"Models In Cluster : {sorted(ci.all_updatable_models)}")
log.info(f"Models In Update Package : {sorted(set(self._models.keys()))}")
incompatible_modules = set()
def check_update_compatibility(slot: int, model: str) -> None:
"""
Check if the given update file is compatible within the cluster.
Take into account included and excluded slots: a module is not considered
incompatible if it is also excluded from the update.
:param slot: slot number to check for
:param model: model name of module in slot
"""
if model.endswith("qdm"):
return
if (model not in self._models) and (
(included_slots is None and excluded_slots is None)
or (included_slots is not None and slot in included_slots)
or (excluded_slots is not None and slot not in excluded_slots)
):
incompatible_modules.add(model)
if ci.slot_index is not None:
# Single slot update
model = next(iter(ci.all_updatable_models))
slot_no = int(ci.slot_index)
log.info(f"Single-Slot Update in slot {slot_no}")
check_update_compatibility(slot_no, model)
elif ci.device.modules is not None:
# Multiple slots update
log.info("Multi-Slot Update")
for slot, module in ci.device.modules.items():
model = module.model
slot_no = int(slot)
check_update_compatibility(slot_no, model)
else:
raise RuntimeError("failed to determine update compatibility for update file")
incompatible_modules = list(sorted(incompatible_modules))
if incompatible_modules:
if len(incompatible_modules) == 1:
to_print = incompatible_modules[0]
else:
to_print = ", ".join(incompatible_modules[:-1]) + " and " + incompatible_modules[-1]
raise ValueError(f"update file is not compatible with {to_print} devices")
# No need to change the contents of the update file, so just open the
# file as-is.
return open(self._update_fname, "rb")
def _detect_archive_type_magic(file_obj: IO[bytes]) -> Optional[ArchiveType]:
if not file_obj.seekable():
return None
header = file_obj.read(264)
file_obj.seek(0)
"""
Detect archive/image type based on magic bytes.
Parameters
----------
filepath: str
The file to load.
Returns
-------
ArchiveType
An Enum with the different file types or None if not supported.
"""
# with open(filepath, "rb") as f:
# header = f.read(264)
magic_type: Optional[ArchiveType] = None
if header.startswith(b"\x50\x4b\x03\x04"):
magic_type = ArchiveType.ZIP
elif header.startswith(b"\x1f\x8b"):
magic_type = ArchiveType.TAR_GZ
elif header.startswith(b"\xfd\x37\x7a\x58\x5a\x00"):
magic_type = ArchiveType.TAR_XZ
elif header.startswith(b"\x42\x5a\x68"):
magic_type = ArchiveType.TAR_BZ2
elif header.startswith(b"hsqs"):
# squashfs of rauc bundles has 'hsqs' ie 0x73717368 magic number
magic_type = ArchiveType.SQUASHFS
elif len(header) >= 262 and (header[257:262] in [b"ustar", b"ustar\x00"]):
# Tar files do not have a fixed magic at the beginning,
# but the ustar signature is at byte offset 257
magic_type = ArchiveType.TAR
return magic_type
def _detect_archive_extension(filepath: str) -> Optional[ArchiveExtension]:
"""
Detects the archive type based on file extension.
Parameters
----------
filepath: str
The file to load.
Returns
-------
ArchiveExtension
An Enum file extension type, 'zip', 'tar.gz', 'tar.xz', 'tar.bz2', 'tar', 'raucb'
or None if not supported
"""
filename = os.path.basename(filepath).lower()
extension = None
if filename.endswith((".zip",)):
extension = ArchiveExtension.ZIP
elif filename.endswith((".tar.gz", ".tgz")):
extension = ArchiveExtension.TAR_GZ
elif filename.endswith((".tar.xz", ".txz")):
extension = ArchiveExtension.TAR_XZ
elif filename.endswith((".tar.bz2", ".tbz2")):
extension = ArchiveExtension.TAR_BZ2
elif filename.endswith((".tar",)):
extension = ArchiveExtension.TAR
elif filename.endswith((".raucb",)):
extension = ArchiveExtension.RAUC_BUNDLE
return extension
def _read_rauc_manifest(fobj: [str, bytes]) -> Optional[str]:
# Use SquashFsImage.from_file as default image object constructor
# Use SquashFsImage.from_bytes if it is applicable.
image_constructor = SquashFsImage.from_bytes if type(fobj) is bytes else SquashFsImage.from_file
with image_constructor(fobj) as image:
manifest = image.find("manifest.raucm")
if manifest is not None:
return manifest.read_bytes().decode("utf-8")
return None
def _read_rauc_manifest_path(fpath: str) -> Optional[str]:
with open(fpath, "rb") as f:
return _read_rauc_manifest(f)
def _read_rauc_manifest_bytes(fdata: bytes) -> Optional[str]:
return _read_rauc_manifest(fdata)