Source code for qblox_instruments.native.spi_rack_modules.s4g_module

# ----------------------------------------------------------------------------
# Description    : S4g SPI module native interface
# Git repository : https://gitlab.com/qblox/packages/software/qblox_instruments.git
# Copyright (C) Qblox BV (2021)
# ----------------------------------------------------------------------------

import numpy as np
import threading
from time import sleep

from typing import List, Optional, Tuple

from spirack.S4g_module import S4g_module as S4gApi
from qblox_instruments.native.spi_rack_modules import SpiModuleBase


class DummyS4gApi:
    """
    Mock implementation of
    `spirack API <https://github.com/mtiggelman/SPI-rack/blob/master/spirack/S4g_module.py>`_,
    for use with the dummy drivers.
    """

    # DAC software span constants
    range_max_uni = 0
    range_max_bi = 2
    range_min_bi = 4

    DAC_RESOLUTION = 2**18
    NUMBER_OF_DACS = 4

    def __init__(
        self,
        spi_rack,
        module: int,
        max_current: float = 50e-3,
        reset_currents: bool = True,
    ):
        """
        Instantiates the mock communication layer with identical parameters to
        the `spirack.S4g_module.S4g_module` constructor.

        Parameters
        ----------
        spi_rack : DummySpiApi
            Mock SPI_rack class object via which the communication runs
        module : int
            module number set on the hardware
        max_current : float
            maximum range of the S4g, configured in hardware
        reset_currents : bool
            if True, then reset all currents to zero and change the span to
            `range_max_bi`

        Returns
        ----------
        """
        self.parent = spi_rack
        self._currents = [0.0] * self.NUMBER_OF_DACS
        init_span = self.range_max_bi if reset_currents else 0
        self._num_dacs = self.NUMBER_OF_DACS
        self._spans = [init_span] * self.NUMBER_OF_DACS
        self.max_current = max_current
        self.address = module

    def change_span_update(self, dac: int, span: int) -> None:
        """
        Mocks the `change_span_update` function of the API.

        Parameters
        ----------
        dac : int
            Current output of which to change the span
        span : int
            values for the span as mentioned in the datasheet, use from
            SPAN_MAPPING

        Returns
        ----------
        """
        self._spans[dac] = span

    def change_span(self, dac, span) -> None:
        """
        Mocks the `change_span` function of the API

        Parameters
        ----------
        dac : int
            Current output of which to change the span
        span : int
            values for the span as mentioned in the datasheet, use from
            SPAN_MAPPING

        Returns
        ----------
        """
        self.change_span_update(dac, span)

    def set_current(self, dac: int, current: float) -> None:
        """
        Mocks the `set_voltage` function of the API

        Parameters
        ----------
        dac: int
            Current output of which to update the current
        voltage: float
            new DAC current
        """
        self._currents[dac] = current

    def get_settings(self, dac: int) -> Tuple[float, int]:
        """
        Mocks the `get_settings` function of the API

        Parameters
        ----------
        dac : int
            Current output of which the settings will be read

        Returns
        ----------
        float
            current
        int
            span
        """
        return self._currents[dac], self._spans[dac]

    def get_stepsize(self, dac) -> float:
        """
        Mocks the `get_stepsize` function of the API

        Parameters
        ----------
        dac : int
            Current output of which the stepsize is calculated

        Returns
        ----------
        float
            Smallest current step possible with DAC
        """
        if self._spans[dac] == self.range_max_bi:
            return 2 * self.max_current / self.DAC_RESOLUTION

        return self.max_current / self.DAC_RESOLUTION


[docs] class S4gModule(SpiModuleBase): """ Native driver for the S4g SPI module. """ NUMBER_OF_DACS = 4 # Set by hardware constraints
[docs] def __init__( self, parent, name: str, address: int, reset_currents: bool = False, dac_names: Optional[List[str]] = None, is_dummy: bool = False, ): """ Instantiates the driver object. This is the object that should be instantiated by the :func:`~qblox_instruments.native.SpiRack.add_spi_module` function. Parameters ---------- parent Reference to the :class:`~qblox_instruments.native.SpiRack` parent object. This is handled by the :func:`~qblox_instruments.native.SpiRack.add_spi_module` function. name : str Name given to the InstrumentChannel. address : int Module number set on the hardware. reset_currents : bool If True, then reset all currents to zero and change the span to `range_max_bi`. dac_names : Optional[List[str]] List of all the names to use for the dac channels. If no list is given or is None, the default name "dac{i}" is used for the i-th dac channel. is_dummy : bool If true, do not connect to physical hardware, but use dummy module. Returns ---------- Raises ---------- ValueError Length of the dac names list does not match the number of dacs. """ super().__init__(parent, name, address) api = DummyS4gApi if is_dummy else S4gApi self.api = api( parent.spi_rack, module=address, reset_currents=reset_currents, ) self._channels = [] for dac in range(self.NUMBER_OF_DACS): if dac_names is None: ch_name = f"dac{dac}" elif len(dac_names) == self.NUMBER_OF_DACS: ch_name = dac_names[dac] else: raise ValueError(f"Length of dac_names must be {self.NUMBER_OF_DACS}") channel = S4gDacChannel(self, ch_name) self._channels.append(channel)
[docs] def set_dacs_zero(self) -> None: """ Sets all currents of all outputs to 0. Returns ---------- """ for ch in self._channels: ch.current(0)
class S4gDacChannel: """ Native driver for the dac channels of the S4g module. This class is used by the :class:`~qblox_instruments.native.spi_rack_modules.S4gModule` to define the individual dac channels and should not be used directly. """ def __init__(self, parent: S4gModule, name: str): """ Constructor for the dac channel instrument channel. Parameters ---------- parent : S4gModule Reference to the parent :class:`~qblox_instruments.native.spi_rack_modules.S4gModule` name : str Name for the instrument channel Returns ---------- Raises ---------- """ self._api = parent.api self._chan_name = name self._ramping_enabled = False self._ramp_rate = 1e-3 self._ramp_max_step = 0.5e-3 self._is_ramping = False self._ramp_thread = None def _get_span(self, dac: int) -> int: """ Gets the span set by the module. Parameters ---------- dac : int the dac of which to get the span Returns ---------- int The current span """ _, span = self._api.get_settings(dac) return span def _set_span(self, dac: int, val: int) -> None: """ Sets the span set by the module. Parameters ---------- dac : int the dac of which to set the span val : int the integer representation of the new span Raises ---------- ValueError When the present current on the output does not fit into the new span """ cur, _ = self._api.get_settings(dac) # check if cur fits into the new span if val == self._api.range_max_uni: maxI = self._api.max_current minI = 0.0 elif val == self._api.range_max_bi: maxI = self._api.max_current minI = -self._api.max_current elif val == self._api.range_min_bi: maxI = self._api.max_current / 2.0 minI = -(self._api.max_current / 2.0) if cur < minI or cur > maxI: raise ValueError( f"Present current is {cur*1e3} mA, which is out of the range [{minI*1e3} mA, {maxI*1e3} mA], the newly selected span" ) self._api.change_span(dac, val) self.set_current_instant(dac, cur) def _get_current(self, dac: int) -> float: """ Gets the current set by the module. Parameters ---------- dac : int the dac of which to get the current Returns ---------- float The output current reported by the hardware """ current, _ = self._api.get_settings(dac) return current def _set_current(self, dac: int, val: float) -> None: """ Sets the current either through ramping or instantly. Parameters ---------- dac : int the dac of which to set the current val : float The new value of the current Returns ---------- """ if self._ramping_enabled: self._set_current_ramp(dac, val) else: self.set_current_instant(dac, val) def _set_current_ramp(self, dac, val) -> None: """ Ramps the current in steps set by `ramp_max_step` with a rate set by `ramp_rate`. Ramping is non-blocking so the user should check `is_ramping() == False` to see if the final value is reached. Parameters ---------- dac : int the dac of which to set the current val : float The new value of the current after ramping Returns ---------- """ class RampThread(threading.Thread): """ Inner class that defines a thread that can be safely killed. """ def __init__(self, *args, **kwargs): super().__init__(target=self.worker_function, *args, **kwargs) self._stopped = False def worker_function(self, ch, dac, vals, dt): """ Conducts the actual ramping """ for val in vals: if self._stopped: return sleep(dt) ch.set_current_instant(dac, val) ch._is_ramping = False def exit(self): """ Stops the thread. """ self._stopped = True # Start of function code # Stop ramping to the value set previously, and ramp to new value. if self._is_ramping: self._ramp_thread.exit() self._is_ramping = True val_begin = self._get_current(dac) num_steps = int(np.ceil(np.abs(val_begin - val) / self._ramp_max_step)) vals = np.linspace(val_begin, val, num_steps) dt = np.abs(val_begin - val) / self._ramp_rate / num_steps th = RampThread(args=(self, dac, vals, dt)) self._ramp_thread = th th.start() def set_current_instant(self, dac, val) -> None: """ Wrapper function around the set_current API call. Instantaneously sets the current. Parameters ---------- dac : int the dac of which to set the current val : float The new value of the current Returns ---------- """ self._api.get_settings(dac) self._api.set_current(dac, val)