# ----------------------------------------------------------------------------
# Description : IEEE488.2 interface
# Git repository : https://gitlab.com/qblox/packages/software/qblox_instruments.git
# Copyright (C) Qblox BV (2020)
# ----------------------------------------------------------------------------
# -- include -----------------------------------------------------------------
from __future__ import annotations
from functools import wraps
from typing import TYPE_CHECKING
from qblox_instruments.types import DebugLevel
if TYPE_CHECKING:
from typing import Any, Callable
from qblox_instruments.ieee488_2 import Transport
from qblox_instruments.native import Cluster
# -- function ----------------------------------------------------------------
def gpib_error_check( # noqa: ANN201
minimal_check: bool | Callable[..., Any | None] = False,
):
"""
Factory function for a decorator that catches and checks for errors on an IEEE488.2 call.
Parameters
----------
minimal_check
If True, this decorator will always check for errors unless the debug
level is ``DebugLevel.NO_CHECK``. By default False.
Returns
-------
Callable
The decorator.
Raises
------
RuntimeError
An error was found in system error.
"""
def decorator(func: Callable[..., Any | None]) -> Callable[..., Any | None]:
@wraps(func)
def wrapper(self: Ieee488_2 | Cluster, *args: Any, **kwargs: Any) -> Any | None:
if self._debug in (DebugLevel.ERROR_CHECK, DebugLevel.VERSION_AND_ERROR_CHECK) or (
self._debug == DebugLevel.MINIMAL_CHECK and minimal_check is True
):
error = None
try:
return func(self, *args, **kwargs)
except OSError:
raise
except Exception as err:
error = err
finally:
self._check_error(error)
else:
return func(self, *args, **kwargs)
return wrapper
# if used without parentheses
if callable(minimal_check):
return decorator(minimal_check)
# if used with parentheses
return decorator
# -- class -------------------------------------------------------------------
[docs]
class Ieee488_2: # noqa: N801
"""
Class that implements the IEEE488.2 interface.
"""
__slots__ = ["_debug", "_transport"]
# ------------------------------------------------------------------------
[docs]
def __init__(self, transport: Transport, debug: DebugLevel = DebugLevel.MINIMAL_CHECK) -> None:
"""
Creates IEEE488.2 interface object.
Parameters
----------
transport : Transport
Transport class responsible for the lowest level of communication
(e.g. ethernet).
debug : DebugLevel
Debug level. See :class:`~qblox_instruments.types.DebugLevel` for more
information. By default None, which means that for a connection to a dummy
cluster, `DebugLevel.ERROR_CHECK` will be used, and for a real cluster,
`DebugLevel.MINIMAL_CHECK`.
"""
self._transport = transport
self._debug = debug
# ------------------------------------------------------------------------
def _write(self, cmd_str: str) -> None:
"""
Write command to instrument.
Parameters
----------
cmd_str : str
Command string.
"""
self._transport.write(cmd_str)
# ------------------------------------------------------------------------
def _write_bin(self, cmd_str: str, bin_block: bytes) -> None:
"""
Write command and binary data block to instrument.
Parameters
----------
cmd_str : str
Command string.
bin_block : bytes
Binary data array to send.
"""
self._bin_block_write(cmd_str + " ", bin_block)
# ------------------------------------------------------------------------
def _read(self, cmd_str: str) -> str:
"""
Write command to instrument and read response. Using an empty command
string will skip the write and only read.
Parameters
----------
cmd_str : str
Command string.
Returns
-------
str
Command response string.
"""
self._transport.write(cmd_str)
return self._transport.readline().rstrip() # Remove trailing white space, CR, LF
# ------------------------------------------------------------------------
def _read_bin(self, cmd_str: str, flush_line_end: bool = True) -> bytes:
"""
Write command to instrument and read binary data block. Using an empty
command string will skip the write and only read to allow reading
concatenated binary blocks.
Parameters
----------
cmd_str : str
Command string.
flush_line_end : bool
Flush end of line characters.
Returns
-------
bytes
Binary data array received.
"""
if cmd_str != "":
self._transport.write(cmd_str)
return self._bin_block_read(flush_line_end)
# ------------------------------------------------------------------------
def _write_read_bin(self, cmd_str: str, bin_block: bytes, flush_line_end: bool = True) -> bytes:
"""
Write command and binary data block to instrument and read binary data
block. Using an empty command string will skip the write and only read
to allow reading concatenated binary blocks.
Parameters
----------
cmd_str : str
Command string.
bin_block : bytes
Binary data array to send.
flush_line_end : bool
Flush end of line characters.
Returns
-------
bytes
Binary data array received.
"""
if cmd_str != "":
self._write_bin(cmd_str, bin_block)
return self._bin_block_read(flush_line_end)
# ------------------------------------------------------------------------
def _bin_block_write(self, cmd_str: str, bin_block: bytes) -> None:
"""
Write IEEE488.2 binary data block to instrument.
Parameters
----------
cmd_str : str
Command string.
bin_block : bytes
Binary data array to send.
"""
header = cmd_str + Ieee488_2._build_header_string(len(bin_block))
self._transport.write_binary(header.encode(), bin_block)
self._transport.write("") # Add a Line Terminator
# ------------------------------------------------------------------------
def _bin_block_read(self, flush_line_end: bool = True) -> bytes:
"""
Read IEEE488.2 binary data block from instrument.
Parameters
----------
flush_line_end : bool
Flush end of line characters.
Returns
-------
bytes
Binary data array received.
Raises
------
RunTimeError
Header error.
"""
header_a = self._transport.read_binary(2) # Read '#N'
header_a_str = header_a.decode()
# character ',' is a valid delimiter and we need to discard it
if header_a_str[0] == ",":
# read again to get the digit part of '#N'
digit = self._transport.read_binary(1)
digit_str = digit.decode()
header_a_str = header_a_str[1] + digit_str # continue as before
if header_a_str[0] != "#":
s = f"Header error: received {header_a}"
raise RuntimeError(s)
digit_cnt = int(header_a_str[1])
header_b = self._transport.read_binary(digit_cnt)
byte_cnt = int(header_b.decode())
bin_block = self._transport.read_binary(byte_cnt)
if flush_line_end:
self._flush_line_end()
return bin_block
# ------------------------------------------------------------------------
@staticmethod
def _build_header_string(byte_cnt: int) -> str:
"""
Generate IEEE488.2 binary data block header.
Parameters
----------
byte_cnt : int
Size of the binary data block in bytes.
Returns
-------
str
Header string.
"""
byte_cnt_str = str(byte_cnt)
digit_cnt_str = str(len(byte_cnt_str))
bin_header_str = "#" + digit_cnt_str + byte_cnt_str
return bin_header_str
# ------------------------------------------------------------------------
def _flush_line_end(self) -> None:
"""
Flush end of line <CR><LF> characters.
"""
self._transport.read_binary(2) # Consume <CR><LF>
# -------------------------------------------------------------------------
def _check_error(self, err: Exception | None = None) -> None:
"""
Check system error for errors. Empties and prints the complete error
queue.
Parameters
----------
err : Optional[Exception]
Exception to reraise.
Raises
------
Exception
An exception was passed as input argument.
"""
if err:
raise err from err
# ------------------------------------------------------------------------
# IEEE488.2 constants
# ------------------------------------------------------------------------
# fmt: off
# *ESR and *ESE bits
_ESR_OPERATION_COMPLETE = 0x01
_ESR_REQUEST_CONTROL = 0x02
_ESR_QUERY_ERROR = 0x04
_ESR_DEVICE_DEPENDENT_ERROR = 0x08
_ESR_EXECUTION_ERROR = 0x10
_ESR_COMMAND_ERROR = 0x20
_ESR_USER_REQUEST = 0x40
_ESR_POWER_ON = 0x80
# fmt: on
# ------------------------------------------------------------------------
# IEEE488.2 commands
# ------------------------------------------------------------------------
# -------------------------------------------------------------------------
@gpib_error_check
def _get_idn(self) -> str:
"""
Get device identity.
Parameters
----------
None
Returns
-------
str
Concatenated list of strings separated by the semicolon character.
The IDN consists of four strings.
Raises
------
Exception
An error is reported in system error and debug <= 1.
"""
return self._read("*IDN?")
# -------------------------------------------------------------------------
@gpib_error_check
def _reset(self) -> None:
"""
Reset device and clear all status and event registers.
Parameters
----------
None
Returns
-------
None
Raises
------
Exception
An error is reported in system error and debug <= 1.
"""
self._write("*RST")
# -------------------------------------------------------------------------
[docs]
@gpib_error_check
def clear(self) -> None:
"""
Clear all status and event registers (see `SCPI <https://www.ivifoundation.org/downloads/SCPI/scpi-99.pdf>`_).
Parameters
----------
None
Returns
-------
None
Raises
------
Exception
Invalid input parameter type.
Exception
An error is reported in system error and debug <= 1.
"""
self._write("*CLS")
# -------------------------------------------------------------------------
[docs]
@gpib_error_check
def get_status_byte(self) -> int:
"""
Get status byte register. Register is only cleared when feeding registers are cleared.
Parameters
----------
None
Returns
-------
int
Status byte register.
Raises
------
Exception
An error is reported in system error and debug <= 1.
"""
return int(self._read("*STB?"))
# -------------------------------------------------------------------------
[docs]
@gpib_error_check
def set_service_request_enable(self, reg: int) -> None:
"""
Set service request enable register.
Parameters
----------
reg : int
Service request enable register.
Returns
-------
None
Raises
------
Exception
Invalid input parameter type.
Exception
An error is reported in system error and debug <= 1.
"""
if not isinstance(reg, int):
raise TypeError(
f"Unexpected type for input argument reg, expected int but got {type(reg).__qualname__}." # noqa: E501
)
self._write(f"*SRE {reg}")
# -------------------------------------------------------------------------
[docs]
@gpib_error_check
def get_service_request_enable(self) -> int:
"""
Get service request enable register. The register is cleared after reading it.
Parameters
----------
None
Returns
-------
int
Service request enable register.
Raises
------
Exception
An error is reported in system error and debug <= 1.
"""
return int(self._read("*SRE?"))
# -------------------------------------------------------------------------
[docs]
@gpib_error_check
def set_standard_event_status_enable(self, reg: int) -> None:
"""
Set standard event status enable register.
Parameters
----------
reg : int
Standard event status enable register.
Returns
-------
None
Raises
------
Exception
Invalid input parameter type.
Exception
An error is reported in system error and debug <= 1.
"""
if not isinstance(reg, int):
raise TypeError(
f"Unexpected type for input argument reg, expected int but got {type(reg).__qualname__}." # noqa: E501
)
self._write(f"*ESE {reg}")
# -------------------------------------------------------------------------
[docs]
@gpib_error_check
def get_standard_event_status_enable(self) -> int:
"""
Get standard event status enable register. The register is cleared after reading it.
Parameters
----------
None
Returns
-------
int
Standard event status enable register.
Raises
------
Exception
An error is reported in system error and debug <= 1.
All errors are read from system error and listed in the exception.
"""
return int(self._read("*ESE?"))
# -------------------------------------------------------------------------
[docs]
@gpib_error_check
def get_standard_event_status(self) -> int:
"""
Get standard event status register. The register is cleared after reading it.
Parameters
----------
None
Returns
-------
int
Standard event status register.
Raises
------
Exception
An error is reported in system error and debug <= 1.
"""
return int(self._read("*ESR?"))
# -------------------------------------------------------------------------
[docs]
@gpib_error_check
def set_operation_complete(self) -> None:
"""
Set device in operation complete query active state.
Parameters
----------
None
Returns
-------
None
Raises
------
Exception
An error is reported in system error and debug <= 1.
"""
self._write("*OPC")
# -------------------------------------------------------------------------
[docs]
@gpib_error_check
def get_operation_complete(self) -> bool:
"""
Get operation complete state.
Parameters
----------
None
Returns
-------
bool
Operation complete state (False = running, True = completed).
Raises
------
Exception
An error is reported in system error and debug <= 1.
"""
return bool(int(self._read("*OPC?")))
# -------------------------------------------------------------------------
[docs]
@gpib_error_check
def test(self) -> bool:
"""
Run self-test.
Parameters
----------
None
Returns
-------
bool
Test result (False = failed, True = success).
Raises
------
Exception
An error is reported in system error and debug <= 1.
"""
return bool(int(self._read("*TST?")))
# -------------------------------------------------------------------------
[docs]
@gpib_error_check
def wait(self) -> None:
"""
Wait until operations completed before continuing.
Parameters
----------
None
Returns
-------
None
Raises
------
Exception
An error is reported in system error and debug <= 1.
"""
self._write("*WAI")