# ----------------------------------------------------------------------------
# Description : Generic native interface functions
# Git repository : https://gitlab.com/qblox/packages/software/qblox_instruments.git
# Copyright (C) Qblox BV (2020)
# ----------------------------------------------------------------------------
# -- include -----------------------------------------------------------------
import copy
import sys
import dis
import numpy
import struct
import re
import time
import json
import fastjsonschema
from enum import Enum
from collections import namedtuple
from functools import wraps, partial
from typing import Any, Callable, Dict, List, Optional, Union
from inspect import getmembers, isfunction
from qblox_instruments import DeviceInfo
from qblox_instruments.scpi import scpi_error_check
# -- definitions -------------------------------------------------------------
# State enum base class
[docs]class StateEnum(Enum):
"""
State enum base class that arranges child enum string representations.
"""
def __repr__(self) -> str:
return "<{}.{}>".format(str(type(self)).split("'")[1], self.name)
def __str__(self) -> str:
return str(self.name)
def __eq__(self, other: Any) -> bool:
if type(self) == type(other):
return str(self) == str(other)
elif other in [str(val) for val in type(self)]:
return str(self) == other
else:
raise KeyError("{} is not of type {}".format(other, type(self)))
def __key__(self):
return str(self)
def __hash__(self):
return hash(self.__key__())
# State tuple base class
[docs]class StateTuple:
"""
State tuple base class that arranges child tuple string representations.
"""
def __str__(self) -> str:
# Status, flags and slot_flags are inherited from the child class
# using virtual inheritance, so we retrieve these attributes through
# getattr to not upset Pylint
status = getattr(self, "status")
flags = getattr(self, "flags")
if len(flags) > 0:
flags = ", ".join([str(flag) for flag in flags])
else:
flags = "NONE"
pretty_str = "Status: {}, Flags: {}".format(status, flags)
if hasattr(self, "slot_flags"):
slot_flags = getattr(self, "slot_flags")
pretty_str += ", Slot flags: {}".format(slot_flags)
return pretty_str
# System status enum
[docs]class SystemStatus(StateEnum):
"""
System status enum.
"""
BOOTING = "System is booting."
OKAY = "System is okay."
CRITICAL = "An error indicated by the flags occured, but has been resolved."
ERROR = "An error indicated by the flags is occuring."
# System status flags enum
[docs]class SystemStatusFlags(StateEnum):
"""
System status flags enum.
"""
CARRIER_PLL_UNLOCKED = "Carrier board PLL is unlocked."
FPGA_PLL_UNLOCKED = "FPGA PLL is unlocked."
LO_PLL_UNLOCKED = "Local oscillator PLL is unlocked."
FPGA_TEMPERATURE_OUT_OF_RANGE = "FPGA temperature is out-of-range."
CARRIER_TEMPERATURE_OUT_OF_RANGE = "Carrier board temperature is out-of-range."
AFE_TEMPERATURE_OUT_OF_RANGE = "Analog-frontend board temperature is out-of-range."
LO_TEMPERATURE_OUT_OF_RANGE = "Local oscillator board temperature is out-of-range."
BACKPLANE_TEMPERATURE_OUT_OF_RANGE = "Backplane board temperature is out-of-range."
MODULE_NOT_CONNECTED = "Module is not connected."
MODULE_FIRMWARE_INCOMPATIBLE = "Module firmware is incompatible with the rest of the system."
MODULE_FF_CALIB_FAILED = "The feedback network calibration failed."
# Namedtuple representing the slot status flags
NUM_SLOTS = 20
[docs]class SystemStatusSlotFlags(namedtuple(
"SystemStatusSlotFlags",
["slot{}".format(slot) for slot in range(1, NUM_SLOTS+1)])
):
"""
Tuple containing lists of Cluster slot status flag enums of type
:class:`~qblox_instruments.SystemStatusFlags`. Each Cluster slot has its
own status flag list attribute named `slot<X>`.
"""
__name__ = "SystemStatusSlotFlags"
__slots__ = ()
def __new__(cls, slot_flags: Dict = {}):
slot_flag_lists = NUM_SLOTS *[[]]
for slot in range(0, NUM_SLOTS):
slot_str = "slot{}".format(slot+1)
if slot_str in slot_flags:
slot_flag_lists[slot] = slot_flags[slot_str]
return super().__new__(cls, *slot_flag_lists)
def __repr__(self):
slot_str_list = []
for slot in range(0, NUM_SLOTS):
if len(self[slot]) > 0:
slot_str_list.append("slot{}={}".format(slot+1, self[slot]))
return "{}({})".format(self.__name__, ", ".join(slot_str_list))
def __str__(self):
slot_str_list = []
for slot in range(0, NUM_SLOTS):
for flag in self[slot]:
slot_str_list.append("SLOT{}_{}".format(slot+1, flag))
if len(slot_str_list) > 0:
return ", ".join(slot_str_list)
else:
return "NONE"
# Namedtuple representing the system status
[docs]class SystemState(namedtuple("SystemState", ["status", "flags", "slot_flags"]), StateTuple):
"""
System status tuple returned by :func:`!get_system_state`. The tuple
contains a system status enum of type
:class:`~qblox_instruments.SystemStatus`, a list of associated system
status flag enums of type
:class:`~qblox_instruments.SystemStatusFlags` and a tuple of type
:class:`~qblox_instruments.SystemStatusSlotFlags` containing Cluster slot
status flags.
"""
pass
SystemState.status.__doc__ = """
System status enum of type :class:`~qblox_instruments.SystemStatus`.
"""
SystemState.flags.__doc__ = """
List of system status flag enums of type
:class:`~qblox_instruments.SystemStatusFlags`.
"""
SystemState.slot_flags.__doc__ = """
Tuple of type :class:`~qblox_instruments.SystemStatusSlotFlags containing
Cluster slot status flags
"""
# Sequencer status enum
[docs]class SequencerStatus(StateEnum):
"""
Sequencer status enum.
"""
IDLE = "Sequencer waiting to be armed and started."
ARMED = "Sequencer is armed and ready to start."
RUNNING = "Sequencer is running."
Q1_STOPPED = "Classical part of the sequencer has stopped; waiting for real-time part to stop."
STOPPED = "Sequencer has completely stopped."
# Sequencer status flags enum
[docs]class SequencerStatusFlags(StateEnum):
"""
Sequencer status flags enum.
"""
DISARMED = "Sequencer was disarmed."
FORCED_STOP = "Sequencer was stopped while still running."
SEQUENCE_PROCESSOR_Q1_ILLEGAL_INSTRUCTION = "Classical sequencer part executed an unknown instruction."
SEQUENCE_PROCESSOR_RT_EXEC_ILLEGAL_INSTRUCTION = "Real-time sequencer part executed an unknown instruction."
SEQUENCE_PROCESSOR_RT_EXEC_COMMAND_UNDERFLOW = "Real-time sequencer part command queue underflow."
AWG_WAVE_PLAYBACK_INDEX_INVALID_PATH_0 = "AWG path 0 tried to play an unknown waveform."
AWG_WAVE_PLAYBACK_INDEX_INVALID_PATH_1 = "AWG path 1 tried to play an unknown waveform."
ACQ_WEIGHT_PLAYBACK_INDEX_INVALID_PATH_0 = "Acquisition path 0 tried to play an unknown weight."
ACQ_WEIGHT_PLAYBACK_INDEX_INVALID_PATH_1 = "Acquisition path 1 tried to play an unknown weight."
ACQ_SCOPE_DONE_PATH_0 = "Scope acquisition for path 0 has finished."
ACQ_SCOPE_OUT_OF_RANGE_PATH_0 = "Scope acquisition data for path 0 was out-of-range."
ACQ_SCOPE_OVERWRITTEN_PATH_0 = "Scope acquisition data for path 0 was overwritten."
ACQ_SCOPE_DONE_PATH_1 = "Scope acquisition for path 1 has finished."
ACQ_SCOPE_OUT_OF_RANGE_PATH_1 = "Scope acquisition data for path 1 was out-of-range."
ACQ_SCOPE_OVERWRITTEN_PATH_1 = "Scope acquisition data for path 1 was overwritten."
ACQ_BINNING_DONE = "Acquisition binning completed."
ACQ_BINNING_FIFO_ERROR = "Acqusition binning encountered internal FIFO error."
ACQ_BINNING_COMM_ERROR = "Acqusition binning encountered internal communication error."
ACQ_BINNING_OUT_OF_RANGE = "Acquisition binning data out-of-range."
ACQ_INDEX_INVALID = "Acquisition tried to process an invalid acquisition."
ACQ_BIN_INDEX_INVALID = "Acquisition tried to process an invalid bin."
TRIGGER_NETWORK_CONFLICT = "Trigger network has encountered a conflict."
TRIGGER_NETWORK_MISSED_INTERNAL_TRIGGER = "Trigger network missed an internal trigger."
OUTPUT_OVERFLOW = "Output overflow."
CLOCK_INSTABILITY = "Clock source instability occurred."
ACQ_INTEGRATOR_OUT_OF_RANGE_PATH_0 = "Acquisition integration input data for path 0 was out-of-range."
ACQ_INTEGRATOR_OUT_OF_RANGE_PATH_1 = "Acquisition integration input data for path 1 was out-of-range."
# Namedtuple representing the sequencer status
[docs]class SequencerState(namedtuple("SequencerState", ["status", "flags"]), StateTuple):
"""
Sequencer status tuple returned by :func:`!get_sequencer_state`. The tuple
contains a sequencer status
enum of type :class:`~qblox_instruments.SequencerStatus` and a list of
associated sequencer status flag
enums of type :class:`~qblox_instruments.SequencerStatusFlags`.
"""
pass
SequencerState.status.__doc__ = """
Sequencer status enum of type :class:`~qblox_instruments.SequencerStatus`.
"""
SequencerState.flags.__doc__ = """
List of sequencer status flag enums of type
:class:`~qblox_instruments.SequencerStatusFlags`.
"""
# Maximum program length allowed
MAX_PROGRAM_LENGTH = 10 * (128*1024*8 + 1024)
# JSON schema to validate sequence dictionaries with
QCM_SEQUENCE_JSON_SCHEMA = {
"title": "Sequence container",
"description": "Contains all waveforms, weights and acquisitions and a program required for a sequence.",
"type": "object",
"required": ["program", "waveforms"],
"properties": {
"program": {
"description": "Sequencer assembly program in string format.",
"type": "string",
},
"waveforms": {
"description": "Waveform dictionary containing one or multiple AWG waveform(s).",
"type": "object",
},
"weights": {
"description": "Weight dictionary containing one or multiple acquisition weights(s).",
"type": "object",
},
"acquisitions": {
"description": "Acquisition dictionary containing information about one or multiple acquisition(s).",
"type": "object",
},
},
}
# JSON schema to validate QRM sequence dictionaries with
QRM_SEQUENCE_JSON_SCHEMA = copy.deepcopy(QCM_SEQUENCE_JSON_SCHEMA)
QRM_SEQUENCE_JSON_SCHEMA["required"] = [
"program",
"waveforms",
"weights",
"acquisitions",
]
# JSON schema to validate wavefrom and weight dictionaries with
WAVE_JSON_SCHEMA = {
"title": "Waveform/weight container",
"description": "Waveform/weight dictionary for a single waveform.",
"type": "object",
"required": ["data"],
"properties": {
"data": {
"description": "List of waveform samples.",
"type": "array"
},
"index": {
"description": "Optional waveform index number.",
"type": "number"
},
},
}
# JSON schema to validate acquisition dictionaries with
ACQ_JSON_SCHEMA = {
"title": "Acquisition container",
"description": "Acquisition dictionary for a single acquisition.",
"type": "object",
"required": ["num_bins"],
"properties": {
"num_bins": {
"description": "Number of bins in acquisition.",
"type": "number"
},
"index": {
"description": "Optional waveform index number.",
"type": "number"
},
},
}
# -- class -------------------------------------------------------------------
[docs]class FuncRefs:
"""
Function reference container intended to hold references to methods of the
instrument's SCPI and native interfaces that are called by methods in
:mod:`~qblox_instruments.native.generic_func`. In effect, this class enables
passing parametrized methods to the
:mod:`~qblox_instruments.native.generic_func` functions so that those
functions can be reused between different instruments.
"""
# ------------------------------------------------------------------------
[docs] def __init__(self, instrument: Optional[Any] = None):
"""
Create function reference container.
Parameters
----------
instrument : Any
Instrument parent object of the function references.
Returns
----------
Raises
----------
"""
# Store instrument reference
self._instrument = instrument
# Create list of instrument function names referenced in this module's
# functions and manually add any missing functions that the
# convenience method failed to pick up.
func_names = FuncRefs._get_referenced_funcs(sys.modules[__name__])
func_names.append("_write")
func_names.append("_flush_line_end")
func_names.append("_get_awg_waveforms")
func_names.append("_get_acq_weights")
func_names.append("_get_acq_acquisition_data")
func_names.append("_get_acq_acquisitions")
# Create dictionary of instrument functions and associated attributes.
# Initialize the associated attributes to None since they are not
# registered yet.
self._funcs = {}
for name in func_names:
self._funcs[name] = None
# Add instrument functions as temporary attributes to this class, so
# that when called it throws a NotImplemented exception. Do this
# through a wrapper function to ensure that the function name is
# unique in each error string. These attributes should be overwritten
# using the register method after this class is instantiated.
def create_unique_func(name: str) -> Callable[..., None]:
def raise_not_implemented_error(*args, **kwargs) -> None:
raise NotImplementedError(
'"{}" has not yet been registered to this function reference container.'.format(name)
)
return raise_not_implemented_error
for name in func_names:
setattr(self, name, create_unique_func(name))
# ------------------------------------------------------------------------
@property
def instrument(self) -> Any:
"""
Return function references parent object.
Parameters
----------
Returns
----------
Any
Instrument parent object of the function references.
Raises
----------
"""
return self._instrument
# ------------------------------------------------------------------------
@property
def funcs(self) -> Dict:
"""
Return dictionary of instrument function names and their associate
references, referenced in this module's functions so that the
referenced functions can be registered to this object using the
register method.
Parameters
----------
Returns
----------
dict
Dictionary of required instrument function names and associated
references.
Raises
----------
"""
return self._funcs
# ------------------------------------------------------------------------
[docs] def register(self, ref: Callable[[Any], Any], attr_name: Optional[str] = None) -> None:
"""
Register function reference as attribute to object.
Parameters
----------
ref : Callable[[Any], Any]
Function reference to register.
attr_name : Optional[str]
Attribute name to register function to. If attribute name
is not provided. The function is registered to the name of
the reference argument.
Returns
----------
Raises
----------
AttributeError
Could not get name of reference.
KeyError
Attribute name is not found in function name list.
"""
if attr_name is None:
if hasattr(ref, "__name__"):
attr_name = ref.__name__
else:
raise AttributeError(
"Could not get name of function reference."
)
if attr_name in self.funcs:
self.funcs[attr_name] = ref
setattr(self, attr_name, ref)
else:
raise KeyError(
"Attribute name that is being registered ({}) is not in the instrument function list".format(attr_name)
)
# ------------------------------------------------------------------------
@staticmethod
def _get_referenced_funcs(module: Any, arg_name: str = "funcs") -> List:
"""
Get all referenced instrument function names using FuncRefs for the
functions in the given Python module. This method looks very
specifically for the use of the input argument name specified by
arg_name of type FuncRefs to find any referenced function names. Note
that this is a convenience method that does not work for all use
cases. For instance, it does not work for decorated functions.
Parameters
----------
arg_name : str
Argurment name to search for.
Returns
----------
List
List of required instrument function names.
Raises
----------
"""
# Get functions from module, disassemble them and search for FuncRefs
# references using name specified by input argument. Finally split
# out the function and attribute calls and return the found results.
funcs = []
for func in getmembers(module, isfunction):
instr_list = list(dis.Bytecode(func[1]))
for idx, instr in enumerate(instr_list):
if instr.opname == "LOAD_FAST" and instr.argval == arg_name:
next_instr = instr_list[idx + 1]
if ((next_instr.opname == "LOAD_METHOD" or
next_instr.opname == "LOAD_ATTR") and not
hasattr(FuncRefs, next_instr.argval)):
funcs.append(instr_list[idx + 1].argval)
return list(set(funcs))
# -- decorator ---------------------------------------------------------------
[docs]def copy_docstr(src_func):
"""
Decorator that copies the docstring from the provided function to the
decorated function.
Parameters
----------
src_func
Function from which to copy the docstring.
Returns
----------
Raises
----------
"""
def actual_copy_docstr(func):
@wraps(func)
def decorator_wrapper(*args, **kwargs):
return func(*args, **kwargs)
decorator_wrapper.__doc__ = src_func.__doc__
return decorator_wrapper
return actual_copy_docstr
# -- helper functions --------------------------------------------------------
[docs]def check_sequencer_index(sequencer: int) -> None:
"""
Check if sequencer index is within range. We just check if the index is a
positive integer here, because sending a negative number breaks the
underlying SCPI command. The upperbound is checked by the instrument.
Parameters
----------
sequencer : int
Sequencer index.
Returns
----------
Raises
----------
ValueError
Sequencer index is out-of-range (i.e. < 1).
"""
if sequencer < 0:
raise ValueError(
"Sequencer index is out-of-range ({})".format(sequencer)
)
# ---------------------------------------------------------------------
def _check_program_length(program: str) -> None:
"""
Checks if the program length is above the limit. If it is, an
attempt is made to shorten the program by removing comments and
unnecessary whitespaces. If the program is still too large, a
Runtime error is raised.
Parameters
----------
program : str
Sequence program to be updated to the device
Returns
----------
Raises
----------
RuntimeError
Too large program string.
"""
if len(program.encode('utf-8')) > MAX_PROGRAM_LENGTH:
checked_program = re.sub(r'#.*|^\s*', '', program, 0, re.MULTILINE)
checked_program = re.sub(r'[^\S\r\n]+', ' ', checked_program)
if len(checked_program.encode('utf-8')) > MAX_PROGRAM_LENGTH:
raise RuntimeError(
"Program length too large, expected something below {} bytes but got {} bytes.".format(MAX_PROGRAM_LENGTH, len(checked_program.encode('utf-8')))
)
else:
return checked_program
else:
return program
# ----------------------------------------------------------------------------
[docs]def check_qrm_type(is_qrm_type: bool) -> None:
"""
Check if module is of type QRM. If not throw a NotImplemented exception.
This helper function can be used to catch execution of QRM functionality
on non-QRM type modules.
Parameters
----------
is_qrm_type : bool
Is QRM module type.
Returns
----------
Raises
----------
NotImplementedError
Functionality not available on this module.
"""
if not is_qrm_type:
raise NotImplementedError(
"This functionality not available on this module."
)
# ----------------------------------------------------------------------------
[docs]def create_read_bin(
read_bin_func: Callable[[str, bool], bytes],
cmd: str
) -> Callable[[Optional[int], Optional[str]], bytes]:
"""
Create binary read function that can provide a binary read with a
preconfigured command. This is usefull for functions like
`_get_awg_waveforms`, that need a specific binary read command to kick
off a stream of binary blocks.
Parameters
----------
read_bin_func : Callable[[str, bool], bytes]
SCPI layer binary read method.
cmd : str
Unformated command string.
Returns
----------
Callable[[Optional[int], Optional[str]], bytes]
Binary read function with preconfigured command that takes the
optional sequencer index and optional name string as arguments.
Raises
----------
"""
def read_bin(sequencer: Optional[int] = None, name: Optional[str] = None) -> bytes:
if sequencer is None:
new_cmd = cmd
else:
if name is None:
new_cmd = cmd.format(sequencer)
else:
new_cmd = cmd.format(sequencer, name)
return read_bin_func(new_cmd, False)
return read_bin
# -- functions ---------------------------------------------------------------
# Note that the arguments in the docstrings of the following functions do not
# reflect the arguments of the functions themselves. Instead they reflect the
# arguments of the native instrument layer's methods that call these
# functions. The copy_docstr decorator is used to copy the docstring to the
# calling method, so that not only the functionality but also the docstring
# can be shared across the methods of the native instrument layers.
[docs]def get_scpi_commands(funcs: FuncRefs) -> Dict:
"""
Get SCPI commands and convert to dictionary.
Parameters
----------
Returns
----------
dict
Dictionary containing all available SCPI commands, corresponding
parameters, arguments and Python methods and finally a descriptive
comment.
Raises
----------
"""
# Split function
def split(cmd_elem: str) -> List:
if cmd_elem != "None" and cmd_elem != "":
return cmd_elem.split(",")
else:
return []
# Format command string
cmds = funcs._get_scpi_commands()
cmd_elem_list = cmds.split(";")[:-1]
cmd_list = numpy.reshape(cmd_elem_list, (int(len(cmd_elem_list) / 9), 9))
cmd_dict = {
cmd[0]: {
"scpi_in_type": split(cmd[1]),
"scpi_out_type": split(cmd[2]),
"python_func": cmd[3],
"python_in_type": split(cmd[4]),
"python_in_var": split(cmd[5]),
"python_out_type": split(cmd[6]),
"comment": cmd[8].replace("\t", "\n"),
} for cmd in cmd_list
}
return cmd_dict
# ----------------------------------------------------------------------------
[docs]def get_idn(funcs: FuncRefs) -> Dict:
"""
Get device identity and build information and convert them to a
dictionary.
Parameters
----------
Returns
----------
dict
Dictionary containing manufacturer, model, serial number and build
information. The build information is subdivided into FPGA firmware,
kernel module software, application software and driver software build
information. Each of those consist of the version, build date,
build Git hash and Git build dirty indication.
Raises
----------
"""
return DeviceInfo.from_idn(funcs._get_idn()).to_idn_dict()
# ----------------------------------------------------------------------------
[docs]def get_system_state(funcs: FuncRefs) -> SystemState:
"""
Get general system state and convert it to a
:class:`~qblox_instruments.SystemState`.
Parameters
----------
Returns
----------
SystemStatus
Tuple containing general system status and corresponding flags.
Raises
----------
"""
# Format status string
state = funcs._get_system_state()
state_elem_list = re.sub(" |-", "_", state).split(";")
if state_elem_list[-1] != "":
state_flag_list = state_elem_list[-1].split(",")[:-1]
else:
state_flag_list = []
# Split system status flags from slot status flags
system_flags = []
slot_flags = {}
for flag in state_flag_list:
flag_parts = flag.split("_")
if flag_parts[0] != "SLOT":
system_flags.append(SystemStatusFlags[flag])
else:
slot = "slot" + flag_parts[1]
flag = SystemStatusFlags['_'.join(flag_parts[2:])]
if slot not in slot_flags:
slot_flags[slot] = [flag]
else:
slot_flags[slot].append(flag)
return SystemState(
SystemStatus[state_elem_list[0]],
system_flags,
SystemStatusSlotFlags(slot_flags),
)
# ----------------------------------------------------------------------------
[docs]def set_acq_scope_config(funcs: FuncRefs, config: Dict) -> None:
"""
Set configuration of the scope acquisition. The configuration consists of
multiple parameters in a C struct format. If an invalid sequencer index
is given or the configation struct does not have the correct format, an
error is set in system error.
Parameters
----------
config : dict
Configuration dictionary.
Returns
----------
Raises
----------
NotImplementedError
Functionality not available on this module.
"""
funcs._set_acq_scope_config(config)
# ----------------------------------------------------------------------------
[docs]def get_acq_scope_config(funcs: FuncRefs) -> Dict:
"""
Get configuration of the scope acquisition. The configuration consists of
multiple parameters in a C struct format. If an invalid sequencer index is
given, an error is set in system error.
Parameters
----------
Returns
----------
dict
Configuration dictionary.
Raises
----------
NotImplementedError
Functionality not available on this module.
"""
check_qrm_type(funcs.is_qrm_type())
return funcs._get_acq_scope_config()
# --------------------------------------------------------------------------
[docs]def set_acq_scope_config_val(funcs: FuncRefs, keys: Any, val: Any) -> None:
"""
Set value of specific scope acquisition parameter.
Parameters
----------
keys : Union[List[str], str]
Configuration key to access, or hierarchical list thereof
val: Any
Value to set parameter to.
Returns
----------
Raises
----------
"""
# Normalize keys argument.
if not isinstance(keys, str) and hasattr(keys, '__iter__'):
keys = tuple(keys)
else:
keys = [keys]
# Get configuration.
cfg = get_acq_scope_config(funcs)
# Walk to the right "directory" in the hierarchy.
d = cfg
for key in keys[:-1]:
if (isinstance(d, list) and (not isinstance(key, int) or key < 0 or key >= len(d))) or \
(isinstance(d, dict) and (not isinstance(key, str) or key not in d)):
raise KeyError(f'acq_scope_dict[{"][".join(str(e) for e in keys)}] is not a valid scope path, failed at {key}')
d = d[key]
key = keys[-1]
if (isinstance(d, list) and (not isinstance(key, int) or key < 0 or key >= len(d))) or \
(isinstance(d, dict) and (not isinstance(key, str) or key not in d)):
raise KeyError(f'acq_scope_dict[{"][".join(str(e) for e in keys)}] is not a valid scope path, failed at {key}')
# Make sure val has the right JSON type; try casting it if not.
cur = d[keys[-1]]
for typ in (int, float, str, bool):
if isinstance(cur, typ):
try:
val = typ(val)
except (ValueError, TypeError) as e:
raise TypeError(f'{".".join(str(e) for e in keys)} should be of type {typ.__name__}: {e}')
break
else:
# Probably a dict or array! Can't set those directly.
raise KeyError(f'{".".join(str(e) for e in keys)} is an incomplete scope path')
# Set the new value.
if isinstance(d[keys[-1]], bool):
d[keys[-1]] = True if val == 1 else False
else:
d[keys[-1]] = val
set_acq_scope_config(funcs, cfg)
# --------------------------------------------------------------------------
[docs]def get_acq_scope_config_val(funcs: FuncRefs, keys: Any) -> Any:
"""
Get value of specific scope acquisition parameter.
Parameters
----------
keys : Union[List[str], str]
Configuration key to access, or hierarchical list thereof
Returns
----------
Any
Parameter value.
Raises
----------
"""
# Normalize keys argument.
if not isinstance(keys, str) and hasattr(keys, '__iter__'):
keys = tuple(keys)
else:
keys = [keys]
val = get_acq_scope_config(funcs)
for key in keys:
if (isinstance(val, list) and (not isinstance(key, int) or key < 0 or key >= len(val))) or \
(isinstance(val, dict) and (not isinstance(key, str) or key not in val)):
raise KeyError(f'cfg_dict[{"][".join(str(e) for e in keys)}] is not a valid sequencer path, failed at {key}')
val = val[key]
return val
# ----------------------------------------------------------------------------
[docs]def set_sequencer_program(funcs: FuncRefs, sequencer: int, program: str) -> None:
"""
Assemble and set Q1ASM program for the indexed sequencer. If assembling
failes, an RuntimeError is thrown with the assembler log attached.
Parameters
----------
sequencer : int
Sequencer index.
program : str
Q1ASM program.
Returns
----------
Raises
----------
RuntimeError
Assembly failed.
"""
check_sequencer_index(sequencer)
try:
funcs._set_sequencer_program(sequencer, _check_program_length(program))
except:
print(funcs.get_assembler_log())
raise
# ----------------------------------------------------------------------------
[docs]def set_sequencer_config(funcs: FuncRefs, sequencer: int, config: Dict) -> None:
"""
Set configuration of the indexed sequencer. The configuration consists
dictionary containing multiple parameters that will be converted into a
JSON object supported by the Pulsar QRM.
Parameters
----------
sequencer : int
Sequencer index.
config : dict
Configuration dictionary.
Returns
----------
Raises
----------
"""
# Get current configuration and merge dictionaries.
check_sequencer_index(sequencer)
funcs._set_sequencer_config(sequencer, config)
# ----------------------------------------------------------------------------
[docs]def get_sequencer_config(funcs: FuncRefs, sequencer: int) -> Dict:
"""
Get configuration of the indexed sequencer. The configuration consists
dictionary containing multiple parameters that will be converted from a
JSON object provided by the Pulsar QRM.
Parameters
----------
sequencer : int
Sequencer index.
Returns
----------
dict
Configuration dictionary.
Raises
----------
"""
check_sequencer_index(sequencer)
return funcs._get_sequencer_config(sequencer)
# --------------------------------------------------------------------------
[docs]def set_sequencer_config_val(funcs: FuncRefs, sequencer: int, keys: Any, val: Any) -> None:
"""
Set value of specific sequencer parameter.
Parameters
----------
sequencer : int
Sequencer index.
keys : Union[List[str], str]
Configuration key to access, or hierarchical list thereof
val : Any
Value to set parameter to.
Returns
----------
Raises
----------
"""
# Normalize keys argument.
if not isinstance(keys, str) and hasattr(keys, '__iter__'):
keys = tuple(keys)
else:
keys = [keys]
# Get configuration.
cfg = get_sequencer_config(funcs, sequencer)
# Walk to the right "directory" in the hierarchy.
d = cfg
for key in keys[:-1]:
if (isinstance(d, list) and (not isinstance(key, int) or key < 0 or key >= len(d))) or \
(isinstance(d, dict) and (not isinstance(key, str) or key not in d)):
raise KeyError(f'cfg_dict[{"][".join(str(e) for e in keys)}] is not a valid sequencer path, failed at {key}')
if key == "acq" or key == "awg":
d = d[key][0]
else:
d = d[key]
key = keys[-1]
if (isinstance(d, list) and (not isinstance(key, int) or key < 0 or key >= len(d))) or \
(isinstance(d, dict) and (not isinstance(key, str) or key not in d)):
raise KeyError(f'cfg_dict[{"][".join(str(e) for e in keys)}] is not a valid sequencer path, failed at {key}')
# Make sure val has the right JSON type; try casting it if not.
cur = d[keys[-1]]
for typ in (int, float, str, bool):
if isinstance(cur, typ):
try:
val = typ(val)
except (ValueError, TypeError) as e:
raise TypeError(f'{".".join(str(e) for e in keys)} should be of type {typ.__name__}: {e}')
break
else:
# Probably a dict or array! Can't set those directly.
raise KeyError(f'{".".join(str(e) for e in keys)} is an incomplete sequencer path')
# Set the new value.
if isinstance(d[keys[-1]], bool):
d[keys[-1]] = True if val == 1 else False
else:
d[keys[-1]] = val
set_sequencer_config(funcs, sequencer, cfg)
# --------------------------------------------------------------------------
[docs]def get_sequencer_config_val(funcs: FuncRefs, sequencer: int, keys: Any) -> Any:
"""
Get value of specific sequencer parameter.
Parameters
----------
sequencer : int
Sequencer index.
keys : Union[List[str], str]
Configuration key to access, or hierarchical list thereof
Returns
----------
Any
Parameter value.
Raises
----------
"""
# Normalize keys argument.
if not isinstance(keys, str) and hasattr(keys, '__iter__'):
keys = tuple(keys)
else:
keys = [keys]
val = get_sequencer_config(funcs, sequencer)
for key in keys:
if (isinstance(val, list) and (not isinstance(key, int) or key < 0 or key >= len(val))) or \
(isinstance(val, dict) and (not isinstance(key, str) or key not in val)):
raise KeyError(f'cfg_dict[{"][".join(str(e) for e in keys)}] is not a valid sequencer path, failed at {key}')
if key == "acq" or key == "awg":
val = val[key][0]
else:
val = val[key]
return val
# ----------------------------------------------------------------------------
[docs]def set_sequencer_config_rotation_matrix(
funcs: FuncRefs,
sequencer: int,
phase_incr: float
) -> None:
"""
Sets the integration result phase rotation matrix in the acquisition path.
Parameters
----------
sequencer : int
Sequencer index.
phase_incr : float
Phase increment in degrees.
Returns
----------
Raises
----------
NotImplementedError
Functionality not available on this module.
"""
check_qrm_type(funcs.is_qrm_type())
cfg_dict = get_sequencer_config(funcs, sequencer)
cfg_dict["acq"][0]["th_acq"]["rotation_matrix_a11"] = numpy.cos(numpy.deg2rad(360 - phase_incr))
cfg_dict["acq"][0]["th_acq"]["rotation_matrix_a12"] = numpy.sin(numpy.deg2rad(360 - phase_incr))
set_sequencer_config(funcs, sequencer, cfg_dict)
# ----------------------------------------------------------------------------
[docs]def get_sequencer_config_rotation_matrix(
funcs: FuncRefs,
sequencer: int
) -> float:
"""
Gets the integration result phase rotation matrix in the acquisition path.
Parameters
----------
sequencer : int
Sequencer index.
Returns
----------
float
Phase increment in degrees.
Raises
----------
NotImplementedError
Functionality not available on this module.
"""
check_qrm_type(funcs.is_qrm_type())
cfg = get_sequencer_config(funcs, sequencer)
vector = cfg["acq"][0]["th_acq"]["rotation_matrix_a11"] + cfg["acq"][0]["th_acq"]["rotation_matrix_a12"] * 1j
phase_incr = numpy.angle(vector, deg=True)
if phase_incr == 0:
return 0
elif phase_incr >= 0:
return 360 - phase_incr
else:
return -1.0 * phase_incr
# -----------------------------------------------------------------------------
[docs]def set_sequencer_channel_map(funcs: FuncRefs, sequencer: int, output: int, enable: bool) -> None:
"""
Set enable of the indexed sequencer's path to output.
If an invalid sequencer index is given or the channel map is not valid, an
error is set in system error.
Parameters
----------
sequencer : int
Sequencer index.
output : int
Output index.
enable : bool
Sequencer path to output enable
Returns
----------
Raises
----------
"""
check_sequencer_index(sequencer)
channel_map_bin = funcs._get_sequencer_channel_map(sequencer)
channel_map = list(
struct.unpack("I" * int(len(channel_map_bin) / 4), channel_map_bin)
)
if output not in channel_map and enable:
channel_map.append(output)
elif output in channel_map and not enable:
channel_map.remove(output)
funcs._set_sequencer_channel_map(
sequencer,
struct.pack("I" * len(channel_map), *channel_map)
)
# -----------------------------------------------------------------------------
[docs]def get_sequencer_channel_map(funcs: FuncRefs, sequencer: int, output: int) -> bool:
"""
Get enable of the indexed sequencer's path to output.
If an invalid sequencer index is given or the channel map is not valid, an
error is set in system error.
Parameters
----------
sequencer : int
Sequencer index.
output : int
Output index.
Returns
----------
bool
Sequencer path to output enable.
Raises
----------
"""
check_sequencer_index(sequencer)
channel_map_bin = funcs._get_sequencer_channel_map(sequencer)
channel_map = list(
struct.unpack("I" * int(len(channel_map_bin) / 4), channel_map_bin)
)
return output in channel_map
# ----------------------------------------------------------------------------
[docs]def arm_sequencer(funcs: FuncRefs, scpi_cmd_prefix: str) -> None:
"""
Prepare the indexed sequencer to start by putting it in the armed state.
If no sequencer index is given, all sequencers are armed. Any sequencer
that was already running is stopped and rearmed. If an invalid sequencer
index is given, an error is set in system error.
Parameters
----------
sequencer : Optional[int]
Sequencer index.
Returns
----------
Raises
----------
RuntimeError
An error is reported in system error and debug <= 1.
All errors are read from system error and listed in the exception.
"""
# The SCPI command prefix is set by the native instrument layer so that
# it can select to arm a specific sequencer (e.g. "SLOT1:SEQuencer0") or
# all sequencers (e.g. "SLOT:SEQuencer")
# The actual SCPI call is wrapped in a function to make use of the
# scpi_error_check method.
@scpi_error_check
def arm_sequencer_func(instrument: Any):
funcs._write("{}:ARM".format(scpi_cmd_prefix))
arm_sequencer_func(funcs.instrument)
# ----------------------------------------------------------------------------
[docs]def start_sequencer(funcs: FuncRefs, scpi_cmd_prefix: str) -> None:
"""
Start the indexed sequencer, thereby putting it in the running state.
If an invalid sequencer index is given or the indexed sequencer was not
yet armed, an error is set in system error. If no sequencer index is
given, all armed sequencers are started and any sequencer not in the armed
state is ignored. However, if no sequencer index is given and no
sequencers are armed, and error is set in system error.
Parameters
----------
sequencer : Optional[int]
Sequencer index.
Returns
----------
Raises
----------
RuntimeError
An error is reported in system error and debug <= 1.
All errors are read from system error and listed in the exception.
"""
# The SCPI command prefix is set by the native instrument layer so that
# it can select to start a specific sequencer (e.g. "SLOT1:SEQuencer0") or
# all sequencers (e.g. "SLOT:SEQuencer")
# The actual SCPI call is wrapped in a function to make use of the
# scpi_error_check method.
@scpi_error_check
def start_sequencer_func(instrument: Any):
funcs._write("{}:START".format(scpi_cmd_prefix))
start_sequencer_func(funcs.instrument)
# ----------------------------------------------------------------------------
[docs]def stop_sequencer(funcs: FuncRefs, scpi_cmd_prefix: str) -> None:
"""
Stop the indexed sequencer, thereby putting it in the stopped state. If
an invalid sequencer index is given, an error is set in system error. If
no sequencer index is given, all sequencers are stopped.
Parameters
----------
sequencer : Optional[int]
Sequencer index.
Returns
----------
Raises
----------
RuntimeError
An error is reported in system error and debug <= 1.
All errors are read from system error and listed in the exception.
"""
# The SCPI command prefix is set by the native instrument layer so that
# it can select to stop a specific sequencer (e.g. "SLOT1:SEQuencer0") or
# all sequencers (e.g. "SLOT:SEQuencer")
# The actual SCPI call is wrapped in a function to make use of the
# scpi_error_check method.
@scpi_error_check
def stop_sequencer_func(instrument: Any):
funcs._write("{}:STOP".format(scpi_cmd_prefix))
stop_sequencer_func(funcs.instrument)
# ----------------------------------------------------------------------------
[docs]def get_sequencer_state(
funcs: FuncRefs,
sequencer: int,
timeout: int = 0,
timeout_poll_res:
float = 0.02
) -> SequencerState:
"""
Get the sequencer state. If an invalid sequencer index is given, an error
is set in system error. If the timeout is set to zero, the function
returns the state immediately. If a positive non-zero timeout is set, the
function blocks until the sequencer completes. If the sequencer hasn't
stopped before the timeout expires, a TimeoutError is thrown.
Parameters
----------
sequencer : int
Sequencer index.
timeout : int
Timeout in minutes.
timeout_poll_res : float
Timeout polling resolution in seconds.
Returns
----------
SequencerState
Tuple containing sequencer status and corresponding flags.
Raises
----------
TimeoutError
Timeout
"""
# Format status string
check_sequencer_index(sequencer)
state = funcs._get_sequencer_state(sequencer)
state_elem_list = re.sub(" |-", "_", state).split(";")
if state_elem_list[-1] != "":
state_flag_list = state_elem_list[-1].split(",")[:-1]
else:
state_flag_list = []
state_tuple = SequencerState(
SequencerStatus[state_elem_list[0]],
[SequencerStatusFlags[flag] for flag in state_flag_list],
)
elapsed_time = 0.0
timeout = timeout * 60.0
while ((state_tuple.status == SequencerStatus.RUNNING or
state_tuple.status == SequencerStatus.Q1_STOPPED) and
elapsed_time < timeout):
time.sleep(timeout_poll_res)
state_tuple = get_sequencer_state(funcs, sequencer)
elapsed_time += timeout_poll_res
if elapsed_time >= timeout:
raise TimeoutError(
"Sequencer {} did not stop in timeout period of {} minutes.".format(sequencer, int(timeout / 60))
)
return state_tuple
# ----------------------------------------------------------------------------
def _add_awg_waveform(
funcs: FuncRefs,
sequencer: int,
name: str,
waveform: List[float],
index: Optional[int] = None,
) -> None:
"""
Add new waveform to AWG waveform list of indexed sequencer's AWG path. If
an invalid sequencer index is given or if the waveform causes the waveform
memory limit to be exceeded or if the waveform samples are out-of-range,
an error is set in the system error. The waveform names 'all' and 'ALL'
are reserved and adding waveforms with those names will also result in an
error being set in system error. The optional index argument is used to
specify an index for the waveform in the waveform list which is used by
the sequencer Q1ASM program to refer to the waveform. If no index is
given, the next available waveform index is selected (starting from 0).
If an invalid waveform index is given, an error is set in system error.
Parameters
----------
sequencer : int
Sequencer index.
name : str
Waveform name.
waveform : list
List of floats in the range of 1.0 to -1.0 representing the waveform.
index : Optional[int]
Waveform index of the waveform in the waveform list.
Returns
----------
Raises
----------
"""
funcs._add_awg_waveform(sequencer, name, len(waveform), False)
funcs._set_awg_waveform_data(sequencer, name, waveform)
if index is not None:
funcs._set_awg_waveform_index(sequencer, name, index)
# ----------------------------------------------------------------------------
# Note: decorator uses instrument argument
@scpi_error_check
def _get_awg_waveforms(instrument: Any, funcs: FuncRefs, sequencer: int) -> Dict:
"""
Get all waveforms in waveform list of indexed sequencer's AWG path. If an
invalid sequencer index is given, an error is set in system error.
Parameters
----------
sequencer : int
Sequencer index.
Returns
----------
dict
Dictionary with waveforms.
Raises
----------
RuntimeError
An error is reported in system error and debug <= 1.
All errors are read from system error and listed in the exception.
"""
# SCPI call
num_waveforms = struct.unpack("I", funcs._get_awg_waveforms(sequencer))[0]
if num_waveforms == 0:
funcs._flush_line_end()
waveform_dict = {}
for wave_it in range(0, num_waveforms):
# Get name and index
name = str(funcs._read_bin("", False), "utf-8")
index = struct.unpack("I", funcs._read_bin("", False))[0]
# Get data
data = funcs._read_bin("", wave_it >= (num_waveforms - 1))
data = struct.unpack("f" * int(len(data) / 4), data)
# Add to dictionary
waveform_dict[name] = {"index": index, "data": list(data)}
return waveform_dict
# ----------------------------------------------------------------------------
def _add_acq_weight(
funcs: FuncRefs,
sequencer: int,
name: str,
weight: List[float],
index: Optional[int] = None,
) -> None:
"""
Add new weight to acquisition weight list of indexed sequencer's
acquisition path. If an invalid sequencer index is given or if the weight
causes the weight memory limit to be exceeded or if the weight samples are
out-of-range, an error is set in the system error. The weight names 'all'
and 'ALL' are reserved and adding weights with those names will also
result in an error being set in system error. The optional index argument
is used to specify an index for the weight in the weight list which is
used by the sequencer Q1ASM program to refer to the weight. If no index
is given, the next available weight index is selected (starting from 0).
If an invalid weight index is given, an error is set in system error.
Parameters
----------
sequencer : int
Sequencer index.
name : str
Weight name.
weight : list
List of floats in the range of 1.0 to -1.0 representing the weight.
index : Optional[int]
Weight index of the weight in the weight list.
Returns
----------
Raises
----------
"""
funcs._add_acq_weight(sequencer, name, len(weight), False)
funcs._set_acq_weight_data(sequencer, name, weight)
if index is not None:
funcs._set_acq_weight_index(sequencer, name, index)
# ----------------------------------------------------------------------------
# Note: decorator uses instrument argument
@scpi_error_check
def _get_acq_weights(instrument: Any, funcs: FuncRefs, sequencer: int) -> Dict:
"""
Get all weights in weight list of indexed sequencer's acquisition path.
If an invalid sequencer index is given, an error is set in system error.
Parameters
----------
sequencer : int
Sequencer index.
Returns
----------
dict
Dictionary with weights.
Raises
----------
RuntimeError
An error is reported in system error and debug <= 1.
All errors are read from system error and listed in the exception.
"""
# SCPI call
num_weights = struct.unpack("I", funcs._get_acq_weights(sequencer))[0]
if num_weights == 0:
funcs._flush_line_end()
weight_dict = {}
for weight_it in range(0, num_weights):
# Get name and index
name = str(funcs._read_bin("", False), "utf-8")
index = struct.unpack("I", funcs._read_bin("", False))[0]
# Get data
data = funcs._read_bin("", weight_it >= (num_weights - 1))
data = struct.unpack("f" * int(len(data) / 4), data)
# Add to dictionary
weight_dict[name] = {"index": index, "data": list(data)}
return weight_dict
# ----------------------------------------------------------------------------
def _add_acq_acquisition(
funcs: FuncRefs,
sequencer: int,
name: str,
num_bins: int,
index: Optional[int] = None,
) -> None:
"""
Add new acquisition to acquisition list of indexed sequencer's acquisition
path. If an invalid sequencer index is given or if the required
acquisition memory cannot be allocated, an error is set in system error.
The acquisition names 'all' and 'ALL' are reserved and adding those will
also result in an error being set in system error. If no index is given,
the next available weight index is selected (starting from 0). If an
invalid weight index is given, an error is set in system error.
Parameters
----------
sequencer : int
Sequencer index.
name : str
Acquisition name.
num_bins : int
Number of bins in acquisition. Maximum is 2^24.
index : Optional[int]
Waveform index of the acquisition in the acquisition list.
Returns
----------
Raises
----------
"""
funcs._add_acq_acquisition(sequencer, name, num_bins)
if index is not None:
funcs._set_acq_acquisition_index(sequencer, name, index)
# The _get_acq_data_and_convert method potentially does a division by zero
# that we want to ignore.
numpy.seterr(divide='ignore', invalid='ignore')
# ----------------------------------------------------------------------------
def _get_acq_data_and_convert(
funcs: FuncRefs,
init_read_func: Callable[[Optional[int], Optional[str]], bytes],
flush_line_end: bool
) -> Dict:
"""
Get acquisition data and convert it to a dictionary.
Parameters
----------
init_read_func : Callable[[Optional[int], Optional[str]], bytes]
Function that performs the initial binary read.
flush_line_end : bool
Indication to flush final characters after final read.
Returns
----------
dict
Dictionary with data of single acquisition.
Raises
----------
"""
acquisition_dict = {
"scope": {
"path0": {"data": [], "out-of-range": False, "avg_cnt": 0},
"path1": {"data": [], "out-of-range": False, "avg_cnt": 0},
},
"bins": {
"integration": {"path0": [], "path1": []},
"threshold": [],
"avg_cnt": [],
},
}
sample_width = 12
max_sample_value = 2 ** (sample_width - 1) - 1
max_sample_value_sqrd = max_sample_value ** 2
# Retrieve scope data
scope_data = init_read_func()
path0_scope_raw = numpy.array(struct.unpack("i" * int(len(scope_data) / 4), scope_data))
path0_oor = struct.unpack("?", funcs._read_bin("", False))[0]
path0_avg_cnt = struct.unpack("I", funcs._read_bin("", False))[0]
scope_data = funcs._read_bin("", False)
path1_scope_raw = numpy.array(struct.unpack("i" * int(len(scope_data) / 4), scope_data))
path1_oor = struct.unpack("?", funcs._read_bin("", False))[0]
path1_avg_cnt = struct.unpack("I", funcs._read_bin("", False))[0]
# Normalize scope data
path0_scope = numpy.where(
path0_avg_cnt > 0,
path0_scope_raw / max_sample_value / path0_avg_cnt,
path0_scope_raw / max_sample_value
)
path1_scope = numpy.where(
path1_avg_cnt > 0,
path1_scope_raw / max_sample_value / path1_avg_cnt,
path1_scope_raw / max_sample_value
)
# Retrieve bin data and convert to long values
bins = funcs._read_bin("", flush_line_end)
bin_data = numpy.frombuffer(bins, dtype=numpy.int64).reshape(-1,4)
valid = bin_data[:,0].astype(bool)
path0_raw = bin_data[:,1]
path1_raw = bin_data[:,2]
thres_avg_cnt_raw = bin_data[:,3].astype(numpy.uint64)
# Thresholded and average count are stored in the same long value so need
# to be separated. Thresholded is first in stream, thus lower 32 bits.
thres_raw = thres_avg_cnt_raw & 0xFFFFFFFF
avg_cnt = thres_avg_cnt_raw >> 32
# Normalize bin data
path0_raw = numpy.where(avg_cnt > 0, path0_raw / avg_cnt, path0_raw)
path1_raw = numpy.where(avg_cnt > 0, path1_raw / avg_cnt, path1_raw)
thres_raw = numpy.where(avg_cnt > 0, thres_raw / avg_cnt, thres_raw)
path0_data = numpy.where(valid, path0_raw / max_sample_value_sqrd, numpy.nan)
path1_data = numpy.where(valid, path1_raw / max_sample_value_sqrd, numpy.nan)
thres_data = numpy.where(valid, thres_raw, numpy.nan)
avg_cnt_data = numpy.where(valid, avg_cnt, numpy.nan)
# Set final results
acquisition_dict["scope"]["path0"]["data"] = path0_scope.tolist()
acquisition_dict["scope"]["path0"]["out-of-range"] = path0_oor
acquisition_dict["scope"]["path0"]["avg_cnt"] = path0_avg_cnt
acquisition_dict["scope"]["path1"]["data"] = path1_scope.tolist()
acquisition_dict["scope"]["path1"]["out-of-range"] = path1_oor
acquisition_dict["scope"]["path1"]["avg_cnt"] = path1_avg_cnt
acquisition_dict["bins"]["integration"]["path0"] = path0_data.tolist()
acquisition_dict["bins"]["integration"]["path1"] = path1_data.tolist()
acquisition_dict["bins"]["threshold"] = thres_data.tolist()
acquisition_dict["bins"]["avg_cnt"] = avg_cnt_data.tolist()
return acquisition_dict
# ----------------------------------------------------------------------------
# Note: decorator uses instrument argument
[docs]@scpi_error_check
def get_acq_acquisition_data(
instrument: Any,
funcs: FuncRefs,
sequencer: int,
name: str
) -> Dict:
"""
Get acquisition data of acquisition in acquisition list of indexed
sequencer's acquisition path. The acquisition scope and bin data is
normalized to a range of -1.0 to 1.0 taking both the bit widths of the
processing path and average count into considaration. For the binned
integration results, the integration length is not handled during
normalization and therefore these values have to be divided by their
respective integration lenghts. If an invalid sequencer index is given or
if a non-existing acquisition name is given, an error is set in system
error.
Parameters
----------
sequencer : int
Sequencer index.
name : str
Acquisition name.
Returns
----------
dict
Dictionary with data of single acquisition.
Raises
----------
RuntimeError
An error is reported in system error and debug <= 1.
All errors are read from system error and listed in the exception.
"""
# SCPI call
check_sequencer_index(sequencer)
return _get_acq_data_and_convert(
funcs,
partial(funcs._get_acq_acquisition_data, sequencer, name),
True,
)
# ----------------------------------------------------------------------------
# Note: decorator uses instrument argument
@scpi_error_check
def _get_acq_acquisitions(instrument: Any, funcs: FuncRefs, sequencer: int) -> Dict:
"""
Get all acquisitions in acquisition list of indexed sequencer's
acquisition path. If an invalid sequencer index is given, an error is set
in system error.
Parameters
----------
sequencer : int
Sequencer index.
Returns
----------
dict
Dictionary with acquisitions.
Raises
----------
RuntimeError
An error is reported in system error and debug <= 1.
All errors are read from system error and listed in the exception.
"""
# SCPI call
num_acq = struct.unpack("I", funcs._get_acq_acquisitions(sequencer))[0]
if num_acq == 0:
funcs._flush_line_end()
acquisition_dict = {}
for acq_it in range(0, num_acq):
# Get name and index
name = str(funcs._read_bin("", False), "utf-8")
index = struct.unpack("I", funcs._read_bin("", False))[0]
# Get data
acq = _get_acq_data_and_convert(
funcs,
partial(funcs._read_bin, "", False),
acq_it >= (num_acq - 1)
)
# Add to dictionary
acquisition_dict[name] = {"index": index, "acquisition": acq}
return acquisition_dict
# ----------------------------------------------------------------------------
# ----------------------------------------------------------------------------
# ----------------------------------------------------------------------------
# ----------------------------------------------------------------------------
[docs]def add_weights(funcs: FuncRefs, sequencer: int, weights: Dict) -> None:
"""
Add all weights in JSON compatible dictionary to the aquisition weight
list of indexed sequencer. The dictionary must be structured as follows:
- name : weight name.
- data: weight samples in a range of 1.0 to -1.0.
- index: optional waveweightform index used by the sequencer Q1ASM
program to refer to the weight.
Parameters
----------
sequencer : int
Sequencer index.
weights : dict
JSON compatible dictionary with one or more weigths.
Returns
----------
Raises
----------
KeyError
Missing weight data of weight in dictionary.
NotImplementedError
Functionality not available on this module.
"""
check_qrm_type(funcs.is_qrm_type())
check_sequencer_index(sequencer)
for name in weights:
if "data" in weights[name]:
if "index" in weights[name]:
_add_acq_weight(
funcs,
sequencer,
name,
weights[name]["data"],
weights[name]["index"],
)
else:
_add_acq_weight(
funcs,
sequencer,
name,
weights[name]["data"]
)
else:
raise KeyError(
"Missing data key for {} in acquisiton weight dictionary".format(name)
)
# ----------------------------------------------------------------------------
[docs]def delete_weight(
funcs: FuncRefs,
sequencer: int,
name: str = "",
all: bool = False
) -> None:
"""
Delete a weight specified by name in the acquisition weight list of
indexed sequencer or delete all weights if `all` is True.
Parameters
----------
sequencer : int
Sequencer index.
name : str
Weight name
all : bool
All weights
Returns
----------
Raises
----------
NotImplementedError
Functionality not available on this module.
"""
check_qrm_type(funcs.is_qrm_type())
check_sequencer_index(sequencer)
funcs._delete_acq_weight(sequencer, "all" if all else name)
# ----------------------------------------------------------------------------
[docs]def get_weights(funcs: FuncRefs, sequencer: int) -> Dict:
"""
Get all weigths in the acquisition weight lists of indexed sequencer.
The returned dictionary is structured as follows:
-name : weight name.
- data: weight samples in a range of 1.0 to -1.0.
- index: weight index used by the sequencer Q1ASM program to refer
to the weight.
Parameters
----------
sequencer : int
Sequencer index.
Returns
----------
dict
Dictionary with weights.
Raises
----------
NotImplementedError
Functionality not available on this module.
"""
check_qrm_type(funcs.is_qrm_type())
check_sequencer_index(sequencer)
return _get_acq_weights(funcs.instrument, funcs, sequencer)
# ----------------------------------------------------------------------------
[docs]def get_acquisition_state(
funcs: FuncRefs,
sequencer: int,
timeout: int = 0,
timeout_poll_res: float = 0.02,
check_seq_state: bool = True,
) -> bool:
"""
Return acquisition binning completion state of the indexed sequencer. If
an invalid sequencer is given, an error is set in system error. If the
timeout is set to zero, the function returns the state immediately. If a
positive non-zero timeout is set, the function blocks until the acquisition
binning completes. If the acquisition hasn't completed before the timeout
expires, a TimeoutError is thrown. Note that when sequencer state checking
is enabled, the sequencer state is checked using get_sequencer_state with
the selected timeout period first and then the acquisition state is checked
with the same timeout period. This means that the total timeout period is
two times the set timeout period.
Parameters
----------
sequencer : int
Sequencer index.
timeout : int
Timeout in minutes.
timeout_poll_res : float
Timeout polling resolution in seconds.
check_seq_state : bool
Check if sequencer is done before checking acquisition state.
Returns
----------
bool
Indicates the acquisition binning completion state (False = uncompleted,
True = completed).
Raises
----------
TimeoutError
Timeout
NotImplementedError
Functionality not available on this module.
"""
# Check if sequencer has stopped
check_qrm_type(funcs.is_qrm_type())
if check_seq_state:
seq_state = get_sequencer_state(
funcs,
sequencer,
timeout,
timeout_poll_res
)
if seq_state.status != SequencerStatus.STOPPED:
return False
else:
seq_state = get_sequencer_state(funcs, sequencer)
# Get acquisition status
acq_state = SequencerStatusFlags.ACQ_BINNING_DONE in seq_state.flags
elapsed_time = 0.0
timeout = timeout * 60.0
while acq_state is False and elapsed_time < timeout:
time.sleep(timeout_poll_res)
seq_state = get_sequencer_state(funcs, sequencer)
acq_state = SequencerStatusFlags.ACQ_BINNING_DONE in seq_state.flags
elapsed_time += timeout_poll_res
if elapsed_time >= timeout:
raise TimeoutError(
"Acquisitions on sequencer {} did not complete in timeout period of {} minutes.".format(sequencer, int(timeout / 60))
)
return acq_state
# ----------------------------------------------------------------------------
[docs]def add_acquisitions(funcs: FuncRefs, sequencer: int, acquisitions: Dict) -> None:
"""
Add all waveforms and weights in JSON compatible dictionary to AWG
waveform and aquisition weight lists of indexed sequencer. The dictionary
must be structured as follows:
- name: acquisition name.
- num_bins: number of bins in acquisition.
- index: optional acquisition index used by the sequencer Q1ASM
program to refer to the acquition.
Parameters
----------
sequencer : int
Sequencer index.
acquisitions : dict
JSON compatible dictionary with one or more acquisitions.
Returns
----------
Raises
----------
KeyError
Missing dictionary key in acquisitions.
NotImplementedError
Functionality not available on this module.
"""
check_qrm_type(funcs.is_qrm_type())
check_sequencer_index(sequencer)
for name in acquisitions:
if "num_bins" in acquisitions[name]:
if "index" in acquisitions[name]:
_add_acq_acquisition(
funcs,
sequencer,
name,
acquisitions[name]["num_bins"],
acquisitions[name]["index"],
)
else:
_add_acq_acquisition(
funcs,
sequencer,
name,
acquisitions[name]["num_bins"]
)
else:
raise KeyError(
"Missing num_bins key for {} in acquisition dictionary".format(name)
)
# ----------------------------------------------------------------------------
[docs]def delete_acquisition(
funcs: FuncRefs,
sequencer: int,
name: str = "",
all: bool = False
) -> None:
"""
Delete an acquisition specified by name in the acquisition list of indexed
sequencer or delete all acquisitions if `all` is True.
Parameters
----------
sequencer : int
Sequencer index.
name : str
Weight name
all : bool
All weights
Returns
----------
Raises
----------
NotImplementedError
Functionality not available on this module.
"""
check_qrm_type(funcs.is_qrm_type())
check_sequencer_index(sequencer)
funcs._delete_acq_acquisition(sequencer, "all" if all else name)
# ----------------------------------------------------------------------------
[docs]def delete_acquisition_data(
funcs: FuncRefs,
sequencer: int,
name: str = "",
all: bool = False
) -> None:
"""
Delete data from an acquisition specified by name in the acquisition list
of indexed sequencer or delete data in all acquisitions if `all` is True.
Parameters
----------
sequencer : int
Sequencer index.
name : str
Weight name
Returns
----------
Raises
----------
NotImplementedError
Functionality not available on this module.
"""
check_qrm_type(funcs.is_qrm_type())
check_sequencer_index(sequencer)
funcs._delete_acq_acquisition_data(sequencer, "all" if all else name)
# ----------------------------------------------------------------------------
[docs]def store_scope_acquisition(funcs: FuncRefs, sequencer: int, name: str) -> None:
"""
After an acquisition has completed, store the scope acquisition results
in the acquisition specified by name of the indexed sequencers. If an
invalid sequencer index is given an error is set in system error. To get
access to the acquisition results, the sequencer will be stopped when
calling this function.
Parameters
----------
sequencer : int
Sequencer index.
name : str
Acquisition name.
Returns
----------
Raises
----------
NotImplementedError
Functionality not available on this module.
"""
check_qrm_type(funcs.is_qrm_type())
check_sequencer_index(sequencer)
funcs._set_acq_acquisition_data(sequencer, name)
# ----------------------------------------------------------------------------
[docs]def get_acquisitions(funcs: FuncRefs, sequencer: int) -> Dict:
"""
Get all acquisitions in acquisition lists of indexed sequencer. The
acquisition scope and bin data is normalized to a range of -1.0 to 1.0
taking both the bit widths of the processing path and average count into
considaration. For the binned integration results, the integration length
is not handled during normalization and therefore these values have to be
divided by their respective integration lenghts. The returned dictionary
is structured as follows:
- name: acquisition name
- index: acquisition index used by the sequencer Q1ASM program to
refer to the acquisition.
- acquisition: acquisition dictionary
- scope: Scope data
- path0: input path 0
- data: acquisition samples in a range of 1.0 to -1.0.
- out-of-range: out-of-range indication for the entire
acquisition (False = in-range, True =
out-of-range).
- avg_cnt: number of averages.
- path1: input path 1
- data: acquisition samples in a range of 1.0 to -1.0.
- out-of-range: out-of-range indication for the entire
acquisition (False = in-range, True =
out-of-range).
- avg_cnt: number of averages.
- bins: bin data
- integration: integration data
- path_0: input path 0 integration result bin list
- path_1: input path 1 integration result bin list
- threshold: threshold result bin list
- valid: list of valid indications per bin
- avg_cnt: list of number of averages per bin
Parameters
----------
sequencer : int
Sequencer index.
Returns
----------
dict
Dictionary with acquisitions.
Raises
----------
NotImplementedError
Functionality not available on this module.
"""
check_qrm_type(funcs.is_qrm_type())
check_sequencer_index(sequencer)
return _get_acq_acquisitions(funcs.instrument, funcs, sequencer)
# --------------------------------------------------------------------------
_validate_qcm_sequence = fastjsonschema.compile(QCM_SEQUENCE_JSON_SCHEMA)
_validate_qrm_sequence = fastjsonschema.compile(QRM_SEQUENCE_JSON_SCHEMA)
_validat_wave = fastjsonschema.compile(WAVE_JSON_SCHEMA)
_validate_acq = fastjsonschema.compile(ACQ_JSON_SCHEMA)
[docs]def set_sequence(
funcs: FuncRefs,
sequencer: int,
sequence: Union[str, Dict[str, Any]],
validation_enable: bool = True,
) -> None:
"""
Set sequencer program, AWG waveforms, acquisition weights and acquisitions
from a JSON file or from a dictionary directly. The JSON file or
dictionary need to apply the schema specified by
`QCM_SEQUENCE_JSON_SCHEMA`, `QRM_SEQUENCE_JSON_SCHEMA`, `WAVE_JSON_SCHEMA`
and `ACQ_JSON_SCHEMA`.
Parameters
----------
sequencer : int
Sequencer index.
sequence : Union[str, Dict[str, Any]]
Path to sequence file or dictionary.
validation_enable : bool
Enable JSON schema validation on sequence.
Returns
----------
Raises
----------
JsonSchemaValueException
Invalid JSON object.
"""
# Set dictionary
if isinstance(sequence, dict):
sequence_dict = sequence
else:
with open(sequence, "r") as file:
sequence_dict = json.load(file)
# Validate dictionary
if validation_enable:
if funcs.is_qrm_type():
_validate_qrm_sequence(sequence_dict)
else:
_validate_qcm_sequence(sequence_dict)
for name in sequence_dict["waveforms"]:
_validat_wave(sequence_dict["waveforms"][name])
if funcs.is_qrm_type():
for name in sequence_dict["weights"]:
_validat_wave(sequence_dict["weights"][name])
for name in sequence_dict["acquisitions"]:
_validate_acq(sequence_dict["acquisitions"][name])
# Set sequence
set_sequencer_program(funcs, sequencer, sequence_dict["program"])
delete_waveform(funcs, sequencer, all=True)
add_waveforms(funcs, sequencer, sequence_dict["waveforms"])
if funcs.is_qrm_type():
delete_weight(funcs, sequencer, all=True)
add_weights(funcs, sequencer, sequence_dict["weights"])
delete_acquisition(funcs, sequencer, all=True)
add_acquisitions(funcs, sequencer, sequence_dict["acquisitions"])