Source code for qblox_instruments.ieee488_2.dummy_transport

# ----------------------------------------------------------------------------
# Description    : Transport layer (abstract, IP, file, dummy)
# Git repository : https://gitlab.com/qblox/packages/software/qblox_instruments.git
# Copyright (C) Qblox BV (2020)
# ----------------------------------------------------------------------------


# -- include -----------------------------------------------------------------

import re
import abc
from dataclasses import dataclass

from typing import Optional, Tuple, Union, Iterable, Dict
from qblox_instruments import PulsarType, ClusterType, TypeHandle
from qblox_instruments.ieee488_2 import Transport


# -- class -------------------------------------------------------------------

[docs]@dataclass class DummyBinnedAcquisitionData: """ Class to hold data for the dummy hardware for the binned acquisition. This class contains all values for one bin. """ data: Tuple[float, float] thres: int avg_cnt: int
# -- class -------------------------------------------------------------------
[docs]@dataclass class DummyScopeAcquisitionData: """ Class to hold data for the dummy hardware for the scope acquisition. This class contains all values for the scope acquisition on one module. """ data: Iterable[Tuple[float, float]] out_of_range: Tuple[bool, bool] avg_cnt: Tuple[int, int]
# -- class -------------------------------------------------------------------
[docs]class DummyTransport(abc.ABC, Transport): """ Class to replace device with dummy device to support software stack testing without hardware. The class implements all mandatory and required SCPI calls. Call reponses are largely artifically constructed to be inline with the call's functionality (e.g. `*IDN?` returns valid, but artificial IDN data.) """ # ------------------------------------------------------------------------
[docs] def __init__(self, dummy_type: Union[PulsarType, ClusterType]): """ Create dummy transport class. Parameters ---------- dummy_type : Union[PulsarType, ClusterType] Dummy instrument type (e.g. Pulsar QCM, Pulsar QRM) Returns ---------- Raises ---------- """ # Set instrument type handle self._type_handle = TypeHandle(dummy_type) # Initialize variables self._cmd_hist = [] self._data_out = 0 self._bin_out = None self._system_error = [] # Set command dictionary self._cmds = { "*CMDS?": self._get_cmds, "*IDN?": self._get_idn, "*RST": self._reset, "SYSTem:ERRor:NEXT?": self._get_system_error, "SYSTem:ERRor:COUNt?": self._get_system_error_cnt, "STATus:GENeral:STATE?": self._get_system_state, }
# ------------------------------------------------------------------------ @property def instrument_class(self) -> str: """ Get instrument class (e.g. Pulsar, Cluster). Parameters ---------- Returns ---------- str Instrument class Raises ---------- """ return self._type_handle.instrument_class # ------------------------------------------------------------------------ @property def instrument_type(self) -> str: """ Get instrument type (e.g. MM, QRM, QCM). Parameters ---------- Returns ---------- str Instrument type Raises ---------- """ return self._type_handle.instrument_type # ------------------------------------------------------------------------
[docs] def close(self) -> None: """ Close and resets base dummy transport class. Parameters ---------- Returns ---------- Raises ---------- """ self._cmd_hist = [] self._data_out = 0 self._bin_out = None self._system_error = []
# ------------------------------------------------------------------------
[docs] def write(self, cmd_str: str) -> None: """ Write command to dummy. Stores command in command history. Parameters ---------- cmd_str : str Command Returns ---------- Raises ---------- """ cmd_parts, cmd_params, cmd_args = self._parse_cmd(cmd_str) self._execute_cmd(cmd_parts, cmd_params, cmd_args)
# ------------------------------------------------------------------------
[docs] def write_binary(self, data: bytes) -> None: """ Write binary data to dummy. Stores command in command history. Parameters ---------- data : bytes Binary data Returns ---------- Raises ---------- """ cmd_parts = data.split("#".encode()) cmd_str = cmd_parts[0].decode() bin_in = "#".encode() + "#".encode().join(cmd_parts[1:]) cmd_parts, cmd_params, cmd_args = self._parse_cmd(cmd_str) self._execute_cmd(cmd_parts, cmd_params, cmd_args, bin_in)
# ------------------------------------------------------------------------
[docs] def read_binary(self, size: int) -> bytes: """ Read binary data from dummy. Parameters ---------- size : int Number of bytes Returns ---------- bytes Binary data array of length "size". Raises ---------- """ bin_var = self._bin_out[:size] self._bin_out = self._bin_out[size:] return bin_var
# ------------------------------------------------------------------------
[docs] def readline(self) -> str: """ Read data from dummy. Parameters ---------- Returns ---------- str String with data. Raises ---------- """ return str(self._data_out)
# ------------------------------------------------------------------------
[docs] @abc.abstractmethod def delete_dummy_binned_acquisition_data(self, sequencer: Optional[int] = None, acq_index_name: Optional[str] = None): """ Delete all dummy binned acquisition data for the dummy. Parameters ---------- sequencer : Optional[int] Sequencer. acq_index_name : Optional[str] Acquisition index name. Returns ---------- Raises ---------- ValueError If the slot_idx doesn't make sense for the transport. """ pass
# ------------------------------------------------------------------------
[docs] @abc.abstractmethod def set_dummy_binned_acquisition_data( self, sequencer: int, acq_index_name: str, data: Iterable[Union[DummyBinnedAcquisitionData, None]]): """ Set dummy binned acquisition data for the dummy. Parameters ---------- sequencer : int Sequencer. acq_index_name : str Acquisition index name. data : Iterable[Union[DummyBinnedAcquisitionData, None]] Dummy data for the binned acquisition. An iterable of all the bin values. Returns ---------- Raises ---------- ValueError If the slot_idx doesn't make sense for the transport. """ pass
# ------------------------------------------------------------------------
[docs] @abc.abstractmethod def delete_dummy_scope_acquisition_data( self): """ Set dummy scope acquisition data for the dummy. Parameters ---------- Returns ---------- Raises ---------- ValueError If the slot_idx doesn't make sense for the transport. """ pass
# ------------------------------------------------------------------------
[docs] @abc.abstractmethod def set_dummy_scope_acquisition_data( self, data: DummyScopeAcquisitionData): """ Set dummy scope acquisition data for the dummy. Parameters ---------- data : DummyScopeAcquisitionData Dummy data for the scope acquisition. Returns ---------- Raises ---------- ValueError If the slot_idx doesn't make sense for the transport. """ pass
# ------------------------------------------------------------------------ def _parse_cmd(self, cmd_str: str) -> Tuple: """ Parse command and split it into command, parameters and arguments. The command is stored in the command history (see :func:`~.qblox_instruments.ieee488_2.DummyTransport.get_cmd_hist`). Parameters ---------- cmd_str : str Command Returns ---------- list Reformatted command sections list Command parameters list Command arguments Raises ---------- """ # Create command part list # Substitute command parameters with # cmd_list = cmd_str.split(" ") cmd_sub = re.sub("[0-9]+", "#", cmd_list[0]) cmd_parts = cmd_sub.split(':') # Get all command parameters cmd_params = re.findall("[0-9]+", cmd_list[0]) # Get all command arguments # Remove any " characters cmd_args = cmd_list[1].split(",") if len(cmd_list) > 1 else [] cmd_args = [arg.strip('"') for arg in cmd_args] # Append command to command history self._cmd_hist.append(cmd_sub) return cmd_parts, cmd_params, cmd_args # ------------------------------------------------------------------------ def _execute_cmd( self, cmd_parts: list, cmd_params: list, cmd_args: list, bin_in: Optional[bytes] = None ) -> None: """ Execute associated command method found in command dictionary. If the command is not in the command dictionary, respond with the default response ('0'). Parameters ---------- cmd_parts : list Reformated command sections cmd_params : list Command parameters cmd_args : list Command arguments bin_in : Optional[bytes] Binary data that needs to be send by the command. Returns ---------- Raises ---------- """ cmd_str = ':'.join(cmd_parts) if cmd_str in self._cmds: self._cmds[cmd_str](cmd_params, cmd_args, bin_in) else: self._data_out = 0 self._bin_out = self._encode_bin("0".encode()) # ------------------------------------------------------------------------ @staticmethod def _encode_bin(data: bytes, end_of_line: bool = True) -> None: """ Encode binary data to be compatible with IEEE488.2. Parameters ---------- data : bytes Binary data. end_of_line: bool Indicates if end-of-line needs to be added. Returns ---------- Raises ---------- """ header_b = str(len(data)).encode() header_a = ("#" + str(len(header_b))).encode() if end_of_line: out = header_a + header_b + data + "\r\n".encode() else: out = header_a + header_b + data return out # ------------------------------------------------------------------------ @staticmethod def _decode_bin(data: bytes) -> bytes: """ Decode IEEE488.2 binary data. Parameters ---------- data : bytes Binary data. Returns ---------- Raises ---------- RunTimeError Header error. """ header_a = data[:2].decode() # Read '#N' data = data[2:] if header_a[0] != "#": raise RuntimeError("Header error: received {}".format(header_a)) header_b = data[: int(header_a[1])].decode() data = data[int(header_a[1]) :] return data[: int(header_b)] # ------------------------------------------------------------------------
[docs] def get_cmd_hist(self) -> list: """ Get list of every executed command since the initialization or reset of the class. Parameters ---------- Returns ---------- list List of executed command strings including arguments (does not include binary data argument). Raises ---------- """ return self._cmd_hist
# ------------------------------------------------------------------------ def _get_cmds(self, cmd_params: list, cmd_args: list, bin_in: bytes) -> None: """ Get SCPI commands. Parameters ---------- cmd_params : list Command parameters indicated by '#' in the command. cmd_args : list Command arguments. bin_in : bytes Binary input data. Returns ---------- Raises ---------- """ self._data_out = ( "THe:CAke:Is:A:LIe;" "cake;" "str;" "get_cake;" "lie;" "cake;" "str;" "0;" "Your trusty AI companion promised you a cake.;" ) # ------------------------------------------------------------------------ def _get_idn(self, cmd_params: list, cmd_args: list, bin_in: bytes) -> None: """ Get device identity and build information. Parameters ---------- cmd_params : list Command parameters indicated by '#' in the command. cmd_args : list Command arguments. bin_in : bytes Binary input data. Returns ---------- Raises ---------- """ self._data_out = ( "Qblox," "{}_{}," "whatever," "fwVersion=0.0.0 fwBuild=28/11/1967-00:00:00 fwHash=0xDEADBEAF fwDirty=0 " "kmodVersion=0.0.0 kmodBuild=15/07/1943-00:00:00 kmodHash=0x0D15EA5E kmodDirty=0 " "swVersion=0.0.0 swBuild=11/05/1924-00:00:00 swHash=0xBEEFBABE swDirty=0" ).format(self.instrument_class, self.instrument_type) # ------------------------------------------------------------------------ def _reset(self, cmd_params: list, cmd_args: list, bin_in: bytes) -> None: """ Reset dummy. Parameters ---------- cmd_params : list Command parameters indicated by '#' in the command. cmd_args : list Command arguments. bin_in : bytes Binary input data. Returns ---------- Raises ---------- """ self.close() # ------------------------------------------------------------------------ def _get_system_error(self, cmd_params: list, cmd_args: list, bin_in: bytes) -> None: """ Get system error from queue (see `SCPI <https://www.ivifoundation.org/docs/scpi-99.pdf>`_). Parameters ---------- cmd_params : list Command parameters indicated by '#' in the command. cmd_args : list Command arguments. bin_in : bytes Binary input data. Returns ---------- Raises ---------- """ if len(self._system_error) > 0: self._data_out = "0," + self._system_error[0] self._system_error = self._system_error[1:] else: self._data_out = "No error" # ------------------------------------------------------------------------ def _get_system_error_cnt(self, cmd_params: list, cmd_args: list, bin_in: bytes) -> None: """ Get number of system errors (see `SCPI <https://www.ivifoundation.org/docs/scpi-99.pdf>`_). Parameters ---------- cmd_params : list Command parameters indicated by '#' in the command. cmd_args : list Command arguments. bin_in : bytes Binary input data. Returns ---------- Raises ---------- """ self._data_out = str(len(self._system_error)) # ------------------------------------------------------------------------ def _get_system_state(self, cmd_params: list, cmd_args: list, bin_in: bytes) -> None: """ Get system status. Parameters ---------- cmd_params : list Command parameters indicated by '#' in the command. cmd_args : list Command arguments. bin_in : bytes Binary input data. Returns ---------- Raises ---------- """ self._data_out = ( "CRITICAL;" "TEMPERATURE OUT-OF-RANGE," )