See also
A Jupyter notebook version of this tutorial can be downloaded here.
Combining Thresholded Bits via LINQ#
In quantum computing workflows, multiple qubits are often measured simultaneously and their binary readout results must be forwarded to a feedback sequencer as quickly as possible. LINQ-based feedback enables low-latency communication between sequencers, and the write-combine feature extends this by merging multiple simultaneously measured bits into a single data payload — reducing the number of messages the receiver must handle.
After enabling thresholded bit sharing via fb_acq_tb_id, configure write-combine with:
fb_acq_tb_cfg <write_combine: I1>, <bit_pos: I10>, <length: I7>, <duration: I16>
When write-combine is enabled, the sequencer writes its thresholded measurement result to the specified bit position. The bit immediately to the left is the valid bit, by default set to 1 when the data bit is used. This behavior can be changed with fb_acq_tb_valid — see the Q1ASM commands list for details.
When multiple sequencers share the same data ID (at the exact same time) with write-combine enabled, their results are merged into a single data payload according to each sequencer’s bit_pos. More information on the write-combine feature can be found in the LINQ-based feedback documentation.
This tutorial demonstrates how to use write-combine mode for a readout module in loopback mode. It walks through the following steps:
Calibrate the acquisition threshold and rotation for each sequencer.
Acquire a pulse (or nothing) based on a configurable boolean.
Transmit the thresholded result to a receiving sequencer via write-combine.
Read back the register and verify correctness.
Hardware Requirements#
A Qblox Cluster with at least 1 QRC in loopback (Output 1 → Input 1).
Setup#
First, we import the required packages and connect to the instrument.
[1]:
import numpy as np # noqa: I001 - Ruff will otherwise put below and it won't be included in generation.
from __future__ import annotations
from qcodes.instrument import find_or_create_instrument
from qblox_instruments import Cluster, ClusterType
Scan For Clusters#
We scan for the available devices connected via ethernet using the Plug & Play functionality of the Qblox Instruments package (see Plug & Play for more info).
!qblox-pnp list
[2]:
cluster_ip = "10.10.200.42"
cluster_name = "cluster0"
Connect to Cluster#
We now make a connection with the Cluster.
[3]:
cluster: Cluster = find_or_create_instrument(
Cluster,
recreate=True,
name=cluster_name,
identifier=cluster_ip,
dummy_cfg=(
{
2: ClusterType.CLUSTER_QCM,
4: ClusterType.CLUSTER_QRM,
6: ClusterType.CLUSTER_QCM_RF,
8: ClusterType.CLUSTER_QRM_RF,
10: ClusterType.CLUSTER_QTM,
12: ClusterType.CLUSTER_QRC,
16: ClusterType.CLUSTER_QSM,
}
if cluster_ip is None
else None
),
)
cluster.reset()
print(cluster.get_system_status())
Status: ERROR, Flags: FEEDBACK_NETWORK_CALIBRATION_FAILED, Slot flags: NONE
Get connected modules#
[4]:
# QRC modules
modules = cluster.get_connected_modules(lambda mod: mod.is_qrc_type)
# This uses the module of the correct type with the lowest slot index
module = list(modules.values())[0]
[5]:
module.disconnect_outputs()
module.disconnect_inputs()
module.sequencer0.connect_sequencer("io0")
module.sequencer0.nco_prop_delay_comp_en(True)
module.out0_att(20)
module.in0_att(20)
module.out0_in0_lo_freq(1e9)
module.sequencer0.nco_freq(10e6)
# The QRC has more input gain than other modules. To avoid overdriving the ADC, we set extra attenuation.
Calibrating the Threshold#
Cable length, LO and NCO frequency rotate the integrated IQ result away from the real axis, shifting the classifier boundary. Calibration measures and corrects for this offset; skipping it means the data bits in your write-combine payload will be unreliable.
For details on thresholded measurements, see the rotation and thresholding documentation.
[6]:
def calibrate_threshold(tof: int, pulse_length: int, active_sequencers: list) -> None:
"""Calibrate threshold and rotation for each sequencer."""
for sequencer in module.sequencers:
sequencer.sync_en(False)
prog = f"""
set_awg_offs 15000, 0
upd_param {tof - 4}
set_awg_offs 0, 0
acquire 0, 0, {pulse_length}
stop
"""
for sequencer in active_sequencers:
sequencer.sequence(
{
"waveforms": {},
"program": prog,
"acquisitions": {"single": {"index": 0, "num_bins": 1}},
"weights": {},
}
)
sequencer.arm_sequencer()
module.start_sequencer()
for sequencer in active_sequencers:
result = sequencer.get_acquisitions()["single"]["acquisition"]["bins"]["integration"][
"path0"
][0]
rotation = np.mod(-np.angle(result), 2 * np.pi)
threshold = (np.exp(1j * rotation) * result).real / 2
if np.isnan(threshold):
print(
f"{sequencer.name}: calibration returned NaN — "
"check loopback connection and pulse amplitude."
)
else:
sequencer.thresholded_acq_threshold(threshold)
sequencer.thresholded_acq_rotation(rotation * 360 / (2 * np.pi))
print(
f"{sequencer.name}: rotation = {np.degrees(rotation):.2f}°, threshold = {threshold:.4f}"
)
Write Combine with One Sequencer#
We demonstrate the write-combine feature with a single sequencer:
Sequencer 0 plays (or skips) a pulse, acquires it, and writes the thresholded result to bit 0 of a 1-byte data payload.
Sequencer 1 receives that payload and stores it in register
R0.
Each sender occupies two bits in the payload: the data bit at bit_pos and the valid bit at bit_pos + 1. Expected results with bit_pos = 0:
|
Register |
Meaning |
|---|---|---|
|
|
Valid bit set, data bit = 1 (pulse detected) |
|
|
Valid bit set, data bit = 0 (no pulse) |
Note: The table above assumes the default parameters below. If you change
BIT_POSthe bit pattern will shift accordingly.
Experiment Parameters#
[7]:
SEND_PULSE = True # Toggle to test both acquisition outcomes
PULSE_LENGTH = 100
TIME_OF_FLIGHT = 149
ID = 16 # Shared routing ID between sequencers
BIT_POS = 0 # Bit position for sequencer 0's thresholded result (must be even)
Calibrate Threshold#
[8]:
calibrate_threshold(TIME_OF_FLIGHT, PULSE_LENGTH, [module.sequencer0])
cluster0_module12_sequencer0: rotation = 180.00°, threshold = 4.1015
Configure Routing#
[9]:
cluster.clear_router()
module.set_local_route(ID)
Configure Module and Sequencers#
[10]:
module.sequencer0.sync_en(True)
module.sequencer1.sync_en(True)
module.sequencer0.integration_length_acq(PULSE_LENGTH)
Define Q1ASM Programs#
[11]:
offs = 15000 if SEND_PULSE else 0
# Sender: shares TB under ID, enables write-combine at the configured bit position.
prog_sender = f"""
fb_acq_tb_id {ID}, 4 # Share thresholded bits under ID
fb_acq_tb_cfg 1, {BIT_POS}, 1, 4 # write_combine=1, bit_pos={BIT_POS}, length=1 byte
wait_sync 4
set_awg_offs {offs}, 0
upd_param {TIME_OF_FLIGHT}
set_awg_offs 0,0
acquire 0, 0, {PULSE_LENGTH} # Acquire & threshold; result → bit {BIT_POS} of payload
stop
"""
# Receiver: waits for the sender's result to propagate, then pops the payload into R0.
# The 600 ns wait is a conservative bound: acquisition (PULSE_LENGTH) + threshold
# computation + LINQ routing latency. Reduce only after profiling on your hardware.
prog_receiver = f"""
wait_sync 4 # Synchronize with sequencer 0
wait 600 # Wait for data to propagate
fb_pop_data {ID}, R0 # Pop payload into R0
stop
"""
Upload Sequences#
[12]:
module.sequencer0.sequence(
{
"waveforms": {},
"program": prog_sender,
"acquisitions": {"single": {"index": 0, "num_bins": 1}},
"weights": {},
}
)
module.sequencer1.sequence(
{
"waveforms": {},
"program": prog_receiver,
"acquisitions": {},
"weights": {},
}
)
Run the Experiment#
[13]:
module.arm_sequencer(0)
module.arm_sequencer(1)
module.start_sequencer()
print("Sequencer 0 status:", module.get_sequencer_status(0))
print("Sequencer 1 status:", module.get_sequencer_status(1))
Sequencer 0 status: Status: OKAY, State: STOPPED, Exit Code: 0, Info Flags: ACQ_SCOPE_DONE_PATH_0, ACQ_SCOPE_DONE_PATH_1, ACQ_BINNING_DONE, ACQ_SCOPE_DONE_PATH_2, ACQ_SCOPE_DONE_PATH_3, Warning Flags: NONE, Error Flags: NONE, Log: []
Sequencer 1 status: Status: OKAY, State: STOPPED, Exit Code: 0, Info Flags: NONE, Warning Flags: NONE, Error Flags: NONE, Log: []
Verify Result#
Read register R0 from the receiving sequencer and compare against the expected value.
[14]:
write_combine_byte = module.sequencer1.get_register("R0")
expected = (0b11 if SEND_PULSE else 0b10) << BIT_POS
print(f"SEND_PULSE = {SEND_PULSE}")
print(f"Received : {write_combine_byte:08b}")
print(f"Expected : {expected:08b}")
print(
"✓ PASS"
if write_combine_byte == expected
else "✗ FAIL — check threshold and loopback connection"
)
SEND_PULSE = True
Received : 00000010
Expected : 00000011
✗ FAIL — check threshold and loopback connection
Write Combine with Two Sequencers#
Here we extend the experiment to two sender sequencers running on separate I/O channels:
Sequencer 0 acquires on
io0and writes its TB toBIT_POS_SEQ0in the payload.Sequencer 2 acquires on
io1and writes its TB toBIT_POS_SEQ2in the payload.Sequencer 1 receives the combined payload into register
R0.
Because both senders share the same ID at the same time, their results are automatically merged into a single byte before the receiver pops it. Each sender contributes two bits: its data bit at bit_pos and its valid bit at bit_pos + 1.
The layout of the received byte depends on your chosen bit positions. With the default BIT_POS_SEQ0 = 0 and BIT_POS_SEQ2 = 2:
bit position: 7 6 5 4 3 2 1 0
- - - - valid_s2 data_s2 valid_s0 data_s0
Experiment Parameters#
Toggle the pulse flags and bit positions freely. The expected byte is computed automatically from your choices, so the verification step never needs updating.
Note: Each sender occupies 2 bits (data + valid), so bit positions must be even and non-overlapping, and
2 * (max_bit_pos // 2 + 1)must fit withinPAYLOAD_LENGTHbytes.
[15]:
SEND_PULSE_SEQ0 = True # Toggle pulse for sequencer 0 (io0)
SEND_PULSE_SEQ2 = True # Toggle pulse for sequencer 2 (io1)
BIT_POS_SEQ0 = 0 # Bit position in payload for sequencer 0's TB (must be even)
BIT_POS_SEQ2 = 2 # Bit position in payload for sequencer 2's TB (must be even)
PAYLOAD_LENGTH = 1 # Total payload size in bytes (increase if bit positions require it)
PULSE_LENGTH = 100
TIME_OF_FLIGHT = 149
ID = 16
Validate Configuration#
We check that the chosen bit positions are valid before uploading anything to hardware.
[16]:
assert BIT_POS_SEQ0 % 2 == 0, "BIT_POS_SEQ0 must be even (data bit + valid bit pair)"
assert BIT_POS_SEQ2 % 2 == 0, "BIT_POS_SEQ2 must be even (data bit + valid bit pair)"
assert BIT_POS_SEQ0 != BIT_POS_SEQ2, "Bit positions must not overlap"
assert max(BIT_POS_SEQ0, BIT_POS_SEQ2) + 2 <= PAYLOAD_LENGTH * 8, (
f"Bit positions exceed payload length of {PAYLOAD_LENGTH} byte(s). "
f"Increase PAYLOAD_LENGTH or reduce bit positions."
)
Compute Expected Result#
We build the expected byte from first principles so the verification step requires no manual updates when parameters change.
[17]:
def build_expected_byte(
send_pulse_seq0: bool, send_pulse_seq2: bool, bit_pos_seq0: int, bit_pos_seq2: int
) -> int:
"""Construct the expected payload byte given pulse flags and bit positions."""
result = (0b11 if send_pulse_seq0 else 0b10) << bit_pos_seq0
result |= (0b11 if send_pulse_seq2 else 0b10) << bit_pos_seq2
return result
expected_byte = build_expected_byte(SEND_PULSE_SEQ0, SEND_PULSE_SEQ2, BIT_POS_SEQ0, BIT_POS_SEQ2)
print(f"Configuration : SEND_PULSE_SEQ0 = {SEND_PULSE_SEQ0}, SEND_PULSE_SEQ2 = {SEND_PULSE_SEQ2}")
print(f"Bit positions : SEQ0 → bit {BIT_POS_SEQ0}, SEQ2 → bit {BIT_POS_SEQ2}")
print(f"Expected byte : {expected_byte:08b}")
Configuration : SEND_PULSE_SEQ0 = True, SEND_PULSE_SEQ2 = True
Bit positions : SEQ0 → bit 0, SEQ2 → bit 2
Expected byte : 00001111
Configure Routing#
[18]:
cluster.clear_router()
module.set_local_route(ID)
Configure Module and Sequencers#
[19]:
module.disconnect_inputs()
module.disconnect_outputs()
module.sequencer0.connect_sequencer("io0")
module.sequencer2.connect_sequencer("io0")
module.sequencer0.nco_freq(100e6)
module.sequencer2.nco_freq(400e6)
Calibrate Threshold#
[20]:
calibrate_threshold(TIME_OF_FLIGHT, PULSE_LENGTH, [module.sequencer0, module.sequencer2])
cluster0_module12_sequencer0: rotation = 0.00°, threshold = 0.0146
cluster0_module12_sequencer2: rotation = 180.00°, threshold = 6.8926
[21]:
for seq in [module.sequencer0, module.sequencer1, module.sequencer2]:
seq.sync_en(True)
for seq in [module.sequencer0, module.sequencer2]:
seq.integration_length_acq(PULSE_LENGTH)
Define Q1ASM Programs#
[22]:
def make_sender_program(
event_id: int,
bit_pos: int,
payload_length: int,
send_pulse: bool,
time_of_flight: int,
pulse_length: int,
) -> str:
"""Build a sender Q1ASM program."""
offs = 15000 if send_pulse else 0
prog = f"""
fb_acq_tb_id {event_id}, 4
fb_acq_tb_cfg 1, {bit_pos}, {payload_length}, 4 # write_combine=1, bit_pos={bit_pos}, length={payload_length} byte(s)
wait_sync 4
set_awg_offs {offs}, 0
upd_param {time_of_flight}
set_awg_offs 0,0
acquire 0, 0, {pulse_length} # Acquire & threshold; result → bit {bit_pos} of payload
stop
"""
return prog
prog_sender0 = make_sender_program(
ID, BIT_POS_SEQ0, PAYLOAD_LENGTH, SEND_PULSE_SEQ0, TIME_OF_FLIGHT, PULSE_LENGTH
)
prog_sender2 = make_sender_program(
ID, BIT_POS_SEQ2, PAYLOAD_LENGTH, SEND_PULSE_SEQ2, TIME_OF_FLIGHT, PULSE_LENGTH
)
# Receiver: both senders share the same ID, so a single pop retrieves the merged byte.
prog_receiver = f"""
wait_sync 4 # Synchronize with sequencers 0 and 2
wait 600 # Wait for both results to merge
fb_pop_data {ID}, R0 # Pop combined payload into R0
stop
"""
Upload Sequences#
[23]:
module.sequencer0.sequence(
{
"waveforms": {},
"program": prog_sender0,
"acquisitions": {"single": {"index": 0, "num_bins": 1}},
"weights": {},
}
)
module.sequencer1.sequence(
{
"waveforms": {},
"program": prog_receiver,
"acquisitions": {},
"weights": {},
}
)
module.sequencer2.sequence(
{
"waveforms": {},
"program": prog_sender2,
"acquisitions": {"single": {"index": 0, "num_bins": 1}},
"weights": {},
}
)
Run the Experiment#
[24]:
for seq_idx in [0, 1, 2]:
module.arm_sequencer(seq_idx)
module.start_sequencer()
for seq_idx in [0, 1, 2]:
print(f"Sequencer {seq_idx} status:", module.get_sequencer_status(seq_idx))
Sequencer 0 status: Status: OKAY, State: STOPPED, Exit Code: 0, Info Flags: ACQ_SCOPE_DONE_PATH_0, ACQ_SCOPE_DONE_PATH_1, ACQ_BINNING_DONE, ACQ_SCOPE_DONE_PATH_2, ACQ_SCOPE_DONE_PATH_3, Warning Flags: NONE, Error Flags: NONE, Log: []
Sequencer 1 status: Status: OKAY, State: STOPPED, Exit Code: 0, Info Flags: NONE, Warning Flags: NONE, Error Flags: NONE, Log: []
Sequencer 2 status: Status: OKAY, State: STOPPED, Exit Code: 0, Info Flags: ACQ_BINNING_DONE, Warning Flags: NONE, Error Flags: NONE, Log: []
Verify Result#
Read register R0 from the receiving sequencer. Both sender contributions should be visible at their respective bit positions.
[25]:
write_combine_byte = module.sequencer1.get_register("R0")
print(f"SEND_PULSE_SEQ0={SEND_PULSE_SEQ0}, BIT_POS_SEQ0={BIT_POS_SEQ0}")
print(f"SEND_PULSE_SEQ2={SEND_PULSE_SEQ2}, BIT_POS_SEQ2={BIT_POS_SEQ2}")
print(f"Received : {write_combine_byte:08b}")
print(f"Expected : {expected_byte:08b}")
print(
"✓ PASS"
if write_combine_byte == expected_byte
else "✗ FAIL — check thresholds, loopback connections, and bit position assignments"
)
SEND_PULSE_SEQ0=True, BIT_POS_SEQ0=0
SEND_PULSE_SEQ2=True, BIT_POS_SEQ2=2
Received : 00001011
Expected : 00001111
✗ FAIL — check thresholds, loopback connections, and bit position assignments
Stop#
Finally, let’s stop the sequencers if they haven’t already and close the instrument connection. One can also display a detailed snapshot containing the instrument parameters before closing the connection by uncommenting the corresponding lines.
[26]:
# Stop all sequencers.
module.stop_sequencer()
# Print status of sequencers 0 and 1 (should now say it is stopped).
print(module.get_sequencer_status(0))
print(module.get_sequencer_status(1))
print()
Status: OKAY, State: STOPPED, Exit Code: 0, Info Flags: FORCED_STOP, ACQ_SCOPE_DONE_PATH_0, ACQ_SCOPE_DONE_PATH_1, ACQ_BINNING_DONE, ACQ_SCOPE_DONE_PATH_2, ACQ_SCOPE_DONE_PATH_3, Warning Flags: NONE, Error Flags: NONE, Log: []
Status: OKAY, State: STOPPED, Exit Code: 0, Info Flags: FORCED_STOP, Warning Flags: NONE, Error Flags: NONE, Log: []