# ----------------------------------------------------------------------------
# Description : Update file format utilities
# Git repository : https://gitlab.com/qblox/packages/software/qblox_instruments.git
# Copyright (C) Qblox BV (2021)
# ----------------------------------------------------------------------------
# -- include -----------------------------------------------------------------
import tarfile
import zipfile
import json
import tempfile
import re
import os
import copy
from typing import Optional, BinaryIO, Callable
from qblox_instruments.cfg_man.probe import ConnectionInfo
from qblox_instruments.cfg_man.const import VERSION
from qblox_instruments.build import DeviceInfo
import qblox_instruments.cfg_man.log as log
# ----------------------------------------------------------------------------
[docs]
class UpdateFile:
"""
Representation of a device update file.
"""
__slots__ = [
"_fname",
"_update_fname",
"_tempdir",
"_format",
"_models",
"_metadata",
]
# ------------------------------------------------------------------------
[docs]
def __init__(self, fname: str, check_version: bool = True):
"""
Loads an update file.
Parameters
----------
fname: str
The file to load.
check_version: bool
Whether to throw a NotImplementedError if the minimum
configuration management client version reported by the update
file is newer than our client version.
"""
super().__init__()
# Save filename.
self._fname = fname
# Be lenient: if the user downloaded a release file and forgot to
# extract it, extract it for them transparently.
self._update_fname = None
self._tempdir = None
def extract(fin):
log.debug(
'"%s" looks like a release file, extracting update.tar.gz from it...',
self._fname,
)
self._tempdir = tempfile.TemporaryDirectory()
self._update_fname = os.path.join(
self._tempdir.__enter__(), "update.tar.gz"
)
with open(self._update_fname, "wb") as fout:
while True:
buf = fin.read(4096)
if not buf:
break
while buf:
buf = buf[fout.write(buf) :]
try:
log.debug('Determining file type of "%s"...', self._fname)
with tarfile.TarFile.open(self._fname, "r:*") as tar:
for name in tar.getnames():
if name.endswith("update.tar.gz"):
with tar.extractfile(name) as fin:
extract(fin)
break
else:
log.debug(
'"%s" looks like it might indeed be an update file.',
self._fname,
)
self._update_fname = self._fname
except tarfile.TarError:
try:
with zipfile.ZipFile(self._fname, "r") as zip:
for name in zip.namelist():
if name.endswith("update.tar.gz"):
with zip.open(name, "r") as fin:
extract(fin)
break
except zipfile.BadZipFile:
pass
if self._update_fname is None:
raise ValueError("invalid update file")
# Read the tar file.
try:
log.debug('Scanning update tar file "%s"...', self._update_fname)
with tarfile.TarFile.open(self._update_fname, "r:gz") as tar:
fmts = set()
meta_json = None
models = set()
metadata = {}
while True:
info = tar.next()
if info is None:
break
name = info.name
log.debug(" %s", name)
if name.startswith("."):
name = name[1:]
if name.startswith("/") or name.startswith("\\"):
name = name[1:]
name, *tail = re.split(r"/|\\", name, maxsplit=1)
if name == "meta.json" and not tail:
fmts.add("multi")
meta_json = info
elif name.startswith("only_"):
name = name[5:]
if name not in models:
fmts.add("multi")
metadata[name] = {"manufacturer": "qblox", "model": name}
models.add(name)
elif name == "common":
fmts.add("multi")
else:
if name not in models:
fmts.add("legacy")
metadata[name] = {"manufacturer": "qblox", "model": name}
models.add(name)
log.debug("Scan complete")
log.debug("")
if meta_json is not None:
with tar.extractfile(meta_json) as f:
metadata.update(json.loads(f.read()))
if len(fmts) != 1:
raise ValueError("invalid update file")
self._format = next(iter(fmts))
self._models = {
model: DeviceInfo.from_dict(metadata[model])
for model in sorted(models)
}
self._metadata = metadata.get("meta", {})
except tarfile.TarError:
raise ValueError("invalid update file")
# Check client version.
if check_version:
if (
self._metadata.get("meta", {}).get("min_cfg_man_client", (0, 0, 0))
> VERSION
):
raise NotImplementedError(
"update file format is too new. Please update Qblox Instruments first"
)
# ------------------------------------------------------------------------
[docs]
def close(self):
"""
Cleans up any operating resources that we may have claimed.
Parameters
----------
Returns
-------
"""
if hasattr(self, "_tempdir") and self._tempdir is not None:
self._tempdir.cleanup()
self._tempdir = None
# ------------------------------------------------------------------------
def __del__(self):
self.close()
# ------------------------------------------------------------------------
def __enter__(self):
return self
# ------------------------------------------------------------------------
def __exit__(self, type, value, traceback):
self.close()
# ------------------------------------------------------------------------
[docs]
def needs_confirmation(self) -> Optional[str]:
"""
Returns whether the update file requests the user to confirm something
before application, and if so, what message should be printed.
Parameters
----------
Returns
-------
Optional[str]
None if there is nothing exceptional about this file, otherwise
this is the confirmation message.
"""
return self._metadata.get("confirm", None)
# ------------------------------------------------------------------------
def __str__(self):
return self._fname
# ------------------------------------------------------------------------
def __repr__(self):
return repr(self._fname)
# ------------------------------------------------------------------------
[docs]
def summarize(self) -> str:
"""
Returns a summary of the update file format.
Parameters
----------
Returns
-------
str
Update file summary.
"""
if self._format == "legacy":
return f"legacy update file for {next(iter(self._models))}"
return f"update file for {', '.join(self._models)}"
# ------------------------------------------------------------------------
[docs]
def pprint(self, output: Callable[[str], None] = log.info) -> None:
"""
Pretty-prints the update file metadata.
Parameters
----------
output: Callable[[str], None]
The function used for printing. Each call represents a line.
Returns
-------
"""
min_client = self._metadata.get("min_cfg_man_client", None)
if min_client is not None:
if self._format != "legacy":
min_client = (0, 2, 0)
min_client = ".".join(map(str, min_client))
query_message = self._metadata.get("confirm", "None")
output(f"Update file : {self._fname}")
output(f"File format : {self._format}")
output(f"Minimum client version : {min_client}")
output(f"Query message : {query_message}")
output(f"Contains updates for : {len(self._models)} product(s)")
for model, di in self._models.items():
output(f" Model : {model}")
for key, pretty in (
("sw", "Application"),
("fw", "FPGA firmware"),
("kmod", "Kernel module"),
("cfg_man", "Cfg. manager"),
):
try:
output(f" {pretty + ' version':<21}: {di[key]}"),
except KeyError:
continue
# ------------------------------------------------------------------------
[docs]
def load(self, ci: ConnectionInfo) -> BinaryIO:
"""
Loads an update file, checking whether the given update file is
compatible within the given connection context. Returns a file-like
object opened in binary read mode if compatible, or throws a
ValueError if there is a problem.
Parameters
----------
ci: ConnectionInfo
Connection information object retrieved from autoconf(), to verify
that the update file is compatible, or to make it compatible, if
possible.
Returns
-------
BinaryIO
Binary file-like object for the update file. Will at least be
opened for reading, and rewound to the start of the file. This may
effectively be ``open(fname, "rb")``, but could also be a
``tempfile.TemporaryFile`` to an update file specifically
converted to be compatible with the given environment. It is the
responsibility of the caller to close the file.
Raises
------
ValueError
If there is a problem with the given update file.
"""
# Check whether the update includes data for all the devices we need to
# support.
for model in ci.all_models:
if model not in self._models:
raise ValueError(f"update file is not compatible with {model} devices")
# If we're connected to the server via the legacy update protocol, we
# must also supply a legacy update file. So if this is not already in
# the legacy format, we have to downconvert the file format.
if ci.protocol == "legacy" and self._format != "legacy":
if len(ci.all_models) != 1:
raise ValueError(
"cannot update multiple devices at once with legacy configuration managers"
)
log.info(
"Converting multi-device update to legacy update file for %s...",
ci.device.model,
)
with tarfile.open(self._update_fname, "r:gz") as tar:
common = {}
specific = {}
infos = []
log.debug("Scanning input tar file...")
while True:
info = tar.next()
if info is None:
break
log.debug(" %s", info.name)
infos.append(info)
for info in infos:
# Split filename into the name of the root directory of the
# tar file and the corresponding root path on the device.
name = info.name
if name.startswith("."):
name = name[1:]
if name.startswith("/") or name.startswith("\\"):
name = name[1:]
tar_dir, *root_path = re.split(r"[\\/]", name, maxsplit=1)
if root_path:
root_path = "/" + root_path[0]
else:
root_path = "/"
# Save the info blocks for the files relevant to us.
if tar_dir == "only_" + ci.device.model:
specific[root_path] = info
elif tar_dir == "common":
common[root_path] = info
# Device-specific files override common files.
files = common
files.update(specific)
# Create a new tar.gz file with the files for this device
# specifically.
log.debug("Recompressing in legacy format...")
file_obj = tempfile.TemporaryFile("w+b")
try:
with tarfile.open(None, "w:gz", file_obj) as tar_out:
for idx, (path, info) in enumerate(sorted(files.items())):
log.progress(
idx / len(files),
"Recompressing update archive in legacy format...",
)
# Determine the path in the new tarfile.
out_info = copy.copy(info)
if path == "/":
out_info.name = f"./{ci.device.model}"
else:
out_info.name = f"./{ci.device.model}{path}"
log.debug(" %s", out_info.name)
tar_out.addfile(out_info, tar.extractfile(info))
finally:
log.clear_progress()
log.debug("Legacy update file complete")
log.debug("")
# Rewind back to the start of the file to comply with
# postconditions.
file_obj.seek(0)
return file_obj
# No need to change the contents of the update file, so just open the
# file as-is.
return open(self._update_fname, "rb")