Source code for qblox_scheduler.analysis.data_handling

# Repository: https://gitlab.com/qblox/packages/software/qblox-scheduler
# Licensed according to the LICENSE file on the main branch
#
# Copyright 2020-2025, Quantify Consortium
# Copyright 2025, Qblox B.V.

"""Data handling utilities for Qblox Scheduler."""

import datetime
import sys
from pathlib import Path
from typing import Any, ClassVar, Literal, Optional

import rich
import xarray as xr
from dateutil.parser import parse

import quantify_core.data.dataset_adapters as da
from quantify_core.data.handling import snapshot as create_snapshot
from quantify_core.data.handling import write_dataset as qc_write_dataset
from quantify_core.data.types import TUID
from quantify_core.utilities.general import save_json


[docs] def _get_default_datadir(verbose: bool = False) -> Path: """ Returns (and optionally print) a default datadir path. Intended for fast prototyping, tutorials, examples, etc.. Parameters ---------- verbose If ``True`` prints the returned datadir. Returns ------- : The ``Path.home() / "qblox_data"`` path. """ datadir = (Path.home() / "qblox_data").resolve() if verbose: rich.print(f"Data will be saved in:\n{datadir}") return datadir
[docs] class OutputDirectoryManager: """ Manages output directory paths for Qblox Scheduler data storage. The class maintains a single instance throughout the application lifecycle, ensuring consistent directory management. Attributes ---------- _datadir : str or Path The current data directory path. Private attribute managed through setter and getter methods. """
[docs] DATADIR: ClassVar[Path] = _get_default_datadir()
@classmethod
[docs] def set_datadir(cls, datadir: Path | str | None = None) -> None: """ Sets the data directory. Parameters ---------- datadir : pathlib.Path or str or None Path of the data directory. If set to ``None``, resets the datadir to the default datadir (``<top_level>/data``). """ if isinstance(datadir, str): datadir = Path(datadir) if datadir is None: datadir = _get_default_datadir() try: Path(datadir).mkdir(exist_ok=True, parents=True) except PermissionError as e: raise PermissionError( f"Permission error while setting datadir {datadir}." "\nPlease make sure you have the correct permissions." ) from e cls.DATADIR = datadir
@classmethod
[docs] def get_datadir(cls) -> Path: """ Returns the current data directory. Returns ------- : The current data directory. """ if not Path.is_dir(cls.DATADIR): raise NotADirectoryError( "The datadir is not valid." "\nWe recommend to settle for a single common data directory for all \n" "notebooks/experiments within your measurement setup/PC.\n" "E.g. '~/qblox_data' (unix), or 'D:\\Data\\qblox_data' (Windows).\n" ) return cls.DATADIR
[docs] class AnalysisDataContainer: """ Class which represents all data related to an experiment. This allows the user to run experiments and store data. The class serves as an initial interface and uses the directory paths set by OutputDirectoryManager. """
[docs] DATASET_NAME: ClassVar[str] = "dataset.hdf5"
[docs] SNAPSHOT_FILENAME: ClassVar[str] = "snapshot.json"
[docs] _TUID_LENGTH: ClassVar[int] = 26 # Length of "YYYYmmDD-HHMMSS-sss-******"
def __init__(self, tuid: str, name: str): """ Creates an instance of the AnalysisDataContainer. Parameters ---------- tuid TUID to use name Name to append to the data directory path. """
[docs] self.tuid = tuid
# Date folder works as a container of TUIDs date_folder = tuid.split("-")[0]
[docs] self.day_folder = OutputDirectoryManager.get_datadir() / date_folder
Path.mkdir(self.day_folder, exist_ok=True) # A TUID folder that contains data and potentially snapshot
[docs] self.data_folder = ( (self.day_folder / f"{self.tuid}-{name}") if name else self.day_folder / f"{self.tuid}" )
Path.mkdir(self.data_folder, exist_ok=True) @property
[docs] def experiment_name(self) -> str: """The name of the experiment.""" return self.tuid[self._TUID_LENGTH :]
@classmethod
[docs] def load_dataset( cls, tuid: TUID, name: str = DATASET_NAME, ) -> xr.Dataset: """ Loads a dataset specified by a tuid. Parameters ---------- tuid A :class:`~quantify_core.data.types.TUID` string. It is also possible to specify only the first part of a tuid. name Name of the dataset. Returns ------- : The dataset. """ day_folder = OutputDirectoryManager.get_datadir() / Path(tuid.split("-")[0]) path = list(Path(day_folder).rglob(f"{tuid}*"))[0] / name return AnalysisDataContainer.load_dataset_from_path(path)
@classmethod
[docs] def load_dataset_from_path(cls, path: Path | str) -> xr.Dataset: """ Loads a :class:`~xarray.Dataset` with a specific engine preference. Before returning the dataset :meth:`AdapterH5NetCDF.recover() <quantify_core.data.dataset_adapters.AdapterH5NetCDF.recover>` is applied. This function tries to load the dataset until success with the following engine preference: - ``"h5netcdf"`` - ``"netcdf4"`` - No engine specified (:func:`~xarray.load_dataset` default) Parameters ---------- path Path to the dataset. Returns ------- : The loaded dataset. """ # pylint: disable=line-too-long exceptions = [] engines = ["h5netcdf", "netcdf4", None] for engine in engines: # there are three datasets that a user can load: # - "old" quantify datasets ( <2.0.0) # - "new" quantify datasets (>= 2.0.0) # - qblox-scheduler datasets try: dataset = xr.load_dataset(path, engine=engine) except Exception as exception: # noqa: BLE001, PERF203 exceptions.append(exception) else: # Only quantify_dataset_version=>2.0.0 requires the adapter if "quantify_dataset_version" in dataset.attrs: dataset = da.AdapterH5NetCDF.recover(dataset) return dataset # Do not let exceptions pass silently for exception, engine in zip(exceptions, engines[: engines.index(engine)]): # type: ignore # noqa: B020, B905 print( f"Failed loading dataset with '{engine}' engine. " f"Raised '{exception.__class__.__name__}':\n {exception}", ) # raise the last exception raise exception # type: ignore
[docs] def write_dataset(self, dataset: xr.Dataset) -> None: """ Writes the quantify dataset to the directory specified by `~.data_folder`. Parameters ---------- dataset The dataset to be written to the directory """ qc_write_dataset(self.data_folder / self.DATASET_NAME, dataset)
[docs] def save_snapshot( self, snapshot: Optional[dict[str, Any]] = None, compression: Literal["bz2", "gzip", "lzma"] | None = None, ) -> None: """ Writes the snapshot to disk as specified by `~.data_folder`. Parameters ---------- snapshot The snapshot to be written to the directory compression The compression type to use. Can be one of 'gzip', 'bz2', 'lzma'. Defaults to None, which means no compression. """ if snapshot is None: snapshot = create_snapshot() save_json( directory=self.data_folder, filename=self.SNAPSHOT_FILENAME, data=snapshot, compression=compression, )
@classmethod
[docs] def get_latest_tuid(cls, contains: str = "") -> TUID: """Returns the most recent tuid. .. tip:: This function is similar to :func:`~get_tuids_containing` but is preferred if one is only interested in the most recent :class:`~quantify_core.data.types.TUID` for performance reasons. Parameters ---------- contains An optional string contained in the experiment name. Returns ------- : The latest TUID. Raises ------ FileNotFoundError No data found. """ # `max_results=1, reverse=True` makes sure the tuid is found efficiently asap return AnalysisDataContainer.get_tuids_containing(contains, max_results=1, reverse=True)[0]
@classmethod
[docs] # pylint: disable=too-many-locals def get_tuids_containing( cls, contains: str = "", t_start: datetime.datetime | str | None = None, t_stop: datetime.datetime | str | None = None, max_results: int = sys.maxsize, reverse: bool = False, ) -> list[TUID]: """Returns a list of tuids containing a specific label. .. tip:: If one is only interested in the most recent :class:`~quantify_core.data.types.TUID`, :func:`~get_latest_tuid` is preferred for performance reasons. Parameters ---------- contains A string contained in the experiment name. t_start datetime to search from, inclusive. If a string is specified, it will be converted to a datetime object using :obj:`~dateutil.parser.parse`. If no value is specified, will use the year 1 as a reference t_start. t_stop datetime to search until, exclusive. If a string is specified, it will be converted to a datetime object using :obj:`~dateutil.parser.parse`. If no value is specified, will use the current time as a reference t_stop. max_results Maximum number of results to return. Defaults to unlimited. reverse If False, sorts tuids chronologically, if True sorts by most recent. Returns ------- list A list of :class:`~quantify_core.data.types.TUID`: objects. Raises ------ FileNotFoundError No data found. """ datadir = OutputDirectoryManager.get_datadir() if isinstance(t_start, str): t_start = parse(t_start) elif t_start is None: t_start = datetime.datetime(1, 1, 1) if isinstance(t_stop, str): t_stop = parse(t_stop) elif t_stop is None: t_stop = datetime.datetime.now() # date range filters, define here to make the next line more readable d_start = t_start.strftime("%Y%m%d") d_stop = t_stop.strftime("%Y%m%d") def lower_bound(dir_name: str) -> bool: return dir_name >= d_start if d_start else True def upper_bound(dir_name: str) -> bool: return dir_name <= d_stop if d_stop else True daydirs = list( filter( lambda x: ( x.name.isdigit() and len(x.name) == 8 and lower_bound(x.name) and upper_bound(x.name) ), datadir.iterdir(), ), ) daydirs.sort(reverse=reverse) if len(daydirs) == 0: err_msg = f"There are no valid day directories in the data folder '{datadir}'" if t_start or t_stop: err_msg += f", for the range {t_start or ''} to {t_stop or ''}" raise FileNotFoundError(err_msg) tuids = [] for daydir in daydirs: expdirs = list( filter( lambda x: ( len(x.name) > 25 and x.is_dir() and (contains in x.name) # label is part of exp_name and TUID.is_valid(x.name[:26]) # tuid is valid and (t_start <= TUID.datetime_seconds(x.name) < t_stop) ), Path.iterdir(datadir / daydir), ), ) expdirs.sort(reverse=reverse) for expname in expdirs: # Check for inconsistent folder structure for datasets portability if daydir != expname.name[:8]: raise FileNotFoundError( f"Experiment container '{expname}' is in wrong day directory '{daydir}'", ) tuids.append(TUID(expname.name[:26])) if len(tuids) == max_results: return tuids if len(tuids) == 0: raise FileNotFoundError(f"No experiment found containing '{contains}'") return tuids
@classmethod
[docs] def locate_experiment_container(cls, tuid: str) -> Path: """Returns the experiment container for the given tuid.""" day_folder = Path(tuid.split("-")[0]) # Based on the tuid check if there is a respective folder(s) folder_list = list( Path(OutputDirectoryManager.get_datadir() / day_folder).rglob(f"{tuid}*") ) if len(folder_list) == 0: raise FileNotFoundError( f"Experiment container with given TUID {tuid}\ was not found" ) return folder_list[0]