Source code for qeg_nmr_qua.experiment.experiment_2d

import matplotlib.pyplot as plt
import warnings

from qeg_nmr_qua.experiment.macros import (
    readout_mode,
    safe_mode,
    drive_mode,
)
from qeg_nmr_qua.experiment.experiment import Experiment
from qeg_nmr_qua.config.config import OPXConfig
from qeg_nmr_qua.config.settings import ExperimentSettings

from qualang_tools.results import fetching_tool, progress_counter
from qualang_tools.plot import interrupt_on_close
from qualang_tools.units import unit
from qualang_tools.loops import from_array
from qm import QuantumMachine
from qm.jobs.running_qm_job import RunningQmJob
from qm.qua import (
    assign,
    wait,
    reset_frame,
    measure,
    save,
    program,
    declare,
    stream_processing,
    declare_stream,
    for_,
    if_,
    else_,
    fixed,
    demod,
    reset_frame,
)

u = unit(coerce_to_integer=True)


[docs] class Experiment2D(Experiment): """ Class to create and run 2D NMR experiments using the QUA programming language. Inherits from the base :class:`Experiment` class and implements methods specific to 2D experiments, which can have a broad range of applications such as measuring relaxation times (T1, T2), performing pulse amplitude sweeps, and performing two-point correlation measurements under Hamiltonian engineering pulse sequences. 2D experiments involve sweeping one parameter (e.g., pulse amplitude, delay time, evolution time) while measuring the system's response. This is typically done by defining a variable vector that contains the values to be swept. The experiment loops over this vector, applying the corresponding parameter value in each iteration. In this class's implementation, the swept parameter is varied first, then the averaging loop is performed. During longer experiments, this ordering should help mitigate the effects of slow drifts in system parameters. """
[docs] def __init__( self, settings: ExperimentSettings, config: OPXConfig = None, connect: bool = True, ): super().__init__(settings=settings, config=config, connect=connect) self.sweep_axis = None # Axis for live plotting and data saving self.sweep_label = "Swept Variable" # Label for sweep axis self.use_fixed_lst = [None] # whether to use fixed point for looping variables self.var_vec_lst = [None] # variable vector for looped experiments
[docs] def update_sweep_axis(self, new_axis): """ Updates the sweep axis for live plotting and data saving. If this method is not called, the first element of the variable vector collection :attr:`var_vec_list` will be used as the sweep axis by default. It can be convienient to change the sweep axis to a more physically meaningful quantity (e.g., converting pulse amplitude rescaling factor physical Vpp units). """ if len(new_axis) != len(self.var_vec_lst[0]): raise ValueError( "New sweep axis must have the same length as the variable vector." ) self.sweep_axis = new_axis
[docs] def update_sweep_label(self, new_label): """ Updates the label for the sweep axis in live plotting. If this method is not called, the default label "Swept Variable" will be used. """ self.sweep_label = new_label
[docs] def validate_experiment(self): """ Checks to make sure that the experiment contains variable operations, since it is a 2D experiment. Variable operations require looping which is supported in 2D experiments. Raises: ValueError: No variable vector was found in the experiment commands. """ if self.var_vec_lst[0] is None: raise ValueError( "Experiment2D requires variable vectors. Use Experiment1D, or similar, instead." ) if len(self.var_vec_lst) > 1: warnings.warn( "Experiment2D only supports one variable vector, but more were found." )
[docs] def create_experiment(self): """ Creates the Quantum Machine program for the experiment, and returns the experiment object as a qua ``program()``. This is used by the :meth:`~Experiment.execute_experiment` and :meth:`~Experiment.simulate_experiment` methods. Returns: program: The QUA program for the experiment defined by this class's commands. """ self.validate_experiment() with program() as experiment: # define the variables and datastreams n = declare(int) # QUA variable for the averaging loop loop_idx = declare(int) # QUA variable for floquet loops n_st = declare_stream() # Stream for the averaging iteration 'n' dummy = declare( int, 0 ) # dummy variable for loops without a declared variable vector I1 = declare(fixed) Q1 = declare(fixed) I2 = declare(fixed) Q2 = declare(fixed) I_st = declare_stream() Q_st = declare_stream() t1 = declare(int) t2 = declare(int) if self.use_fixed_lst[0]: var = declare(fixed) else: var = declare(int) with for_(n, 0, n < self.n_avg, n + 1): # averaging loop with for_( *from_array(var, self.var_vec_lst[0]) ): # inner loop over variable vector with if_(dummy > 0): wait(self.wait_between_scans, self.probe_key) with else_(): assign(dummy, dummy + 1) assign(dummy, dummy + 1) drive_mode(switch=self.rx_switch_key, amplifier=self.amplifier_key) for command in self._commands: self.translate_command(command, (var,), loop_idx) # wait for ringdown to decay, blank amplifier, set to receive mode safe_mode(switch=self.rx_switch_key, amplifier=self.amplifier_key) wait(self.pre_scan_delay) readout_mode( switch=self.rx_switch_key, amplifier=self.amplifier_key ) # measure the FID signal via resonator and helper elements with for_(t1, 0, t1 < self.measure_sequence_len, t1 + 2): measure( "no_pulse_readout", self.probe_key, demod.full("rotated_cos", I1, "out1"), demod.full("rotated_sin", Q1, "out1"), ) save(I1, I_st) save(Q1, Q_st) wait(self.loop_wait_cycles, self.probe_key) wait( self.loop_wait_cycles, self.helper_key ) # Delay the second measurement loop with for_(t2, 1, t2 < self.measure_sequence_len, t2 + 2): measure( "no_pulse_readout", self.helper_key, demod.full("rotated_cos", I2, "out1"), demod.full("rotated_sin", Q2, "out1"), ) save(I2, I_st) save(Q2, Q_st) wait(self.loop_wait_cycles, self.helper_key) safe_mode(switch=self.rx_switch_key, amplifier=self.amplifier_key) reset_frame(self.probe_key, self.helper_key) save(n, n_st) with stream_processing(): n_st.save("iteration") I_st.buffer(self.measure_sequence_len).buffer( len(self.var_vec_lst[0]) ).average().save("I") Q_st.buffer(self.measure_sequence_len).buffer( len(self.var_vec_lst[0]) ).average().save("Q") return experiment
[docs] def data_processing( self, qm: QuantumMachine, job: RunningQmJob, live: bool, wait_on_close: bool = True, title_prefix: str = "", ): """ Handles live data processing for a 2D experiment during execution. This method fetches data from the Quantum Machine job, processes it into voltage units via digital demodulation, and generates live plots when `live` is set to `True`. The plot includes 2D color plots of the I and Q signals as functions of the swept variable and acquisition time, as well as a line plot of the primary signal, determined to be the first element of each FID's I data. This captures the essential observable for 2D NMR experiments, such as calibrations of T1, T2, pulse amplitude sweeps, and Hamiltonian engineering measurements of two-point correlations. After the experiment completes, the final data is saved into the :attr:`save_data_dict` for later analysis and storage. Args: qm (QuantumMachine): The Quantum Machine instance used to run the experiment. job (RunningQmJob): The job object representing the running experiment. live (bool): Flag indicating whether to generate live plots during data acquisition. wait_on_close (bool): Whether to wait for user to close plot after completion. title_prefix (str): Prefix to add to plot title. """ # Fetching tool -- even if we aren't doing live plotting, we use it to fetch data # continually during the experiment's execution results = fetching_tool( job, data_list=["I", "Q", "iteration"], mode="live", ) if live: fig_live, (ax1, ax2, ax3) = plt.subplots( 1, 3, sharex=False, figsize=(16, 6.4) ) # Only interrupt on close if we're waiting for user input if wait_on_close: interrupt_on_close(fig_live, job) try: while results.is_processing(): I, Q, iteration = results.fetch_all() progress_counter(iteration, self.n_avg, start_time=results.start_time) # Convert results into Volts I = u.demod2volts(I, self.readout_len) Q = u.demod2volts(Q, self.readout_len) if live: # 2D color plot: pulse amplitude vs I axis = ( self.sweep_axis if self.sweep_axis is not None else self.var_vec_lst[0] ) if title_prefix: fig_live.suptitle(title_prefix, fontsize=12, fontweight="bold") ax1.cla() im1 = ax1.pcolormesh( axis, self.tau_sweep / u.us, I.T * 1e6, shading="auto", cmap="viridis", ) ax1.set_ylabel("Delay (µs)") ax1.set_xlabel(self.sweep_label) ax1.set_title("I") if not hasattr(ax1, "_colorbar"): ax1._colorbar = plt.colorbar(im1, ax=ax1, label="I (V)") else: ax1._colorbar.update_normal(im1) # 2D color plot: pulse amplitude vs tau for Q ax2.cla() im2 = ax2.pcolormesh( axis, self.tau_sweep / u.us, Q.T * 1e6, shading="auto", cmap="viridis", ) ax2.set_ylabel("Delay (µs)") ax2.set_xlabel(self.sweep_label) ax2.set_title("Q") if not hasattr(ax2, "_colorbar"): ax2._colorbar = plt.colorbar(im2, ax=ax2, label="Q (µV)") else: ax2._colorbar.update_normal(im2) ax3.cla() ax3.plot(axis, I.T[0] * 1e6, label="I") ax3.set_xlabel(self.sweep_label) ax3.set_ylabel("I (µV)") ax3.set_title("Primary signal") ax3.legend() fig_live.tight_layout() fig_live.canvas.draw_idle() plt.pause(0.1) except KeyboardInterrupt: print("Experiment interrupted by user.") if live and wait_on_close: # Keep the interactive plot open after acquisition until the user closes it message = "Acquisition finished. Close the plot window to continue." print(message) try: # Add a centered text box on the figure (figure coordinates) fig_live.text( 0.04, 0.02, message, ha="left", va="bottom", fontsize=8, bbox=dict(facecolor="white", alpha=0.7, edgecolor="none"), ) fig_live.canvas.draw_idle() except Exception as e: print(e) while plt.fignum_exists(fig_live.number): plt.pause(0.5) # Close the figure if we're not waiting for user to close it if live and not wait_on_close: plt.close(fig_live) fig_live = None self.save_data_dict.update({"I_data": I}) self.save_data_dict.update({"Q_data": Q}) self.save_data_dict.update({"swept_variable": self.var_vec_lst[0]}) self.save_data_dict.update({"sweep_axis": axis}) self.save_data_dict.update({"sweep_label": self.sweep_label}) self.save_data_dict.update({"fig_live": fig_live}) self.save_data()