Source code for qcodes.instrument_drivers.Keysight.Infiniium

import re
from pathlib import Path
from typing import TYPE_CHECKING, Any, ClassVar, Literal

import numpy as np
from pyvisa import VisaIOError
from pyvisa.constants import StatusCode

import qcodes.validators as vals
from qcodes.instrument import (
    ChannelList,
    InstrumentBase,
    InstrumentBaseKWArgs,
    InstrumentChannel,
    InstrumentModule,
    VisaInstrument,
    VisaInstrumentKWArgs,
)
from qcodes.parameters import (
    Parameter,
    ParameterBase,
    ParameterWithSetpoints,
    create_on_off_val_mapping,
)

if TYPE_CHECKING:
    from collections.abc import Sequence

    from typing_extensions import Unpack


class DSOTimeAxisParam(Parameter):
    """
    Time axis parameter for the Infiniium series DSO.
    """

    def __init__(self, xorigin: float, xincrement: float, points: int, **kwargs: Any):
        """
        Initialize time axis. If values are unknown, they can be initialized to zero and
        filled in later.
        """
        super().__init__(**kwargs)

        self.xorigin = xorigin
        self.xincrement = xincrement
        self.points = points

    def get_raw(self) -> np.ndarray:
        """
        Return the array corresponding to this time axis.
        """
        return np.linspace(
            self.xorigin,
            self.xorigin + self.points * self.xincrement,
            self.points,
            endpoint=False,
        )


class DSOFrequencyAxisParam(Parameter):
    """
    Frequency axis parameter for the Infiniium series DSO.
    """

    def __init__(self, xorigin: float, xincrement: float, points: int, **kwargs: Any):
        """
        Initialize frequency axis. If values are unknown, they can be initialized
        to zero and filled in later.
        """
        super().__init__(**kwargs)

        self.xorigin = xorigin
        self.xincrement = xincrement
        self.points = points

    def get_raw(self) -> np.ndarray:
        """
        Return the array corresponding to this time axis.
        """
        return np.linspace(
            self.xorigin,
            self.xorigin + self.points * self.xincrement,
            self.points,
            endpoint=False,
        )


class DSOTraceParam(ParameterWithSetpoints):
    """
    Trace parameter for the Infiniium series DSO
    """

    UNIT_MAP: ClassVar[dict[int, str]] = {
        0: "UNKNOWN",
        1: "V",
        2: "s",
        3: "''",
        4: "A",
        5: "dB",
    }

    def __init__(
        self,
        name: str,
        instrument: "KeysightInfiniiumChannel | KeysightInfiniiumFunction",
        channel: str,
        **kwargs: Any,
    ):
        """
        Initialize DSOTraceParam bound to a specific channel.
        """
        self._ch_valid = False
        super().__init__(name, instrument=instrument, **kwargs)
        self._channel = channel
        # This parameter will be updated prior to being retrieved if
        # self.root_instrument.auto_digitize is true.
        self._points = 0
        self._yoffset = 0.0
        self._yincrement = 0.0
        self._unit = 0

    @property
    def setpoints(self) -> "Sequence[ParameterBase]":
        """
        Overwrite setpoint parameter to update setpoints if auto_digitize is true
        """
        instrument = self.instrument
        if isinstance(instrument, KeysightInfiniiumChannel):
            root_instrument: KeysightInfiniium
            root_instrument = self.root_instrument  # type: ignore[assignment]
            cache_setpoints = root_instrument.cache_setpoints()
            if not cache_setpoints:
                self.update_setpoints()
            return (instrument.time_axis,)
        elif isinstance(instrument, KeysightInfiniiumFunction):
            if instrument.function().startswith("FFT"):
                self.update_fft_setpoints()
                return (instrument.frequency_axis,)
            else:
                self.update_setpoints()
                return (instrument.time_axis,)
        raise RuntimeError("Invalid type for parent instrument.")

    @setpoints.setter
    def setpoints(self, setpoints: Any) -> None:
        """
        Stub to allow initialization. Ignore any set attempts on setpoint as we
        figure it out on the fly.
        """
        return

    @property
    def unit(self) -> str:
        """
        Return the units for this measurement.
        """
        if self._ch_valid is False:
            return "''"
        elif self._unit != 0:
            return self.UNIT_MAP[self._unit]
        elif self.instrument is not None:
            self.instrument.write(f":WAV:SOUR {self._channel}")
            return self.instrument.ask(":WAV:YUN?")
        return "''"

    @unit.setter
    def unit(self, unit: Any) -> None:
        """
        Stub to allow initialization.
        """
        return

    def update_setpoints(self, preamble: "Sequence[str] | None" = None) -> None:
        """
        Update waveform parameters. Must be called before data
        acquisition if instr.cache_setpoints is False
        """
        instrument: KeysightInfiniiumChannel | KeysightInfiniiumFunction
        instrument = self.instrument  # type: ignore[assignment]
        if preamble is None:
            instrument.write(f":WAV:SOUR {self._channel}")
            preamble = instrument.ask(":WAV:PRE?").strip().split(",")
        self._points = int(preamble[2])
        self._yincrement = float(preamble[7])
        self._yoffset = float(preamble[8])
        self._unit = int(preamble[21])
        instrument.time_axis.points = int(preamble[2])
        instrument.time_axis.xorigin = float(preamble[5])
        instrument.time_axis.xincrement = float(preamble[4])
        self._ch_valid = True

    def update_fft_setpoints(self) -> None:
        """
        Update waveform parameters for an FFT.
        """
        instrument: KeysightInfiniiumFunction = self.instrument  # type: ignore[assignment]
        instrument.write(f":WAV:SOUR {self._channel}")
        preamble = instrument.ask(":WAV:PRE?").strip().split(",")
        self.update_setpoints(preamble)
        instrument.frequency_axis.points = int(preamble[2])
        instrument.frequency_axis.xorigin = float(preamble[5])
        instrument.frequency_axis.xincrement = float(preamble[4])

    def get_raw(self) -> np.ndarray:
        """
        Get waveform data from scope
        """
        if self.instrument is None:
            raise RuntimeError("Cannot get data without instrument")
        root_instr: KeysightInfiniium = self.root_instrument  # type: ignore[assignment]
        # Check if we can use cached trace parameters
        if not root_instr.cache_setpoints():
            self.update_setpoints()
        if not self._ch_valid:
            raise RuntimeError(
                "Trace parameters are unknown. If cache_setpoints is True, "
                "you must manually call instr.chX.update_setpoints at least"
                "once prior to measurement."
            )

        # Check if we should run a new sweep
        if root_instr.auto_digitize():
            root_instr.digitize()
        # Ask for waveform data
        root_instr.write(f":WAV:SOUR {self._channel}")
        root_instr.write(":WAV:DATA?")
        # Ignore first two bytes, which should be "#0"
        _ = root_instr.visa_handle.read_bytes(2)
        data: np.ndarray
        data = root_instr.visa_handle.read_binary_values(  # type: ignore[assignment]
            "h",
            container=np.ndarray,
            header_fmt="empty",
            expect_termination=True,
            data_points=self._points,
        )
        data = data.astype(np.float64)
        data = (data * self._yincrement) + self._yoffset
        return data


class AbstractMeasurementSubsystem(InstrumentModule):
    """
    Submodule containing the measurement subsystem commands and associated
    parameters.

    Note: these commands are executed on the waveform in the scope buffer.
    If you need to ensure a fresh value, run dso.digitize() prior to reading
    the measurement value.
    """

    def __init__(
        self,
        parent: InstrumentBase,
        name: str,
        **kwargs: "Unpack[InstrumentBaseKWArgs]",
    ) -> None:
        """
        Add parameters to measurement subsystem. Note: This should not be initialized
        directly, rather initialize BoundMeasurementSubsystem
        or UnboundMeasurementSubsystem.
        """
        super().__init__(parent, name, **kwargs)

        ###################################
        # Voltage Parameters
        self.amplitude: Parameter = Parameter(
            name="amplitude",
            instrument=self,
            label="Voltage amplitude",
            get_cmd=self._create_query("VAMP"),
            get_parser=float,
            unit="V",
            snapshot_value=False,
        )
        self.average: Parameter = Parameter(
            name="average",
            instrument=self,
            label="Voltage average",
            get_cmd=self._create_query("VAV", "DISP"),
            get_parser=float,
            unit="V",
            snapshot_value=False,
        )
        self.base: Parameter = Parameter(
            name="base",
            instrument=self,
            label="Statistical base",
            get_cmd=self._create_query("VBAS"),
            get_parser=float,
            unit="V",
            snapshot_value=False,
        )
        # Threshold Voltage Measurements - this measurement ignores overshoot
        # in the data
        self.vlow: Parameter = Parameter(
            name="vlow",
            instrument=self,
            label="Lower threshold voltage",
            get_cmd=self._create_query("VLOW"),
            get_parser=float,
            unit="V",
            snapshot_value=False,
        )
        self.vmid: Parameter = Parameter(
            name="vmid",
            instrument=self,
            label="Middle threshold voltage",
            get_cmd=self._create_query("VMID"),
            get_parser=float,
            unit="V",
            snapshot_value=False,
        )
        self.vup: Parameter = Parameter(
            name="vup",
            instrument=self,
            label="Upper threshold voltage",
            get_cmd=self._create_query("VUPP"),
            get_parser=float,
            unit="V",
            snapshot_value=False,
        )
        # Limit values - the minimum/maximum shown on screen
        self.vmin: Parameter = Parameter(
            name="vmin",
            instrument=self,
            label="Voltage minimum",
            get_cmd=self._create_query("VMIN"),
            get_parser=float,
            unit="V",
            snapshot_value=False,
        )
        self.vmax: Parameter = Parameter(
            name="vmax",
            instrument=self,
            label="Voltage maximum",
            get_cmd=self._create_query("VMAX"),
            get_parser=float,
            unit="V",
            snapshot_value=False,
        )
        # Waveform Parameters
        self.overshoot: Parameter = Parameter(
            name="overshoot",
            instrument=self,
            label="Voltage overshoot",
            get_cmd=self._create_query("VOV"),
            get_parser=float,
            unit="V",
            snapshot_value=False,
        )
        self.vpp = Parameter(
            name="vpp",
            instrument=self,
            label="Voltage peak-to-peak",
            get_cmd=self._create_query("VPP"),
            get_parser=float,
            unit="V",
            snapshot_value=False,
        )
        self.vrms: Parameter = Parameter(
            name="vrms",
            instrument=self,
            label="Voltage RMS",
            get_cmd=self._create_query("VRMS", "CYCL,AC"),
            get_parser=float,
            unit="V_rms",
            snapshot_value=False,
        )
        self.vrms_dc: Parameter = Parameter(
            name="vrms_dc",
            instrument=self,
            label="Voltage RMS with DC Component",
            get_cmd=self._create_query("VRMS", "CYCL,DC"),
            get_parser=float,
            unit="V_rms",
            snapshot_value=False,
        )

        ###################################
        # Time Parameters
        self.rise_time: Parameter = Parameter(
            name="rise_time",
            instrument=self,
            label="Rise time",
            get_cmd=self._create_query("RIS"),
            get_parser=float,
            unit="s",
            snapshot_value=False,
        )
        self.fall_time: Parameter = Parameter(
            name="fall_time",
            instrument=self,
            label="Fall time",
            get_cmd=self._create_query("FALL"),
            get_parser=float,
            unit="s",
            snapshot_value=False,
        )
        self.duty_cycle: Parameter = Parameter(
            name="duty_cycle",
            instrument=self,
            label="Duty Cycle",
            get_cmd=self._create_query("DUTY"),
            get_parser=float,
            unit="%",
            snapshot_value=False,
        )
        self.period: Parameter = Parameter(
            name="period",
            instrument=self,
            label="Period",
            get_cmd=self._create_query("PER"),
            get_parser=float,
            unit="s",
            snapshot_value=False,
        )
        self.frequency: Parameter = Parameter(
            name="frequency",
            instrument=self,
            label="Signal frequency",
            get_cmd=self._create_query("FREQ"),
            get_parser=float,
            unit="Hz",
            docstring="""
                                     measure the frequency of the first
                                     complete cycle on the screen using
                                     the mid-threshold levels of the waveform
                                     """,
            snapshot_value=False,
        )
        self.slew_rate: Parameter = Parameter(
            name="slew_rate",
            instrument=self,
            label="Slew rate",
            get_cmd=self._create_query("SLEW"),
            get_parser=float,
            unit="S",
            snapshot_value=False,
        )

        ###################################
        # Deprecated parameter aliases
        self.rms = self.vrms_dc
        self.rms_no_dc = self.vrms
        self.min = self.vmin
        self.middle = self.vmid
        self.max = self.vmax
        self.lower = self.vlow

    def _create_query(self, cmd: str, pre_cmd: str = "", post_cmd: str = "") -> str:
        """
        Create a query string with the correct source included
        """
        chan_str = self._channel
        if chan_str:
            if pre_cmd:
                chan_str = f",{chan_str}"
            if post_cmd:
                chan_str = f"{chan_str},"
        elif pre_cmd and post_cmd:
            pre_cmd = f"{pre_cmd},"
        return f":MEAS:{cmd}? {pre_cmd}{chan_str}{post_cmd}".strip()


[docs] class KeysightInfiniiumBoundMeasurement(AbstractMeasurementSubsystem): def __init__( self, parent: "KeysightInfiniiumChannel | KeysightInfiniiumFunction", name: str, **kwargs: "Unpack[InstrumentBaseKWArgs]", ): """ Initialize measurement subsystem bound to a specific channel """ # Bind the channel self._channel = parent.channel_name # Initialize measurement parameters super().__init__(parent, name, **kwargs)
BoundMeasurement = KeysightInfiniiumBoundMeasurement """ Alias for backwards compatibility """
[docs] class KeysightInfiniiumUnboundMeasurement(AbstractMeasurementSubsystem): def __init__( self, parent: "KeysightInfiniium", name: str, **kwargs: "Unpack[InstrumentBaseKWArgs]", ): """ Initialize measurement subsystem where target is set by the parameter `source`. """ # Blank channel self._channel = "" # Initialize measurement parameters super().__init__(parent, name, **kwargs) self.source = Parameter( name="source", instrument=self, label="Primary measurement source", set_cmd=self._set_source, get_cmd=self._get_source, snapshot_value=False, ) def _validate_source(self, source: str) -> str: """Validate and set the source.""" valid_channels = f"CHAN[1-{self.root_instrument.no_channels}]" if re.fullmatch(valid_channels, source): if not int(self.ask(f"CHAN{source[-1]}:DISP?")): raise ValueError(f"Channel {source[-1]} not turned on.") return source if re.fullmatch("DIFF[1-2]", source): diff_chan = (int(source[-1]) - 1) * 2 + 1 if int(self.ask(f"CHAN{diff_chan}:DIFF?")) != 1: raise ValueError(f"Differential channel {source[-1]} not turned on.") return source if re.fullmatch("COMM[1-2]", source): diff_chan = (int(source[-1]) - 1) * 2 + 1 if int(self.ask(f"CHAN{diff_chan}:DIFF?")) != 1: raise ValueError(f"Differential channel {source[-1]} not turned on.") return source if re.fullmatch("WMEM[1-4]", source): return source match = re.fullmatch("FUNC([1-9]{1,2})", source) if match: func_chan = int(match.groups()[0]) if not (1 <= func_chan <= 16): raise ValueError( f"Function number should be in the range 1-16. Got {func_chan}." ) if not int(self.ask(f"FUNC{func_chan}:DISP?")): raise ValueError(f"Function {func_chan} is not enabled.") return f"FUNC{func_chan}" raise ValueError( f"Invalid measurement source {source}. Valid values are: (" "CHAN[1-4], DIFF[1-2], COMM[1-2], WMEM[1-4], FUNC[1-16])." ) def _set_source(self, source: str) -> None: source = self._validate_source(source) self._channel = source # Then set the measurement source self.write(f":MEAS:SOUR {self._channel}") def _get_source(self) -> str: if self._channel == "": source = self.ask(":MEAS:SOUR?") self._channel = source.strip().split(",")[0] return self._channel
UnboundMeasurement = KeysightInfiniiumUnboundMeasurement """ Alias for backwards compatibility """
[docs] class KeysightInfiniiumFunction(InstrumentChannel): def __init__( self, parent: "KeysightInfiniium", name: str, channel: int, **kwargs: "Unpack[InstrumentBaseKWArgs]", ): """ Initialize an infiniium channel. """ self._channel = channel super().__init__(parent, name, **kwargs) # display self.display = Parameter( name="display", instrument=self, label=f"Function {channel} display on/off", set_cmd=f"FUNC{channel}:DISP {{}}", get_cmd=f"FUNC{channel}:DISP?", val_mapping=create_on_off_val_mapping(on_val=1, off_val=0), ) # Retrieve basic settings of the function self.function: Parameter = Parameter( name="function", instrument=self, label=f"Function {channel} function", get_cmd=self._get_func, vals=vals.Strings(), ) self.source: Parameter = Parameter( name="source", instrument=self, label=f"Function {channel} source", get_cmd=f"FUNC{channel}?", ) # Trace settings self.points: Parameter = Parameter( name="points", instrument=self, label=f"Function {channel} points", get_cmd=self._get_points, ) self.frequency_axis = DSOFrequencyAxisParam( name="frequency_axis", instrument=self, label="Frequency", unit="Hz", xorigin=0.0, xincrement=0.0, points=1, vals=vals.Arrays(shape=(self.points,)), snapshot_value=False, ) self.time_axis = DSOTimeAxisParam( name="time_axis", instrument=self, label="Time", unit="s", xorigin=0.0, xincrement=0.0, points=1, vals=vals.Arrays(shape=(self.points,)), snapshot_value=False, ) self.trace = DSOTraceParam( name="trace", instrument=self, label=f"Function {channel} trace", channel=self.channel_name, vals=vals.Arrays(shape=(self.points,)), snapshot_value=False, ) # Measurement subsystem self.add_submodule( "measure", KeysightInfiniiumBoundMeasurement(self, "measure") ) @property def channel(self) -> int: return self._channel @property def channel_name(self) -> str: return f"FUNC{self._channel}" def _get_points(self) -> int: """ Return the number of points in the current function. This may be different to the number of points in the source as often functions modify the number of points. """ self.write(f":WAV:SOUR {self.channel_name}") return int(self.ask(":WAV:POIN?")) def _get_func(self) -> str: """ Return the function applied to the sources for this function """ try: self.write(":SYST:HEAD ON") func, _ = self.ask(f":{self.channel_name}?").strip().split() match = re.fullmatch(f":{self.channel_name}:([\\w]+)", func) if match: return match.groups()[0] raise ValueError( f"Couldn't extract function for {self.channel_name}. Got {func}" ) finally: self.write(":SYST:HEAD OFF")
InfiniiumFunction = KeysightInfiniiumFunction """ Alias for backwards compatibility """
[docs] class KeysightInfiniiumChannel(InstrumentChannel): def __init__( self, parent: "KeysightInfiniium", name: str, channel: int, **kwargs: "Unpack[InstrumentBaseKWArgs]", ): """ Initialize an infiniium channel. """ self._channel = channel super().__init__(parent, name, **kwargs) # input # On MXR/EXR-Series oscilloscopes: # DC — DC coupling, 1 MΩ impedance. # DC50 | DCFifty — DC coupling, 50Ω impedance. # AC — AC coupling, 1 MΩ impedance. # LFR1 | LFR2 — AC 1 MΩ input impedance. # When no probe is attached, the coupling for each channel can be AC, DC, DC50, or DCFifty. # If you have an 1153A probe attached, the valid parameters are DC, LFR1, and LFR2 (low-frequency reject). self.input: Parameter = Parameter( name="input", instrument=self, label=f"Channel {channel} input coupling & impedance", set_cmd=f"CHAN{channel}:INP {{}}", get_cmd=f"CHAN{channel}:INP?", vals=vals.Enum("DC", "DC50", "AC", "LFR1", "LFR2"), ) # display self.display: Parameter = Parameter( name="display", instrument=self, label=f"Channel {channel} display on/off", set_cmd=f"CHAN{channel}:DISP {{}}", get_cmd=f"CHAN{channel}:DISP?", val_mapping=create_on_off_val_mapping(on_val=1, off_val=0), ) # scaling self.offset: Parameter = Parameter( name="offset", instrument=self, label=f"Channel {channel} offset", set_cmd=f"CHAN{channel}:OFFS {{}}", unit="V", get_cmd=f"CHAN{channel}:OFFS?", get_parser=float, ) self.range: Parameter = Parameter( name="range", instrument=self, label=f"Channel {channel} range", unit="V", set_cmd=f"CHAN{channel}:RANG {{}}", get_cmd=f"CHAN{channel}:RANG?", get_parser=float, vals=vals.Numbers(), ) # Trigger level self.trigger_level: Parameter = Parameter( name="trigger_level", instrument=self, label=f"Channel {channel} trigger level", unit="V", set_cmd=f":TRIG:LEV CHAN{channel},{{}}", get_cmd=f":TRIG:LEV? CHAN{channel}", get_parser=float, vals=vals.Numbers(), ) # Trace data self.time_axis = DSOTimeAxisParam( name="time_axis", instrument=self, label="Time", unit="s", xorigin=0.0, xincrement=0.0, points=1, vals=vals.Arrays(shape=(self.parent.acquire_points,)), snapshot_value=False, ) self.trace = DSOTraceParam( name="trace", instrument=self, label=f"Channel {channel} trace", unit="V", channel=self.channel_name, vals=vals.Arrays(shape=(self.parent.acquire_points,)), snapshot_value=False, ) # Measurement subsystem self.add_submodule( "measure", KeysightInfiniiumBoundMeasurement(self, "measure") ) @property def channel(self) -> int: return self._channel @property def channel_name(self) -> str: return f"CHAN{self._channel}"
[docs] def update_setpoints(self) -> None: """ Update time axis and offsets for this channel. Calling this function is required when instr.cache_setpoints is True whenever the scope parameters are changed. """ self.trace.update_setpoints()
InfiniiumChannel = KeysightInfiniiumChannel """ Alias for backwards compatibility """
[docs] class KeysightInfiniium(VisaInstrument): """ This is the QCoDeS driver for the Keysight Infiniium oscilloscopes """ default_timeout = 20 default_terminator = "\n" def __init__( self, name: str, address: str, channels: int = 4, silence_pyvisapy_warning: bool = False, **kwargs: "Unpack[VisaInstrumentKWArgs]", ): """ Initialises the oscilloscope. Args: name: Name of the instrument used by QCoDeS address: Instrument address as used by VISA timeout: Visa timeout, in secs. channels: The number of channels on the scope. silence_pyvisapy_warning: Don't warn about pyvisa-py at startup **kwargs: kwargs are forwarded to base class. """ super().__init__(name, address, **kwargs) self.connect_message() # Check if we are using pyvisa-py as our visa lib and warn users that # this may cause long digitize operations to fail if ( self.visa_handle.visalib.library_path == "py" and not silence_pyvisapy_warning ): self.log.warning( "Timeout not handled correctly in pyvisa_py. This may cause" " long acquisitions to fail. Either use ni/keysight visalib" " or set timeout to longer than longest expected acquisition" " time." ) # switch the response header off else none of our parameters will work self.write(":SYSTem:HEADer OFF") # Then set up the data format used to retrieve waveforms self.write(":WAVEFORM:FORMAT WORD") self.write(":WAVEFORM:BYTEORDER LSBFirst") self.write(":WAVEFORM:STREAMING ON") # Query the oscilloscope parameters # Set sample rate, bandwidth and memory depth limits self._query_capabilities() # Number of channels can't be queried on most older scopes. Use a parameter # for now. self.no_channels = channels # Run state self.run_mode: Parameter = Parameter( name="run_mode", instrument=self, label="run mode", get_cmd=":RST?", vals=vals.Enum("RUN", "STOP", "SING"), ) # Timing Parameters self.timebase_range: Parameter = Parameter( name="timebase_range", instrument=self, label="Range of the time axis", unit="s", get_cmd=":TIM:RANG?", set_cmd=":TIM:RANG {}", vals=vals.Numbers(5e-12, 20), get_parser=float, ) self.timebase_position: Parameter = Parameter( name="timebase_position", instrument=self, label="Offset of the time axis", unit="s", get_cmd=":TIM:POS?", set_cmd=":TIM:POS {}", vals=vals.Numbers(), get_parser=float, ) self.timebase_roll_enabled: Parameter = Parameter( name="timebase_roll_enabled", instrument=self, label="Is rolling mode enabled", get_cmd=":TIM:ROLL:ENABLE?", set_cmd=":TIM:ROLL:ENABLE {}", val_mapping={True: 1, False: 0}, ) # Trigger self.trigger_mode: Parameter = Parameter( name="trigger_mode", instrument=self, label="Trigger mode", get_cmd=":TRIG:MODE?", ) self.trigger_sweep: Parameter = Parameter( name="trigger_sweep", instrument=self, label="Trigger sweep mode", get_cmd=":TRIG:SWE?", set_cmd=":TRIG:SWE {}", vals=vals.Enum("AUTO", "TRIG"), ) self.trigger_state: Parameter = Parameter( name="trigger_state", instrument=self, label="Trigger state", get_cmd=":AST?", vals=vals.Enum("ARM", "TRIG", "ATRIG", "ADONE"), snapshot_value=False, ) # Edge trigger parameters # Note that for now we only support parameterized edge triggers - this may # be something worth expanding. # To set trigger level, use the "trigger_level" parameter in each channel self.trigger_edge_source: Parameter = Parameter( name="trigger_edge_source", instrument=self, label="Source channel for the edge trigger", get_cmd=":TRIGger:EDGE:SOURce?", set_cmd=":TRIGger:EDGE:SOURce {}", vals=vals.Enum( *( [f"CHAN{i}" for i in range(1, self.no_channels + 1)] + [f"DIG{i}" for i in range(16 + 1)] + ["AUX", "LINE"] ) ), ) self.trigger_edge_slope: Parameter = Parameter( name="trigger_edge_slope", instrument=self, label="slope of the edge trigger", get_cmd=":TRIGger:EDGE:SLOPe?", set_cmd=":TRIGger:EDGE:SLOPe {}", vals=vals.Enum("POS", "POSITIVE", "NEG", "NEGATIVE", "EITH"), ) self.trigger_level_aux: Parameter = Parameter( name="trigger_level_aux", instrument=self, label="Trigger level AUX", unit="V", get_cmd=":TRIGger:LEVel? AUX", set_cmd=":TRIGger:LEVel AUX,{}", get_parser=float, vals=vals.Numbers(), ) # Aquisition # If sample points, rate and timebase_scale are set in an # incomensurate way, the scope only displays part of the waveform self.acquire_points: Parameter = Parameter( name="acquire_points", instrument=self, label="sample points", get_cmd=":ACQ:POIN?", set_cmd=":ACQ:POIN {}", get_parser=int, vals=vals.Numbers(min_value=self.min_pts, max_value=self.max_pts), ) self.sample_rate: Parameter = Parameter( name="sample_rate", instrument=self, label="sample rate", get_cmd=":ACQ:SRAT?", set_cmd=":ACQ:SRAT {}", unit="Hz", get_parser=float, vals=vals.Numbers(min_value=self.min_srat, max_value=self.max_srat), ) # Note: newer scopes allow a per-channel bandwidth. This is not implemented yet. self.bandwidth: Parameter = Parameter( name="bandwidth", instrument=self, label="bandwidth", get_cmd=":ACQ:BAND?", set_cmd=":ACQ:BAND {}", unit="Hz", get_parser=float, vals=vals.Numbers(min_value=self.min_bw, max_value=self.max_bw), ) self.acquire_interpolate: Parameter = Parameter( name="acquire_interpolate", instrument=self, get_cmd=":ACQ:INTerpolate?", set_cmd=":ACQuire:INTerpolate {}", vals=vals.Enum(0, 1, "INT1", "INT2", "INT4", "INT8", "INT16", "INT32"), ) self.acquire_mode: Parameter = Parameter( name="acquire_mode", instrument=self, label="Acquisition mode", get_cmd="ACQuire:MODE?", set_cmd="ACQuire:MODE {}", vals=vals.Enum( "ETIMe", "RTIMe", "PDETect", "HRESolution", "SEGMented", "SEGPdetect", "SEGHres", ), ) self.average: Parameter = Parameter( name="average", instrument=self, label="Averages", get_cmd=self._get_avg, set_cmd=self._set_avg, vals=vals.Ints(min_value=1, max_value=10486575), ) # Automatically digitize before acquiring a trace self.auto_digitize: Parameter = Parameter( name="auto_digitize", instrument=self, label="Auto digitize", set_cmd=None, get_cmd=None, val_mapping=create_on_off_val_mapping(), docstring=( "Digitize before each waveform download. " "If you need to acquire from multiple channels simultaneously " "or you wish to acquire with the scope running freely, " "set this value to False." ), initial_value=True, ) self.cache_setpoints: Parameter = Parameter( name="cache_setpoints", instrument=self, label="Cache setpoints", set_cmd=None, get_cmd=None, val_mapping=create_on_off_val_mapping(), docstring=( "Cache setpoints. If false, the preamble is queried before each" " acquisition, which may add latency to measurements. If you" " are taking repeated measurements, set this to True and update" " setpoints manually by calling `instr.chX.update_setpoints()`." ), initial_value=False, ) # Channels _channels = ChannelList( self, "channels", KeysightInfiniiumChannel, snapshotable=False ) for i in range(1, self.no_channels + 1): channel = KeysightInfiniiumChannel(self, f"chan{i}", i) _channels.append(channel) self.add_submodule(f"ch{i}", channel) self.add_submodule("channels", _channels.to_channel_tuple()) # Functions _functions = ChannelList( self, "functions", KeysightInfiniiumFunction, snapshotable=False ) for i in range(1, 16 + 1): function = KeysightInfiniiumFunction(self, f"func{i}", i) _functions.append(function) self.add_submodule(f"func{i}", function) # Have to call channel list "funcs" here as functions is a # reserved name in Instrument. self.add_submodule("funcs", _functions.to_channel_tuple()) # Submodules meassubsys = KeysightInfiniiumUnboundMeasurement(self, "measure") self.add_submodule("measure", meassubsys) def _query_capabilities(self) -> None: """ Query scope capabilities (sample rate, bandwidth, memory depth) """ try: # Bandwidth self.min_bw, self.max_bw = 0.0, 99.0e9 # Set default limits bw = self.ask(":ACQ:BAND:TESTLIMITS?") match = re.fullmatch( r"1,<numeric>([0-9.]+E\+[0-9]+):([0-9.]+E\+[0-9]+)", bw ) if match: self.min_bw, self.max_bw = (float(f) for f in match.groups()) self.log.info(f"Scope BW: {self.min_bw}-{self.max_bw}") self._meta_attrs.extend(("min_bw", "max_bw")) else: self.log.warning( f"Unable to query bandwidth limits (inv. format ({bw})). " f"Setting limits to default." ) except VisaIOError as e: self.log.warning( f"Unable to query bandwidth limits ({e}). Setting limits to default." ) # Memory depth try: self.min_pts, self.max_pts = 16, 1_000_000_000 mem = self.ask(":ACQ:POIN:TESTLIMITS?") match = re.match("1,<numeric>([0-9]+):([0-9]+)", mem) if match: self.min_pts, self.max_pts = (int(p) for p in match.groups()) self.log.info(f"Scope memory: {self.min_pts}-{self.max_pts}") self._meta_attrs.extend(("min_pts", "max_pts")) else: self.log.warning( f"Unable to query memory depth (inv. format ({mem})). " "Setting limits to default." ) except VisaIOError as e: self.log.warning( f"Unable to query memory depth ({e}). Setting limits to default." ) # Sample Rate try: # Set BW to auto in order to query this bw_set: float | Literal["AUTO"] = float(self.ask(":ACQ:BAND?")) if np.isclose(bw_set, self.max_bw): # Auto returns max bandwidth bw_set = "AUTO" self.write(":ACQ:BAND AUTO") self.min_srat, self.max_srat = 10.0, 99.0e9 # Set large limits srat = self.ask(":ACQ:SRAT:TESTLIMITS?") self.write(f":ACQ:BAND {bw_set}") match = re.fullmatch( r"1,<numeric>([0-9.]+E\+[0-9]+):([0-9.]+E\+[0-9]+)", srat ) if match: self.min_srat, self.max_srat = (float(f) for f in match.groups()) self.log.info(f"Scope sample rate: {self.min_srat}-{self.max_srat}") self._meta_attrs.extend(("min_srat", "max_srat")) else: self.log.warning( f"Unable to query sample rate (inv. format ({srat})). " "Setting limits to default." ) except VisaIOError as e: self.log.warning( f"Unable to query sample rate ({e}). Setting limits to default." ) def _get_avg(self) -> int: """ Return the number of averages, or 1 if averaging is disabled. """ enabled = int(self.ask(":ACQ:AVER?")) if not enabled: return 1 else: return int(self.ask(":ACQ:AVER:COUN?")) def _set_avg(self, count: int) -> None: """ Set the number of averages, or disable if 1. """ if count == 1: self.write(":ACQ:AVER 0") else: self.write(f":ACQ:AVER:COUN {count}") self.write(":ACQ:AVER 1") # Simple oscilloscope commands
[docs] def run(self) -> None: """ Set the scope in run mode. """ self.write(":RUN") self.run_mode()
[docs] def stop(self) -> None: """ Set the scope in stop mode. """ self.write(":STOP") self.run_mode()
[docs] def single(self) -> None: """ Take a single acquisition """ self.write(":SING") self.run_mode()
[docs] def update_all_setpoints(self) -> None: """ Update the setpoints for all enabled channels. This method may be run at the beginning of a measurement rather than looping through each channel manually. """ for channel in self.channels: if channel.display(): channel.update_setpoints()
[docs] def digitize(self, timeout: int | None = None) -> None: """ Digitize a full waveform and block until the acquisition is complete. Warning: If using pyvisa_py as your visa library, this will not work with acquisitions longer than a single timeout period. If you require long acquisitions either use Keysight/NI Visa or set timeout to be longer than the expected acquisition time. """ old_timeout = self.visa_handle.timeout if timeout is not None: self.visa_handle.timeout = timeout # 1 second timeout try: self.visa_handle.write(":DIGITIZE;*OPC?") ret = None # Wait until we receive the "complete" reply while ret != "1": try: ret = self.visa_handle.read() except VisaIOError as e: # Ignore timeout errors - we could still be waiting for a trigger # or taking a long acquisition if e.error_code != StatusCode.error_timeout: self.log.exception( "Unexpected VisaError while waiting for acquisition." ) raise # Raise all other visa errors except KeyboardInterrupt: self.log.error( "Keyboard interrupt while waiting to digitize. Check your trigger?" ) raise # Pass error upwards finally: # Clear the device to unblock any failed digitize self.device_clear() if timeout is not None: self.visa_handle.timeout = old_timeout
[docs] def screenshot( self, path: str | Path = "./screenshot", with_time: bool = False, time_fmt: str = "%Y-%m-%d_%H-%M-%S", divider: str = "_", ) -> np.ndarray | None: """Save screen to {path} with {image_type}: bmp, jpg, gif, tif, png return np.array if sucessfully saved, else return None """ from datetime import datetime from io import BytesIO from os.path import splitext from PIL.Image import open as pil_open if isinstance(path, Path): path = str(path) time_str = datetime.now().strftime(time_fmt) if with_time else "" img_name, img_type = splitext(path) img_path = ( f"{img_name}{divider if with_time else ''}{time_str}{img_type.lower()}" ) try: with open(img_path, "wb") as f: screen_bytes = self.visa_handle.query_binary_values( f":DISPlay:DATA? {img_type.upper()[1:]}", # without . # https://docs.python.org/3/library/struct.html#format-characters datatype="B", # Capitcal B for unsigned byte container=bytes, ) f.write(screen_bytes) # type: ignore[arg-type] print(f"Screen image written to {img_path}") return np.asarray(pil_open(BytesIO(screen_bytes))) # type: ignore[arg-type] except Exception as e: self.log.error(f"Failed to save screenshot, Error occurred: \n{e}") return None
Infiniium = KeysightInfiniium """ Alias for backwards compatibility """