# Repository: https://gitlab.com/qblox/packages/software/qblox-scheduler
# Licensed according to the LICENSE file on the main branch
#
# Copyright 2025, Qblox B.V.
"""
The HardwareAgent object is the goto object when running experiments on qblox hardware.
It provides an interface to easily set up the hardware, run experiments, and receive results.
"""
from __future__ import annotations
import json
import warnings
from pathlib import Path
from typing import TYPE_CHECKING, TypeVar
from qblox_instruments import Cluster, ClusterType
from qblox_instruments.types import DebugLevel
from qcodes.instrument import find_or_create_instrument
from qblox_scheduler.analysis.data_handling import OutputDirectoryManager
from qblox_scheduler.backends.graph_compilation import SerialCompiler
from qblox_scheduler.backends.qblox.data import save_to_experiment as save_to_experiment_
from qblox_scheduler.backends.qblox_backend import (
QbloxHardwareCompilationConfig,
QbloxModuleNotFoundError,
)
from qblox_scheduler.backends.types.qblox import ClusterDescription
from qblox_scheduler.device_under_test.device_element import DeviceElement
from qblox_scheduler.device_under_test.quantum_device import QuantumDevice
from qblox_scheduler.experiments.schedules import ExecuteSchedule
from qblox_scheduler.instrument_coordinator.components.base import instrument_to_component_name
from qblox_scheduler.instrument_coordinator.components.qblox import ClusterComponent
from qblox_scheduler.instrument_coordinator.instrument_coordinator import InstrumentCoordinator
from qblox_scheduler.schedule import Schedule
from qblox_scheduler.schedules.schedule import CompiledSchedule, TimeableSchedule
from quantify_core.data.handling import gen_tuid
from quantify_core.measurement.control import MeasurementControl
if TYPE_CHECKING:
from collections.abc import Sequence
from pydantic import BaseModel
from xarray import Dataset
from qblox_scheduler.device_under_test.edge import Edge
[docs]
def _get_pydantic_value_or_set_default(
obj: dict | BaseModel, attribute: str, default: T = None
) -> T:
# Quick hack to get an attribute of a pydantic object,
# whether it's still in dict form or as instance.
# Used since the hardware_configuration might not be ok yet for pydantic validation
if isinstance(obj, dict):
if attribute in obj:
return obj[attribute]
obj[attribute] = default
return default
return obj.__getattribute__(attribute)
[docs]
def _load_quantum_device_from_file(file: str | Path) -> QuantumDevice:
file = Path(file)
if file.suffix == ".json":
return QuantumDevice.from_json_file(file)
elif file.suffix == ".yaml":
return QuantumDevice.from_yaml_file(file)
else:
raise ValueError(f"Error opening {file}.\nUnsupported file type '{file.suffix}'")
[docs]
class HardwareAgent:
"""
Hardware definition of qblox backend.
Contains definitions for the cluster, the hardware configuration,
and related classes to manage the instruments.
Parameters
----------
hardware_configuration
The hardware configuration.
Either
- a QbloxHardwareCompilationConfig instance
- a dictionary as pydantic model of the QbloxHardwareCompilationConfig
- a json file that stores the above dictionary
The cluster fields in "hardware_description" will have its `modules` field autogenerated,
based on its `ip` field when they are not defined.
This `ip` field is mandatory.
quantum_device_configuration
The quantum device configuration.
Either
- a json dictionary of the QuantumDevice
- a path to the file that stores the json dictionary
debug
Debug level of the cluster. By default, it is set to `DebugLevel.MINIMAL_CHECK`.
Available debug levels are:
- `DebugLevel.MINIMAL_CHECK`: Check compatibility between the hardware firmware version and qblox-instruments version, and check for errors when starting and stopping sequencers.
- `DebugLevel.ERROR_CHECK`: Check for errors after every low-level cluster command.
- `DebugLevel.NO_CHECK`: Do not check for errors.
- `DebugLevel.VERSION_AND_ERROR_CHECK`: Combination of `DebugLEVEL.MINIMAL_CHECK` and `DebugLevel.ERROR_CHECK`.
default_output_dir
Default output directory where the data will be stored.
""" # noqa: E501
[docs]
_ElementType = TypeVar("_ElementType", bound=DeviceElement)
[docs]
_num_instances_initiated = 0
def __init__(
self,
hardware_configuration: QbloxHardwareCompilationConfig | dict | str | Path,
quantum_device_configuration: dict | str | Path | QuantumDevice | None = None,
*,
debug: DebugLevel = DebugLevel.MINIMAL_CHECK,
output_dir: Path | str | None = None,
) -> None:
hardware_config: QbloxHardwareCompilationConfig | dict
if isinstance(hardware_configuration, (str, Path)):
with Path(hardware_configuration).open("r") as file:
hardware_config = json.load(file)
else:
hardware_config = hardware_configuration
[docs]
self._hardware_configuration = hardware_config
[docs]
self._clusters: dict[str, ClusterComponent] = {}
match quantum_device_configuration:
case None:
self._quantum_device = QuantumDevice(
f"quantum_device_{HardwareAgent._num_instances_initiated}" # type: ignore[reportCallIssue] # TODO: Remove after refactoring CompatModel.__init__
)
case str() | Path():
self._quantum_device = _load_quantum_device_from_file(quantum_device_configuration)
case dict():
self._quantum_device = QuantumDevice.from_dict(quantum_device_configuration)
case QuantumDevice():
self._quantum_device = quantum_device_configuration
case _:
raise TypeError(
"Argument for 'quantum_device_configuration' does not match any supported type."
)
[docs]
self._instrument_coordinator = (
InstrumentCoordinator(
name=f"instrument_coordinator_{HardwareAgent._num_instances_initiated}",
add_default_generic_icc=False,
)
if not self._quantum_device.instr_instrument_coordinator
else self._quantum_device.instr_instrument_coordinator
)
[docs]
self._measurement_control = MeasurementControl(
name=f"measurement_control_{HardwareAgent._num_instances_initiated}",
)
self._quantum_device.instr_instrument_coordinator = self._instrument_coordinator
HardwareAgent._num_instances_initiated += 1
[docs]
self._latest_compiled_schedule: CompiledSchedule | None = None
OutputDirectoryManager.set_datadir(output_dir)
@property
[docs]
def hardware_configuration(self) -> QbloxHardwareCompilationConfig:
"""
The (validated) hardware configuration of this hardware.
Only available after the connections with the clusters
have been established using `connect_clusters` if initially given as a file/dictionary.
Note that this is **not** in json/dictionary format.
"""
if not isinstance(self._hardware_configuration, QbloxHardwareCompilationConfig):
raise TypeError("Hardware not initialized yet, please do so with `connect_clusters`")
return self._hardware_configuration
[docs]
def add_device_elements(self, device_elements: Sequence[DeviceElement]) -> None:
"""
Add multiple device elements to the quantum device.
Parameters
----------
device_elements
list of device elements to be added
"""
for element in device_elements:
self._quantum_device.add_element(element)
[docs]
def add_edges(self, edges: list[Edge]) -> None:
"""
Add multiple edges to the quantum device.
Parameters
----------
edges
list of edges to be added
"""
for edge in edges:
self._quantum_device.add_edge(edge)
[docs]
def connect_clusters(self) -> None:
"""
Connect the defined clusters to the hardware given their identifiers.
Will also supplement the modules to the given hardware description and/or verify them.
"""
if isinstance(self._hardware_configuration, QbloxHardwareCompilationConfig):
hardware_description = self._hardware_configuration.hardware_description
else:
if "hardware_description" not in self._hardware_configuration:
raise ValueError("Hardware configuration misses the hardware description.")
hardware_description = self._hardware_configuration["hardware_description"]
# When _clusters is empty, get the IP from the hardware description
if not self._clusters:
for cluster_name, cluster_description in hardware_description.items():
if not isinstance(cluster_description, (ClusterDescription, dict)):
raise TypeError(
f"Incorrect hardware description format for cluster '{cluster_name}'"
)
instrument_type = _get_pydantic_value_or_set_default(
cluster_description, "instrument_type"
)
if instrument_type == "Cluster":
cluster = self._create_cluster(cluster_name, cluster_description)
ic_cluster_name = instrument_to_component_name(cluster_name)
if ClusterComponent.exist(ic_cluster_name, ClusterComponent):
ClusterComponent.find_instrument(ic_cluster_name, ClusterComponent).close()
cluster_component: ClusterComponent = ClusterComponent(cluster)
self._clusters[cluster_name] = cluster_component
self._instrument_coordinator.add_component(cluster_component)
modules = _get_pydantic_value_or_set_default(cluster_description, "modules", {})
for i, instrument in cluster_component.get_module_descriptions().items():
if str(i) not in modules and i not in modules:
if isinstance(
self._hardware_configuration, QbloxHardwareCompilationConfig
):
modules[i] = instrument
else:
modules[i] = {"instrument_type": instrument.instrument_type}
try:
self._hardware_configuration = QbloxHardwareCompilationConfig.model_validate(
self._hardware_configuration
)
except QbloxModuleNotFoundError as exception:
raise QbloxModuleNotFoundError(
f"{exception.args[0]}\n"
f"This can also be because no module at the given index exists in the cluster,"
f"yet it is defined in the connectivity."
) from exception
self._quantum_device.hardware_config = self._hardware_configuration
self._verify_hardware_configuration()
@property
[docs]
def quantum_device(self) -> QuantumDevice:
"""
The quantum device active in this hardware.
The quantum device contains the device elements
and how the connectivity between them is defined.
"""
return self._quantum_device
@property
[docs]
def instrument_coordinator(self) -> InstrumentCoordinator:
"""
The instrument coordinator active in this hardware.
The instrument coordinator is responsible for executing
the (compiled) schedule on the instruments in this backend.
"""
return self._instrument_coordinator
@property
[docs]
def measurement_control(self) -> MeasurementControl:
"""
The instrument coordinator active in this hardware.
The instrument coordinator is responsible for executing
the (compiled) schedule on the instruments in this backend.
"""
return self._measurement_control
[docs]
def get_clusters(self) -> dict[str, Cluster]:
"""
Get all the instantiated clusters.
Returns
-------
A dictionary mapping cluster names to Cluster objects.
"""
if not self._clusters:
self.connect_clusters()
return {
cluster_name: cluster_component.cluster
for cluster_name, cluster_component in self._clusters.items()
}
@property
[docs]
def latest_compiled_schedule(self) -> CompiledSchedule | None:
"""Get the latest compiled schedule, if one exists."""
return self._latest_compiled_schedule
[docs]
def run(
self,
schedule: Schedule | TimeableSchedule | CompiledSchedule,
*,
timeout: int = 10,
save_to_experiment: bool = True,
save_snapshot: bool = True,
) -> Dataset:
"""
Run a schedule on the hardware.
Parameters
----------
schedule
The schedule to run.
timeout
The timeout for retrieving the results, in seconds
save_to_experiment
Whether to save the dataset to an experiment directory. The dataset and (optionally)
snapshot will be saved in <datadir>/<tuid>/dataset.hdf5 and
<datadir>/<tuid>/snapshot.json, where datadir is specified by
:func:`~qblox_scheduler.analysis.data_handling.OutputDirectoryManager.set_datadir` and
:func:`~qblox_scheduler.analysis.data_handling.OutputDirectoryManager.get_datadir`
save_snapshot
Whether to save a snapshot of the experiment
Returns
-------
Acquisition result dataset if any acquisitions are used in the schedule.
Empty dataset otherwise.
Raises
------
TimeoutError
When hardware doesn't return results in the given timeout.
"""
if isinstance(schedule, (TimeableSchedule, CompiledSchedule)):
new_schedule = Schedule(schedule.name)
new_schedule.add(ExecuteSchedule(schedule))
return self.run(
new_schedule,
timeout=timeout,
save_to_experiment=save_to_experiment,
save_snapshot=save_snapshot,
)
self.connect_clusters()
dataset = schedule._experiment.run(self.quantum_device, timeout=timeout)
if save_snapshot or save_to_experiment:
tuid = gen_tuid()
save_to_experiment_(
dataset=dataset,
save_dataset=save_to_experiment,
save_snapshot=save_snapshot,
tuid=tuid,
)
return dataset
[docs]
def compile(self, schedule: TimeableSchedule) -> CompiledSchedule:
"""
Compile the schedule to the hardware.
Compilation is already done in the `run` method,
so run this method only if the compiled schedule requires to be investigated.
Parameters
----------
schedule
The TimeableSchedule to compile for this hardware
Returns
-------
The compiled schedule
"""
compiler = SerialCompiler()
compiled_schedule = compiler.compile(
schedule=schedule, config=self._quantum_device.generate_compilation_config()
)
self._latest_compiled_schedule = compiled_schedule
return compiled_schedule
@staticmethod
[docs]
def set_output_data_dir(datadir: Path | str | None = None) -> None:
"""
Sets the output data directory.
Parameters
----------
datadir
Path of the data directory. If set to ``None``, resets the datadir to the
default datadir (``<top_level>/data``).
"""
OutputDirectoryManager.set_datadir(datadir=datadir)
[docs]
def _verify_hardware_configuration(self) -> None:
"""
Verifies if the given hardware configuration can be run on the cluster.
Raises
------
TypeError
When the hardware_configuration has not been correctly initialized yet.
ValueError
When the hardware configuration does not match the modules installed on the cluster.
"""
if not isinstance(self._hardware_configuration, QbloxHardwareCompilationConfig):
raise TypeError(
"Hardware config is not yet correctly initialized. "
"It can be initialized by calling `QbloxHardware.connect_clusters()`"
)
num_clusters = sum(
isinstance(x, ClusterDescription)
for x in self._hardware_configuration.hardware_description.values()
)
if len(self._clusters) != num_clusters:
raise ValueError(
"Hardware config is not yet correctly initialized. "
"It can be initialized by calling `QbloxHardware.connect_clusters()`"
)
for cluster_name, cluster_info in self._hardware_configuration.hardware_description.items():
if not isinstance(cluster_info, ClusterDescription):
continue
cluster_module_types = self._clusters[cluster_name].get_module_descriptions()
for i, module_info in cluster_info.modules.items():
index = int(i)
if index not in cluster_module_types:
raise ValueError(
f"No module at index {index} installed in {cluster_name}. "
f"Yet one is defined manually in the hardware description."
)
if type(module_info) is not type(cluster_module_types[index]):
raise ValueError(
f"Cluster `{cluster_name}` has a module "
f"{cluster_module_types[index].instrument_type} "
f"installed at index {index}, "
f"not a {module_info.instrument_type} as defined manually in the "
f"hardware description."
)
[docs]
def _create_cluster(
self, cluster_name: str, cluster_description: dict | ClusterDescription
) -> Cluster:
ip = _get_pydantic_value_or_set_default(cluster_description, "ip")
if ip is None:
warnings.warn(
f"{cluster_name}: Trying to instantiate cluster with ip 'None'."
"Creating a dummy cluster.",
UserWarning,
)
return self._create_dummy_cluster(cluster_name, cluster_description)
return find_or_create_instrument(
instrument_class=Cluster,
name=cluster_name,
recreate=True,
identifier=ip,
debug=self._debug,
)
[docs]
def _create_dummy_cluster(
self, cluster_name: str, cluster_description: dict | ClusterDescription
) -> Cluster:
module_types = {
"QCM": ClusterType.CLUSTER_QCM,
"QRM": ClusterType.CLUSTER_QRM,
"QCM_RF": ClusterType.CLUSTER_QCM_RF,
"QRM_RF": ClusterType.CLUSTER_QRM_RF,
"QTM": ClusterType.CLUSTER_QTM,
"QDM": ClusterType.CLUSTER_QDM,
"QRC": ClusterType.CLUSTER_QRC,
}
modules_value = _get_pydantic_value_or_set_default(
ClusterDescription.model_validate(cluster_description), "modules", {}
)
modules: dict = {} if modules_value is None else modules_value
dummy_config = {
int(slot): module_types[
_get_pydantic_value_or_set_default(module, "instrument_type", "")
]
for slot, module in modules.items()
}
return find_or_create_instrument(
instrument_class=Cluster, name=cluster_name, recreate=True, dummy_cfg=dummy_config
)