Source code for qcodes.instrument_drivers.tektronix.TPS2012

import binascii
import logging
from functools import partial
from typing import Any

import numpy as np
from pyvisa.errors import VisaIOError
from typing_extensions import TypedDict, Unpack

from qcodes import validators as vals
from qcodes.instrument import (
    ChannelList,
    InstrumentBaseKWArgs,
    InstrumentChannel,
    VisaInstrument,
    VisaInstrumentKWArgs,
)
from qcodes.parameters import ArrayParameter, Parameter, ParamRawDataType

log = logging.getLogger(__name__)


class TraceNotReady(Exception):
    pass


class OutputDict(TypedDict):
    no_of_bytes: int
    no_of_bits: int
    encoding: str
    binary_format: str
    byte_order: str
    no_of_points: int
    waveform_ID: str
    point_format: str
    x_incr: float
    x_zero: float
    x_unit: str
    y_multiplier: float
    y_zero: float
    y_offset: float
    y_unit: str


class ScopeArray(ArrayParameter):
    def __init__(
        self,
        name: str,
        instrument: "TektronixTPS2012Channel",
        channel: int,
        **kwargs: Any,
    ):
        super().__init__(
            name=name,
            shape=(2500,),
            label="Voltage",
            unit="V ",
            setpoint_names=("Time",),
            setpoint_labels=("Time",),
            setpoint_units=("s",),
            docstring="holds an array from scope",
            instrument=instrument,
            **kwargs,
        )
        self.channel = channel

    def calc_set_points(self) -> tuple[np.ndarray, int]:
        assert isinstance(self.instrument, TektronixTPS2012Channel)
        message = self.instrument.ask("WFMPre?")
        preamble = self._preambleparser(message)
        xstart = preamble["x_zero"]
        xinc = preamble["x_incr"]
        no_of_points = preamble["no_of_points"]
        xdata = np.linspace(xstart, no_of_points * xinc + xstart, no_of_points)
        return xdata, no_of_points

    def prepare_curvedata(self) -> None:
        """
        Prepare the scope for returning curve data
        """
        # To calculate set points, we must have the full preamble
        # For the instrument to return the full preamble, the channel
        # in question must be displayed
        assert isinstance(self.instrument, TektronixTPS2012Channel)
        assert isinstance(self.root_instrument, TektronixTPS2012)
        self.instrument.parameters["state"].set("ON")
        self.root_instrument.data_source(f"CH{self.channel}")

        xdata, no_of_points = self.calc_set_points()
        self.setpoints = (tuple(xdata),)
        self.shape = (no_of_points,)

        self.root_instrument.trace_ready = True

    def get_raw(self) -> ParamRawDataType:
        assert isinstance(self.root_instrument, TektronixTPS2012)
        if not self.root_instrument.trace_ready:
            raise TraceNotReady(
                "Please run prepare_curvedata to prepare "
                "the scope for giving a trace."
            )
        message = self._curveasker(self.channel)
        _, ydata, _ = self._curveparameterparser(message)
        # Due to the limitations in the current api the below solution
        # to change setpoints does nothing because the setpoints have
        # already been copied to the dataset when get is called.

        # self.setpoints = (tuple(xdata),)
        # self.shape = (npoints,)
        return ydata

    def _curveasker(self, ch: int) -> str:
        assert isinstance(self.instrument, TektronixTPS2012Channel)
        self.instrument.write(f"DATa:SOURce CH{ch}")
        message = self.instrument.ask("WAVFrm?")
        self.instrument.write("*WAI")
        return message

    @staticmethod
    def _binaryparser(curve: str) -> np.ndarray:
        """
        Helper function for parsing the curve data

        Args:
            curve: the return value of 'CURVe?' when
              DATa:ENCdg is set to RPBinary.
              Note: The header and final newline character
              must be removed.

        Returns:
            The curve in units where the digitisation range
            is mapped to (-32768, 32767).

        """
        # TODO: Add support for data width = 1 mode?
        output = np.zeros(int(len(curve) / 2))  # data width 2
        # output = np.zeros(int(len(curve)))  # data width 1
        for ii, _ in enumerate(output):
            # casting FTWs
            temp_1 = curve[2 * ii : 2 * ii + 1].encode("latin-1")  # data width 2
            temp_2 = binascii.b2a_hex(temp_1)
            temp_3 = (int(temp_2, 16) - 128) * 256  # data width 2 (1)
            output[ii] = temp_3
        return output

    @staticmethod
    def _preambleparser(response: str) -> OutputDict:
        """
        Parser function for the curve preamble

        Args:
            response: The response of WFMPre?

        Returns:
            A dictionary containing the following keys:
              no_of_bytes, no_of_bits, encoding, binary_format,
              byte_order, no_of_points, waveform_ID, point_format,
              x_incr, x_zero, x_unit, y_multiplier, y_zero, y_offset, y_unit

        """
        response_list = response.split(";")

        outdict: OutputDict = {
            "no_of_bytes": int(response_list[0]),
            "no_of_bits": int(response_list[1]),
            "encoding": response_list[2],
            "binary_format": response_list[3],
            "byte_order": response_list[4],
            "no_of_points": int(response_list[5]),
            "waveform_ID": response_list[6],
            "point_format": response_list[7],
            "x_incr": float(response_list[8]),
            # outdict['point_offset'] = response_list[9]  # Always zero
            "x_zero": float(response_list[10]),
            "x_unit": response_list[11],
            "y_multiplier": float(response_list[12]),
            "y_zero": float(response_list[13]),
            "y_offset": float(response_list[14]),
            "y_unit": response_list[15],
        }
        return outdict

    def _curveparameterparser(
        self, waveform: str
    ) -> tuple[np.ndarray, np.ndarray, int]:
        """
        The parser for the curve parameter. Note that WAVFrm? is equivalent
        to WFMPre?; CURVe?

        Args:
            waveform: The return value of WAVFrm?

        Returns:
            Two numpy arrays with the time axis in units
            of s and curve values in units of V; (time, voltages) and
            the number of points as an integer

        """
        fulldata = waveform.split(";")
        preamblestr = ";".join(fulldata[:16])
        curvestr = ";".join(fulldata[16:])

        preamble = self._preambleparser(preamblestr)
        # the raw curve data starts with a header containing the char #
        # followed by on digit giving the number of digits in the len of the
        # array in bytes
        # and the length of the array. I.e. the string #45000 is 5000 bytes
        # represented by 4 digits.
        total_number_of_bytes = preamble["no_of_bytes"] * preamble["no_of_points"]
        raw_data_offset = 2 + len(str(total_number_of_bytes))
        curvestr = curvestr[raw_data_offset:-1]
        rawcurve = self._binaryparser(curvestr)

        yoff = preamble["y_offset"]
        yoff -= 2**15  # data width 2
        ymult = preamble["y_multiplier"]
        ydata = ymult * (rawcurve - yoff)
        assert len(ydata) == preamble["no_of_points"]
        xstart = preamble["x_zero"]
        xinc = preamble["x_incr"]
        xdata = np.linspace(xstart, len(ydata) * xinc + xstart, len(ydata))
        return xdata, ydata, preamble["no_of_points"]


[docs] class TektronixTPS2012Channel(InstrumentChannel): def __init__( self, parent: "TektronixTPS2012", name: str, channel: int, **kwargs: Unpack[InstrumentBaseKWArgs], ): super().__init__(parent, name, **kwargs) self.scale: Parameter = self.add_parameter( "scale", label=f"Channel {channel} Scale", unit="V/div", get_cmd=f"CH{channel}:SCAle?", set_cmd="CH{}:SCAle {}".format(channel, "{}"), get_parser=float, ) """Parameter scale""" self.position: Parameter = self.add_parameter( "position", label=f"Channel {channel} Position", unit="div", get_cmd=f"CH{channel}:POSition?", set_cmd="CH{}:POSition {}".format(channel, "{}"), get_parser=float, ) """Parameter position""" self.curvedata: ScopeArray = self.add_parameter( "curvedata", channel=channel, parameter_class=ScopeArray, ) """Parameter curvedata""" self.state: Parameter = self.add_parameter( "state", label=f"Channel {channel} display state", set_cmd="SELect:CH{} {}".format(channel, "{}"), get_cmd=partial(self._get_state, channel), val_mapping={"ON": 1, "OFF": 0}, vals=vals.Enum("ON", "OFF"), ) """Parameter state""" def _get_state(self, ch: int) -> int: """ get_cmd for the chX_state parameter """ # 'SELect?' returns a ';'-separated string of 0s and 1s # denoting state display state of ch1, ch2, ?, ?, ? # (maybe ch1, ch2, math, ref1, ref2 ..?) selected = list(map(int, self.ask("SELect?").split(";"))) state = selected[ch - 1] return state
TPS2012Channel = TektronixTPS2012Channel
[docs] class TektronixTPS2012(VisaInstrument): """ This is the QCoDeS driver for the Tektronix 2012B oscilloscope. """ default_timeout = 20 def __init__( self, name: str, address: str, **kwargs: Unpack[VisaInstrumentKWArgs], ): """ Initialises the TPS2012. Args: name: Name of the instrument used by QCoDeS address: Instrument address as used by VISA **kwargs: kwargs are forwarded to base class. """ super().__init__(name, address, **kwargs) self.connect_message() # Scope trace boolean self.trace_ready = False # functions self.add_function( "force_trigger", call_cmd="TRIGger FORce", docstring="Force trigger event" ) self.add_function( "run", call_cmd="ACQuire:STATE RUN", docstring="Start acquisition" ) self.add_function( "stop", call_cmd="ACQuire:STATE STOP", docstring="Stop acquisition" ) # general parameters self.trigger_type: Parameter = self.add_parameter( "trigger_type", label="Type of the trigger", get_cmd="TRIGger:MAIn:TYPe?", set_cmd="TRIGger:MAIn:TYPe {}", vals=vals.Enum("EDGE", "VIDEO", "PULSE"), ) """Parameter trigger_type""" self.trigger_source: Parameter = self.add_parameter( "trigger_source", label="Source for the trigger", get_cmd="TRIGger:MAIn:EDGE:SOURce?", set_cmd="TRIGger:MAIn:EDGE:SOURce {}", vals=vals.Enum("CH1", "CH2"), ) """Parameter trigger_source""" self.trigger_edge_slope: Parameter = self.add_parameter( "trigger_edge_slope", label="Slope for edge trigger", get_cmd="TRIGger:MAIn:EDGE:SLOpe?", set_cmd="TRIGger:MAIn:EDGE:SLOpe {}", vals=vals.Enum("FALL", "RISE"), ) """Parameter trigger_edge_slope""" self.trigger_level: Parameter = self.add_parameter( "trigger_level", label="Trigger level", unit="V", get_cmd="TRIGger:MAIn:LEVel?", set_cmd="TRIGger:MAIn:LEVel {}", vals=vals.Numbers(), ) """Parameter trigger_level""" self.data_source: Parameter = self.add_parameter( "data_source", label="Data source", get_cmd="DATa:SOUrce?", set_cmd="DATa:SOURce {}", vals=vals.Enum("CH1", "CH2"), ) """Parameter data_source""" self.horizontal_scale: Parameter = self.add_parameter( "horizontal_scale", label="Horizontal scale", unit="s", get_cmd="HORizontal:SCAle?", set_cmd=self._set_timescale, get_parser=float, vals=vals.Enum( 5e-9, 10e-9, 25e-9, 50e-9, 100e-9, 250e-9, 500e-9, 1e-6, 2.5e-6, 5e-6, 10e-6, 25e-6, 50e-6, 100e-6, 250e-6, 500e-6, 1e-3, 2.5e-3, 5e-3, 10e-3, 25e-3, 50e-3, 100e-3, 250e-3, 500e-3, 1, 2.5, 5, 10, 25, 50, ), ) """Parameter horizontal_scale""" # channel-specific parameters channels = ChannelList( self, "ScopeChannels", TektronixTPS2012Channel, snapshotable=False ) for ch_num in range(1, 3): ch_name = f"ch{ch_num}" channel = TektronixTPS2012Channel(self, ch_name, ch_num) channels.append(channel) self.add_submodule(ch_name, channel) self.add_submodule("channels", channels.to_channel_tuple()) # Necessary settings for parsing the binary curve data self.visa_handle.encoding = "latin-1" log.info("Set VISA encoding to latin-1") self.write("DATa:ENCdg RPBinary") log.info("Set TPS2012 data encoding to RPBinary (Positive Integer Binary)") self.write("DATa:WIDTh 2") log.info("Set TPS2012 data width to 2") # Note: using data width 2 has been tested to not add # significantly to transfer times. The maximal length # of an array in one transfer is 2500 points. def _set_timescale(self, scale: float) -> None: """ set_cmd for the horizontal_scale """ self.trace_ready = False self.write(f"HORizontal:SCAle {scale}") ################################################## # METHODS FOR THE USER # ##################################################
[docs] def clear_message_queue(self, verbose: bool = False) -> None: """ Function to clear up (flush) the VISA message queue of the AWG instrument. Reads all messages in the queue. Args: verbose: If True, the read messages are printed. Default: False. """ original_timeout = self.visa_handle.timeout self.visa_handle.timeout = 1000 # 1 second as VISA counts in ms gotexception = False while not gotexception: try: message = self.visa_handle.read() if verbose: print(message) except VisaIOError: gotexception = True self.visa_handle.timeout = original_timeout
class TPS2012(TektronixTPS2012): """ Deprecated alias for ``TektronixTPS2012`` """ pass