Source code for qcodes.instrument_drivers.stanford_research.SR86x

from __future__ import annotations

import logging
from typing import TYPE_CHECKING, Any, ClassVar

import numpy as np

from qcodes.instrument import (
    ChannelList,
    InstrumentBaseKWArgs,
    InstrumentChannel,
    VisaInstrument,
    VisaInstrumentKWArgs,
)
from qcodes.parameters import ArrayParameter
from qcodes.validators import ComplexNumbers, Enum, Ints, Numbers

if TYPE_CHECKING:
    from collections.abc import Callable, Sequence

    from typing_extensions import Unpack

    from qcodes.parameters import Parameter

log = logging.getLogger(__name__)


class SR86xBufferReadout(ArrayParameter):
    """
    The parameter array that holds read out data. We need this to be compatible
    with qcodes.Measure

    Args:
        name: Name of the parameter.
        instrument: The instrument to add this parameter to.

    """

    def __init__(self, name: str, instrument: SR86x, **kwargs: Any) -> None:
        unit = "deg"
        if name in ["X", "Y", "R"]:
            unit = "V"

        super().__init__(
            name,
            shape=(1,),  # dummy initial shape
            unit=unit,
            setpoint_names=("Time",),
            setpoint_labels=("Time",),
            setpoint_units=("s",),
            instrument=instrument,
            docstring="Holds an acquired (part of the) data buffer of one channel.",
            **kwargs,
        )

        self._capture_data: np.ndarray | None = None

    def prepare_readout(self, capture_data: np.ndarray) -> None:
        """
        Prepare this parameter for readout.

        Args:
            capture_data: The data to capture.

        """
        self._capture_data = capture_data

        data_len = len(capture_data)
        self.shape = (data_len,)
        self.setpoint_units = ("",)
        self.setpoint_names = ("sample_nr",)
        self.setpoint_labels = ("Sample number",)
        self.setpoints = (tuple(np.arange(0, data_len)),)

    def get_raw(self) -> np.ndarray:
        """
        Public method to access the capture data
        """
        if self._capture_data is None:
            raise ValueError(
                f"Cannot return data for parameter {self.name}. "
                f"Please prepare for readout by calling "
                f"'get_capture_data' with appropriate "
                f"configuration settings"
            )

        return self._capture_data


[docs] class SR86xBuffer(InstrumentChannel): """ Buffer module for the SR86x drivers. This driver has been verified to work with the SR860 and SR865. For reference, please consult the SR860 manual: http://thinksrs.com/downloads/PDFs/Manuals/SR860m.pdf """ def __init__( self, parent: SR86x, name: str, **kwargs: Unpack[InstrumentBaseKWArgs] ) -> None: super().__init__(parent, name, **kwargs) self.capture_length_in_kb: Parameter = self.add_parameter( "capture_length_in_kb", label="get/set capture length", get_cmd="CAPTURELEN?", set_cmd="CAPTURELEN {}", set_parser=self._set_capture_len_parser, get_parser=int, unit="kB", ) """Parameter capture_length_in_kb""" self.bytes_per_sample = 4 self.min_capture_length_in_kb = 1 # i.e. minimum buffer size self.max_capture_length_in_kb = 4096 # i.e. maximum buffer size # Maximum amount of kB that can be read per single CAPTUREGET command self.max_size_per_reading_in_kb = 64 self.capture_config: Parameter = ( self.add_parameter( # Configure which parameters we want to capture "capture_config", label="capture configuration", get_cmd="CAPTURECFG?", set_cmd="CAPTURECFG {}", val_mapping={"X": "0", "X,Y": "1", "R,T": "2", "X,Y,R,T": "3"}, ) ) """Parameter capture_config""" self.capture_rate_max: Parameter = self.add_parameter( "capture_rate_max", label="capture rate maximum", get_cmd="CAPTURERATEMAX?", get_parser=float, ) """Parameter capture_rate_max""" self.capture_rate: Parameter = self.add_parameter( "capture_rate", label="capture rate raw", get_cmd="CAPTURERATE?", set_cmd="CAPTURERATE {}", get_parser=float, set_parser=self._set_capture_rate_parser, ) """Parameter capture_rate""" max_rate = self.capture_rate_max() self.available_frequencies = [max_rate / 2**i for i in range(20)] self.capture_status: Parameter = ( self.add_parameter( # Are we capturing at the moment? "capture_status", label="capture status", get_cmd="CAPTURESTAT?" ) ) """Parameter capture_status""" self.count_capture_bytes: Parameter = self.add_parameter( "count_capture_bytes", label="captured bytes", get_cmd="CAPTUREBYTES?", unit="B", get_parser=int, docstring="Number of bytes captured so far in the buffer. Can be " "used to track live progress.", ) """Number of bytes captured so far in the buffer. Can be used to track live progress.""" self.count_capture_kilobytes: Parameter = self.add_parameter( "count_capture_kilobytes", label="captured kilobytes", get_cmd="CAPTUREPROG?", unit="kB", docstring="Number of kilobytes captured so far in the buffer, " "rounded-up to 2 kilobyte chunks. Capture must be " "stopped before requesting the value of this " "parameter. If the acquisition wrapped during operating " "in Continuous mode, then the returned value is " "simply equal to the current capture length.", ) """ Number of kilobytes captured so far in the buffer, rounded-up to 2 kilobyte chunks. Capture must be stopped before requesting the value of this parameter. If the acquisition wrapped during operating in Continuous mode, then the returned value is simply equal to the current capture length. """ for parameter_name in ["X", "Y", "R", "T"]: self.add_parameter(parameter_name, parameter_class=SR86xBufferReadout)
[docs] def snapshot_base( self, update: bool | None = False, params_to_skip_update: Sequence[str] | None = None, ) -> dict[Any, Any]: if params_to_skip_update is None: params_to_skip_update = [] # we omit count_capture_kilobytes from the snapshot because # it can only be read after a completed capture and will # timeout otherwise when the snapshot is updated, e.g. at # station creation time params_to_skip_update = list(params_to_skip_update) params_to_skip_update.append("count_capture_kilobytes") snapshot = super().snapshot_base(update, params_to_skip_update) return snapshot
def _set_capture_len_parser(self, capture_length_in_kb: int) -> int: """ Parse the capture length in kB according to the way buffer treats it (refer to the manual for details). The given value has to fit in the range and has to be even, otherwise this function raises exceptions. Args: capture_length_in_kb: The desired capture length in kB. Returns: capture_length_in_kb """ if capture_length_in_kb % 2: raise ValueError("The capture length should be an even number") if ( not self.min_capture_length_in_kb <= capture_length_in_kb <= self.max_capture_length_in_kb ): raise ValueError( f"The capture length should be between " f"{self.min_capture_length_in_kb} and " f"{self.max_capture_length_in_kb}" ) return capture_length_in_kb
[docs] def set_capture_rate_to_maximum(self) -> None: """ Sets the capture rate to maximum. The maximum capture rate is retrieved from the device, and depends on the current value of the time constant. """ self.capture_rate(self.capture_rate_max())
def _set_capture_rate_parser(self, capture_rate_hz: float) -> int: """ According to the manual, the capture rate query returns a value in Hz, but then setting this value it is expected to give a value n, where the capture rate in Hz is given by capture_rate_hz = max_rate / 2 ** n. Please see page 136 of the manual. Here n is an integer in the range [0, 20]. Args: capture_rate_hz: The desired capture rate in Hz. If the desired rate is more than 1 Hz from the nearest valid rate, a warning is issued and the nearest valid rate it used. Returns: n_round """ max_rate = self.capture_rate_max() n = np.log2(max_rate / capture_rate_hz) n_round = int(round(n)) if not 0 <= n_round <= 20: raise ValueError( f"The chosen frequency is invalid. Please " f"consult the SR860 manual at page 136. " f"The maximum capture rate is {max_rate}" ) nearest_valid_rate = max_rate / 2**n_round if abs(capture_rate_hz - nearest_valid_rate) > 1: available_frequencies = ", ".join( str(f) for f in self.available_frequencies ) log.warning(f"Warning: Setting capture rate to {nearest_valid_rate:.5} Hz") log.warning(f"The available frequencies are: {available_frequencies}") return n_round
[docs] def start_capture(self, acquisition_mode: str, trigger_mode: str) -> None: """ Start an acquisition. Please see page 137 of the manual for a detailed explanation. Args: acquisition_mode: "ONE" | "CONT" trigger_mode: "IMM" | "TRIG" | "SAMP" """ if acquisition_mode not in ["ONE", "CONT"]: raise ValueError("The acquisition mode needs to be either 'ONE' or 'CONT'") if trigger_mode not in ["IMM", "TRIG", "SAMP"]: raise ValueError( "The trigger mode needs to be either 'IMM', 'TRIG' or 'SAMP'" ) cmd_str = f"CAPTURESTART {acquisition_mode}, {trigger_mode}" self.write(cmd_str)
[docs] def stop_capture(self) -> None: """Stop a capture""" self.write("CAPTURESTOP")
def _get_list_of_capture_variable_names(self) -> list[str]: """ Retrieve the list of names of variables (readouts) that are set to be captured """ return self.capture_config().split(",") def _get_number_of_capture_variables(self) -> int: """ Retrieve the number of variables (readouts) that are set to be captured """ capture_variables = self._get_list_of_capture_variable_names() n_variables = len(capture_variables) return n_variables def _calc_capture_size_in_kb(self, sample_count: int) -> int: """ Given the number of samples to capture, calculate the capture length that the buffer needs to be set to in order to fit the requested number of samples. Note that the number of activated readouts is taken into account. """ n_variables = self._get_number_of_capture_variables() total_size_in_kb = int( np.ceil(n_variables * sample_count * self.bytes_per_sample / 1024) ) # Make sure that the total size in kb is an even number, as expected by # the instrument if total_size_in_kb % 2: total_size_in_kb += 1 return total_size_in_kb
[docs] def set_capture_length_to_fit_samples(self, sample_count: int) -> None: """ Set the capture length of the buffer to fit the given number of samples. Args: sample_count: Number of samples that the buffer has to fit """ total_size_in_kb = self._calc_capture_size_in_kb(sample_count) self.capture_length_in_kb(total_size_in_kb)
[docs] def wait_until_samples_captured(self, sample_count: int) -> None: """ Wait until the given number of samples is captured. This function is blocking and has to be used with caution because it does not have a timeout. Args: sample_count: Number of samples that needs to be captured """ n_captured_bytes = 0 n_variables = self._get_number_of_capture_variables() n_bytes_to_capture = sample_count * n_variables * self.bytes_per_sample while n_captured_bytes < n_bytes_to_capture: n_captured_bytes = self.count_capture_bytes()
[docs] def get_capture_data(self, sample_count: int) -> dict[str, np.ndarray]: """ Read the given number of samples of the capture data from the buffer. Args: sample_count: number of samples to read from the buffer Returns: The keys in the dictionary correspond to the captured variables. For instance, if before the capture, the capture config was set as 'capture_config("X,Y")', then the keys will be "X" and "Y". The values in the dictionary are numpy arrays of numbers. """ total_size_in_kb = self._calc_capture_size_in_kb(sample_count) capture_variables = self._get_list_of_capture_variable_names() n_variables = self._get_number_of_capture_variables() values = self._get_raw_capture_data(total_size_in_kb) # Remove zeros which mark the end part of the buffer that is not # filled with captured data values = values[values != 0] values = values.reshape((-1, n_variables)).T values = values[:, :sample_count] data = {k: v for k, v in zip(capture_variables, values)} for capture_variable in capture_variables: buffer_parameter = getattr(self, capture_variable) buffer_parameter.prepare_readout(data[capture_variable]) return data
def _get_raw_capture_data(self, size_in_kb: int) -> np.ndarray: """ Read data from the buffer from its beginning avoiding the instrument limit of 64 kilobytes per reading. Args: size_in_kb :Size of the data that needs to be read; if it exceeds the capture length, an exception is raised. Returns: A one-dimensional numpy array of the requested data. Note that the returned array contains data for all the variables that are mentioned in the capture config. """ current_capture_length = self.capture_length_in_kb() if size_in_kb > current_capture_length: raise ValueError( f"The size of the requested data ({size_in_kb}kB) " f"is larger than current capture length of the " f"buffer ({current_capture_length}kB)." ) values: np.ndarray = np.array([]) data_size_to_read_in_kb = size_in_kb n_readings = 0 while data_size_to_read_in_kb > 0: offset = n_readings * self.max_size_per_reading_in_kb if data_size_to_read_in_kb > self.max_size_per_reading_in_kb: size_of_this_reading = self.max_size_per_reading_in_kb else: size_of_this_reading = data_size_to_read_in_kb data_from_this_reading = self._get_raw_capture_data_block( size_of_this_reading, offset_in_kb=offset ) values = np.append(values, data_from_this_reading) data_size_to_read_in_kb -= size_of_this_reading n_readings += 1 return values def _get_raw_capture_data_block( self, size_in_kb: int, offset_in_kb: int = 0 ) -> np.ndarray: """ Read data from the buffer. The maximum amount of data that can be read with this function (size_in_kb) is 64kB (this limitation comes from the instrument). The offset argument can be used to navigate along the buffer. An exception will be raised if either size_in_kb or offset_in_kb are longer that the *current* capture length (number of kB of data that is captured so far rounded up to 2kB chunks). If (offset_in_kb + size_in_kb) is longer than the *current* capture length, the instrument returns the wrapped data. For more information, refer to the description of the "CAPTUREGET" command in the manual. Args: size_in_kb: Amount of data in kB that is to be read from the buffer offset_in_kb: Offset within the buffer of where to read the data; for example, when 0 is specified, the data is read from the start of the buffer. Returns: A one-dimensional numpy array of the requested data. Note that the returned array contains data for all the variables that are mentioned in the capture config. """ if size_in_kb > self.max_size_per_reading_in_kb: raise ValueError( f"The size of the requested data ({size_in_kb}kB) " f"is larger than maximum size that can be read " f"at once ({self.max_size_per_reading_in_kb}kB)." ) # Calculate the size of the data captured so far, in kB, rounded up # to 2kB chunks size_of_currently_captured_data = int( np.ceil(np.ceil(self.count_capture_bytes() / 1024) / 2) * 2 ) if size_in_kb > size_of_currently_captured_data: raise ValueError( f"The size of the requested data ({size_in_kb}kB) " f"cannot be larger than the size of currently " f"captured data rounded up to 2kB chunks " f"({size_of_currently_captured_data}kB)" ) if offset_in_kb > size_of_currently_captured_data: raise ValueError( f"The offset for reading the requested data " f"({offset_in_kb}kB) cannot be larger than the " f"size of currently captured data rounded up to " f"2kB chunks " f"({size_of_currently_captured_data}kB)" ) values = self._parent.visa_handle.query_binary_values( f"CAPTUREGET? {offset_in_kb}, {size_in_kb}", datatype="f", is_big_endian=False, expect_termination=False, ) # the sr86x does not include an extra termination char on binary # messages so we set expect_termination to False return np.array(values)
[docs] def capture_one_sample_per_trigger( self, trigger_count: int, start_triggers_pulsetrain: Callable[..., Any] ) -> dict[str, np.ndarray]: """ Capture one sample per each trigger, and return when the specified number of triggers has been received. Args: trigger_count: Number of triggers to capture samples for start_triggers_pulsetrain: By calling this *non-blocking* function, the train of trigger pulses should start Returns: The keys in the dictionary correspond to the captured variables. For instance, if before the capture, the capture config was set as 'capture_config("X,Y")', then the keys will be "X" and "Y". The values in the dictionary are numpy arrays of numbers. """ self.set_capture_length_to_fit_samples(trigger_count) self.start_capture("ONE", "SAMP") start_triggers_pulsetrain() self.wait_until_samples_captured(trigger_count) self.stop_capture() return self.get_capture_data(trigger_count)
[docs] def capture_samples_after_trigger( self, sample_count: int, send_trigger: Callable[..., Any] ) -> dict[str, np.ndarray]: """ Capture a number of samples after a trigger has been received. Please refer to page 135 of the manual for details. Args: sample_count: Number of samples to capture send_trigger: By calling this *non-blocking* function, one trigger should be sent that will initiate the capture Returns: The keys in the dictionary correspond to the captured variables. For instance, if before the capture, the capture config was set as 'capture_config("X,Y")', then the keys will be "X" and "Y". The values in the dictionary are numpy arrays of numbers. """ self.set_capture_length_to_fit_samples(sample_count) self.start_capture("ONE", "TRIG") send_trigger() self.wait_until_samples_captured(sample_count) self.stop_capture() return self.get_capture_data(sample_count)
[docs] def capture_samples(self, sample_count: int) -> dict[str, np.ndarray]: """ Capture a number of samples at a capture rate, starting immediately. Unlike the "continuous" capture mode, here the buffer does not get overwritten with the new data once the buffer is full. The function blocks until the required number of samples is acquired, and returns them. Args: sample_count: Number of samples to capture Returns: The keys in the dictionary correspond to the captured variables. For instance, if before the capture, the capture config was set as 'capture_config("X,Y")', then the keys will be "X" and "Y". The values in the dictionary are numpy arrays of numbers. """ self.set_capture_length_to_fit_samples(sample_count) self.start_capture("ONE", "IMM") self.wait_until_samples_captured(sample_count) self.stop_capture() return self.get_capture_data(sample_count)
class SR86xDataChannel(InstrumentChannel): """ Implements a data channel of SR86x lock-in amplifier. Parameters that are assigned to these channels get plotted on the display of the instrument. Moreover, there are commands that allow to conveniently retrieve the values of the parameters that are currently assigned to the data channels. This class relies on the available parameter names that should be mentioned in the lock-in amplifier class in `PARAMETER_NAMES` attribute. Args: parent: an instance of SR86x driver name: data channel name that is to be used to reference it from the parent cmd_id: this ID is used in VISA commands to refer to this data channel, usually is an integer number cmd_id_name: this name can also be used in VISA commands along with channel_id; it is not used in this implementation, but is added for reference color: every data channel is also referred to by the color with which it is being plotted on the instrument's screen; added here only for reference """ def __init__( self, parent: SR86x, name: str, cmd_id: str, cmd_id_name: str | None = None, color: str | None = None, **kwargs: Unpack[InstrumentBaseKWArgs], ) -> None: super().__init__(parent, name, **kwargs) self._cmd_id = cmd_id self._cmd_id_name = cmd_id_name self._color = color self.assigned_parameter: Parameter = self.add_parameter( "assigned_parameter", label=f"Data channel {cmd_id} parameter", docstring=f"Allows to set and get the " f"parameter that is assigned to data " f"channel {cmd_id}", set_cmd=f"CDSP {cmd_id}, {{}}", get_cmd=f"CDSP? {cmd_id}", val_mapping=self.parent.PARAMETER_NAMES, ) """Allows to set and get the parameter that is assigned to the channel""" @property def cmd_id(self) -> str: return self._cmd_id @property def cmd_id_name(self) -> str | None: return self._cmd_id_name @property def color(self) -> str | None: return self._color
[docs] class SR86x(VisaInstrument): """ Base class for Stanford SR86x Lock-in Amplifier drivers. This class should not be instantiated directly instead one of the model specific sub classes should be used. """ _VOLT_TO_N: ClassVar[dict[int | float, int]] = { 1: 0, 500e-3: 1, 200e-3: 2, 100e-3: 3, 50e-3: 4, 20e-3: 5, 10e-3: 6, 5e-3: 7, 2e-3: 8, 1e-3: 9, 500e-6: 10, 200e-6: 11, 100e-6: 12, 50e-6: 13, 20e-6: 14, 10e-6: 15, 5e-6: 16, 2e-6: 17, 1e-6: 18, 500e-9: 19, 200e-9: 20, 100e-9: 21, 50e-9: 22, 20e-9: 23, 10e-9: 24, 5e-9: 25, 2e-9: 26, 1e-9: 27, } _N_TO_VOLT: ClassVar[dict[int, int | float]] = {v: k for k, v in _VOLT_TO_N.items()} _CURR_TO_N: ClassVar[dict[float, int]] = { 1e-6: 0, 500e-9: 1, 200e-9: 2, 100e-9: 3, 50e-9: 4, 20e-9: 5, 10e-9: 6, 5e-9: 7, 2e-9: 8, 1e-9: 9, 500e-12: 10, 200e-12: 11, 100e-12: 12, 50e-12: 13, 20e-12: 14, 10e-12: 15, 5e-12: 16, 2e-12: 17, 1e-12: 18, 500e-15: 19, 200e-15: 20, 100e-15: 21, 50e-15: 22, 20e-15: 23, 10e-15: 24, 5e-15: 25, 2e-15: 26, 1e-15: 27, } _N_TO_CURR: ClassVar[dict[int, float]] = {v: k for k, v in _CURR_TO_N.items()} _VOLT_ENUM = Enum(*_VOLT_TO_N.keys()) _CURR_ENUM = Enum(*_CURR_TO_N.keys()) _INPUT_SIGNAL_TO_N: ClassVar[dict[str, int]] = { "voltage": 0, "current": 1, } _N_TO_INPUT_SIGNAL: ClassVar[dict[int, str]] = { v: k for k, v in _INPUT_SIGNAL_TO_N.items() } PARAMETER_NAMES: ClassVar[dict[str, str]] = { "X": "0", # X output, 'X' "Y": "1", # Y output, 'Y' "R": "2", # R output, 'R' "P": "3", # theta output, 'THeta' "aux_in1": "4", # Aux In 1, 'IN1' "aux_in2": "5", # Aux In 2, 'IN2' "aux_in3": "6", # Aux In 3, 'IN3' "aux_in4": "7", # Aux In 4, 'IN4' "Xnoise": "8", # X noise, 'XNOise' "Ynoise": "9", # Y noise, 'YNOise' "aux_out1": "10", # Aux Out 1, 'OUT1' "aux_out2": "11", # Aux Out 2, 'OUT2' "phase": "12", # Reference Phase, 'PHAse' "amplitude": "13", # Sine Out Amplitude, 'SAMp' "sine_outdc": "14", # DC Level, 'LEVel' "frequency": "15", # Int. Ref. Frequency, 'FInt' "frequency_ext": "16", # Ext. Ref. Frequency, 'FExt' } _N_DATA_CHANNELS = 4 default_terminator = "\n" def __init__( self, name: str, address: str, max_frequency: float, reset: bool = False, **kwargs: Unpack[VisaInstrumentKWArgs], ): super().__init__(name, address, **kwargs) self._max_frequency = max_frequency # Reference commands self.frequency: Parameter = self.add_parameter( name="frequency", label="Frequency", unit="Hz", get_cmd="FREQ?", set_cmd="FREQ {}", get_parser=float, vals=Numbers(min_value=1e-3, max_value=self._max_frequency), ) """Parameter frequency""" self.sine_outdc: Parameter = self.add_parameter( name="sine_outdc", label="Sine out dc level", unit="V", get_cmd="SOFF?", set_cmd="SOFF {}", get_parser=float, vals=Numbers(min_value=-5, max_value=5), ) """Parameter sine_outdc""" self.amplitude: Parameter = self.add_parameter( name="amplitude", label="Amplitude", unit="V", get_cmd="SLVL?", set_cmd="SLVL {}", get_parser=float, vals=Numbers(min_value=0, max_value=2), ) """Parameter amplitude""" self.harmonic: Parameter = self.add_parameter( name="harmonic", label="Harmonic", get_cmd="HARM?", get_parser=int, set_cmd="HARM {:d}", vals=Ints(min_value=1, max_value=99), ) """Parameter harmonic""" self.phase: Parameter = self.add_parameter( name="phase", label="Phase", unit="deg", get_cmd="PHAS?", set_cmd="PHAS {}", get_parser=float, vals=Numbers(min_value=-3.6e5, max_value=3.6e5), ) """Parameter phase""" # Signal commands self.sensitivity: Parameter = self.add_parameter( name="sensitivity", label="Sensitivity", get_cmd="SCAL?", set_cmd="SCAL {:d}", get_parser=self._get_sensitivity, set_parser=self._set_sensitivity, ) """Parameter sensitivity""" self.filter_slope: Parameter = self.add_parameter( name="filter_slope", label="Filter slope", unit="dB/oct", get_cmd="OFSL?", set_cmd="OFSL {}", val_mapping={6: 0, 12: 1, 18: 2, 24: 3}, ) """Parameter filter_slope""" self.sync_filter: Parameter = self.add_parameter( name="sync_filter", label="Sync filter", get_cmd="SYNC?", set_cmd="SYNC {}", val_mapping={"OFF": 0, "ON": 1}, ) """Parameter sync_filter""" self.noise_bandwidth: Parameter = self.add_parameter( name="noise_bandwidth", label="Noise bandwidth", unit="Hz", get_cmd="ENBW?", get_parser=float, ) """Parameter noise_bandwidth""" self.signal_strength: Parameter = self.add_parameter( name="signal_strength", label="Signal strength indicator", get_cmd="ILVL?", get_parser=int, ) """Parameter signal_strength""" self.signal_input: Parameter = self.add_parameter( name="signal_input", label="Signal input", get_cmd="IVMD?", get_parser=self._get_input_config, set_cmd="IVMD {}", set_parser=self._set_input_config, vals=Enum(*self._INPUT_SIGNAL_TO_N.keys()), ) """Parameter signal_input""" self.input_range: Parameter = self.add_parameter( name="input_range", label="Input range", unit="V", get_cmd="IRNG?", set_cmd="IRNG {}", val_mapping={1: 0, 300e-3: 1, 100e-3: 2, 30e-3: 3, 10e-3: 4}, ) """Parameter input_range""" self.input_config: Parameter = self.add_parameter( name="input_config", label="Input configuration", get_cmd="ISRC?", set_cmd="ISRC {}", val_mapping={"a": 0, "a-b": 1}, ) """Parameter input_config""" self.input_shield: Parameter = self.add_parameter( name="input_shield", label="Input shield", get_cmd="IGND?", set_cmd="IGND {}", val_mapping={"float": 0, "ground": 1}, ) """Parameter input_shield""" self.input_gain: Parameter = self.add_parameter( name="input_gain", label="Input gain", unit="ohm", get_cmd="ICUR?", set_cmd="ICUR {}", val_mapping={1e6: 0, 100e6: 1}, ) """Parameter input_gain""" self.adv_filter: Parameter = self.add_parameter( name="adv_filter", label="Advanced filter", get_cmd="ADVFILT?", set_cmd="ADVFILT {}", val_mapping={"OFF": 0, "ON": 1}, ) """Parameter adv_filter""" self.input_coupling: Parameter = self.add_parameter( name="input_coupling", label="Input coupling", get_cmd="ICPL?", set_cmd="ICPL {}", val_mapping={"ac": 0, "dc": 1}, ) """Parameter input_coupling""" self.time_constant: Parameter = self.add_parameter( name="time_constant", label="Time constant", unit="s", get_cmd="OFLT?", set_cmd="OFLT {}", val_mapping={ 1e-6: 0, 3e-6: 1, 10e-6: 2, 30e-6: 3, 100e-6: 4, 300e-6: 5, 1e-3: 6, 3e-3: 7, 10e-3: 8, 30e-3: 9, 100e-3: 10, 300e-3: 11, 1: 12, 3: 13, 10: 14, 30: 15, 100: 16, 300: 17, 1e3: 18, 3e3: 19, 10e3: 20, 30e3: 21, }, ) """Parameter time_constant""" self.external_reference_trigger: Parameter = self.add_parameter( name="external_reference_trigger", label="External reference trigger mode", get_cmd="RTRG?", set_cmd="RTRG {}", val_mapping={ "SIN": 0, "POS": 1, "POSTTL": 1, "NEG": 2, "NEGTTL": 2, }, docstring="The triggering mode for synchronization of the " "internal reference signal with the externally provided " "one", ) """The triggering mode for synchronization of the internal reference signal with the externally provided one""" self.reference_source: Parameter = self.add_parameter( name="reference_source", label="Reference source", get_cmd="RSRC?", set_cmd="RSRC {}", val_mapping={"INT": 0, "EXT": 1, "DUAL": 2, "CHOP": 3}, docstring="The source of the reference signal", ) """The source of the reference signal""" self.external_reference_trigger_input_resistance: Parameter = ( self.add_parameter( name="external_reference_trigger_input_resistance", label="External reference trigger input resistance", get_cmd="REFZ?", set_cmd="REFZ {}", val_mapping={ "50": 0, "50OHMS": 0, 0: 0, "1M": 1, "1MEG": 1, 1: 1, }, docstring="Input resistance of the input for the external " "reference signal", ) ) """Input resistance of the input for the external reference signal""" # Auto functions self.add_function("auto_range", call_cmd="ARNG") self.add_function("auto_scale", call_cmd="ASCL") self.add_function("auto_phase", call_cmd="APHS") # Data transfer # first 4 parameters from a list of 16 below. self.X: Parameter = self.add_parameter( "X", label="In-phase Magnitude", get_cmd="OUTP? 0", get_parser=float, unit="V", ) """Parameter X""" self.Y: Parameter = self.add_parameter( "Y", label="Out-phase Magnitude", get_cmd="OUTP? 1", get_parser=float, unit="V", ) """Parameter Y""" self.R: Parameter = self.add_parameter( "R", label="Magnitude", get_cmd="OUTP? 2", get_parser=float, unit="V" ) """Parameter R""" self.P: Parameter = self.add_parameter( "P", label="Phase", get_cmd="OUTP? 3", get_parser=float, unit="deg" ) """Parameter P""" self.complex_voltage: Parameter = self.add_parameter( "complex_voltage", label="Voltage", get_cmd=self._get_complex_voltage, unit="V", vals=ComplexNumbers(), ) """Parameter complex_voltage""" # CH1/CH2 Output Commands self.X_offset: Parameter = self.add_parameter( "X_offset", label="X offset ", unit="%", get_cmd="COFP? 0", set_cmd="COFP 0, {}", get_parser=float, vals=Numbers(min_value=-999.99, max_value=999.99), ) """Parameter X_offset""" self.Y_offset: Parameter = self.add_parameter( "Y_offset", label="Y offset", unit="%", get_cmd="COFP? 1", set_cmd="COFP 1, {}", get_parser=float, vals=Numbers(min_value=-999.99, max_value=999.99), ) """Parameter Y_offset""" self.R_offset: Parameter = self.add_parameter( "R_offset", label="R offset", unit="%", get_cmd="COFP? 2", set_cmd="COFP 2, {}", get_parser=float, vals=Numbers(min_value=-999.99, max_value=999.99), ) """Parameter R_offset""" self.X_expand: Parameter = self.add_parameter( "X_expand", label="X expand multiplier", get_cmd="CEXP? 0", set_cmd="CEXP 0, {}", val_mapping={"OFF": "0", "X10": "1", "X100": "2"}, ) """Parameter X_expand""" self.Y_expand: Parameter = self.add_parameter( "Y_expand", label="Y expand multiplier", get_cmd="CEXP? 1", set_cmd="CEXP 1, {}", val_mapping={"OFF": 0, "X10": 1, "X100": 2}, ) """Parameter Y_expand""" self.R_expand: Parameter = self.add_parameter( "R_expand", label="R expand multiplier", get_cmd="CEXP? 2", set_cmd="CEXP 2, {}", val_mapping={"OFF": 0, "X10": 1, "X100": 2}, ) """Parameter R_expand""" # Aux input/output for i in [0, 1, 2, 3]: self.add_parameter( f"aux_in{i}", label=f"Aux input {i}", get_cmd=f"OAUX? {i}", get_parser=float, unit="V", ) self.add_parameter( f"aux_out{i}", label=f"Aux output {i}", get_cmd=f"AUXV? {i}", get_parser=float, set_cmd=f"AUXV {i}, {{}}", unit="V", ) # Data channels: # 'DAT1' (green), 'DAT2' (blue), 'DAT3' (yellow), 'DAT4' (orange) data_channels = ChannelList( self, "data_channels", SR86xDataChannel, snapshotable=False ) for num, color in zip( range(self._N_DATA_CHANNELS), ("green", "blue", "yellow", "orange") ): cmd_id = f"{num}" cmd_id_name = f"DAT{num + 1}" ch_name = f"data_channel_{num + 1}" data_channel = SR86xDataChannel(self, ch_name, cmd_id, cmd_id_name, color) data_channels.append(data_channel) self.add_submodule(ch_name, data_channel) self.add_submodule("data_channels", data_channels.to_channel_tuple()) # Interface self.add_function("reset", call_cmd="*RST") self.add_function("disable_front_panel", call_cmd="OVRM 0") self.add_function("enable_front_panel", call_cmd="OVRM 1") buffer = SR86xBuffer(self, f"{self.name}_buffer") self.add_submodule("buffer", buffer) self.input_config() self.connect_message() def _set_units(self, unit: str) -> None: for param in [self.X, self.Y, self.R, self.sensitivity]: param.unit = unit def _get_complex_voltage(self) -> complex: x, y = self.get_values("X", "Y") return x + 1.0j * y def _get_input_config(self, s: int) -> str: mode = self._N_TO_INPUT_SIGNAL[int(s)] if mode == "voltage": self.sensitivity.vals = self._VOLT_ENUM self._set_units("V") else: self.sensitivity.vals = self._CURR_ENUM self._set_units("A") return mode def _set_input_config(self, s: str) -> int: if s == "voltage": self.sensitivity.vals = self._VOLT_ENUM self._set_units("V") else: self.sensitivity.vals = self._CURR_ENUM self._set_units("A") return self._INPUT_SIGNAL_TO_N[s] def _get_sensitivity(self, s: int) -> float: if self.signal_input() == "voltage": return self._N_TO_VOLT[int(s)] else: return self._N_TO_CURR[int(s)] def _set_sensitivity(self, s: float) -> int: if self.signal_input() == "voltage": return self._VOLT_TO_N[s] else: return self._CURR_TO_N[s]
[docs] def get_values(self, *parameter_names: str) -> tuple[float, ...]: """ Get values of 2 or 3 parameters that are measured by the lock-in amplifier. These values are guaranteed to come from the same measurement cycle as opposed to getting values of parameters one by one (for example, by calling `sr.X()`, and then `sr.Y()`. Args: *parameter_names: 2 or 3 names of parameters for which the values are requested; valid names can be found in `PARAMETER_NAMES` attribute of the driver class Returns: a tuple of 2 or 3 floating point values """ if not 2 <= len(parameter_names) <= 3: raise KeyError( "It is only possible to request values of 2 or 3 parameters " "at a time." ) for name in parameter_names: if name not in self.PARAMETER_NAMES: raise KeyError( f"{name} is not a valid parameter name. Refer " f"to `PARAMETER_NAMES` for a list of valid " f"parameter names" ) p_ids = [self.PARAMETER_NAMES[name] for name in parameter_names] output = self.ask(f'SNAP? {",".join(p_ids)}') return tuple(float(val) for val in output.split(","))
[docs] def get_data_channels_values(self) -> tuple[float, ...]: """ Queries the current values of the data channels Returns: tuple of 4 values of the data channels """ output = self.ask("SNAPD?") return tuple(float(val) for val in output.split(","))
[docs] def get_data_channels_parameters( self, query_instrument: bool = True ) -> tuple[str, ...]: """ Convenience method to query a list of parameters which the data channels are currently assigned to. Args: query_instrument: If set to False, the internally cashed names of the parameters will be returned; if True, then the names will be queried through the instrument Returns: a tuple of 4 strings of parameter names """ if query_instrument: method_name = "get" else: method_name = "get_latest" return tuple( getattr(getattr(self.data_channels[i], "assigned_parameter"), method_name)() for i in range(self._N_DATA_CHANNELS) )
[docs] def get_data_channels_dict(self, requery_names: bool = False) -> dict[str, float]: """ Returns a dictionary where the keys are parameter names currently assigned to the data channels, and values are the values of those parameters. Args: requery_names: if False, the currently assigned parameter names will not be queries from the instrument in order to save time on communication, in this case the cached assigned parameter names will be used for the keys of the dicitonary; if True, the assigned parameter names will be queried from the instrument Returns: a dictionary where keys are names of parameters assigned to the data channels, and values are the values of those parameters """ parameter_names = self.get_data_channels_parameters(requery_names) parameter_values = self.get_data_channels_values() return dict(zip(parameter_names, parameter_values))