Source code for qblox_instruments.pnp.resolve

# ----------------------------------------------------------------------------
# Description    : Name/address resolution logic
# Git repository : https://gitlab.com/qblox/packages/software/qblox_instruments.git
# Copyright (C) Qblox BV (2021)
# ----------------------------------------------------------------------------


# -- include -----------------------------------------------------------------

import re
import socket
import logging
import ipaddress
from collections import namedtuple
from typing import Optional, Union
from qblox_instruments.pnp import PlugAndPlay


# -- definitions -------------------------------------------------------------

# Port definitions for direct connections to an instrument.
MAIN_SCPI_PORT = 5025
MAIN_CFG_PORT = 5026
MAIN_ZMQ_RR_PORT = 20802
MAIN_ZMQ_PS_PORT = 20803

# Port definitions forwarded by the CMM to modules in the cluster, for direct
# module connections if needed.
MODULE_SCPI_PORT_OFFSET = 25000
MODULE_CFG_PORT_OFFSET = 26000
MODULE_ZMQ_RR_PORT_OFFSET = 23000
MODULE_ZMQ_PS_PORT_OFFSET = 24000

# Slot index of the CMM.
CMM_SLOT_INDEX = 0

# Structure returned by resolve(). See resolve() for more information.
AddressInfo = namedtuple(
    "AddressInfo",
    [
        "protocol",
        "address",
        "slot_index",
        "scpi_port",
        "cfg_port",
        "zmq_rr_port",
        "zmq_ps_port",
    ],
)

AddressInfo.__doc__ = """
Connection information structure. Can be constructed from an instrument
identifier via resolve().
"""

AddressInfo.protocol.__doc__ = """
:str: The protocol that must be used to connect. Can be:

 - ``"ip"`` for a normal IP-based connection; or
 - ``"pnp"`` when the device is not accessible due to IP address
   misconfiguration.
"""

AddressInfo.address.__doc__ = """
:str: IP address string for IP connections, or the device serial number for
plug & play.
"""

AddressInfo.slot_index.__doc__ = """
:Optional[int]: For clusters, this specifies which specific module to connect
to, if any. If None, the whole cluster will be treated as a single instrument.
For Pulsar devices, this should always be None.
"""

AddressInfo.scpi_port.__doc__ = """
:int: The port number for SCPI connections.
"""

AddressInfo.cfg_port.__doc__ = """
:int: The port number for configuration manager connections.
"""

AddressInfo.zmq_rr_port.__doc__ = """
:int: The port number for Qblox native ZeroMQ/CBOR REQ-REP connections.
"""

AddressInfo.zmq_ps_port.__doc__ = """
:int: The port number for Qblox native ZeroMQ/CBOR PUB-SUB connections.
"""


# -- functions ---------------------------------------------------------------

def _populate_ports(
    protocol: str,
    address: str,
    slot_index: Optional[int]
) -> AddressInfo:
    """
    Helper function for resolve(). Populates an :class:`~AddressInfo`
    structure with port information.

    Parameters
    ----------
    protocol: str
        Protocol, as determined by resolve().
    address: str
        IP address, as determined by resolve().
    slot_index: Optional[int]
        Slot index to retrieve port numbers for, or None to get the default
        port numbers.

    Returns
    -------
    AddressInfo
        The completed address information structure.
    """
    if slot_index is None or slot_index == CMM_SLOT_INDEX:
        return AddressInfo(
            protocol,
            address,
            slot_index,
            MAIN_SCPI_PORT,
            MAIN_CFG_PORT,
            MAIN_ZMQ_RR_PORT,
            MAIN_ZMQ_PS_PORT,
        )
    else:
        return AddressInfo(
            protocol,
            address,
            slot_index,
            MODULE_SCPI_PORT_OFFSET + slot_index,
            MODULE_CFG_PORT_OFFSET + slot_index,
            MODULE_ZMQ_RR_PORT_OFFSET + slot_index,
            MODULE_ZMQ_PS_PORT_OFFSET + slot_index,
        )


# ----------------------------------------------------------------------------
[docs] def resolve(identifier: Union[str, AddressInfo]) -> AddressInfo: """ Converts a device identifier to an IP address or (if only reachable via plug and play) a serial number. For IP connections, also returns the ports that the device should be listening on. Parameters ---------- identifier: Union[str, AddressInfo] If a string, this can be one of the following things: - ``"[ip://]<ip-address>[/#]"``, - ``"[pnp://]<device-name>[/#]"``, - ``"[pnp://]<serial-number>[/#]"``, or - ``"[ip://]<hostname>[/#]"``, where: - ``<ip-address>`` is a valid IPv4 or IPv6 address for the device to be resolved; - ``<device-name>`` is the customer-specified name of the instrument (programmed into it with qblox-cfg, qblox-pnp, or the ``set_name()`` method); - ``<serial-number>`` is the serial number of the instrument; - ``<hostname>`` is a hostname that will resolve to the IP address of the instrument via DNS (some routers support this if the device connects via DHCP; the hostname requested by the device will be its customer-specified name in lowercase, using dashes (``-``) for sequences of non-alphanumeric characters); - either ``ip://`` or ``pnp://`` may be prefixed to disambiguate between the various methods of device identification, if necessary; and - the optional ``/#`` suffix may be used for cluster devices to return information for a specific cluster module, where ``#`` is the slot index of the module. The four identification methods will be attempted in the sequence shown. If an :class:`~AddressInfo` object is passed as input, this function simply returns it as-is. Returns ------- AddressInfo Address information structure. Raises ------ ValueError If the identifier is invalid. RuntimeError If we failed to determine what this identifier is. """ # Fallthrough if input is already a resolved address. if isinstance(identifier, AddressInfo): return identifier # Store the full identifier string for error messages. full_identifier = identifier # Properly log what we're doing. logger = logging.getLogger("resolver") logger.info('Resolving device identifier "%s"...', full_identifier) # Split off slot index suffix, if specified. if re.match(".*/[0-9]+$", identifier): identifier, slot_index = identifier.rsplit("/", maxsplit=1) slot_index = int(slot_index) logger.info( "Restricting connection to module in slot %d only.", slot_index ) else: slot_index = None # Split off protocol hint. *hint, identifier = identifier.split("://", maxsplit=1) if hint: hint = hint[0] if hint not in ("pnp", "ip"): raise ValueError("protocol hint prefix must be pnp:// or ip://") else: hint = None # See if this is an IP address. if hint != "pnp": try: # Below line will throw a parse error if identifier is not a valid # IP address. ipaddress.ip_address(identifier) # No error, so this is an IP address. logger.info( '"%s" is a valid IP address; skipping name resolution.', identifier ) return _populate_ports("ip", identifier, slot_index) except ValueError: pass # See if we can reach this device via plug & play. if hint != "ip": logger.info( 'Trying to resolve "%s" as device name or serial number ' "via plug & play...", identifier, ) data = None try: with PlugAndPlay() as p: data = p.describe(identifier) except Exception: pass if data is not None: remote_ip = data.get("identity", {}).get("ip", None) local_ip_and_prefix = data.get("connected_via", None) if remote_ip is not None and local_ip_and_prefix is not None: local_net = ipaddress.IPv4Network(local_ip_and_prefix, strict=False) remote_net = ipaddress.IPv4Network("{}/32".format(remote_ip), strict=False) if local_net.overlaps(remote_net): # Resolved via plug & play, but device should be # accessible via regular TCP connections. logger.info( 'Found "%s" at IP address %s via plug & play', identifier, remote_ip, ) return _populate_ports("ip", remote_ip, slot_index) else: # IP misconfiguration, only plug & play is available. Use # serial number instead of whatever the device responded # to if possible. logger.warning( 'Found "%s" at IP address %s via %s, but that won\'t ' "work; only plug & play connectivity is available", identifier, remote_ip, local_ip_and_prefix, ) address = data.get("identity", {}).get("ser", identifier) return _populate_ports("pnp", address, slot_index) # See if we can resolve the identifier via DNS. if hint != "pnp": logger.info('Trying to resolve "%s" via DNS lookup...', identifier) try: # Below line will throw a parse error if identifier is not a valid # IP address. data = socket.getaddrinfo(identifier, None) if data: remote_ip = data[0][4][0] logger.info( 'Found "%s" at IP address %s via DNS lookup', identifier, remote_ip ) return _populate_ports("ip", remote_ip, slot_index) except OSError: pass raise RuntimeError( 'Failed to resolve "{}" as something we can connect to'.format(full_identifier) )