Source code for qblox_scheduler.compilation

# Repository: https://gitlab.com/qblox/packages/software/qblox-scheduler
# Licensed according to the LICENSE file on the main branch
#
# Copyright 2020-2025, Quantify Consortium
# Copyright 2025, Qblox B.V.
"""Compiler for the qblox_scheduler."""

from __future__ import annotations  # noqa: I001

import logging
from typing import TYPE_CHECKING, Literal, overload

import networkx as nx

from qblox_scheduler.operations.hardware_operations.inline_q1asm import InlineQ1ASM
from qblox_scheduler.enums import SchedulingStrategy
from qblox_scheduler.json_utils import load_json_schema, validate_json
from qblox_scheduler.operations.operation import Operation
from qblox_scheduler.operations.control_flow_library import (
    ControlFlowOperation,
    LoopOperation,
    LoopStrategy,
)
from qblox_scheduler.schedules.schedule import (
    Schedulable,
    TimeableSchedule,
    TimeableScheduleBase,
    TimingConstraint,
)

if TYPE_CHECKING:
    from qblox_scheduler.backends.graph_compilation import CompilationConfig

[docs] logger = logging.getLogger(__name__)
@overload
[docs] def _determine_absolute_timing( schedule: TimeableSchedule, time_unit: Literal["physical", "ideal", None] = "physical", config: CompilationConfig | None = None, ) -> TimeableSchedule: ...
@overload def _determine_absolute_timing( schedule: Operation, time_unit: Literal["physical", "ideal", None] = "physical", config: CompilationConfig | None = None, ) -> Operation | TimeableSchedule: ... def _determine_absolute_timing( schedule: Operation | TimeableSchedule, time_unit: Literal[ "physical", "ideal", None ] = "physical", # should be included in CompilationConfig config: CompilationConfig | None = None, ): """ Determine the absolute timing of a schedule based on the timing constraints. This function determines absolute timings for every operation in the :attr:`~.TimeableScheduleBase.schedulables`. It does this by: 1. iterating over all and elements in the :attr:`~.TimeableScheduleBase.schedulables`. 2. determining the absolute time of the reference operation - reference point :code:`"ref_pt"` of the reference operation defaults to :code:`"end"` in case it is not set (i.e., is :code:`None`). 3. determining the start of the operation based on the :code:`rel_time` and :code:`duration` of operations - reference point :code:`"ref_pt_new"` of the added operation defaults to :code:`"start"` in case it is not set. Parameters ---------- schedule The schedule for which to determine timings. config Compilation config for :class:`~qblox_scheduler.backends.graph_compilation.ScheduleCompiler`. time_unit Whether to use physical units to determine the absolute time or ideal time. When :code:`time_unit == "physical"` the duration attribute is used. When :code:`time_unit == "ideal"` the duration attribute is ignored and treated as if it is :code:`1`. When :code:`time_unit == None` it will revert to :code:`"physical"`. Returns ------- : The modified `.TimeableSchedule`` where the absolute time for each operation has been determined. Raises ------ NotImplementedError If the scheduling strategy is not SchedulingStrategy.ASAP """ time_unit = time_unit or "physical" if time_unit not in (valid_time_units := ("physical", "ideal")): raise ValueError(f"Undefined time_unit '{time_unit}'! Must be one of {valid_time_units}") if isinstance(schedule, TimeableScheduleBase): return _determine_absolute_timing_schedule(schedule, time_unit, config) elif isinstance(schedule, ControlFlowOperation): schedule.body = _determine_absolute_timing(schedule.body, time_unit, config) return schedule elif schedule.duration is None: raise RuntimeError( f"Cannot determine timing for operation {schedule.name}. Operation data: {schedule!r}" ) else: return schedule
[docs] def _determine_absolute_timing_schedule( schedule: TimeableSchedule, time_unit: Literal["physical", "ideal", None], config: CompilationConfig | None, ) -> TimeableSchedule: scheduling_strategy = _determine_scheduling_strategy(config) if not schedule.schedulables: raise ValueError(f"schedule '{schedule.name}' contains no schedulables.") for op_key in schedule.operations: if isinstance(schedule.operations[op_key], TimeableSchedule): if schedule.operations[op_key].get("duration", None) is None: schedule.operations[op_key] = _determine_absolute_timing( schedule=schedule.operations[op_key], time_unit=time_unit, config=config, ) elif isinstance(schedule.operations[op_key], ControlFlowOperation): schedule.operations[op_key] = _determine_absolute_timing( schedule=schedule.operations[op_key], time_unit=time_unit, config=config, ) # Note: type checker can not seem to infer the Operation fields elif isinstance(schedule.operations[op_key], Operation) and ( time_unit == "physical" and not schedule.operations[op_key].valid_pulse # type: ignore and not schedule.operations[op_key].valid_acquisition # type: ignore # TODO (SE-650): move to qblox backend. and not isinstance(schedule.operations[op_key], InlineQ1ASM) ): # Gates do not have a defined duration, so only ideal timing is defined raise RuntimeError( f"Operation {schedule.operations[op_key].name} is not a valid pulse or acquisition." f" Please check whether the device compilation has been performed." f" Operation data: {schedule.operations[op_key]!r}" ) _make_timing_constraints_explicit(schedule, scheduling_strategy) references_graph = _populate_references_graph(schedule) _validate_schedulable_references(schedule, references_graph) schedulables_sorted_by_reference = nx.topological_sort(references_graph) for i, schedulable_name in enumerate(schedulables_sorted_by_reference): i: int schedulable_name: str schedulable: Schedulable = schedule.schedulables[schedulable_name] timing_constraints: list[TimingConstraint] = schedulable.data["timing_constraints"] operation: Operation | TimeableSchedule = schedule.operations[ schedulable.data["operation_id"] ] if scheduling_strategy == SchedulingStrategy.ASAP: for timing_constraint in timing_constraints: abs_time = _get_start_time(schedule, timing_constraint, operation, time_unit) if "abs_time" not in schedulable or abs_time > schedulable["abs_time"]: schedulable.data["abs_time"] = abs_time else: schedulable.data["abs_time"] = _get_start_time( schedule, timing_constraints[0], operation, time_unit ) schedule = _normalize_absolute_timing(schedule) schedule["duration"] = schedule.get_schedule_duration() if time_unit == "ideal": schedule["depth"] = schedule["duration"] + 1 return schedule
[docs] def _determine_scheduling_strategy(config: CompilationConfig | None = None) -> SchedulingStrategy: if config is not None and config.device_compilation_config is not None: return config.device_compilation_config.scheduling_strategy return SchedulingStrategy.ASAP
[docs] def _validate_schedulable_references( schedule: TimeableSchedule, references_graph: nx.DiGraph ) -> None: """Check the schedulable references for circular references.""" for node in references_graph.nodes: if node not in schedule.schedulables: raise ValueError(f"Node {node} not found in schedulables.") if not nx.is_directed_acyclic_graph(references_graph): raise TypeError( "`schedulable_references` is not a Directed Acyclic Graph. This is most likely " "caused by a circular reference in the Timing Constraints." )
[docs] def _populate_references_graph(schedule: TimeableSchedule) -> nx.DiGraph: """Add nodes and edges to the graph containing schedulable references.""" graph = nx.DiGraph() # Add nodes graph.add_nodes_from(schedule.schedulables.keys()) # Add edges for schedulable_name, schedulable in schedule.schedulables.items(): schedulable_name: str schedulable: Schedulable graph.add_edges_from( (timing_constraint.ref_schedulable, schedulable_name) for timing_constraint in schedulable.data["timing_constraints"] if timing_constraint.ref_schedulable is not None ) return graph
[docs] def _make_timing_constraints_explicit( schedule: TimeableSchedule, strategy: SchedulingStrategy ) -> None: default_schedulable_by_schedulable: list[tuple[str, str | None]] = ( _determine_default_ref_schedulables_by_schedulable(schedule, strategy) ) for ( schedulable_name, default_reference_schedulable_name, ) in default_schedulable_by_schedulable: schedulable_name: str default_reference_schedulable_name: str | None _make_timing_constraints_explicit_for_schedulable( schedule=schedule, schedulable_name=schedulable_name, default_reference_schedulable_name=default_reference_schedulable_name, strategy=strategy, )
[docs] def _make_timing_constraints_explicit_for_schedulable( schedule: TimeableSchedule, schedulable_name: str, default_reference_schedulable_name: str | None, strategy: SchedulingStrategy, ) -> None: schedulable: Schedulable = schedule.schedulables[schedulable_name] given_timing_constraints: list[TimingConstraint] = schedulable.data["timing_constraints"] # Support only one timing constraint for now if strategy == SchedulingStrategy.ALAP and len(given_timing_constraints) != 1: raise NotImplementedError("Only exactly one timing constraint per Schedulable supported.") timing_constraint: TimingConstraint = given_timing_constraints[0] if timing_constraint.ref_schedulable is None: timing_constraint.ref_schedulable = default_reference_schedulable_name if timing_constraint.ref_pt is None: timing_constraint.ref_pt = _determine_default_ref_pt(strategy) if timing_constraint.ref_pt_new is None: timing_constraint.ref_pt_new = _determine_default_ref_pt_new(strategy) if timing_constraint.rel_time is None: timing_constraint.rel_time = 0.0
[docs] def _determine_default_ref_pt(strategy: SchedulingStrategy) -> Literal["start", "end"]: if strategy == SchedulingStrategy.ASAP: return "end" if strategy == SchedulingStrategy.ALAP: return "start" raise ValueError(f"Cannot determine default `ref_pt`. Unknown scheduling strategy: {strategy}")
[docs] def _determine_default_ref_pt_new(strategy: SchedulingStrategy) -> Literal["start", "end"]: if strategy == SchedulingStrategy.ASAP: return "start" if strategy == SchedulingStrategy.ALAP: return "end" raise ValueError( f"Cannot determine default `ref_pt_new`. Unknown scheduling strategy: {strategy}" )
[docs] def _determine_default_ref_schedulables_by_schedulable( schedule: TimeableSchedule, strategy: SchedulingStrategy ) -> list[tuple[str, str | None]]: schedulable_names: list[str] = list(schedule.schedulables) if strategy == SchedulingStrategy.ASAP: default_schedulable_names: list[str | None] = [None] + list(schedule.schedulables)[:-1] elif strategy == SchedulingStrategy.ALAP: default_schedulable_names: list[str | None] = list(schedule.schedulables)[1:] + [None] else: raise ValueError(f"Scheduling strategy {strategy} not one of `ASAP` or `ALAP`.") return [ (schedulable_name, default_schedulable_name) for schedulable_name, default_schedulable_name in zip( schedulable_names, default_schedulable_names, strict=False ) ]
[docs] def _get_start_time( schedule: TimeableSchedule, t_constr: TimingConstraint, curr_op: Operation | TimeableSchedule, time_unit: Literal["physical", "ideal", None], ) -> float: if t_constr.ref_schedulable: ref_schedulable: Schedulable = schedule.schedulables[t_constr.ref_schedulable] ref_op: Operation | TimeableSchedule = schedule.operations[ref_schedulable["operation_id"]] time_ref_op = ref_schedulable["abs_time"] # duration = 1 is useful when e.g., drawing a circuit diagram. if time_unit == "physical": duration_ref_op = ref_op.duration else: duration_ref_op = ( ref_op.body.get("depth", 1) if isinstance(ref_op, ControlFlowOperation) else ref_op.get("depth", 1) ) else: time_ref_op = 0 duration_ref_op = 0 # Type checker does not know that ref_op.duration is not None if time_unit == # "physical" assert duration_ref_op is not None # Nor that rel_time is always float instead of also possibly a string assert isinstance(t_constr["rel_time"], (int, float)) ref_pt = t_constr.ref_pt or "end" if ref_pt == "start": t0 = time_ref_op elif ref_pt == "center": t0 = time_ref_op + duration_ref_op / 2 elif ref_pt == "end": t0 = time_ref_op + duration_ref_op else: raise NotImplementedError(f'Timing "{ref_pt=}" not supported by backend.') if time_unit == "physical": duration_new_op = curr_op.duration else: duration_new_op = ( curr_op.body.get("depth", 1) if isinstance(curr_op, ControlFlowOperation) else curr_op.get("depth", 1) ) assert duration_new_op is not None ref_pt_new = t_constr.ref_pt_new or "start" if ref_pt_new == "start": abs_time = t0 + t_constr.rel_time elif ref_pt_new == "center": abs_time = t0 + t_constr.rel_time - duration_new_op / 2 elif ref_pt_new == "end": abs_time = t0 + t_constr.rel_time - duration_new_op else: raise NotImplementedError(f'Timing "{ref_pt_new=}" not supported by backend.') return abs_time
[docs] def _normalize_absolute_timing( schedule: TimeableSchedule, config: CompilationConfig | None = None, # noqa: ARG001 ) -> TimeableSchedule: # TODO: Support normalization of absolute timing in subschedules # See test_negative_absolute_timing_is_normalized_with_subschedule in test_compilation.py # and https://gitlab.com/quantify-os/quantify-scheduler/-/issues/489 min_time = min(schedulable["abs_time"] for schedulable in schedule.schedulables.values()) if min_time < 0: for schedulable in schedule.schedulables.values(): schedulable["abs_time"] -= min_time return schedule
@overload
[docs] def _unroll_loops( schedule: TimeableSchedule, config: CompilationConfig | None = None, ) -> TimeableSchedule: ...
@overload def _unroll_loops( schedule: Operation, config: CompilationConfig | None = None, ) -> Operation | TimeableSchedule: ... def _unroll_loops( schedule: TimeableSchedule | Operation, config: CompilationConfig | None = None, ): # This is a recursive function, the argument `schedule` is not always a `TimeableSchedule` type, # so we rename it at the beginning to not cause confusion. op = schedule if isinstance(op, TimeableSchedule): for inner_op_key, inner_op in op.operations.items(): op.operations[inner_op_key] = _unroll_loops( schedule=inner_op, config=config, ) return op elif isinstance(op, ControlFlowOperation): if isinstance(op, LoopOperation) and op.strategy == LoopStrategy.UNROLLED: return _unroll_single_loop(op) else: op.body = _unroll_loops( schedule=op.body, config=config, ) return op else: return op
[docs] def _unroll_single_loop(op: LoopOperation) -> TimeableSchedule: unrolled_schedule = TimeableSchedule() for rep in range(op.repetitions): variables = {} if op.domain is not None: for var, domain in op.domain.items(): variables[var] = domain[rep] sub_body = op.body.substitute(variables) if sub_body is op.body: # Actually copy operations when unrolling loops, # so they are logically separate. sub_body = op.body.clone() unrolled_schedule.add(sub_body) return unrolled_schedule
[docs] def validate_config(config: dict, scheme_fn: str) -> bool: """ Validate a configuration using a schema. Parameters ---------- config The configuration to validate scheme_fn The name of a json schema in the qblox_scheduler.schemas folder. Returns ------- : True if valid """ scheme = load_json_schema(__file__, scheme_fn) validate_json(config, scheme) return True
[docs] def plot_schedulable_references_graph(schedule: TimeableSchedule) -> None: """ Show the schedulable reference graph. Can be used as a debugging tool to spot any circular references. """ graph = _populate_references_graph(schedule) nx.draw(graph, with_labels=True)