Source code for qblox_scheduler.backends.qblox.operation_handling.pulses

# Repository: https://gitlab.com/qblox/packages/software/qblox-scheduler
# Licensed according to the LICENSE file on the main branch
#
# Copyright 2020-2025, Quantify Consortium
# Copyright 2025, Qblox B.V.
"""Classes for handling pulses."""

from __future__ import annotations

import logging
from abc import ABC
from collections.abc import Sequence
from typing import TYPE_CHECKING, Any

import numpy as np

from qblox_scheduler._check_unsupported_expression import check_unsupported_expression
from qblox_scheduler.backends.qblox import constants, helpers, q1asm_instructions
from qblox_scheduler.backends.qblox.enums import ChannelMode
from qblox_scheduler.backends.qblox.operation_handling.base import IOperationStrategy
from qblox_scheduler.backends.types.qblox import (
    ClusterModuleDescription,
    RFDescription,
    StaticAnalogModuleProperties,
)
from qblox_scheduler.helpers.waveforms import normalize_waveform_data
from qblox_scheduler.operations.expressions import BinaryExpression, Expression
from qblox_scheduler.operations.variables import Variable

if TYPE_CHECKING:
    from qblox_scheduler.backends.qblox.qasm_program import (
        QASMProgram,
    )
    from qblox_scheduler.backends.types import qblox as types

[docs] logger = logging.getLogger(__name__)
[docs] class PulseStrategyPartial(IOperationStrategy, ABC): """ Contains the logic shared between all the pulses. Parameters ---------- operation_info The operation info that corresponds to this pulse. channel_name Specifies the channel identifier of the hardware config (e.g. `complex_output_0`). """
[docs] _amplitude_path_I: float | Variable | None
[docs] _amplitude_path_Q: float | Variable | None
def __init__(self, operation_info: types.OpInfo, channel_name: str) -> None:
[docs] self._pulse_info: types.OpInfo = operation_info
[docs] self.channel_name = channel_name
@property
[docs] def operation_info(self) -> types.OpInfo: """Property for retrieving the operation info.""" return self._pulse_info
[docs] def _get_i_and_q_gain_from_pulse_info( pulse_info: dict[str, Any], ) -> tuple[float | Variable | None, float | Variable | None]: wf_func = pulse_info["wf_func"] if wf_func in ( "qblox_scheduler.waveforms.square", "qblox_scheduler.waveforms.soft_square", "qblox_scheduler.waveforms.chirp", "qblox_scheduler.waveforms.interpolated_complex_waveform", ): amp_param = "gain" if "interpolated_complex_waveform" in wf_func else "amp" amp = pulse_info[amp_param] if isinstance(amp, Sequence): amplitude_path_I = amp[0] amplitude_path_Q = amp[1] elif isinstance(amp, complex): amplitude_path_I = amp.real amplitude_path_Q = amp.imag else: amplitude_path_I = amp amplitude_path_Q = 0 return amplitude_path_I, amplitude_path_Q elif wf_func == "qblox_scheduler.waveforms.drag": return pulse_info["amplitude"], pulse_info["amplitude"] return None, None
[docs] def _get_var_from_supported_expression(expression: Expression) -> Variable: match expression: case Variable(): return expression case BinaryExpression( lhs=Variable() as lhs, operator="*" | "/", rhs=(int() | float()) as rhs ): return lhs case BinaryExpression(lhs=(int() | float()) as lhs, operator="*", rhs=Variable() as rhs): return rhs case _: raise NotImplementedError(f"Unsupported expression: {expression}.")
[docs] class GenericPulseStrategy(PulseStrategyPartial): """ Default class for handling pulses. No assumptions are made with regards to the pulse shape and no optimizations are done. Parameters ---------- operation_info The operation info that corresponds to this pulse. channel_name Specifies the channel identifier of the hardware config (e.g. `complex_output_0`). """ def __init__(self, operation_info: types.OpInfo, channel_name: str) -> None: super().__init__( operation_info=operation_info, channel_name=channel_name, )
[docs] self._amplitude_path_I: float | Variable | None = None
[docs] self._amplitude_path_Q: float | Variable | None = None
[docs] self._waveform_index0: int | None = None
[docs] self._waveform_index1: int | None = None
[docs] self._waveform_len: int | None = None
[docs] def generate_data(self, wf_dict: dict[str, Any]) -> None: r""" Generates the data and adds them to the ``wf_dict`` (if not already present). In complex mode (e.g. ``complex_output_0``), the NCO produces real-valued data (:math:`I_\\text{IF}`) on sequencer path_I and imaginary data (:math:`Q_\\text{IF}`) on sequencer path_Q. .. math:: \\underbrace{\\begin{bmatrix} \\cos\\omega t & -\\sin\\omega t \\\\ \\sin\\omega t & \\phantom{-}\\cos\\omega t \\end{bmatrix}}_\\text{NCO} \\begin{bmatrix} I \\\\ Q \\end{bmatrix} = \\begin{bmatrix} I \\cdot \\cos\\omega t - Q \\cdot\\sin\\omega t \\\\ I \\cdot \\sin\\omega t + Q \\cdot\\cos\\omega t \\end{bmatrix} \\begin{matrix} \\ \\text{(path_I)} \\\\ \\ \\text{(path_Q)} \\end{matrix} = \\begin{bmatrix} I_\\text{IF} \\\\ Q_\\text{IF} \\end{bmatrix} In real mode (e.g. ``real_output_0``), the NCO produces :math:`I_\\text{IF}` on path_I .. math:: \\underbrace{\\begin{bmatrix} \\cos\\omega t & -\\sin\\omega t \\\\ \\sin\\omega t & \\phantom{-}\\cos\\omega t \\end{bmatrix}}_\\text{NCO} \\begin{bmatrix} I \\\\ Q \\end{bmatrix} = \\begin{bmatrix} I \\cdot \\cos\\omega t - Q \\cdot\\sin\\omega t\\\\ - \\end{bmatrix} \\begin{matrix} \\ \\text{(path_I)} \\\\ \\ \\text{(path_Q)} \\end{matrix} = \\begin{bmatrix} I_\\text{IF} \\\\ - \\end{bmatrix} Note that the fields marked with `-` represent waveforms that are not relevant for the mode. Parameters ---------- wf_dict The dictionary to add the waveform to. N.B. the dictionary is modified in function. domains The domains used in the schedule, keyed by variable. This is added as temporarily to ensure we do not upload unnecessary waveforms. The domain information will be used to figure out whether or not a "Q" path waveform needs to be uploaded. Raises ------ ValueError Data is complex (has an imaginary component), but the channel_name is not set as complex (e.g. ``complex_output_0``). """ op_info = self.operation_info amplitude_path_I, amplitude_path_Q = _get_i_and_q_gain_from_pulse_info( self.operation_info.data ) # Simplify expressions so that we can easily recognize multiplication with or division by a # constant. if isinstance(amplitude_path_I, Expression): amplitude_path_I = amplitude_path_I.reduce() if isinstance(amplitude_path_Q, Expression): amplitude_path_Q = amplitude_path_Q.reduce() # If the amplitudes are still expressions after the reduction step, check if we can handle # this expression (i.e., it is just a variable, or a simple variable * constant or variable # / constant expression). if isinstance(amplitude_path_I, Expression): var_path_I = _get_var_from_supported_expression(amplitude_path_I) op_info = op_info.substitute({var_path_I: 1}) else: var_path_I = None if isinstance(amplitude_path_Q, Expression): var_path_Q = _get_var_from_supported_expression(amplitude_path_Q) op_info = op_info.substitute({var_path_Q: 1}) else: var_path_Q = None check_unsupported_expression(*op_info.data.values(), operation_name=op_info.name) waveform_data = helpers.generate_waveform_data( op_info.data, sampling_rate=constants.SAMPLING_RATE ) # If neither I nor Q a variable: normalize both if var_path_I is None and var_path_Q is None: waveform_data, amp_real, amp_imag = normalize_waveform_data(waveform_data) # If one of I or Q is a variable, normalize the waveform that is not scaled by a variable. elif var_path_I is not None and var_path_Q is None: amp_real = None waveform_data_imag, _, amp_imag = normalize_waveform_data(np.imag(waveform_data)) if amp_imag != 0: waveform_data.imag = waveform_data_imag elif var_path_I is None and var_path_Q is not None: waveform_data_real, amp_real, _ = normalize_waveform_data(np.real(waveform_data)) waveform_data.real = waveform_data_real amp_imag = None # If both I and Q are variables, or there is a scaling variable for both: no normalization. else: amp_real = None amp_imag = None self._waveform_len = len(waveform_data) if np.any(np.iscomplex(waveform_data)) and ChannelMode.COMPLEX not in self.channel_name: raise ValueError( f"Complex valued {op_info!s} detected but the sequencer" f" is not expecting complex input. This can be caused by " f"attempting to play complex valued waveforms on an output" f" marked as real.\n\nException caused by {op_info!r}." ) def non_null(amp: float) -> bool: return abs(amp) >= 2 / constants.IMMEDIATE_SZ_GAIN idx_real = ( helpers.add_to_wf_dict_if_unique(wf_dict=wf_dict, waveform=waveform_data.real) if amp_real is None or non_null(amp_real) else None ) idx_imag = ( helpers.add_to_wf_dict_if_unique(wf_dict=wf_dict, waveform=waveform_data.imag) if amp_imag is None or non_null(amp_imag) else None ) self._waveform_index0, self._waveform_index1 = idx_real, idx_imag self._amplitude_path_I = var_path_I or amp_real self._amplitude_path_Q = var_path_Q or amp_imag
[docs] def insert_qasm(self, qasm_program: QASMProgram) -> None: """ Add the assembly instructions for the Q1 sequence processor that corresponds to this pulse. Parameters ---------- qasm_program The QASMProgram to add the assembly instructions to. """ if qasm_program.time_last_pulse_triggered is not None and ( qasm_program.elapsed_time - qasm_program.time_last_pulse_triggered < constants.MIN_TIME_BETWEEN_OPERATIONS ): raise ValueError( f"Attempting to start an operation at t=" f"{qasm_program.elapsed_time} ns, while the last operation was " f"started at t={qasm_program.time_last_pulse_triggered} ns. " f"Please ensure a minimum interval of " f"{constants.MIN_TIME_BETWEEN_OPERATIONS} ns between " f"operations.\n\nError caused by operation:\n" f"{self.operation_info!r}." ) qasm_program.time_last_pulse_triggered = qasm_program.elapsed_time # Only emit play command if at least one path has a signal # else update parameters as there might still be some lingering # from for example a voltage offset. index0 = self._waveform_index0 index1 = self._waveform_index1 if index0 is None and index1 is None: qasm_program.emit( q1asm_instructions.UPDATE_PARAMETERS, constants.MIN_TIME_BETWEEN_OPERATIONS, comment=f"{self.operation_info.name} has too low amplitude to be played, " f"updating parameters instead", ) else: assert self._amplitude_path_I is not None assert self._amplitude_path_Q is not None qasm_program.set_gain_from_amplitude( self._amplitude_path_I, self._amplitude_path_Q, self.operation_info, ) # If a channel doesn't have an index (index0 or index1 is None) means, # that for that channel we do not want to play any waveform; # it's also ensured in this case, that the gain is set to 0 for that channel; # but, the Q1ASM program needs a waveform index for both channels, # so we set the other waveform's index in this case as a dummy qasm_program.emit( q1asm_instructions.PLAY, index0 if (index0 is not None) else index1, index1 if (index1 is not None) else index0, constants.MIN_TIME_BETWEEN_OPERATIONS, # N.B. the waveform keeps playing comment=f"play {self.operation_info.name} ({self._waveform_len} ns)", ) qasm_program.elapsed_time += constants.MIN_TIME_BETWEEN_OPERATIONS
[docs] class DigitalOutputStrategy(PulseStrategyPartial, ABC): """ Interface class for :class:`MarkerPulseStrategy` and :class:`DigitalPulseStrategy`. Both classes work very similarly, since they are both strategy classes for the `~qblox_scheduler.operations.pulse_library.MarkerPulse`. The ``MarkerPulseStrategy`` is for the QCM/QRM modules, and the ``DigitalPulseStrategy`` for the QTM. """
[docs] def generate_data(self, wf_dict: dict[str, Any]) -> None: """Returns None as no waveforms are generated in this strategy.""" pass
[docs] class MarkerPulseStrategy(DigitalOutputStrategy): """If this strategy is used a digital pulse is played on the corresponding marker.""" def __init__( self, operation_info: types.OpInfo, channel_name: str, module_options: ClusterModuleDescription, ) -> None: super().__init__(operation_info, channel_name)
[docs] self.module_options = module_options
[docs] def insert_qasm(self, qasm_program: QASMProgram) -> None: """ Inserts the QASM instructions to play the marker pulse. Note that for RF modules the first two bits of set_mrk are used as switches for the RF outputs. Parameters ---------- qasm_program The QASMProgram to add the assembly instructions to. """ hw_properties = qasm_program.static_hw_properties if not isinstance(hw_properties, StaticAnalogModuleProperties): raise TypeError( f"Marker Operations are only supported for analog modules, " f"not for instrument {self.module_options.instrument_type}." ) if self.channel_name not in hw_properties.channel_name_to_digital_marker: raise ValueError( f"Unable to set markers on channel '{self.channel_name}' for " f"instrument {hw_properties.instrument_type} " f"and operation {self.operation_info.name}. " f"Supported channels: {list(hw_properties.channel_name_to_digital_marker.keys())}" ) marker = hw_properties.channel_name_to_digital_marker[self.channel_name] default_marker = 0 if ( isinstance(self.module_options, RFDescription) and self.module_options.rf_output_on and hw_properties.default_markers ): default_marker = hw_properties.default_markers[self.channel_name] if marker | default_marker == default_marker: # Marker has no effect raise RuntimeError( "Attempting to turn on an RF output on a module where " "`rf_output_on` is set to True (the default value). \n" "Turning the RF output on with an RFSwitchToggle Operation " "has no effect. \n" "Please set `rf_output_on` to False for this module " "in the hardware configuration." ) if self.operation_info.data["enable"]: marker |= default_marker qasm_program.emit( q1asm_instructions.SET_MARKER, marker, comment=f"set markers to {marker} (marker pulse)", ) else: qasm_program.emit( q1asm_instructions.SET_MARKER, default_marker, comment=f"set markers to {default_marker} (default, marker pulse)", )
[docs] class DigitalPulseStrategy(DigitalOutputStrategy): """ If this strategy is used a digital pulse is played on the corresponding digital output channel. """
[docs] def insert_qasm(self, qasm_program: QASMProgram) -> None: """ Inserts the QASM instructions to play the marker pulse. Note that for RF modules the first two bits of set_mrk are used as switches for the RF outputs. Parameters ---------- qasm_program The QASMProgram to add the assembly instructions to. """ if ChannelMode.DIGITAL not in self.channel_name: port = self.operation_info.data.get("port") clock = self.operation_info.data.get("clock") raise ValueError( f"{self.__class__.__name__} can only be used with a " f"digital channel. Please make sure that " f"'digital' keyword is included in the channel_name in the hardware configuration " f"for port-clock combination '{port}-{clock}' " f"(current channel_name is '{self.channel_name}')." f"Operation causing exception: {self.operation_info}" ) if self.operation_info.data["enable"]: fine_delay = helpers.convert_qtm_fine_delay_to_int( self.operation_info.data.get("fine_start_delay", 0) ) else: fine_delay = helpers.convert_qtm_fine_delay_to_int( self.operation_info.data.get("fine_end_delay", 0) ) qasm_program.emit( q1asm_instructions.SET_DIGITAL, int(self.operation_info.data["enable"]), 1, # Mask. Reserved for future use, set to 1. fine_delay, )