Source code for qblox_scheduler.experiments.schedules
# Repository: https://gitlab.com/qblox/packages/software/qblox-scheduler
# Licensed according to the LICENSE file on the main branch
#
# Copyright 2025, Qblox B.V.
"""Module containing the the step to execute a schedule."""
from __future__ import annotations
from typing import TYPE_CHECKING
from qblox_scheduler.backends.graph_compilation import SerialCompiler
from qblox_scheduler.experiments.experiment import Step
from qblox_scheduler.schedules.schedule import (
CompiledSchedule,
TimeableSchedule,
)
if TYPE_CHECKING:
from xarray import Dataset
from qblox_scheduler.device_under_test import QuantumDevice
from qblox_scheduler.schedules.schedule import TimeableScheduleBase
[docs]
class ExecuteSchedule(Step):
"""Experiment step that runs a schedule."""
def __init__(self, schedule: TimeableScheduleBase) -> None:
super().__init__(f"run schedule {schedule.name}")
self.data["schedule_info"] = {
"schedule": schedule,
}
@property
[docs]
def schedule(self) -> TimeableScheduleBase:
"""The schedule to run."""
return self.data["schedule_info"]["schedule"]
[docs]
def run(self, device: QuantumDevice, timeout: int = 10) -> Dataset:
"""Run a schedule on the quantum device."""
schedule = self.schedule
if isinstance(schedule, CompiledSchedule):
compiled_schedule = schedule
elif isinstance(schedule, TimeableSchedule):
compiler = SerialCompiler(name="compiler")
compiled_schedule = compiler.compile(
schedule=schedule,
config=device.generate_compilation_config(),
)
else:
raise RuntimeError(f"uncompileable schedule {schedule}")
inst_coord = device.instr_instrument_coordinator
if not inst_coord:
raise RuntimeError(
"QuantumDevice needs to have an active instrument coordinator to run a schedule"
)
inst_coord.stop(allow_failure=True)
inst_coord.prepare(compiled_schedule)
inst_coord.start()
inst_coord.wait_done(timeout_sec=timeout)
return inst_coord.retrieve_acquisition()