# Repository: https://gitlab.com/qblox/packages/software/qblox-scheduler
# Licensed according to the LICENSE file on the main branch
#
# Copyright 2020-2025, Quantify Consortium
# Copyright 2025, Qblox B.V.
"""Data handling utilities for Qblox Scheduler."""
import datetime
import sys
from pathlib import Path
from typing import Any, ClassVar, Literal, Optional
import rich
import xarray as xr
from dateutil.parser import parse
import quantify_core.data.dataset_adapters as da
from quantify_core.data.handling import snapshot as create_snapshot
from quantify_core.data.handling import write_dataset as qc_write_dataset
from quantify_core.data.types import TUID
from quantify_core.utilities.general import save_json
[docs]
def _get_default_datadir(verbose: bool = False) -> Path:
"""
Returns (and optionally print) a default datadir path.
Intended for fast prototyping, tutorials, examples, etc..
Parameters
----------
verbose
If ``True`` prints the returned datadir.
Returns
-------
:
The ``Path.home() / "qblox_data"`` path.
"""
datadir = (Path.home() / "qblox_data").resolve()
if verbose:
rich.print(f"Data will be saved in:\n{datadir}")
return datadir
[docs]
class OutputDirectoryManager:
"""
Manages output directory paths for Qblox Scheduler data storage.
The class maintains a single instance throughout
the application lifecycle, ensuring consistent directory management.
Attributes
----------
_datadir : str or Path
The current data directory path. Private attribute managed through
setter and getter methods.
"""
[docs]
DATADIR: ClassVar[Path] = _get_default_datadir()
@classmethod
[docs]
def set_datadir(cls, datadir: Path | str | None = None) -> None:
"""
Sets the data directory.
Parameters
----------
datadir : pathlib.Path or str or None
Path of the data directory. If set to ``None``, resets the datadir to the
default datadir (``<top_level>/data``).
"""
if isinstance(datadir, str):
datadir = Path(datadir)
if datadir is None:
datadir = _get_default_datadir()
try:
Path(datadir).mkdir(exist_ok=True, parents=True)
except PermissionError as e:
raise PermissionError(
f"Permission error while setting datadir {datadir}."
"\nPlease make sure you have the correct permissions."
) from e
cls.DATADIR = datadir
@classmethod
[docs]
def get_datadir(cls) -> Path:
"""
Returns the current data directory.
Returns
-------
:
The current data directory.
"""
if not Path.is_dir(cls.DATADIR):
raise NotADirectoryError(
"The datadir is not valid."
"\nWe recommend to settle for a single common data directory for all \n"
"notebooks/experiments within your measurement setup/PC.\n"
"E.g. '~/qblox_data' (unix), or 'D:\\Data\\qblox_data' (Windows).\n"
)
return cls.DATADIR
[docs]
class AnalysisDataContainer:
"""
Class which represents all data related to an experiment. This allows the user to
run experiments and store data. The class serves as an
initial interface and uses the directory paths set by OutputDirectoryManager.
"""
[docs]
DATASET_NAME: ClassVar[str] = "dataset.hdf5"
[docs]
SNAPSHOT_FILENAME: ClassVar[str] = "snapshot.json"
[docs]
_TUID_LENGTH: ClassVar[int] = 26 # Length of "YYYYmmDD-HHMMSS-sss-******"
def __init__(self, tuid: str, name: str):
"""
Creates an instance of the AnalysisDataContainer.
Parameters
----------
tuid
TUID to use
name
Name to append to the data directory path.
"""
# Date folder works as a container of TUIDs
date_folder = tuid.split("-")[0]
[docs]
self.day_folder = OutputDirectoryManager.get_datadir() / date_folder
Path.mkdir(self.day_folder, exist_ok=True)
# A TUID folder that contains data and potentially snapshot
[docs]
self.data_folder = (
(self.day_folder / f"{self.tuid}-{name}") if name else self.day_folder / f"{self.tuid}"
)
Path.mkdir(self.data_folder, exist_ok=True)
@property
[docs]
def experiment_name(self) -> str:
"""The name of the experiment."""
return self.tuid[self._TUID_LENGTH :]
@classmethod
[docs]
def load_dataset(
cls,
tuid: TUID,
name: str = DATASET_NAME,
) -> xr.Dataset:
"""
Loads a dataset specified by a tuid.
Parameters
----------
tuid
A :class:`~quantify_core.data.types.TUID` string. It is also possible to specify
only the first part of a tuid.
name
Name of the dataset.
Returns
-------
:
The dataset.
"""
day_folder = OutputDirectoryManager.get_datadir() / Path(tuid.split("-")[0])
path = list(Path(day_folder).rglob(f"{tuid}*"))[0] / name
return AnalysisDataContainer.load_dataset_from_path(path)
@classmethod
[docs]
def load_dataset_from_path(cls, path: Path | str) -> xr.Dataset:
"""
Loads a :class:`~xarray.Dataset` with a specific engine preference.
Before returning the dataset :meth:`AdapterH5NetCDF.recover()
<quantify_core.data.dataset_adapters.AdapterH5NetCDF.recover>` is applied.
This function tries to load the dataset until success with the following engine
preference:
- ``"h5netcdf"``
- ``"netcdf4"``
- No engine specified (:func:`~xarray.load_dataset` default)
Parameters
----------
path
Path to the dataset.
Returns
-------
:
The loaded dataset.
""" # pylint: disable=line-too-long
exceptions = []
engines = ["h5netcdf", "netcdf4", None]
for engine in engines:
# there are three datasets that a user can load:
# - "old" quantify datasets ( <2.0.0)
# - "new" quantify datasets (>= 2.0.0)
# - qblox-scheduler datasets
try:
dataset = xr.load_dataset(path, engine=engine)
except Exception as exception: # noqa: BLE001, PERF203
exceptions.append(exception)
else:
# Only quantify_dataset_version=>2.0.0 requires the adapter
if "quantify_dataset_version" in dataset.attrs:
dataset = da.AdapterH5NetCDF.recover(dataset)
return dataset
# Do not let exceptions pass silently
for exception, engine in zip(exceptions, engines[: engines.index(engine)]): # type: ignore # noqa: B020, B905
print(
f"Failed loading dataset with '{engine}' engine. "
f"Raised '{exception.__class__.__name__}':\n {exception}",
)
# raise the last exception
raise exception # type: ignore
[docs]
def write_dataset(self, dataset: xr.Dataset) -> None:
"""
Writes the quantify dataset to the directory specified by
`~.data_folder`.
Parameters
----------
dataset
The dataset to be written to the directory
"""
qc_write_dataset(self.data_folder / self.DATASET_NAME, dataset)
[docs]
def save_snapshot(
self,
snapshot: Optional[dict[str, Any]] = None,
compression: Literal["bz2", "gzip", "lzma"] | None = None,
) -> None:
"""
Writes the snapshot to disk as specified by
`~.data_folder`.
Parameters
----------
snapshot
The snapshot to be written to the directory
compression
The compression type to use. Can be one of 'gzip', 'bz2', 'lzma'.
Defaults to None, which means no compression.
"""
if snapshot is None:
snapshot = create_snapshot()
save_json(
directory=self.data_folder,
filename=self.SNAPSHOT_FILENAME,
data=snapshot,
compression=compression,
)
@classmethod
[docs]
def get_latest_tuid(cls, contains: str = "") -> TUID:
"""Returns the most recent tuid.
.. tip::
This function is similar to :func:`~get_tuids_containing` but is preferred if
one is only interested in the most recent
:class:`~quantify_core.data.types.TUID` for performance reasons.
Parameters
----------
contains
An optional string contained in the experiment name.
Returns
-------
:
The latest TUID.
Raises
------
FileNotFoundError
No data found.
"""
# `max_results=1, reverse=True` makes sure the tuid is found efficiently asap
return AnalysisDataContainer.get_tuids_containing(contains, max_results=1, reverse=True)[0]
@classmethod
[docs]
# pylint: disable=too-many-locals
def get_tuids_containing(
cls,
contains: str = "",
t_start: datetime.datetime | str | None = None,
t_stop: datetime.datetime | str | None = None,
max_results: int = sys.maxsize,
reverse: bool = False,
) -> list[TUID]:
"""Returns a list of tuids containing a specific label.
.. tip::
If one is only interested in the most recent
:class:`~quantify_core.data.types.TUID`, :func:`~get_latest_tuid` is preferred
for performance reasons.
Parameters
----------
contains
A string contained in the experiment name.
t_start
datetime to search from, inclusive. If a string is specified, it will be
converted to a datetime object using :obj:`~dateutil.parser.parse`.
If no value is specified, will use the year 1 as a reference t_start.
t_stop
datetime to search until, exclusive. If a string is specified, it will be
converted to a datetime object using :obj:`~dateutil.parser.parse`.
If no value is specified, will use the current time as a reference t_stop.
max_results
Maximum number of results to return. Defaults to unlimited.
reverse
If False, sorts tuids chronologically, if True sorts by most recent.
Returns
-------
list
A list of :class:`~quantify_core.data.types.TUID`: objects.
Raises
------
FileNotFoundError
No data found.
"""
datadir = OutputDirectoryManager.get_datadir()
if isinstance(t_start, str):
t_start = parse(t_start)
elif t_start is None:
t_start = datetime.datetime(1, 1, 1)
if isinstance(t_stop, str):
t_stop = parse(t_stop)
elif t_stop is None:
t_stop = datetime.datetime.now()
# date range filters, define here to make the next line more readable
d_start = t_start.strftime("%Y%m%d")
d_stop = t_stop.strftime("%Y%m%d")
def lower_bound(dir_name: str) -> bool:
return dir_name >= d_start if d_start else True
def upper_bound(dir_name: str) -> bool:
return dir_name <= d_stop if d_stop else True
daydirs = list(
filter(
lambda x: (
x.name.isdigit()
and len(x.name) == 8
and lower_bound(x.name)
and upper_bound(x.name)
),
datadir.iterdir(),
),
)
daydirs.sort(reverse=reverse)
if len(daydirs) == 0:
err_msg = f"There are no valid day directories in the data folder '{datadir}'"
if t_start or t_stop:
err_msg += f", for the range {t_start or ''} to {t_stop or ''}"
raise FileNotFoundError(err_msg)
tuids = []
for daydir in daydirs:
expdirs = list(
filter(
lambda x: (
len(x.name) > 25
and x.is_dir()
and (contains in x.name) # label is part of exp_name
and TUID.is_valid(x.name[:26]) # tuid is valid
and (t_start <= TUID.datetime_seconds(x.name) < t_stop)
),
Path.iterdir(datadir / daydir),
),
)
expdirs.sort(reverse=reverse)
for expname in expdirs:
# Check for inconsistent folder structure for datasets portability
if daydir != expname.name[:8]:
raise FileNotFoundError(
f"Experiment container '{expname}' is in wrong day directory '{daydir}'",
)
tuids.append(TUID(expname.name[:26]))
if len(tuids) == max_results:
return tuids
if len(tuids) == 0:
raise FileNotFoundError(f"No experiment found containing '{contains}'")
return tuids
@classmethod
[docs]
def locate_experiment_container(cls, tuid: str) -> Path:
"""Returns the experiment container for the given tuid."""
day_folder = Path(tuid.split("-")[0])
# Based on the tuid check if there is a respective folder(s)
folder_list = list(
Path(OutputDirectoryManager.get_datadir() / day_folder).rglob(f"{tuid}*")
)
if len(folder_list) == 0:
raise FileNotFoundError(
f"Experiment container with given TUID {tuid}\
was not found"
)
return folder_list[0]