Source code for qblox_instruments.ieee488_2.ip_transport
# ----------------------------------------------------------------------------
# Description    : Transport layer (abstract, IP, file, dummy)
# Git repository : https://gitlab.com/qblox/packages/software/qblox_instruments.git
# Copyright (C) Qblox BV (2020)
# ----------------------------------------------------------------------------
# -- include -----------------------------------------------------------------
import os
import socket
import sys
from qblox_instruments.ieee488_2 import Transport
# -- class -------------------------------------------------------------------
[docs]
class IpTransport(Transport):
    """
    Class for data transport of IP socket.
    """
    __slots__ = ["_socket"]
    # ------------------------------------------------------------------------
[docs]
    def __init__(
        self,
        host: str,
        port: int = 5025,
        timeout: float = 60.0,
        snd_buf_size: int = 512 * 1024,
    ) -> None:
        """
        Create IP socket transport class.
        Parameters
        ----------
        host : str
            Instrument IP address.
        port : int
            Instrument port.
        timeout : float
            Instrument call timeout in seconds.
        snd_buf_size : int
            Instrument buffer size for transmissions to instrument.
        """
        # 1. Setup timeout (before connecting)
        # 2. Enlarge buffer
        # 3. Send immediately
        # 4. Setup keep alive pinging
        # 5. Connect
        self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self._socket.settimeout(3)
        self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, snd_buf_size)
        self._socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
        self._set_keepalive()
        self._socket.connect((host, port))
        # Timeout after connecting
        self._socket.settimeout(timeout) 
    # ------------------------------------------------------------------------
    def _set_keepalive(
        self, after_idle_sec: int = 60, interval_sec: int = 60, max_fails: int = 5
    ) -> None:
        """
        This function instructs the TCP socket to send a heart beat every n
        seconds to detect dead connections. It's the TCP equivalent of the
        IRC ping-pong protocol and allows for better cleanup / detection
        of dead TCP connections.
        It activates after 60 second (after_idle_sec) of idleness, then sends
        a keepalive ping once every 60 seconds (interval_sec), and closes the
        connection after 5 failed ping (max_fails), or 300 seconds by default.
        Parameters
        ----------
        after_idle_sec : int
            Activate keepalive after n seconds.
        interval_sec : int
            Packet interval in seconds.
        max_fails : int
            Maximum number of failed packets.
        """
        if os.name == "nt":  # Windows
            after_idle_sec *= 1000
            interval_sec *= 1000
            # pylint: disable=no-member
            self._socket.ioctl(socket.SIO_KEEPALIVE_VALS, (1, after_idle_sec, interval_sec))
        elif sys.platform == "darwin":  # MacOS
            TCP_KEEPALIVE = 0x10  # From /usr/include, not exported by Python's socket module
            self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
            self._socket.setsockopt(socket.IPPROTO_TCP, TCP_KEEPALIVE, interval_sec)
        else:  # Linux
            self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
            self._socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, after_idle_sec)
            self._socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, interval_sec)
            self._socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, max_fails)
    # ------------------------------------------------------------------------
    def __del__(self) -> None:
        """
        Delete IP socket transport class.
        """
        self.close()
    # ------------------------------------------------------------------------
[docs]
    def close(self) -> None:
        """
        Close IP socket.
        """
        self._socket.close() 
    # ------------------------------------------------------------------------
[docs]
    def write(self, cmd_str: str) -> None:
        """
        Write command to instrument over IP socket.
        Parameters
        ----------
        cmd_str : str
            Command
        """
        out_str = cmd_str + "\n"
        self.write_binary(out_str.encode("ascii")) 
    # ------------------------------------------------------------------------
[docs]
    def write_binary(self, *data: bytes) -> None:
        """
        Write binary data to instrument over IP socket.
        Parameters
        ----------
        *data : bytes
            Binary data
        """
        for item in data:
            exp_len = len(item)
            act_len = 0
            while act_len < exp_len:
                act_len += self._socket.send(item[act_len:exp_len]) 
    # ------------------------------------------------------------------------
[docs]
    def read_binary(self, size: int) -> bytes:
        """
        Read binary data from instrument over IP socket.
        Parameters
        ----------
        size : int
            Number of bytes
        Returns
        ----------
        bytes
            Binary data array of length "size".
        """
        data = self._socket.recv(size)
        act_len = len(data)
        exp_len = size
        while act_len != exp_len:
            data += self._socket.recv(exp_len - act_len)
            act_len = len(data)
        return data 
    # ------------------------------------------------------------------------
[docs]
    def readline(self) -> str:
        """
        Read data from instrument over IP socket.
        Returns
        ----------
        str
            String with data.
        """
        return self._socket.makefile("rb").readline().decode("utf-8", errors="replace")