Source code for qblox_scheduler.backends.qblox.switch_outputs

# Repository: https://gitlab.com/qblox/packages/software/qblox-scheduler
# Licensed according to the LICENSE file on the main branch
#
# Copyright 2025, Qblox B.V.
"""Module containing logic to handle automatic output (RF) switching."""

from __future__ import annotations

from typing import TYPE_CHECKING

from qblox_scheduler.backends.qblox import constants
from qblox_scheduler.backends.qblox.helpers import is_pulse
from qblox_scheduler.compilation import _shift_timing_at, _shift_timing_from
from qblox_scheduler.operations.control_flow_library import ControlFlowOperation
from qblox_scheduler.operations.hardware_operations.pulse_library import RFSwitchToggle
from qblox_scheduler.operations.pulse_library import IdlePulse
from qblox_scheduler.schedules import Schedulable, TimeableSchedule

if TYPE_CHECKING:
    from qblox_scheduler.backends.graph_compilation import CompilationConfig
    from qblox_scheduler.operations import Operation


[docs] def switch_outputs( schedule: TimeableSchedule, config: CompilationConfig, ) -> TimeableSchedule: """ Apply automatic RF output switching to the given schedule based on the operations. Parameters ---------- schedule The schedule to which automatic output switching will be applied. config The configuration containing hardware options. Returns ------- A tuple containing the schedule with RF output switching applied, and the amount of timing added in case of automatic operation insertion. """ schedule, _delta = _switch_outputs(schedule, config) return schedule
[docs] def _switch_outputs( schedule: TimeableSchedule, config: CompilationConfig, ) -> tuple[TimeableSchedule, float]: # Save list of original schedulables as we're adding to them in _add_switch_operation original_schedulables = list(schedule.schedulables.values()) # Maintain list of last switch operations switch_ops = {} # Total amount of time this schedule has been shifted forward delta = 0.0 # Shift to add to every operation due to e.g. recursive schedules having been shifted add_delta = 0.0 for i, schedulable in enumerate(original_schedulables): operation = schedule.operations[schedulable["operation_id"]] control_operation = None if isinstance(operation, ControlFlowOperation): control_operation = operation operation = control_operation.body if add_delta > 0: _shift_timing_at(schedulable, operation, add_delta) if isinstance(operation, TimeableSchedule): # Recursively apply pass to nested schedules new_operation, op_delta = _switch_outputs(operation, config) if control_operation is not None: control_operation.body = new_operation else: schedule.operations[schedulable["operation_id"]] = new_operation delta += op_delta add_delta += op_delta elif is_rf_pulse(operation): is_last = i == len(original_schedulables) - 1 ops, op_delta = _add_switch_operations( schedule, config, operation, schedulable, switch_ops, is_last, ) switch_ops.update(ops) delta += op_delta return schedule, delta
[docs] def is_rf_pulse(operation: Operation) -> bool: """ Check if the operation is an RF pulse. Parameters ---------- operation: The operation to check. Returns ------- : True if the operation is an RF pulse, False otherwise. """ if not is_pulse(operation): return False pulse_info = operation.data["pulse_info"] return not pulse_info.get("marker_pulse", False)
[docs] def _add_switch_operations( schedule: TimeableSchedule, config: CompilationConfig, original_operation: Operation, original_schedulable: Schedulable, switch_ops: dict[str, tuple[Operation, Schedulable]], is_last: bool, ) -> tuple[dict[str, tuple[Operation, Schedulable]], float]: auto_retime = False hardware_config = config.hardware_compilation_config if hardware_config is not None: auto_retime = hardware_config.compiler_options.retime_allowed # Get pulses and their durations switches = _gather_pulse_durations(original_operation, config) # Add operations delta = 0.0 added_ops = {} for target_port, (target_duration, target_clock) in switches.items(): original_start = original_schedulable["abs_time"] + original_operation["pulse_info"]["t0"] target_start = original_start - constants.RF_OUTPUT_RISE_TIME target_end = original_schedulable["abs_time"] + target_duration # Check starts and ends if target_start < 0: if not auto_retime: raise RuntimeError( f"no space to insert RF switch toggle in schedule: " f"{constants.RF_OUTPUT_RISE_TIME * 1e9} ns rise time required, " f"but operation starts at {original_start * 1e9} ns" ) shift_delay = -target_start _shift_timing_from(schedule, original_schedulable, shift_delay) delta += shift_delay target_start = 0 target_end += shift_delay if is_last and constants.RF_OUTPUT_FALL_TIME > 0: if not auto_retime: raise RuntimeError( f"no space to insert RF switch toggle in schedule: " f"{constants.RF_OUTPUT_FALL_TIME * 1e9} ns required afterwards, " "but schedulable is last operation. " "Please add additional idle time at the end of the schedule, " "by adding e.g. an `IdlePulse` to the schedule." ) idle_sched = schedule.add( IdlePulse(duration=constants.RF_OUTPUT_FALL_TIME), rel_time=target_duration, ref_op=original_schedulable, ref_pt="start", ) idle_sched["abs_time"] = original_start + target_duration delta += constants.RF_OUTPUT_FALL_TIME is_last = False # we added the pulse # Check if we can extend an existing op if target_port in switch_ops: switch_op, switch_sched = switch_ops[target_port] switch_end = switch_sched["abs_time"] + switch_op.duration if switch_end + constants.RF_OUTPUT_GRACE_TIME >= target_start: # We can! Extend this op. to_extend = target_end - switch_end if to_extend > 0: switch_op.data["pulse_info"]["duration"] += to_extend continue # No merging, create new op switch_op = RFSwitchToggle( duration=target_end - target_start, port=target_port, clock=target_clock, ) switch_sched = schedule.add( switch_op, rel_time=-constants.RF_OUTPUT_RISE_TIME, ref_op=original_schedulable, ref_pt="start", ) switch_sched["abs_time"] = target_start added_ops[target_port] = (switch_op, switch_sched) return added_ops, delta
[docs] def _gather_pulse_durations( operation: Operation, config: CompilationConfig ) -> dict[str, tuple[float, str]]: # Avoid circular import from qblox_scheduler.backends.qblox_backend import _get_module_cfg if not is_rf_pulse(operation): return {} pulse_info = operation.data["pulse_info"] target_port = pulse_info["port"] target_clock = pulse_info["clock"] target_duration = operation.duration try: module_cfg = _get_module_cfg(target_port, target_clock, config) except KeyError: return {} if getattr(module_cfg, "rf_output_on", None) != "auto": return {} return {target_port: (target_duration, target_clock)}