Source code for qblox_scheduler.analysis.time_of_flight_analysis
# Repository: https://gitlab.com/qblox/packages/software/qblox-scheduler
# Licensed according to the LICENSE file on the main branch
#
# Copyright 2020-2025, Quantify Consortium
# Copyright 2025, Qblox B.V.
"""Module containing analysis class for time of flight measurement."""
from __future__ import annotations
import matplotlib.pyplot as plt
import numpy as np
import xarray as xr
from qblox_scheduler.analysis import base_analysis as ba
from quantify_core.visualization import mpl_plotting as qpl
from quantify_core.visualization.SI_utilities import format_value_string
[docs]
class TimeOfFlightAnalysis(ba.BaseAnalysis):
"""Analysis for time of flight measurement."""
[docs]
def run(
self, acquisition_delay: float = 4e-9, playback_delay: float = 146e-9
) -> ba.BaseAnalysis:
"""
Execute analysis steps and let user specify `acquisition_delay`.
Assumes that the sample time is always 1 ns.
Parameters
----------
acquisition_delay
Time from the start of the pulse to the start of the measurement in seconds.
By default 4 ns.
playback_delay
Time from the start of playback to appearance of pulse at the output
of the instrument in seconds. By default 146 ns, which is the playback
delay for Qblox instruments.
Returns
-------
:
The instance of the analysis object.
"""
self.acquisition_delay_ns = round(acquisition_delay * 1e9)
self.playback_delay_ns = round(playback_delay * 1e9)
return super().run()
[docs]
def process_data(self) -> None:
"""Populate the :code:`.dataset_processed`."""
# Assert that this step is run only after `extract_data` was run. Not having
# this statement will lead to type issues when accessing the data variables.
assert self.dataset is not None, "No dataset is loaded for this analysis"
self.dataset_processed = xr.Dataset(
{
"Magnitude": (("Time",), self.dataset.y0.data),
},
coords={"Time": np.arange(self.dataset.y0.data.shape[0]) + self.acquisition_delay_ns},
)
self.dataset_processed.Magnitude.attrs["name"] = "Magnitude"
self.dataset_processed.Magnitude.attrs["long_name"] = "Magnitude"
self.dataset_processed.Magnitude.attrs["units"] = self.dataset.y0.units
self.dataset_processed.Time.attrs["name"] = "Time"
self.dataset_processed.Time.attrs["long_name"] = "Trace acquisition sample"
self.dataset_processed.Time.attrs["units"] = "ns"
[docs]
def analyze_fit_results(self) -> None:
"""Check fit success and populates :code:`.quantities_of_interest`."""
mean_val = np.mean(self.dataset_processed.Magnitude[0 : self.playback_delay_ns // 3])
std_val = np.std(self.dataset_processed.Magnitude[0 : self.playback_delay_ns // 3])
threshold = mean_val + 4 * float(std_val)
idx_above_threshold = np.where(np.abs(self.dataset_processed.Magnitude) > threshold)[0]
two_subseq_indices_above_threshold = np.where(np.diff(idx_above_threshold) == 1)[0]
if (len(idx_above_threshold) == 0) or (len(two_subseq_indices_above_threshold) == 0):
self.quantities_of_interest["fit_success"] = False
self.quantities_of_interest["fit_msg"] = (
"Can not find the Time of flight,\n"
"try to reduce the acquisition_delay "
f"(current value: {self.acquisition_delay_ns} ns)."
)
else:
self.quantities_of_interest["fit_success"] = True
tof_idx = two_subseq_indices_above_threshold[0]
self.quantities_of_interest["tof"] = (
int(idx_above_threshold[tof_idx]) + self.acquisition_delay_ns
)
self.quantities_of_interest["nco_prop_delay"] = (
self.quantities_of_interest["tof"] - self.playback_delay_ns
)
for var in ("tof", "nco_prop_delay"):
self.quantities_of_interest[var] = round(
self.quantities_of_interest[var] * 1e-9, ndigits=9
)
text_msg = "Summary:\n"
text_msg += format_value_string(
"Time of flight",
self.quantities_of_interest["tof"],
unit="s",
end_char="\n",
)
text_msg += format_value_string(
"Playback delay",
self.playback_delay_ns,
unit="ns",
end_char="\n",
)
text_msg += format_value_string(
"NCO propagation delay",
self.quantities_of_interest["nco_prop_delay"],
unit="s",
end_char="",
)
self.quantities_of_interest["fit_msg"] = text_msg