#------------------------------------------------------------------------------
# Description : Pulsar QRM native interface
# Git repository : https://gitlab.com/qblox/packages/software/qblox_instruments.git
# Copyright (C) Qblox BV (2020)
#------------------------------------------------------------------------------
#-- include -------------------------------------------------------------------
import numpy
import struct
import time
#Add Qblox instruments build info
from qblox_instruments import build
#Add SCPI support
from pulsar_qrm.pulsar_qrm_scpi_ifc import pulsar_qrm_scpi_ifc, scpi_error_check
#-- class ---------------------------------------------------------------------
[docs]class pulsar_qrm_ifc(pulsar_qrm_scpi_ifc):
"""
Class that provides the native API for the Pulsar QrM. It provides methods to control all
functions and features provided by the Pulsar QRM, like sequencer, waveform and acquistion handling.
"""
#--------------------------------------------------------------------------
[docs] def __init__(self, transport_inst, debug = 0):
"""
Creates Pulsar QRM native interface object.
Parameters
----------
transport_inst : :class:`~ieee488_2.transport`
Transport class responsible for the lowest level of communication (e.g. ethernet).
debug : int
Debug level (0 = normal, 1 = no version check, >1 = no version or error checking).
Returns
----------
:class:`~pulsar_qrm.pulsar_qrm_ifc`
Pulsar QRM native interface object.
Raises
----------
Exception
Debug level is 0 and there is a version mismatch.
"""
#Initialize parent class.
super(pulsar_qrm_ifc, self).__init__(transport_inst, debug)
#--------------------------------------------------------------------------
def _get_scpi_commands(self):
"""
Get SCPI commands.
Parameters
----------
Returns
----------
dict
Dictionary containing all available SCPI commands, corresponding parameters, arguments and Python methods and finally a descriptive comment.
Raises
----------
Exception
An error is reported in system error and debug <= 1.
All errors are read from system error and listed in the exception.
"""
#Format command string
cmds = super()._get_scpi_commands()
cmd_elem_list = cmds.split(';')[:-1]
cmd_list = numpy.reshape(cmd_elem_list, (int(len(cmd_elem_list) / 9), 9))
cmd_dict = {cmd[0]: {"scpi_in_type": cmd[1].split(',') if cmd[1] != "None" and cmd[1] != "" else [],
"scpi_out_type": cmd[2].split(',') if cmd[2] != "None" and cmd[2] != "" else [],
"python_func": cmd[3],
"python_in_type": cmd[4].split(',') if cmd[4] != "None" and cmd[4] != "" else [],
"python_in_var": cmd[5].split(',') if cmd[5] != "None" and cmd[5] != "" else [],
"python_out_type": cmd[6].split(',') if cmd[6] != "None" and cmd[6] != "" else [],
"comment": cmd[8]} for cmd in cmd_list}
return cmd_dict
#--------------------------------------------------------------------------
[docs] def get_idn(self):
"""
Get device identity and build information.
Parameters
----------
Returns
----------
dict
Dictionary containing manufacturer, model, serial number and build information. The build information is subdivided into FPGA firmware,
kernel module software, application software and driver software build information. Each of those consist of the version, build date,
build Git hash and Git build dirty indication.
Raises
----------
Exception
An error is reported in system error and debug <= 1.
All errors are read from system error and listed in the exception.
"""
#Format IDN string
idn = self._get_idn()
idn_elem_list = idn.split(',')
idn_build_list = idn_elem_list[-1].split(' ')
idn_dict = {"manufacturer": idn_elem_list[0],
"model": idn_elem_list[1],
"serial_number": idn_elem_list[2],
"firmware": {"fpga": {"version": idn_build_list[0].split("=")[-1],
"date": idn_build_list[1].split("=")[-1],
"hash": idn_build_list[2].split("=")[-1],
"dirty": int(idn_build_list[3].split("=")[-1]) > 0},
"kernel_mod": {"version": idn_build_list[4].split("=")[-1],
"date": idn_build_list[5].split("=")[-1],
"hash": idn_build_list[6].split("=")[-1],
"dirty": int(idn_build_list[7].split("=")[-1]) > 0},
"application": {"version": idn_build_list[8].split("=")[-1],
"date": idn_build_list[9].split("=")[-1],
"hash": idn_build_list[10].split("=")[-1],
"dirty": int(idn_build_list[11].split("=")[-1]) > 0},
"driver": build.get_build_info()}}
return idn_dict
#--------------------------------------------------------------------------
[docs] def get_system_status(self):
"""
Get general system state.
Parameters
----------
Returns
----------
dict
Dictionary containing general status and corresponding flags:
:Status:
- OKAY: System is okay.
- CRITICAL: An error indicated by the flags occured, but has been resolved.
- ERROR: An error indicated by the flags is occuring.
:Flags:
- CARRIER_PLL_UNLOCK: Carrier board PLL is unlocked.
- FPGA_PLL_UNLOCK: FPGA PLL is unlocked.
- FPGA_TEMP_OR: FPGA temperature is out-of-range.
- CARRIER_TEMP_OR: Carrier board temperature is out-of-range.
- AFE_TEMP_OR: Analog frontend board temperature is out-of-range.
Raises
----------
Exception
An error is reported in system error and debug <= 1.
All errors are read from system error and listed in the exception.
"""
#Format status string
status = self._get_system_status()
status_elem_list = status.split(';')
status_flag_list = status_elem_list[-1].split(',')[:-1] if status_elem_list[-1] != '' else []
status_dict = {"status": status_elem_list[0],
"flags": status_flag_list}
return status_dict
@staticmethod
#--------------------------------------------------------------------------
def _get_acq_scope_config_format():
"""
Get format for converting the configuration dictionary to a C struct.
Parameters
----------
Returns
----------
Raises
----------
"""
acq_scope_cfg_format = 'I????II'
acq_scope_float_cfg_format = "ff"
return acq_scope_cfg_format + acq_scope_float_cfg_format
#--------------------------------------------------------------------------
def _set_acq_scope_config(self, cfg_dict):
"""
Set configuration of the scope acquisition. The configuration consists of multiple parameters in a C struct format.
If an invalid sequencer index is given or the configation struct does not have the correct format, an error is set in
system error.
Parameters
----------
config : dict
Configuration dictionary.
Returns
----------
Raises
----------
Exception
Invalid input parameter type.
Exception
An error is reported in system error and debug <= 1.
All errors are read from system error and listed in the exception.
"""
#Get current configuration and merge dictionaries
cfg_dict = {**self._get_acq_scope_config(), **cfg_dict}
#Set new configuration
cfg = [#Scope acquisition
cfg_dict["sel_acq"], #Acquisition select
cfg_dict["avg_en_acq_path_0"], #Averaging enable path 0
cfg_dict["avg_en_acq_path_1"], #Averaging enable path 1
cfg_dict["trig_mode_acq_path_0"], #Trigger mode path 0
cfg_dict["trig_mode_acq_path_1"], #Trigger mode path 1
0, #Trigger level path 0 (unused)
0, #Trigger level path 1 (unused)
#Scope acquisition floating point values to be converted
cfg_dict["trig_lvl_acq_path_0_float"], #Trigger level path 0 as float
cfg_dict["trig_lvl_acq_path_1_float"]] #Trigger level path 1 as float
super()._set_acq_scope_config(struct.pack(pulsar_qrm_ifc._get_acq_scope_config_format(), *cfg))
#--------------------------------------------------------------------------
def _get_acq_scope_config(self):
"""
Get configuration of the scope acquisition. The configuration consists of multiple parameters in a C struct format.
If an invalid sequencer index is given, an error is set in system error.
Parameters
----------
Returns
----------
bytes
Configuration struct.
Raises
----------
Exception
Invalid input parameter type.
Exception
An error is reported in system error and debug <= 1.
All errors are read from system error and listed in the exception.
"""
cfg = struct.unpack(pulsar_qrm_ifc._get_acq_scope_config_format(), super()._get_acq_scope_config())
cfg_dict = {#Scope acquisition
"sel_acq": cfg[0],
"avg_en_acq_path_0": cfg[1],
"avg_en_acq_path_1": cfg[2],
"trig_mode_acq_path_0": cfg[3],
"trig_mode_acq_path_1": cfg[4],
"trig_lvl_acq_path_0": cfg[5],
"trig_lvl_acq_path_1": cfg[6],
#Scope acquisition floating point values
"trig_lvl_acq_path_0_float": cfg[7],
"trig_lvl_acq_path_1_float": cfg[8]}
return cfg_dict
#--------------------------------------------------------------------------
def _set_sequencer_program(self, sequencer, program):
"""
Assemble and set Q1ASM program for the indexed sequencer. If assembling failes, an exception is thrown with the
assembler log.
Parameters
----------
sequencer : int
Sequencer index.
program : str
Q1ASM program.
Returns
----------
Raises
----------
Exception
Invalid input parameter type.
Exception
Assembly failed.
"""
try:
super()._set_sequencer_program(sequencer, program)
except:
print(self.get_assembler_log())
raise
@staticmethod
#--------------------------------------------------------------------------
def _get_sequencer_cfg_format():
"""
Get format for converting the configuration dictionary to a C struct.
Parameters
----------
Returns
----------
Raises
----------
"""
seq_proc_cfg_format = '?I'
awg_cfg_format = '??IIIIIIIIIII?IIIIIII??I'
awg_float_cfg_format = 'fdffffff'
acq_cfg_format = 'II?IIIq'
acq_float_cfg_format = 'ffd'
return seq_proc_cfg_format + awg_cfg_format + awg_float_cfg_format + acq_cfg_format + acq_float_cfg_format
#--------------------------------------------------------------------------
def _set_sequencer_config(self, sequencer, cfg_dict):
"""
Set configuration of the indexed sequencer. The configuration consists dictionary containing multiple parameters
that will be converted into a C struct supported by the Pulsar QRM.
Parameters
----------
sequencer : int
Sequencer index.
config : dict
Configuration dictionary.
Returns
----------
Raises
----------
Exception
Invalid input parameter type.
Exception
An error is reported in system error and debug <= 1.
All errors are read from system error and listed in the exception.
"""
#Get current configuration and merge dictionaries
cfg_dict = {**self._get_sequencer_config(sequencer), **cfg_dict}
#Set new configuration
cfg = [#Sequence processor
cfg_dict["sync_en"], #Sequence processor synchronization enable
0, #Sequence processor program counter start (unused)
#AWG
cfg_dict["cont_mode_en_awg_path_0"], #Continuous mode enable for AWG path 0
cfg_dict["cont_mode_en_awg_path_1"], #Continuous mode enable for AWG path 1
cfg_dict["cont_mode_waveform_idx_awg_path_0"], #continuous mode waveform index for AWG path 0
cfg_dict["cont_mode_waveform_idx_awg_path_1"], #Continuous mode waveform index for AWG path 1
cfg_dict["upsample_rate_awg_path_0"], #Upsample rate for AWG path 0
cfg_dict["upsample_rate_awg_path_1"], #Upsample rate for AWG path 1
0, #Gain for AWG path 0 (unused)
0, #Gain for AWG path 1 (unused)
0, #Offset for AWG path 0 (unused)
0, #Offset for AWG path 1 (unused)
0, #Phase increment; ultra-fine (unused)
0, #Phase increment; fine (unused)
0, #Phase increment; coarse (unused)
0, #Phase increment; sign (unused)
0, #Phase; ultra-fine (unused)
0, #Phase; fine (unused)
0, #Phase; coarse (unused)
0, #Mixer correction matrix a11 (unsued)
0, #Mixer correction matrix a12 (unsued)
0, #Mixer correction matrix a21 (unsued)
0, #Mixer correction matrix a22 (unsued)
cfg_dict["mod_en_awg"], #Modulation enable for AWG paths 0 and 1
cfg_dict["mrk_ovr_en"], #Marker override enable
cfg_dict["mrk_ovr_val"], #Marker override value
#AWG floating point values to be converted
cfg_dict["phase_offs_degree"], #Phase offset in degrees
cfg_dict["freq_hz"], #Frequency in Hz
cfg_dict["gain_awg_path_0_float"], #Gain for AWG path 0 as float
cfg_dict["gain_awg_path_1_float"], #Gain for AWG path 1 as float
cfg_dict["offset_awg_path_0_float"], #Offset for AWG path 0 as float
cfg_dict["offset_awg_path_1_float"], #Offset for AWG path 1 as float
cfg_dict["mixer_corr_phase_offset_degree_float"], #NCO compensation for mixer: phase offset
cfg_dict["mixer_corr_gain_ratio_float"], #NCO compensation for mixer: gain imbalance
#Acquisition
cfg_dict["upsample_rate_acq_path_0"], #Upsample rate for acquisition path 0
cfg_dict["upsample_rate_acq_path_1"], #Upsample rate for acquisition path 1
cfg_dict["demod_en_acq"], #Demodulation enable
cfg_dict["non_weighed_integration_len"], #Non weighed integration length
0, #Rotation matrix A11 (unused)
0, #Rotation matrix A12 (unused)
0, #Discretization threshold (unused)
#Acquisition floating point values to be converted
cfg_dict["rotation_matrix_a11"], #Rotation matrix A11
cfg_dict["rotation_matrix_a12"], #Rotation matrix A12
cfg_dict["discr_threshold"]] #Discretization threshold
super()._set_sequencer_config(sequencer, struct.pack(pulsar_qrm_ifc._get_sequencer_cfg_format(), *cfg))
#--------------------------------------------------------------------------
def _get_sequencer_config(self, sequencer):
"""
Get configuration of the indexed sequencer. The configuration consists dictionary containing multiple parameters
that will be converted from a C struct provided by the Pulsar QRM.
Parameters
----------
sequencer : int
Sequencer index.
Returns
----------
dict
Configuration dictionary.
Raises
----------
Exception
Invalid input parameter type.
Exception
An error is reported in system error and debug <= 1.
All errors are read from system error and listed in the exception.
"""
cfg = struct.unpack(pulsar_qrm_ifc._get_sequencer_cfg_format(), super()._get_sequencer_config(sequencer))
cfg_dict = {#Sequence processor
"sync_en": cfg[0],
#AWG
"cont_mode_en_awg_path_0": cfg[2],
"cont_mode_en_awg_path_1": cfg[3],
"cont_mode_waveform_idx_awg_path_0": cfg[4],
"cont_mode_waveform_idx_awg_path_1": cfg[5],
"upsample_rate_awg_path_0": cfg[6],
"upsample_rate_awg_path_1": cfg[7],
"mod_en_awg": cfg[23],
"mrk_ovr_en": cfg[24],
"mrk_ovr_val": cfg[25],
#AWG floating point values
"phase_offs_degree": cfg[26],
"freq_hz": cfg[27],
"gain_awg_path_0_float": cfg[28],
"gain_awg_path_1_float": cfg[29],
"offset_awg_path_0_float": cfg[30],
"offset_awg_path_1_float": cfg[31],
"mixer_corr_phase_offset_degree_float": cfg[32],
"mixer_corr_gain_ratio_float": cfg[33],
#Acquisition
"upsample_rate_acq_path_0": cfg[34],
"upsample_rate_acq_path_1": cfg[35],
"demod_en_acq": cfg[36],
"non_weighed_integration_len": cfg[37],
#Acquisition floating point values
"rotation_matrix_a11": cfg[41],
"rotation_matrix_a12": cfg[42],
"discr_threshold": cfg[43]}
return cfg_dict
#--------------------------------------------------------------------------
def _set_sequencer_config_rotation_matrix(self, sequencer, phase_incr):
"""
Sets the integration result phase rotation matrix in the acquisition path.
Parameters
----------
sequencer : int
Sequencer index.
phase_incr : float
Phase increment in degrees.
Returns
----------
Raises
----------
Exception
Invalid input parameter type.
Exception
An error is reported in system error and debug <= 1.
All errors are read from system error and listed in the exception.
"""
self._set_sequencer_config(sequencer, {"rotation_matrix_a11": numpy.cos(numpy.deg2rad(360 - phase_incr)),
"rotation_matrix_a12": numpy.sin(numpy.deg2rad(360 - phase_incr))})
#--------------------------------------------------------------------------
def _get_sequencer_config_rotation_matrix(self, sequencer):
"""
Gets the integration result phase rotation matrix in the acquisition path.
Parameters
----------
sequencer : int
Sequencer index.
Returns
----------
float
Phase increment in degrees.
Raises
----------
Exception
Invalid input parameter type.
Exception
An error is reported in system error and debug <= 1.
All errors are read from system error and listed in the exception.
"""
cfg = self._get_sequencer_config(sequencer)
phase_incr = numpy.angle(cfg["rotation_matrix_a11"] + cfg["rotation_matrix_a12"] * 1j, deg=True)
return 0 if phase_incr == 0 else \
360 - phase_incr if phase_incr >= 0 else \
-1.0 * phase_incr
#---------------------------------------------------------------------------
def _set_sequencer_channel_map(self, sequencer, output, enable):
"""
Set enable of the indexed sequencer's path to output.
If an invalid sequencer index is given or the channel map is not valid, an error is set in
system error.
Parameters
----------
sequencer : int
Sequencer index.
output : int
output index.
enable : bool
Sequencer path to output enable
Returns
----------
Raises
----------
Exception
Invalid input parameter type.
Exception
An error is reported in system error and debug <= 1.
All errors are read from system error and listed in the exception.
"""
channel_map_bin = super()._get_sequencer_channel_map(sequencer)
channel_map = list(struct.unpack("I"*int(len(channel_map_bin)/4),channel_map_bin))
if not output in channel_map and enable:
channel_map.append(output)
elif output in channel_map and not enable:
channel_map.remove(output)
super()._set_sequencer_channel_map(sequencer, struct.pack('I'*len(channel_map),*channel_map))
#---------------------------------------------------------------------------
def _get_sequencer_channel_map(self, sequencer, output):
"""
Get enable of the indexed sequencer's path to output.
If an invalid sequencer index is given or the channel map is not valid, an error is set in
system error.
Parameters
----------
sequencer : int
Sequencer index.
output : int
output index.
Returns
----------
bytes
channel map list.
Raises
----------
Exception
Invalid input parameter type.
Exception
An error is reported in system error and debug <= 1.
All errors are read from system error and listed in the exception.
"""
channel_map_bin = super()._get_sequencer_channel_map(sequencer)
channel_map = list(struct.unpack("I"*int(len(channel_map_bin)/4),channel_map_bin))
return output in channel_map
#--------------------------------------------------------------------------
[docs] def arm_sequencer(self, sequencer=None):
"""
Prepare the indexed sequencer to start by putting it in the armed state. If no sequencer index is given, all sequencers are armed.
Any sequencer that was already running is stopped and rearmed. If an invalid sequencer index is given, an error is set in system error.
Parameters
----------
sequencer : int
Sequencer index.
Returns
----------
Raises
----------
Exception
Invalid input parameter type.
Exception
An error is reported in system error and debug <= 1.
All errors are read from system error and listed in the exception.
"""
if sequencer is not None:
self._arm_sequencer(sequencer)
else:
#Arm all sequencers (SCPI call)
scpi_error_check(
self._write('SEQuencer:ARM')
)
#--------------------------------------------------------------------------
[docs] def start_sequencer(self, sequencer=None):
"""
Start the indexed sequencer, thereby putting it in the running state. If an invalid sequencer index is given or the indexed sequencer was
not yet armed, an error is set in system error. If no sequencer index is given, all armed sequencers are started and any sequencer not in
the armed state is ignored. However, if no sequencer index is given and no sequencers are armed, and error is set in system error.
Parameters
----------
sequencer : int
Sequencer index.
Returns
----------
Raises
----------
Exception
Invalid input parameter type.
Exception
An error is reported in system error and debug <= 1.
All errors are read from system error and listed in the exception.
"""
if sequencer is not None:
self._start_sequencer(sequencer)
else:
#Start all sequencers (SCPI call)
scpi_error_check(
self._write('SEQuencer:START')
)
#--------------------------------------------------------------------------
[docs] def stop_sequencer(self, sequencer=None):
"""
Stop the indexed sequencer, thereby putting it in the stopped state. If an invalid sequencer index is given, an error is set in system error.
If no sequencer index is given, all sequencers are stopped.
Parameters
----------
sequencer : int
Sequencer index.
Returns
----------
Raises
----------
Exception
Invalid input parameter type.
Exception
An error is reported in system error and debug <= 1.
All errors are read from system error and listed in the exception.
"""
if sequencer is not None:
self._stop_sequencer(sequencer)
else:
#Stop all sequencers (SCPI call)
scpi_error_check(
self._write('SEQuencer:STOP')
)
#--------------------------------------------------------------------------
[docs] def get_sequencer_state(self, sequencer, timeout=0, timeout_poll_res=0.1):
"""
Get the sequencer state. If an invalid sequencer index is given, an error is set in system error. If the timeout is set to zero, the function returns the state immediately.
If a positive non-zero timeout is set, the function blocks until the sequencer completes. If the sequencer hasn't stopped before the timeout expires, a timeout exception
is thrown.
Parameters
----------
sequencer : int
Sequencer index.
timeout : int
Timeout in minutes.
timeout_poll_res : float
Timeout polling resolution in seconds.
Returns
----------
list
Concatinated list of strings separated by the semicolon character. Status is indicated by one status string and an optional number of flags respectively ordered as:
:Status:
- IDLE: Sequencer waiting to be armed and started.
- ARMED: Sequencer is armed and ready to start.
- RUNNING: Sequencer is running.
- Q1 STOPPED: Classical part of the sequencer has stopped; waiting for real-time part to stop.
- STOPPED: Sequencer has completely stopped.
:Flags:
- DISARMED: Sequencer was disarmed.
- FORCED STOP: Sequencer was stopped while still running.
- SEQUENCE PROCESSOR Q1 ILLEGAL INSTRUCTION: Classical sequencer part executed an unknown instruction.
- SEQUENCE PROCESSOR RT EXEC ILLEGAL INSTRUCTION: Real-time sequencer part executed an unknown instruction.
- AWG WAVE PLAYBACK INDEX INVALID PATH 0: AWG path 0 tried to play an unknown waveform.
- AWG WAVE PLAYBACK INDEX INVALID PATH 1: AWG path 1 tried to play an unknown waveform.
- ACQ WEIGHT PLAYBACK INDEX INVALID PATH 0: Acquisition path 0 tried to play an unknown weight.
- ACQ WEIGHT PLAYBACK INDEX INVALID PATH 1: Acquisition path 1 tried to play an unknown weight.
- ACQ SCOPE DONE PATH 0: Scope acquisition for path 0 has finished.
- ACQ SCOPE OUT-OF-RANGE PATH 0: Scope acquisition data for path 0 was out-of-range.
- ACQ SCOPE OVERWRITTEN PATH 0: Scope acquisition data for path 0 was overwritten.
- ACQ SCOPE DONE PATH 1: Scope acquisition for path 1 has finished.
- ACQ SCOPE OUT-OF-RANGE PATH 1: Scope acquisition data for path 1 was out-of-range.
- ACQ SCOPE OVERWRITTEN PATH 1: Scope acquisition data for path 1 was overwritten.
- ACQ BINNING DONE: Acquisition binning completed.
- ACQ BINNING FIFO ERROR: Acqusition binning encountered internal FIFO error.
- ACQ BINNING COMM ERROR: Acqusition binning encountered internal communication error.
- ACQ BINNING OUT-OF-RANGE: Acquisition binning data out-of-range.
- ACQ INDEX INVALID: Acquisition tried to process an invalid acquisition.
- ACQ BIN INDEX INVALID: Acquisition tried to process an invalid bin.
- CLOCK INSTABILITY: Clock source instability occurred.
Raises
----------
Exception
Invalid input parameter type.
Exception
An error is reported in system error and debug <= 1.
All errors are read from system error and listed in the exception.
TypeError
Timeout is not an integer.
TimeoutError
Timeout
"""
#Check timeout value
if not isinstance(timeout, int):
raise TypeError("Timeout is not an integer.")
#Format status string
status = self._get_sequencer_state(sequencer)
status_elem_list = status.split(';')
status_flag_list = status_elem_list[-1].split(',')[:-1] if status_elem_list[-1] != '' else []
status_dict = {"status": status_elem_list[0],
"flags": status_flag_list}
elapsed_time = 0.0
timeout = timeout * 60.0
while (status_dict["status"] == "RUNNING" or status_dict["status"] == "Q1 STOPPED") and elapsed_time < timeout:
time.sleep(timeout_poll_res)
status_dict = self.get_sequencer_state(sequencer)
elapsed_time += timeout_poll_res
if elapsed_time >= timeout:
raise TimeoutError("Sequencer {} did not stop in timeout period of {} minutes.".format(sequencer, int(timeout / 60)))
return status_dict
#--------------------------------------------------------------------------
def _add_awg_waveform(self, sequencer, name, waveform, index=None):
"""
Add new waveform to AWG waveform list of indexed sequencer's AWG path. If an invalid sequencer index is given or if the waveform causes the waveform memory limit to be exceeded or
if the waveform samples are out-of-range, an error is set in the system error. The waveform names 'all' and 'ALL' are reserved and adding waveforms with those names will also result
in an error being set in system error. The optional index argument is used to specify an index for the waveform in the waveform list which is used by the sequencer Q1ASM program to
refer to the waveform. If no index is given, the next available waveform index is selected (starting from 0). If an invalid waveform index is given, an error is set in system error.
Parameters
----------
sequencer : int
Sequencer index.
name : str
Waveform name.
waveform : list
List of floats in the range of 1.0 to -1.0 representing the waveform.
index : int
Waveform index of the waveform in the waveform list.
Returns
----------
Raises
----------
Exception
Invalid input parameter type.
Exception
An error is reported in system error and debug <= 1.
All errors are read from system error and listed in the exception.
"""
super()._add_awg_waveform(sequencer, name, len(waveform), False)
self._set_awg_waveform_data(sequencer, name, waveform)
if index is not None:
self._set_awg_waveform_index(sequencer, name, index)
#---------------------------------------------------------------------------
@scpi_error_check
def _get_awg_waveforms(self, sequencer):
"""
Get all waveforms in waveform list of indexed sequencer's AWG path. If an invalid sequencer index is given, an error is set in system error.
Parameters
----------
sequencer : int
Sequencer index.
Returns
----------
dict
Dictionary with waveforms.
Raises
----------
Exception
Invalid input parameter type.
Exception
An error is reported in system error and debug <= 1.
All errors are read from system error and listed in the exception.
"""
#Check input types.
self._check_in_type(locals(), ['int'])
#SCPI call
num_waveforms = struct.unpack('I', self._read_bin('SEQuencer{}:AWG:WLISt?'.format(sequencer), False))[0]
if num_waveforms == 0:
self._flush_line_end()
waveform_dict = {}
for wave_it in range(0, num_waveforms):
#Get name and index
name = str(self._read_bin('', False), 'utf-8')
index = struct.unpack('I', self._read_bin('', False))[0]
#Get data
if wave_it < (num_waveforms - 1):
data = self._read_bin('', False)
else:
data = self._read_bin('', True)
data = struct.unpack('f'*int(len(data)/4), data)
#Add to dictionary
waveform_dict[name] = {"index": index,
"data": list(data)}
return waveform_dict
#--------------------------------------------------------------------------
def _add_acq_weight(self, sequencer, name, weight, index=None):
"""
Add new weight to acquisition weight list of indexed sequencer's acquisition path. If an invalid sequencer index is given or if the weight causes the weight memory limit to
be exceeded or if the weight samples are out-of-range, an error is set in the system error. The weight names 'all' and 'ALL' are reserved and adding weights with those names
will also result in an error being set in system error. The optional index argument is used to specify an index for the weight in the weight list which is used by the sequencer
Q1ASM program to refer to the weight. If no index is given, the next available weight index is selected (starting from 0). If an invalid weight index is given, an error is set
in system error.
Parameters
----------
sequencer : int
Sequencer index.
name : str
Weight name.
weight : list
List of floats in the range of 1.0 to -1.0 representing the weight.
index : int
Weight index of the weight in the weight list.
Returns
----------
Raises
----------
Exception
Invalid input parameter type.
Exception
An error is reported in system error and debug <= 1.
All errors are read from system error and listed in the exception.
"""
super()._add_acq_weight(sequencer, name, len(weight), False)
super()._set_acq_weight_data(sequencer, name, weight)
if index is not None:
self._set_acq_weight_index(sequencer, name, index)
#---------------------------------------------------------------------------
@scpi_error_check
def _get_acq_weights(self, sequencer):
"""
Get all weights in weight list of indexed sequencer's acquisition path. If an invalid sequencer index is given, an error is set in system error.
Parameters
----------
sequencer : int
Sequencer index.
Returns
----------
dict
Dictionary with weights.
Raises
----------
Exception
Invalid input parameter type.
Exception
An error is reported in system error and debug <= 1.
All errors are read from system error and listed in the exception.
"""
#Check input types.
self._check_in_type(locals(), ['int'])
#SCPI call
num_weights = struct.unpack('I', self._read_bin('SEQuencer{}:ACQ:WLISt?'.format(sequencer), False))[0]
if num_weights == 0:
self._flush_line_end()
weight_dict = {}
for weight_it in range(0, num_weights):
#Get name and index
name = str(self._read_bin('', False), 'utf-8')
index = struct.unpack('I', self._read_bin('', False))[0]
#Get data
if weight_it < (num_weights - 1):
data = self._read_bin('', False)
else:
data = self._read_bin('', True)
data = struct.unpack('f'*int(len(data)/4), data)
#Add to dictionary
weight_dict[name] = {"index": index,
"data": list(data)}
return weight_dict
#--------------------------------------------------------------------------
def _add_acq_acquisition(self, sequencer, name, num_bins, index=None):
"""
Add new acquisition to acquisition list of indexed sequencer's acquisition path. If an invalid sequencer index is given or if the required acquisition memory
cannot be allocated, an error is set in system error. The acquisition names 'all' and 'ALL' are reserved and adding those will also result in an
error being set in system error. If no index is given, the next available weight index is selected (starting from 0). If an invalid weight index is given, an
error is set in system error.
Parameters
----------
sequencer : int
Sequencer index.
name : str
Acquisition name.
num_bins : int
Number of bins in acquisition. Maximum is 2^24.
index : int
Waveform index of the acquisition in the acquisition list.
Returns
----------
Raises
----------
Exception
Invalid input parameter type.
Exception
An error is reported in system error and debug <= 1.
All errors are read from system error and listed in the exception.
"""
super()._add_acq_acquisition(sequencer, name, num_bins)
if index is not None:
self._set_acq_acquisition_index(sequencer, name, index)
#---------------------------------------------------------------------------
def _get_acq_data_and_convert(self, scpi_cmd, flush_line_end):
"""
Get acquisition data and convert it to a dictionary.
Parameters
----------
scpi_cmd : str
SCPI command string used to set initial SCPI command.
flush_line_end : bool
Indication to flush final characters after final read.
Returns
----------
dict
Dictionary with data of single acquisition.
Raises
----------
"""
acquisition_dict = {"scope": {"path0": {"data": [],
"out-of-range": False,
"avg_cnt": 0},
"path1": {"data": [],
"out-of-range": False,
"avg_cnt": 0}},
"bins": {"integration": {"path0": [],
"path1": []},
"threshold": [],
"avg_cnt": []}}
sample_width = 12
max_sample_value = 2**(sample_width-1)-1
#Retrieve scope data
scope_data = self._read_bin(scpi_cmd, False)
acquisition_dict["scope"]["path0"]["data"] = list(struct.unpack('i'*int(len(scope_data)/4), scope_data))
acquisition_dict["scope"]["path0"]["out-of-range"] = struct.unpack('?', self._read_bin('', False))[0]
acquisition_dict["scope"]["path0"]["avg_cnt"] = struct.unpack('I', self._read_bin('', False))[0]
scope_data = self._read_bin('', False)
acquisition_dict["scope"]["path1"]["data"] = list(struct.unpack('i'*int(len(scope_data)/4), scope_data))
acquisition_dict["scope"]["path1"]["out-of-range"] = struct.unpack('?', self._read_bin('', False))[0]
acquisition_dict["scope"]["path1"]["avg_cnt"] = struct.unpack('I', self._read_bin('', False))[0]
#Normalize scope data
avg_cnt = acquisition_dict["scope"]["path0"]["avg_cnt"]
acquisition_dict["scope"]["path0"]["data"] = [val / max_sample_value if avg_cnt == 0 else val / max_sample_value / avg_cnt for val in acquisition_dict["scope"]["path0"]["data"]]
avg_cnt = acquisition_dict["scope"]["path1"]["avg_cnt"]
acquisition_dict["scope"]["path1"]["data"] = [val / max_sample_value if avg_cnt == 0 else val / max_sample_value / avg_cnt for val in acquisition_dict["scope"]["path1"]["data"]]
#Retrieve bin data
bins = self._read_bin('', flush_line_end)
bin_format = 'QqqLL'
num_bins = int(len(bins) / struct.calcsize('=' + bin_format))
bins = struct.unpack('=' + bin_format * num_bins, bins)
for bin_idx in range(0, num_bins):
valid = bins[bin_idx * len(bin_format)]
if bool(valid):
bin_path0 = bins[bin_idx * len(bin_format) + 1]
bin_path1 = bins[bin_idx * len(bin_format) + 2]
thres = bins[bin_idx * len(bin_format) + 3]
avg_cnt = bins[bin_idx * len(bin_format) + 4]
#Normalize
bin_path0 = bin_path0 / (max_sample_value**2) if avg_cnt == 0 else bin_path0 / (max_sample_value**2) / avg_cnt
bin_path1 = bin_path1 / (max_sample_value**2) if avg_cnt == 0 else bin_path1 / (max_sample_value**2) / avg_cnt
thres = float(thres) if avg_cnt == 0 else thres / avg_cnt
else:
bin_path0 = float("NaN")
bin_path1 = float("NaN")
thres = float("NaN")
avg_cnt = float("NaN")
acquisition_dict["bins"]["integration"]["path0"].append(bin_path0)
acquisition_dict["bins"]["integration"]["path1"].append(bin_path1)
acquisition_dict["bins"]["threshold"].append(thres)
acquisition_dict["bins"]["avg_cnt"].append(avg_cnt)
return acquisition_dict
#---------------------------------------------------------------------------
@scpi_error_check
def _get_acq_acquisition_data(self, sequencer, name):
"""
Get acquisition data of acquisition in acquisition list of indexed sequencer's acquisition path. The acquisition scope and bin data is normalized to a
range of -1.0 to 1.0 taking both the bit widths of the processing path and average count into considaration. For the binned integration results, the integration
length is not handled during normalization and therefore these values have to be divided by their respective integration lenghts. If an invalid sequencer
index is given or if a non-existing acquisition name is given, an error is set in system error.
Parameters
----------
sequencer : int
Sequencer index.
name : str
Acquisition name.
Returns
----------
dict
Dictionary with data of single acquisition.
Raises
----------
Exception
Invalid input parameter type.
Exception
An error is reported in system error and debug <= 1.
All errors are read from system error and listed in the exception.
"""
#Check input types.
self._check_in_type(locals(), ['int', 'str'])
#SCPI call
return self._get_acq_data_and_convert('SEQuencer{}:ACQ:ALISt:ACQuisition:DATA? "{}"'.format(sequencer, name), True)
#---------------------------------------------------------------------------
@scpi_error_check
def _get_acq_acquisitions(self, sequencer):
"""
Get all acquisitions in acquisition list of indexed sequencer's acquisition path. If an invalid sequencer index is given, an error is set in system error.
Parameters
----------
sequencer : int
Sequencer index.
Returns
----------
dict
Dictionary with acquisitions.
Raises
----------
Exception
Invalid input parameter type.
Exception
An error is reported in system error and debug <= 1.
All errors are read from system error and listed in the exception.
"""
#Check input types.
self._check_in_type(locals(), ['int'])
#SCPI call
num_acq = struct.unpack('I', self._read_bin('SEQuencer{}:ACQ:ALISt?'.format(sequencer), False))[0]
if num_acq == 0:
self._flush_line_end()
acquisition_dict = {}
for acq_it in range(0, num_acq):
#Get name and index
name = str(self._read_bin('', False), 'utf-8')
index = struct.unpack('I', self._read_bin('', False))[0]
#Get data
if acq_it < (num_acq - 1):
acq = self._get_acq_data_and_convert('', False)
else:
acq = self._get_acq_data_and_convert('', True)
#Add to dictionary
acquisition_dict[name] = {"index": index,
"acquisition": acq}
return acquisition_dict
#--------------------------------------------------------------------------
def _add_waveforms(self, sequencer, waveform_dict):
"""
Add all waveforms in JSON compatible dictionary to the AWG waveform list of indexed
sequencer. The dictionary must be structured as follows:
- name: waveform name.
- data: waveform samples in a range of 1.0 to -1.0.
- index: optional waveform index used by the sequencer Q1ASM program to refer to the waveform.
Parameters
----------
sequencer : int
Sequencer index.
waveform_dict : dict
JSON compatible dictionary with one or more waveforms and weigths.
Returns
----------
Raises
----------
Exception
Invalid input parameter type.
Exception
An error is reported in system error and debug <= 1.
All errors are read from system error and listed in the exception.
KeyError
Missing waveform data of waveform in dictionary.
"""
for name in waveform_dict:
if "data" in waveform_dict[name]:
if "index" in waveform_dict[name]:
self._add_awg_waveform(sequencer, name, waveform_dict[name]["data"], waveform_dict[name]["index"])
else:
self._add_awg_waveform(sequencer, name, waveform_dict[name]["data"])
else:
raise KeyError("Missing data key for {} in AWG waveform dictionary".format(name))
#--------------------------------------------------------------------------
def _delete_waveforms(self, sequencer):
"""
Delete all waveforms in the AWG waveform list of indexed sequencer.
Parameters
----------
sequencer : int
Sequencer index.
Returns
----------
Raises
----------
Exception
Invalid input parameter type.
Exception
An error is reported in system error and debug <= 1.
All errors are read from system error and listed in the exception.
"""
self._delete_awg_waveform(sequencer, "all")
#--------------------------------------------------------------------------
#--------------------------------------------------------------------------
def _add_weights(self, sequencer, weight_dict):
"""
Add all weights in JSON compatible dictionary to the aquisition weight list of indexed
sequencer. The dictionary must be structured as follows:
- name : weight name.
- data: weight samples in a range of 1.0 to -1.0.
- index: optional waveweightform index used by the sequencer Q1ASM program to refer to the weight.
Parameters
----------
sequencer : int
Sequencer index.
weight_dict : dict
JSON compatible dictionary with one or more weigths.
Returns
----------
Raises
----------
Exception
Invalid input parameter type.
Exception
An error is reported in system error and debug <= 1.
All errors are read from system error and listed in the exception.
KeyError
Missing weight data of weight in dictionary.
"""
for name in weight_dict:
if "data" in weight_dict[name]:
if "index" in weight_dict[name]:
self._add_acq_weight(sequencer, name, weight_dict[name]["data"], weight_dict[name]["index"])
else:
self._add_acq_weight(sequencer, name, weight_dict[name]["data"])
else:
raise KeyError("Missing data key for {} in acquisiton weight dictionary".format(name))
#--------------------------------------------------------------------------
def _delete_weights(self, sequencer):
"""
Delete all weights in the acquisition weights lists of indexed sequencer.
Parameters
----------
sequencer : int
Sequencer index.
Returns
----------
Raises
----------
Exception
Invalid input parameter type.
Exception
An error is reported in system error and debug <= 1.
All errors are read from system error and listed in the exception.
"""
self._delete_acq_weight(sequencer, "all")
#--------------------------------------------------------------------------
[docs] def get_weights(self, sequencer):
"""
Get all weigths in the acquisition weight lists of indexed sequencer.
The returned dictionary is structured as follows:
-name : weight name.
- data: weight samples in a range of 1.0 to -1.0.
- index: weight index used by the sequencer Q1ASM program to refer to the weight.
Parameters
----------
sequencer : int
Sequencer index.
Returns
----------
dict
Dictionary with weights.
Raises
----------
Exception
Invalid input parameter type.
Exception
An error is reported in system error and debug <= 1.
All errors are read from system error and listed in the exception.
"""
return self._get_acq_weights(sequencer)
#--------------------------------------------------------------------------
[docs] def get_acquisition_state(self, sequencer, timeout=0, timeout_poll_res=0.1):
"""
Return acquisition binning completion state of the indexed sequencer. If an invalid sequencer is given, an error is
set in system error. If the timeout is set to zero, the function returns the state immediately. If a positive non-zero timeout is set,
the function blocks until the acquisition binning completes. If the acquisition hasn't completed before the timeout expires, a
timeout exception is thrown.
Parameters
----------
sequencer : int
Sequencer index.
timeout : int
Timeout in minutes.
timeout_poll_res : float
Timeout polling resolution in seconds.
Returns
----------
bool
Indicates the acquisition binning completion state (False = uncompleted, True = completed).
Raises
----------
Exception
Invalid input parameter type.
Exception
An error is reported in system error and debug <= 1.
All errors are read from system error and listed in the exception.
TypeError
Timeout is not an integer.
TimeoutError
Timeout
"""
#Check timeout value
if not isinstance(timeout, int):
raise TypeError("Timeout is not an integer.")
state = self.get_sequencer_state(sequencer)
status = "ACQ BINNING DONE" in state["flags"]
elapsed_time = 0.0
timeout = timeout * 60.0
while status != True and elapsed_time < timeout:
time.sleep(timeout_poll_res)
state = self.get_sequencer_state(sequencer)
status = "ACQ BINNING DONE" in state["flags"]
elapsed_time += timeout_poll_res
if elapsed_time >= timeout:
raise TimeoutError("Acquisitions on sequencer {} did not complete in timeout period of {} minutes.".format(sequencer, int(timeout / 60)))
return status
#--------------------------------------------------------------------------
def _add_acquisitions(self, sequencer, acquisition_dict):
"""
Add all waveforms and weights in JSON compatible dictionary to AWG waveform and aquisition weight lists of indexed
sequencer. The dictionary must be structured as follows:
- name: acquisition name.
- num_bins: number of bins in acquisition.
- index: optional acquisition index used by the sequencer Q1ASM program to refer to the acquition.
Parameters
----------
sequencer : int
Sequencer index.
acquisition_dict : dict
JSON compatible dictionary with one or more acquisitions.
Returns
----------
Raises
----------
Exception
Invalid input parameter type.
Exception
An error is reported in system error and debug <= 1.
All errors are read from system error and listed in the exception.
KeyError
Missing dictionary key in acquisitions.
"""
for name in acquisition_dict:
if "num_bins" in acquisition_dict[name]:
if "index" in acquisition_dict[name]:
self._add_acq_acquisition(sequencer, name, acquisition_dict[name]["num_bins"], acquisition_dict[name]["index"])
else:
self._add_acq_acquisition(sequencer, name, acquisition_dict[name]["num_bins"])
else:
raise KeyError("Missing num_bins key for {} in acquisition dictionary".format(name))
#--------------------------------------------------------------------------
def _delete_acquisitions(self, sequencer):
"""
Delete all acquisitions from acquisition list of indexed sequencer.
Parameters
----------
sequencer : int
Sequencer index.
Returns
----------
Raises
----------
Exception
Invalid input parameter type.
Exception
An error is reported in system error and debug <= 1.
All errors are read from system error and listed in the exception.
"""
self._delete_acq_acquisition(sequencer, "all")
#---------------------------------------------------------------------------
[docs] def store_scope_acquisition(self, sequencer, name):
"""
After an acquisition has completed, store the scope acquisition results in the acquisition specified by name of the indexed sequencers. If an
invalid sequencer index is given an error is set in system error. To get access to the acquisition results, the sequencer will be stopped when calling
this function.
Parameters
----------
sequencer : int
Sequencer index.
name : str
Acquisition name.
Returns
----------
Raises
----------
Exception
Invalid input parameter type.
Exception
An error is reported in system error and debug <= 1.
All errors are read from system error and listed in the exception.
"""
self._set_acq_acquisition_data(sequencer, name)
#--------------------------------------------------------------------------
[docs] def get_acquisitions(self, sequencer):
"""
Get all acquisitions in acquisition lists of indexed sequencer. The acquisition scope and bin data is normalized to a range of -1.0 to 1.0 taking
both the bit widths of the processing path and average count into considaration. For the binned integration results, the integration length is
not handled during normalization and therefore these values have to be divided by their respective integration lenghts. The returned dictionary
is structured as follows:
- name: acquisition name
- index: acquisition index used by the sequencer Q1ASM program to refer to the acquisition.
- acquisition: acquisition dictionary
- scope: Scope data
- path0: input path 0
- data: acquisition samples in a range of 1.0 to -1.0.
- out-of-range: out-of-range indication for the entire acquisition (False = in-range, True = out-of-range).
- avg_cnt: number of averages.
- path1: input path 1
- data: acquisition samples in a range of 1.0 to -1.0.
- out-of-range: out-of-range indication for the entire acquisition (False = in-range, True = out-of-range).
- avg_cnt: number of averages.
- bins: bin data
- integration: integration data
- path_0: input path 0 integration result bin list
- path_1: input path 1 integration result bin list
- threshold: threshold result bin list
- valid: list of valid indications per bin
- avg_cnt: list of number of averages per bin
Parameters
----------
sequencer : int
Sequencer index.
Returns
----------
dict
Dictionary with acquisitions.
Raises
----------
Exception
Invalid input parameter type.
Exception
An error is reported in system error and debug <= 1.
All errors are read from system error and listed in the exception.
"""
return self._get_acq_acquisitions(sequencer)