Source code for qcodes.instrument_drivers.rohde_schwarz.RTO1000

# All manual references are to R&S RTO Digital Oscilloscope User Manual
# for firmware 3.65, 2017

import logging
import time
import warnings
from typing import TYPE_CHECKING, Any

import numpy as np
from packaging import version

import qcodes.validators as vals
from qcodes.instrument import (
    Instrument,
    InstrumentBaseKWArgs,
    InstrumentChannel,
    VisaInstrument,
    VisaInstrumentKWArgs,
)
from qcodes.parameters import ArrayParameter, Parameter, create_on_off_val_mapping

if TYPE_CHECKING:
    from typing_extensions import Unpack

log = logging.getLogger(__name__)


class ScopeTrace(ArrayParameter):
    def __init__(
        self, name: str, instrument: InstrumentChannel, channum: int, **kwargs: Any
    ) -> None:
        """
        The ScopeTrace parameter is attached to a channel of the oscilloscope.

        For now, we only support reading out the entire trace.
        """
        super().__init__(
            name=name,
            shape=(1,),
            label="Voltage",  # TODO: Is this sometimes dbm?
            unit="V",
            setpoint_names=("Time",),
            setpoint_labels=("Time",),
            setpoint_units=("s",),
            docstring="Holds scope trace",
            snapshot_value=False,
            instrument=instrument,
            **kwargs,
        )

        self.channel = instrument
        self.channum = channum
        self._trace_ready = False

    def prepare_trace(self) -> None:
        """
        Prepare the scope for returning data, calculate the setpoints
        """
        assert self.root_instrument is not None

        # We always use 16 bit integers for the data format
        self.root_instrument.dataformat("INT,16")
        # ensure little-endianess
        self.root_instrument.write("FORMat:BORder LSBFirst")
        # only export y-values
        self.root_instrument.write("EXPort:WAVeform:INCXvalues OFF")
        # only export one channel
        self.root_instrument.write("EXPort:WAVeform:MULTichannel OFF")

        # now get setpoints

        hdr = self.root_instrument.ask(f"CHANnel{self.channum}:DATA:HEADER?")
        hdr_vals = list(map(float, hdr.split(",")))
        t_start = hdr_vals[0]
        t_stop = hdr_vals[1]
        no_samples = int(hdr_vals[2])
        values_per_sample = hdr_vals[3]

        # NOTE (WilliamHPNielsen):
        # If samples are multi-valued, we need a `MultiParameter`
        # instead of an `ArrayParameter`.
        if values_per_sample > 1:
            raise NotImplementedError(
                "There are several values per sample "
                "in this trace (are you using envelope"
                " or peak detect?). We currently do "
                "not support saving such a trace."
            )

        self.shape = (no_samples,)
        self.setpoints = (tuple(np.linspace(t_start, t_stop, no_samples)),)

        self._trace_ready = True
        # we must ensure that all this took effect before proceeding
        self.root_instrument.ask("*OPC?")

    def get_raw(self) -> np.ndarray:
        """
        Returns a trace
        """

        instr = self.root_instrument
        assert instr is not None

        if not self._trace_ready:
            raise ValueError("Trace not ready! Please call prepare_trace().")

        if instr.run_mode() == "RUN Nx SINGLE":
            total_acquisitions = instr.num_acquisitions()
            completed_acquisitions = instr.completed_acquisitions()
            log.info(f"Acquiring {total_acquisitions} traces.")
            while completed_acquisitions < total_acquisitions:
                log.info(f"Acquired {completed_acquisitions}:{total_acquisitions}")
                time.sleep(0.25)
                completed_acquisitions = instr.completed_acquisitions()

        log.info("Acquisition completed. Polling trace from instrument.")
        vh = instr.visa_handle
        vh.write(f"CHANnel{self.channum}:DATA?")
        raw_vals = vh.read_raw()

        num_length = int(raw_vals[1:2])
        no_points = int(raw_vals[2 : 2 + num_length])

        # cut of the header and the trailing '\n'
        raw_vals = raw_vals[2 + num_length : -1]

        dataformat = instr.dataformat.get_latest()

        if dataformat == "INT,8":
            int_vals = np.frombuffer(raw_vals, dtype=np.int8, count=no_points)
        else:
            int_vals = np.frombuffer(raw_vals, dtype=np.int16, count=no_points // 2)

        # now the integer values must be converted to physical
        # values

        scale = self.channel.scale()
        no_divs = 10  # TODO: Is this ever NOT 10?

        # we always export as 16 bit integers
        quant_levels = 253 * 256
        conv_factor = scale * no_divs / quant_levels
        output = conv_factor * int_vals + self.channel.offset()

        return output


[docs] class RohdeSchwarzRTO1000ScopeMeasurement(InstrumentChannel): """ Class to hold a measurement of the scope. """ def __init__( self, parent: Instrument, name: str, meas_nr: int, **kwargs: "Unpack[InstrumentBaseKWArgs]", ) -> None: """ Args: parent: The instrument to which the channel is attached name: The name of the measurement meas_nr: The number of the measurement in question. Must match the actual number as used by the instrument (1..8) **kwargs: Forwarded to base class. """ if meas_nr not in range(1, 9): raise ValueError("Invalid measurement number; Min: 1, max 8") self.meas_nr = meas_nr super().__init__(parent, name, **kwargs) self.sources = vals.Enum( "C1W1", "C1W2", "C1W3", "C2W1", "C2W2", "C2W3", "C3W1", "C3W2", "C3W3", "C4W1", "C4W2", "C4W3", "M1", "M2", "M3", "M4", "R1", "R2", "R3", "R4", "SBUS1", "SBUS2", "SBUS3", "SBUS4", "D0", "D1", "D2", "D3", "D4", "D5", "D6", "D7", "D8", "D9", "D10", "D11", "D12", "D13", "D14", "D15", "TRK1", "TRK2", "TRK3", "TRK4", "TRK5", "TRK6", "TRK7", "TRK8", "SG1TL1", "SG1TL2", "SG2TL1", "SG2TL2", "SG3TL1", "SG3TL2", "SG4TL1", "SG4TL2", "Z1V1", "Z1V2", "Z1V3", "Z1V4", "Z1I1", "Z1I2", "Z1I3", "Z1I4", "Z2V1", "Z2V2", "Z2V3", "Z2V4", "Z2I1", "Z2I2", "Z2I3", "Z2I4", ) self.categories = vals.Enum( "AMPTime", "JITTer", "EYEJitter", "SPECtrum", "HISTogram", "PROTocol" ) self.meas_type = vals.Enum( # Amplitude/time measurements "HIGH", "LOW", "AMPLitude", "MAXimum", "MINimum", "PDELta", "MEAN", "RMS", "STDDev", "POVershoot", "NOVershoot", "AREA", "RTIMe", "FTIMe", "PPULse", "NPULse", "PERiod", "FREQuency", "PDCYcle", "NDCYcle", "CYCarea", "CYCMean", "CYCRms", "CYCStddev", "PULCnt", "DELay", "PHASe", "BWIDth", "PSWitching", "NSWitching", "PULSetrain", "EDGecount", "SHT", "SHR", "DTOTrigger", "PROBemeter", "SLERising", "SLEFalling", # Jitter measurements "CCJitter", "NCJitter", "CCWidth", "CCDutycycle", "TIE", "UINTerval", "DRATe", "SKWDelay", "SKWPhase", # Eye diagram measurements "ERPercent", "ERDB", "EHEight", "EWIDth", "ETOP", "EBASe", "QFACtor", "RMSNoise", "SNRatio", "DCDistortion", "ERTime", "EFTime", "EBRate", "EAMPlitude", "PPJitter", "STDJitter", "RMSJitter", # Spectrum measurements "CPOWer", "OBWidth", "SBWidth", "THD", "THDPCT", "THDA", "THDU", "THDR", "HAR", "PLISt", # Histogram measurements "WCOunt", "WSAMples", "HSAMples", "HPEak", "PEAK", "UPEakvalue", "LPEakvalue", "HMAXimum", "HMINimum", "MEDian", "MAXMin", "HMEan", "HSTDdev", "M1STddev", "M2STddev", "M3STddev", "MKPositive", "MKNegative", ) self.enable: Parameter = self.add_parameter( "enable", label=f"Measurement {meas_nr} enable", set_cmd=f"MEASurement{meas_nr}:ENABle {{}}", vals=vals.Enum("ON", "OFF"), docstring="Switches the measurement on or off.", ) """Switches the measurement on or off.""" self.source: Parameter = self.add_parameter( "source", label=f"Measurement {meas_nr} source", set_cmd=f"MEASurement{meas_nr}:SOURce {{}}", vals=self.sources, docstring="Set the source of a measurement if the " "measurement only needs one source.", ) """Set the source of a measurement if the measurement only needs one source.""" self.source_first: Parameter = self.add_parameter( "source_first", label=f"Measurement {meas_nr} first source", set_cmd=f"MEASurement{meas_nr}:FSRC {{}}", vals=self.sources, docstring="Set the first source of a measurement" " if the measurement only needs multiple" " sources.", ) """Set the first source of a measurement if the measurement only needs multiple sources.""" self.source_second: Parameter = self.add_parameter( "source_second", label=f"Measurement {meas_nr} second source", set_cmd=f"MEASurement{meas_nr}:SSRC {{}}", vals=self.sources, docstring="Set the second source of a measurement" " if the measurement only needs multiple" " sources.", ) """Set the second source of a measurement if the measurement only needs multiple sources.""" self.category: Parameter = self.add_parameter( "category", label=f"Measurement {meas_nr} category", set_cmd=f"MEASurement{meas_nr}:CATegory {{}}", vals=self.categories, docstring="Set the category of a measurement.", ) """Set the category of a measurement.""" self.main: Parameter = self.add_parameter( "main", label=f"Measurement {meas_nr} main", set_cmd=f"MEASurement{meas_nr}:MAIN {{}}", vals=self.meas_type, docstring="Set the main of a measurement.", ) """Set the main of a measurement.""" self.statistics_enable: Parameter = self.add_parameter( "statistics_enable", label=f"Measurement {meas_nr} enable statistics", set_cmd=f"MEASurement{meas_nr}:STATistics:ENABle {{}}", vals=vals.Enum("ON", "OFF"), docstring="Switches the measurement on or off.", ) """Switches the measurement on or off.""" self.clear: Parameter = self.add_parameter( "clear", label=f"Measurement {meas_nr} clear statistics", set_cmd=f"MEASurement{meas_nr}:CLEar", docstring="Clears/reset measurement.", ) """Clears/reset measurement.""" self.event_count: Parameter = self.add_parameter( "event_count", label=f"Measurement {meas_nr} number of events", get_cmd=f"MEASurement{meas_nr}:RESult:EVTCount?", get_parser=int, docstring="Number of measurement results in the long-term measurement.", ) """Number of measurement results in the long-term measurement.""" self.result_avg: Parameter = self.add_parameter( "result_avg", label=f"Measurement {meas_nr} averages", get_cmd=f"MEASurement{meas_nr}:RESult:AVG?", get_parser=float, docstring="Average of the long-term measurement results.", ) """Average of the long-term measurement results."""
ScopeMeasurement = RohdeSchwarzRTO1000ScopeMeasurement
[docs] class RohdeSchwarzRTO1000ScopeChannel(InstrumentChannel): """ Class to hold an input channel of the scope. Exposes: state, coupling, ground, scale, range, position, offset, invert, bandwidth, impedance, overload. """ def __init__( self, parent: Instrument, name: str, channum: int, **kwargs: "Unpack[InstrumentBaseKWArgs]", ) -> None: """ Args: parent: The instrument to which the channel is attached name: The name of the channel channum: The number of the channel in question. Must match the actual number as used by the instrument (1..4) **kwargs: Forwarded to base class. """ if channum not in [1, 2, 3, 4]: raise ValueError("Invalid channel number! Must be 1, 2, 3, or 4.") self.channum = channum super().__init__(parent, name, **kwargs) self.state: Parameter = self.add_parameter( "state", label=f"Channel {channum} state", get_cmd=f"CHANnel{channum}:STATe?", set_cmd=f"CHANnel{channum}:STATE {{}}", vals=vals.Enum("ON", "OFF"), docstring="Switches the channel on or off", ) """Switches the channel on or off""" self.coupling: Parameter = self.add_parameter( "coupling", label=f"Channel {channum} coupling", get_cmd=f"CHANnel{channum}:COUPling?", set_cmd=f"CHANnel{channum}:COUPling {{}}", vals=vals.Enum("DC", "DCLimit", "AC"), docstring=( "Selects the connection of the channel " "signal. DC: 50 Ohm, DCLimit 1 MOhm, " "AC: Con. through DC capacitor" ), ) """ Selects the connection of the channel signal. DC: 50 Ohm, DCLimit 1 MOhm, AC: Con. through DC capacitor """ self.ground: Parameter = self.add_parameter( "ground", label=f"Channel {channum} ground", get_cmd=f"CHANnel{channum}:GND?", set_cmd=f"CHANnel{channum}:GND {{}}", vals=vals.Enum("ON", "OFF"), docstring=("Connects/disconnects the signal to/from the ground."), ) """Connects/disconnects the signal to/from the ground.""" # NB (WilliamHPNielsen): This parameter depends on other parameters and # should be dynamically updated accordingly. Cf. p 1178 of the manual self.scale: Parameter = self.add_parameter( "scale", label=f"Channel {channum} Y scale", unit="V/div", get_cmd=f"CHANnel{channum}:SCALe?", set_cmd=self._set_scale, get_parser=float, ) """Parameter scale""" self.range: Parameter = self.add_parameter( "range", label=f"Channel {channum} Y range", unit="V", get_cmd=f"CHANnel{channum}:RANGe?", set_cmd=self._set_range, get_parser=float, ) """Parameter range""" # TODO (WilliamHPNielsen): would it be better to recast this in terms # of Volts? self.position: Parameter = self.add_parameter( "position", label=f"Channel {channum} vert. pos.", unit="div", get_cmd=f"CHANnel{channum}:POSition?", set_cmd=f"CHANnel{channum}:POSition {{}}", get_parser=float, vals=vals.Numbers(-5, 5), docstring=( "Positive values move the waveform up, negative values move it down." ), ) """Positive values move the waveform up, negative values move it down.""" self.offset: Parameter = self.add_parameter( "offset", label=f"Channel {channum} offset", unit="V", get_cmd=f"CHANnel{channum}:OFFSet?", set_cmd=f"CHANnel{channum}:OFFSet {{}}", get_parser=float, ) """Parameter offset""" self.invert: Parameter = self.add_parameter( "invert", label=f"Channel {channum} inverted", get_cmd=f"CHANnel{channum}:INVert?", set_cmd=f"CHANnel{channum}:INVert {{}}", vals=vals.Enum("ON", "OFF"), ) """Parameter invert""" # TODO (WilliamHPNielsen): This parameter should be dynamically # validated since 800 MHz BW is only available for 50 Ohm coupling self.bandwidth: Parameter = self.add_parameter( "bandwidth", label=f"Channel {channum} bandwidth", get_cmd=f"CHANnel{channum}:BANDwidth?", set_cmd=f"CHANnel{channum}:BANDwidth {{}}", vals=vals.Enum("FULL", "B800", "B200", "B20"), ) """Parameter bandwidth""" self.impedance: Parameter = self.add_parameter( "impedance", label=f"Channel {channum} impedance", unit="Ohm", get_cmd=f"CHANnel{channum}:IMPedance?", set_cmd=f"CHANnel{channum}:IMPedance {{}}", vals=vals.Ints(1, 100000), docstring=( "Sets the impedance of the channel " "for power calculations and " "measurements." ), ) """Sets the impedance of the channel for power calculations and measurements.""" self.overload: Parameter = self.add_parameter( "overload", label=f"Channel {channum} overload", get_cmd=f"CHANnel{channum}:OVERload?", ) """Parameter overload""" self.arithmetics: Parameter = self.add_parameter( "arithmetics", label=f"Channel {channum} arithmetics", set_cmd=f"CHANnel{channum}:ARIThmetics {{}}", get_cmd=f"CHANnel{channum}:ARIThmetics?", val_mapping={"AVERAGE": "AVER", "OFF": "OFF", "ENVELOPE": "ENV"}, ) """Parameter arithmetics""" self.trace: ScopeTrace = self.add_parameter( "trace", channum=self.channum, parameter_class=ScopeTrace ) """Parameter trace""" self._trace_ready = False # Specialised/interlinked set/getters def _set_range(self, value: float) -> None: self.scale.cache.set(value / 10) self._parent.write(f"CHANnel{self.channum}:RANGe {value}") def _set_scale(self, value: float) -> None: self.range.cache.set(value * 10) self._parent.write(f"CHANnel{self.channum}:SCALe {value}")
ScopeChannel = RohdeSchwarzRTO1000ScopeChannel
[docs] class RohdeSchwarzRTO1000(VisaInstrument): """ QCoDeS Instrument driver for the Rohde-Schwarz RTO1000 series oscilloscopes. """ default_timeout = 5.0 default_terminator = "\n" def __init__( self, name: str, address: str, *, model: str | None = None, HD: bool = True, **kwargs: "Unpack[VisaInstrumentKWArgs]", ) -> None: """ Args: name: name of the instrument address: VISA resource address model: The instrument model. For newer firmware versions, this can be auto-detected HD: Does the unit have the High Definition Option (allowing 16 bit vertical resolution) **kwargs: kwargs are forwarded to base class. """ super().__init__(name=name, address=address, **kwargs) # With firmware versions earlier than 3.65, it seems that the # model number can NOT be queried from the instrument # (at least fails with RTO1024, fw 2.52.1.1), so in that case # the user must provide the model manually. firmware_version_str = self.get_idn()["firmware"] if firmware_version_str is None: raise RuntimeError("Could not determine firmware version of RTO1000.") firmware_version = version.parse(firmware_version_str) if firmware_version < version.parse("3"): log.warning( "Old firmware version detected. This driver may " "not be compatible. Please upgrade your firmware." ) if firmware_version >= version.parse("3.65"): # strip just in case there is a newline character at the end self.model = self.ask("DIAGnostic:SERVice:WFAModel?").strip() if model is not None and model != self.model: warnings.warn( "The model number provided by the user " "does not match the instrument's response." " I am going to assume that this oscilloscope " f"is a model {self.model}" ) elif model is None: raise ValueError( "No model number provided. Please provide " 'a model number (eg. "RTO1024").' ) else: self.model = model self.HD = HD # Now assign model-specific values self.num_chans = int(self.model[-1]) self.num_meas = 8 self._horisontal_divs = int(self.ask("TIMebase:DIVisions?")) self.display: Parameter = self.add_parameter( "display", label="Display state", set_cmd="SYSTem:DISPlay:UPDate {}", val_mapping={"remote": 0, "view": 1}, ) """Parameter display""" # Triggering self.trigger_display: Parameter = self.add_parameter( "trigger_display", label="Trigger display state", set_cmd="DISPlay:TRIGger:LINes {}", get_cmd="DISPlay:TRIGger:LINes?", val_mapping={"ON": 1, "OFF": 0}, ) """Parameter trigger_display""" # TODO: (WilliamHPNielsen) There are more available trigger # settings than implemented here. See p. 1261 of the manual # here we just use trigger1, which is the A-trigger self.trigger_source: Parameter = self.add_parameter( "trigger_source", label="Trigger source", set_cmd="TRIGger1:SOURce {}", get_cmd="TRIGger1:SOURce?", val_mapping={ "CH1": "CHAN1", "CH2": "CHAN2", "CH3": "CHAN3", "CH4": "CHAN4", "EXT": "EXT", }, ) """Parameter trigger_source""" self.trigger_mode: Parameter = self.add_parameter( "trigger_mode", label="Trigger mode", set_cmd="TRIGger:MODE {}", get_cmd="TRIGger1:SOURce?", vals=vals.Enum("AUTO", "NORMAL", "FREERUN"), docstring="Sets the trigger mode which determines" " the behaviour of the instrument if no" " trigger occurs.\n" "Options: AUTO, NORMAL, FREERUN.", unit="none", ) """Sets the trigger mode which determines the behaviour of the instrument if no trigger occurs. Options: AUTO, NORMAL, FREERUN.""" self.trigger_type: Parameter = self.add_parameter( "trigger_type", label="Trigger type", set_cmd="TRIGger1:TYPE {}", get_cmd="TRIGger1:TYPE?", val_mapping={ "EDGE": "EDGE", "GLITCH": "GLIT", "WIDTH": "WIDT", "RUNT": "RUNT", "WINDOW": "WIND", "TIMEOUT": "TIM", "INTERVAL": "INT", "SLEWRATE": "SLEW", "DATATOCLOCK": "DAT", "STATE": "STAT", "PATTERN": "PATT", "ANEDGE": "ANED", "SERPATTERN": "SERP", "NFC": "NFC", "TV": "TV", "CDR": "CDR", }, ) """Parameter trigger_type""" # See manual p. 1262 for an explanation of trigger types self.trigger_level: Parameter = self.add_parameter( "trigger_level", label="Trigger level", set_cmd=self._set_trigger_level, get_cmd=self._get_trigger_level, ) """Parameter trigger_level""" self.trigger_edge_slope: Parameter = self.add_parameter( "trigger_edge_slope", label="Edge trigger slope", set_cmd="TRIGger1:EDGE:SLOPe {}", get_cmd="TRIGger1:EDGE:SLOPe?", vals=vals.Enum("POS", "NEG", "EITH"), ) """Parameter trigger_edge_slope""" # Horizontal settings self.timebase_scale: Parameter = self.add_parameter( "timebase_scale", label="Timebase scale", set_cmd=self._set_timebase_scale, get_cmd="TIMebase:SCALe?", unit="s/div", get_parser=float, vals=vals.Numbers(25e-12, 10000), ) """Parameter timebase_scale""" self.timebase_range: Parameter = self.add_parameter( "timebase_range", label="Timebase range", set_cmd=self._set_timebase_range, get_cmd="TIMebase:RANGe?", unit="s", get_parser=float, vals=vals.Numbers(250e-12, 100e3), ) """Parameter timebase_range""" self.timebase_position: Parameter = self.add_parameter( "timebase_position", label="Horizontal position", set_cmd=self._set_timebase_position, get_cmd="TIMEbase:HORizontal:POSition?", get_parser=float, unit="s", vals=vals.Numbers(-100e24, 100e24), ) """Parameter timebase_position""" # Acquisition # I couldn't find a way to query the run mode, so we manually keep # track of it. It is very important when getting the trace to make # sense of completed_acquisitions. self.run_mode: Parameter = self.add_parameter( "run_mode", label="Run/acquisition mode of the scope", get_cmd=None, set_cmd=None, ) """Parameter run_mode""" self.run_mode("RUN CONT") self.num_acquisitions: Parameter = self.add_parameter( "num_acquisitions", label="Number of single acquisitions to perform", get_cmd="ACQuire:COUNt?", set_cmd="ACQuire:COUNt {}", vals=vals.Ints(1, 16777215), get_parser=int, ) """Parameter num_acquisitions""" self.completed_acquisitions: Parameter = self.add_parameter( "completed_acquisitions", label="Number of completed acquisitions", get_cmd="ACQuire:CURRent?", get_parser=int, ) """Parameter completed_acquisitions""" self.sampling_rate: Parameter = self.add_parameter( "sampling_rate", label="Sample rate", docstring="Number of averages for measuring trace.", unit="Sa/s", get_cmd="ACQuire:POINts:ARATe" + "?", get_parser=int, ) """Number of averages for measuring trace.""" self.acquisition_sample_rate: Parameter = self.add_parameter( "acquisition_sample_rate", label="Acquisition sample rate", unit="Sa/s", docstring="recorded waveform samples per second", get_cmd="ACQuire:SRATe" + "?", set_cmd="ACQuire:SRATe " + " {:.2f}", vals=vals.Numbers(2, 20e12), get_parser=float, ) """recorded waveform samples per second""" # Data self.dataformat: Parameter = self.add_parameter( "dataformat", label="Export data format", set_cmd="FORMat:DATA {}", get_cmd="FORMat:DATA?", vals=vals.Enum("ASC,0", "REAL,32", "INT,8", "INT,16"), ) """Parameter dataformat""" # High definition mode (might not be available on all instruments) if HD: self.high_definition_state: Parameter = self.add_parameter( "high_definition_state", label="High definition (16 bit) state", set_cmd=self._set_hd_mode, get_cmd="HDEFinition:STAte?", val_mapping=create_on_off_val_mapping(on_val=1, off_val=0), docstring="Sets the filter bandwidth for the" " high definition mode.\n" "ON: high definition mode, up to 16" " bit digital resolution\n" "Options: ON, OFF\n\n" "Warning/Bug: By opening the HD " "acquisition menu on the scope, " 'this value will be set to "ON".', ) """Sets the filter bandwidth for the high definition mode. ON: high definition mode, up to 16 bit digital resolution Options: ON, OFF Warning/Bug: By opening the HD acquisition menu on the scope, this value will be set to "ON".""" self.high_definition_bandwidth: Parameter = self.add_parameter( "high_definition_bandwidth", label="High definition mode bandwidth", set_cmd="HDEFinition:BWIDth {}", get_cmd="HDEFinition:BWIDth?", unit="Hz", get_parser=float, vals=vals.Numbers(1e4, 1e9), ) """Parameter high_definition_bandwidth""" self.error_count: Parameter = self.add_parameter( "error_count", label="Number of errors in the error stack", get_cmd="SYSTem:ERRor:COUNt?", unit="#", get_parser=int, ) """Parameter error_count""" self.error_next: Parameter = self.add_parameter( "error_next", label="Next error from the error stack", get_cmd="SYSTem:ERRor:NEXT?", get_parser=str, ) """Parameter error_next""" # Add the channels to the instrument for ch in range(1, self.num_chans + 1): chan = RohdeSchwarzRTO1000ScopeChannel(self, f"channel{ch}", ch) self.add_submodule(f"ch{ch}", chan) for measId in range(1, self.num_meas + 1): measCh = RohdeSchwarzRTO1000ScopeMeasurement( self, f"measurement{measId}", measId ) self.add_submodule(f"meas{measId}", measCh) self.add_function("stop", call_cmd="STOP") self.add_function("reset", call_cmd="*RST") self.opc: Parameter = self.add_parameter("opc", get_cmd="*OPC?") """Parameter opc""" self.stop_opc: Parameter = self.add_parameter("stop_opc", get_cmd="STOP;*OPC?") """Parameter stop_opc""" self.status_operation: Parameter = self.add_parameter( "status_operation", get_cmd="STATus:OPERation:CONDition?", get_parser=int ) """Parameter status_operation""" self.add_function("run_continues", call_cmd="RUNContinous") # starts the shutdown of the system self.add_function("system_shutdown", call_cmd="SYSTem:EXIT") self.connect_message()
[docs] def run_cont(self) -> None: """ Set the instrument in 'RUN CONT' mode """ self.write("RUN") self.run_mode.set("RUN CONT")
[docs] def run_single(self) -> None: """ Set the instrument in 'RUN Nx SINGLE' mode """ self.write("SINGLE") self.run_mode.set("RUN Nx SINGLE")
[docs] def is_triggered(self) -> bool: wait_trigger_mask = 0b01000 return bool(self.status_operation() & wait_trigger_mask) is False
[docs] def is_running(self) -> bool: measuring_mask = 0b10000 return bool(self.status_operation() & measuring_mask)
[docs] def is_acquiring(self) -> bool: return self.is_triggered() & self.is_running()
# Specialised set/get functions def _set_hd_mode(self, value: int) -> None: """ Set/unset the high def mode """ self._make_traces_not_ready() self.write(f"HDEFinition:STAte {value}") def _set_timebase_range(self, value: float) -> None: """ Set the full range of the timebase """ self._make_traces_not_ready() self.timebase_scale.cache.set(value / self._horisontal_divs) self.write(f"TIMebase:RANGe {value}") def _set_timebase_scale(self, value: float) -> None: """ Set the length of one horizontal division. """ self._make_traces_not_ready() self.timebase_range.cache.set(value * self._horisontal_divs) self.write(f"TIMebase:SCALe {value}") def _set_timebase_position(self, value: float) -> None: """ Set the horizontal position. """ self._make_traces_not_ready() self.write(f"TIMEbase:HORizontal:POSition {value}") def _make_traces_not_ready(self) -> None: """ Make the scope traces be not ready. """ self.ch1.trace._trace_ready = False self.ch2.trace._trace_ready = False self.ch3.trace._trace_ready = False self.ch4.trace._trace_ready = False def _set_trigger_level(self, value: float) -> None: """ Set the trigger level on the currently used trigger source channel. """ trans = {"CH1": 1, "CH2": 2, "CH3": 3, "CH4": 4, "EXT": 5} # We use get and not get_latest because we don't trust users to # not touch the front panel of an oscilloscope. source = trans[self.trigger_source.get()] if source != 5: submodule = self.submodules[f"ch{source}"] assert isinstance(submodule, InstrumentChannel) v_range = submodule.range() offset = submodule.offset() if (value < -v_range / 2 + offset) or (value > v_range / 2 + offset): raise ValueError("Trigger level outside channel range.") self.write(f"TRIGger1:LEVel{source} {value}") def _get_trigger_level(self) -> float: """ Get the trigger level from the currently used trigger source """ trans = {"CH1": 1, "CH2": 2, "CH3": 3, "CH4": 4, "EXT": 5} # we use get and not get_latest because we don't trust users to # not touch the front panel of an oscilloscope source = trans[self.trigger_source.get()] val = self.ask(f"TRIGger1:LEVel{source}?") return float(val.strip())
class RTO1000(RohdeSchwarzRTO1000): """ Backwards compatibility alias for RohdeSchwarzRTO1000 """ pass