"""
QCoDeS driver for the MSO/DPO5000/B, DPO7000/C,
DPO70000/B/C/D/DX/SX, DSA70000/B/C/D, and
MSO70000/C/DX Series Digital Oscilloscopes
"""
import textwrap
import time
from functools import partial
from typing import TYPE_CHECKING, Any, ClassVar, cast
import numpy as np
from typing_extensions import Unpack, deprecated
from qcodes.instrument import (
ChannelList,
Instrument,
InstrumentBase,
InstrumentBaseKWArgs,
InstrumentChannel,
VisaInstrument,
VisaInstrumentKWArgs,
)
from qcodes.parameters import (
Parameter,
ParameterWithSetpoints,
create_on_off_val_mapping,
)
from qcodes.utils import QCoDeSDeprecationWarning
from qcodes.validators import Arrays, Enum
if TYPE_CHECKING:
from collections.abc import Callable
def strip_quotes(string: str) -> str:
"""
This function is used as a get_parser for various
parameters in this driver
"""
return string.strip('"')
[docs]
class TektronixDPOModeError(Exception):
"""
Raise this exception if we are in a wrong mode to
perform an action
"""
pass
ModeError = TektronixDPOModeError
"""
Alias for backwards compatibility
"""
class TektronixDPO7000xx(VisaInstrument):
"""
QCoDeS driver for the MSO/DPO5000/B, DPO7000/C,
DPO70000/B/C/D/DX/SX, DSA70000/B/C/D, and
MSO70000/C/DX Series Digital Oscilloscopes
"""
number_of_channels = 4
number_of_measurements = 8 # The number of available
# measurements does not change.
default_terminator = "\n"
def __init__(
self, name: str, address: str, **kwargs: Unpack[VisaInstrumentKWArgs]
) -> None:
super().__init__(name, address, **kwargs)
self.add_submodule("horizontal", TektronixDPOHorizontal(self, "horizontal"))
self.add_submodule("data", TektronixDPOData(self, "data"))
self.add_submodule("waveform", TektronixDPOWaveformFormat(self, "waveform"))
measurement_list = ChannelList(self, "measurement", TektronixDPOMeasurement)
for measurement_number in range(1, self.number_of_measurements):
measurement_name = f"measurement{measurement_number}"
measurement_module = TektronixDPOMeasurement(
self, measurement_name, measurement_number
)
self.add_submodule(measurement_name, measurement_module)
measurement_list.append(measurement_module)
self.add_submodule("measurement", measurement_list)
self.add_submodule(
"statistics", TektronixDPOMeasurementStatistics(self, "statistics")
)
channel_list = ChannelList(self, "channel", TektronixDPOChannel)
for channel_number in range(1, self.number_of_channels + 1):
channel_name = f"channel{channel_number}"
channel_module = TektronixDPOChannel(
self,
channel_name,
channel_number,
)
self.add_submodule(channel_name, channel_module)
channel_list.append(channel_module)
self.add_submodule("channel", channel_list)
self.add_submodule("trigger", TektronixDPOTrigger(self, "trigger"))
self.add_submodule(
"delayed_trigger",
TektronixDPOTrigger(self, "delayed_trigger", delayed_trigger=True),
)
self.connect_message()
def ask_raw(self, cmd: str) -> str:
"""
Sometimes the instrument returns non-ascii characters in response
strings manually adjust the encoding to latin-1
"""
self.visa_log.debug(f"Querying: {cmd}")
self.visa_handle.write(cmd)
response = self.visa_handle.read(encoding="latin-1")
self.visa_log.debug(f"Response: {response}")
return response
[docs]
class TektronixDPOData(InstrumentChannel):
"""
This submodule sets and retrieves information regarding the
data source for the "CURVE?" query, which is used when
retrieving waveform data.
"""
def __init__(
self, parent: InstrumentBase, name: str, **kwargs: Unpack[InstrumentBaseKWArgs]
) -> None:
super().__init__(parent, name, **kwargs)
# We can choose to retrieve data from arbitrary
# start and stop indices of the buffer.
self.start_index: Parameter = self.add_parameter(
"start_index",
get_cmd="DATa:STARt?",
set_cmd="DATa:STARt {}",
get_parser=int,
)
"""Parameter start_index"""
self.stop_index: Parameter = self.add_parameter(
"stop_index", get_cmd="DATa:STOP?", set_cmd="DATa:STOP {}", get_parser=int
)
"""Parameter stop_index"""
self.source: Parameter = self.add_parameter(
"source",
get_cmd="DATa:SOU?",
set_cmd="DATa:SOU {}",
vals=Enum(*TektronixDPOWaveform.valid_identifiers),
)
"""Parameter source"""
self.encoding: Parameter = self.add_parameter(
"encoding",
get_cmd="DATa:ENCdg?",
set_cmd="DATa:ENCdg {}",
get_parser=strip_quotes,
vals=Enum(
"ASCIi",
"FAStest",
"RIBinary",
"RPBinary",
"FPBinary",
"SRIbinary",
"SRPbinary",
"SFPbinary",
),
docstring=textwrap.dedent(
"""
For a detailed explanation of the
set arguments, please consult the
programmers manual at page 263/264.
http://download.tek.com/manual/077001022.pdf
"""
),
)
"""
Parameter encoding
For a detailed explanation of the
set arguments, please consult the
programmers manual at page 263/264.
http://download.tek.com/manual/077001022.pdf
"""
[docs]
class TektronixDPOChannel(InstrumentChannel):
"""
The main channel module for the oscilloscope. The parameters
defined here reflect the waveforms as they are displayed on
the instrument display.
"""
def __init__(
self,
parent: Instrument | InstrumentChannel,
name: str,
channel_number: int,
**kwargs: Unpack[InstrumentBaseKWArgs],
) -> None:
super().__init__(parent, name, **kwargs)
self._identifier = f"CH{channel_number}"
self.add_submodule(
"waveform", TektronixDPOWaveform(self, "waveform", self._identifier)
)
self.scale: Parameter = self.add_parameter(
"scale",
get_cmd=f"{self._identifier}:SCA?",
set_cmd=f"{self._identifier}:SCA {{}}",
get_parser=float,
unit="V/div",
)
"""Parameter scale"""
self.offset: Parameter = self.add_parameter(
"offset",
get_cmd=f"{self._identifier}:OFFS?",
set_cmd=f"{self._identifier}:OFFS {{}}",
get_parser=float,
unit="V",
)
"""Parameter offset"""
self.position: Parameter = self.add_parameter(
"position",
get_cmd=f"{self._identifier}:POS?",
set_cmd=f"{self._identifier}:POS {{}}",
get_parser=float,
unit="V",
)
"""Parameter position"""
self.termination: Parameter = self.add_parameter(
"termination",
get_cmd=f"{self._identifier}:TER?",
set_cmd=f"{self._identifier}:TER {{}}",
vals=Enum(50, 1e6),
get_parser=float,
unit="Ohm",
)
"""Parameter termination"""
self.analog_to_digital_threshold: Parameter = self.add_parameter(
"analog_to_digital_threshold",
get_cmd=f"{self._identifier}:THRESH?",
set_cmd=f"{self._identifier}:THRESH {{}}",
get_parser=float,
unit="V",
)
"""Parameter analog_to_digital_threshold"""
self.termination_voltage: Parameter = self.add_parameter(
"termination_voltage",
get_cmd=f"{self._identifier}:VTERm:BIAS?",
set_cmd=f"{self._identifier}:VTERm:BIAS {{}}",
get_parser=float,
unit="V",
)
"""Parameter termination_voltage"""
[docs]
def set_trace_length(self, value: int) -> None:
"""
Set the trace length when retrieving data
through the 'waveform' interface
Args:
value: The requested number of samples in the trace
"""
if self.root_instrument.horizontal.record_length() < value:
raise ValueError(
"Cannot set a trace length which is larger than "
"the record length. Please switch to manual mode "
"and adjust the record length first"
)
self.root_instrument.data.start_index(1)
self.root_instrument.data.stop_index(value)
[docs]
def set_trace_time(self, value: float) -> None:
"""
Args:
value: The time over which a trace is desired.
"""
sample_rate = self.root_instrument.horizontal.sample_rate()
required_sample_count = int(sample_rate * value)
self.set_trace_length(required_sample_count)
[docs]
class TektronixDPOHorizontal(InstrumentChannel):
"""
This module controls the horizontal axis of the scope
"""
def __init__(
self,
parent: Instrument | InstrumentChannel,
name: str,
**kwargs: Unpack[InstrumentBaseKWArgs],
) -> None:
super().__init__(parent, name, **kwargs)
self.mode: Parameter = self.add_parameter(
"mode",
get_cmd="HORizontal:MODE?",
set_cmd="HORizontal:MODE {}",
vals=Enum("auto", "constant", "manual"),
get_parser=str.lower,
docstring="""
Auto mode attempts to keep record length
constant as you change the time per division
setting. Record length is read only.
Constant mode attempts to keep sample rate
constant as you change the time per division
setting. Record length is read only.
Manual mode lets you change sample mode and
record length. Time per division or Horizontal
scale is read only.
""",
)
"""
Auto mode attempts to keep record length
constant as you change the time per division
setting. Record length is read only.
Constant mode attempts to keep sample rate
constant as you change the time per division
setting. Record length is read only.
Manual mode lets you change sample mode and
record length. Time per division or Horizontal
scale is read only.
"""
self.unit: Parameter = self.add_parameter(
"unit", get_cmd="HORizontal:MAIn:UNIts?", get_parser=strip_quotes
)
"""Parameter unit"""
self.record_length: Parameter = self.add_parameter(
"record_length",
get_cmd="HORizontal:MODE:RECOrdlength?",
set_cmd=self._set_record_length,
get_parser=float,
)
"""Parameter record_length"""
self.sample_rate: Parameter = self.add_parameter(
"sample_rate",
get_cmd="HORizontal:MODE:SAMPLERate?",
set_cmd="HORizontal:MODE:SAMPLERate {}",
get_parser=float,
unit=f"sample/{self.unit()}",
)
"""Parameter sample_rate"""
self.scale: Parameter = self.add_parameter(
"scale",
get_cmd="HORizontal:MODE:SCAle?",
set_cmd=self._set_scale,
get_parser=float,
unit=f"{self.unit()}/div",
)
"""Parameter scale"""
self.position: Parameter = self.add_parameter(
"position",
get_cmd="HORizontal:POSition?",
set_cmd="HORizontal:POSition {}",
get_parser=float,
unit="%",
docstring=textwrap.dedent(
"""
The horizontal position relative to a
received trigger. E.g. a value of '10'
sets the trigger position of the waveform
such that 10% of the display is to the
left of the trigger position.
"""
),
)
"""
The horizontal position relative to a
received trigger. E.g. a value of '10'
sets the trigger position of the waveform
such that 10% of the display is to the
left of the trigger position.
"""
self.roll: Parameter = self.add_parameter(
"roll",
get_cmd="HORizontal:ROLL?",
set_cmd="HORizontal:ROLL {}",
vals=Enum("Auto", "On", "Off"),
docstring=textwrap.dedent(
"""
Use Roll Mode when you want to view data at
very slow sweep speeds.
"""
),
)
"""
Use Roll Mode when you want to view data at
very slow sweep speeds.
"""
def _set_record_length(self, value: int) -> None:
if self.mode() != "manual":
raise TektronixDPOModeError(
"The record length can only be changed in manual mode"
)
self.write(f"HORizontal:MODE:RECOrdlength {value}")
def _set_scale(self, value: float) -> None:
if self.mode() == "manual":
raise TektronixDPOModeError("The scale cannot be changed in manual mode")
self.write(f"HORizontal:MODE:SCAle {value}")
[docs]
class TektronixDPOTrigger(InstrumentChannel):
"""
Submodule for trigger setup.
You can trigger with the A (Main) trigger system alone
or combine the A (Main) trigger with the B (Delayed) trigger
to trigger on sequential events. When using sequential
triggering, the A trigger event arms the trigger system, and
the B trigger event triggers the instrument when the B
trigger conditions are met.
A and B triggers can (and typically do) have separate sources.
The B trigger condition is based on a time delay or a specified
number of events.
See page75, Using A (Main) and B (Delayed) triggers.
https://download.tek.com/manual/MSO70000C-DX-DPO70000C-DX-MSO-DPO7000C-MSO-DPO5000B-Oscilloscope-Quick-Start-User-Manual-071298006.pdf
"""
def __init__(
self,
parent: Instrument,
name: str,
delayed_trigger: bool = False,
**kwargs: Unpack[InstrumentBaseKWArgs],
):
super().__init__(parent, name, **kwargs)
self._identifier = "B" if delayed_trigger else "A"
trigger_types = ["edge", "logic", "pulse"]
if self._identifier == "A":
trigger_types.extend(
["video", "i2c", "can", "spi", "communication", "serial", "rs232"]
)
self.type: Parameter = self.add_parameter(
"type",
get_cmd=f"TRIGger:{self._identifier}:TYPE?",
set_cmd=self._trigger_type,
vals=Enum(*trigger_types),
get_parser=str.lower,
)
"""Parameter type"""
edge_couplings = ["ac", "dc", "hfrej", "lfrej", "noiserej"]
if self._identifier == "B":
edge_couplings.append("atrigger")
self.edge_coupling: Parameter = self.add_parameter(
"edge_coupling",
get_cmd=f"TRIGger:{self._identifier}:EDGE:COUPling?",
set_cmd=f"TRIGger:{self._identifier}:EDGE:COUPling {{}}",
vals=Enum(*edge_couplings),
get_parser=str.lower,
)
"""Parameter edge_coupling"""
self.edge_slope: Parameter = self.add_parameter(
"edge_slope",
get_cmd=f"TRIGger:{self._identifier}:EDGE:SLOpe?",
set_cmd=f"TRIGger:{self._identifier}:EDGE:SLOpe {{}}",
vals=Enum("rise", "fall", "either"),
get_parser=str.lower,
)
"""Parameter edge_slope"""
trigger_sources = [
f"CH{i}" for i in range(1, TektronixDPO7000xx.number_of_channels)
]
trigger_sources.extend([f"D{i}" for i in range(0, 16)])
if self._identifier == "A":
trigger_sources.append("line")
self.source: Parameter = self.add_parameter(
"source",
get_cmd=f"TRIGger:{self._identifier}:EDGE:SOUrce?",
set_cmd=f"TRIGger:{self._identifier}:EDGE:SOUrce {{}}",
vals=Enum(*trigger_sources),
)
"""Parameter source"""
def _trigger_type(self, value: str) -> None:
if value != "edge":
raise NotImplementedError(
"We currently only support the 'edge' trigger type"
)
self.write(f"TRIGger:{self._identifier}:TYPE {value}")
[docs]
@deprecated(
"TekronixDPOTrigger is deprecated use TektronixDPOTrigger",
category=QCoDeSDeprecationWarning,
)
class TekronixDPOTrigger(TektronixDPOTrigger):
"""
Deprecated alias for backwards compatibility
"""
[docs]
class TektronixDPOMeasurementParameter(Parameter):
"""
A measurement parameter does not only return the instantaneous value
of a measurement, but can also return some statistics. The accumulation
time over which these statistics are gathered can be controlled through
the 'time_constant' parameter on the submodule
'TektronixDPOMeasurementStatistics'. Here we also find the method 'reset'
to reset the values over which the statistics are gathered.
"""
def _get(self, metric: str) -> float:
measurement_channel = cast(TektronixDPOMeasurement, self.instrument)
if measurement_channel.type.get_latest() != self.name:
measurement_channel.type(self.name)
measurement_channel.state(1)
measurement_channel.wait_adjustment_time()
measurement_number = measurement_channel.measurement_number
str_value = measurement_channel.ask(
f"MEASUrement:MEAS{measurement_number}:{metric}?"
)
return float(str_value)
[docs]
def mean(self) -> float:
return self._get("MEAN")
[docs]
def max(self) -> float:
return self._get("MAX")
[docs]
def min(self) -> float:
return self._get("MINI")
[docs]
def stdev(self) -> float:
return self._get("STDdev")
[docs]
def get_raw(self) -> float:
return self._get("VALue")
[docs]
def set_raw(self, value: Any) -> None:
raise ValueError("A measurement cannot be set")
[docs]
class TektronixDPOMeasurement(InstrumentChannel):
"""
The measurement submodule
"""
# It was found by trial and error that adjusting
# the measurement type and source takes some time
# to reflect properly on the value of the
# measurement. Wait a minimum of ...
_minimum_adjustment_time = 0.1
# seconds after setting measurement type/source before
# calling the measurement value SCPI command.
measurements: ClassVar[list[tuple[str, str]]] = [
("amplitude", "V"),
("area", "Vs"),
("burst", "s"),
("carea", "Vs"),
("cmean", "V"),
("crms", "V"),
("delay", "s"),
("distduty", "%"),
("extinctdb", "dB"),
("extinctpct", "%"),
("extinctratio", ""),
("eyeheight", "V"),
("eyewidth", "s"),
("fall", "s"),
("frequency", "Hz"),
("high", "V"),
("hits", "hits"),
("low", "V"),
("maximum", "V"),
("mean", "V"),
("median", "V"),
("minimum", "V"),
("ncross", "s"),
("nduty", "%"),
("novershoot", "%"),
("nwidth", "s"),
("pbase", "V"),
("pcross", "s"),
("pctcross", "%"),
("pduty", "%"),
("peakhits", "hits"),
("period", "s"),
("phase", "°"),
("pk2pk", "V"),
("pkpkjitter", "s"),
("pkpknoise", "V"),
("povershoot", "%"),
("ptop", "V"),
("pwidth", "s"),
("qfactor", ""),
("rise", "s"),
("rms", "V"),
("rmsjitter", "s"),
("rmsnoise", "V"),
("sigma1", "%"),
("sigma2", "%"),
("sigma3", "%"),
("sixsigmajit", "s"),
("snratio", ""),
("stddev", "V"),
("undefined", ""),
("waveforms", "wfms"),
]
def __init__(
self,
parent: Instrument,
name: str,
measurement_number: int,
**kwargs: Unpack[InstrumentBaseKWArgs],
) -> None:
super().__init__(parent, name, **kwargs)
self._measurement_number = measurement_number
self._adjustment_time = time.perf_counter()
self.state: Parameter = self.add_parameter(
"state",
get_cmd=f"MEASUrement:MEAS{self._measurement_number}:STATe?",
set_cmd=f"MEASUrement:MEAS{self._measurement_number}:STATe {{}}",
val_mapping=create_on_off_val_mapping(on_val="1", off_val="0"),
)
"""Parameter state"""
self.type: Parameter = self.add_parameter(
"type",
get_cmd=f"MEASUrement:MEAS{self._measurement_number}:TYPe?",
set_cmd=self._set_measurement_type,
get_parser=str.lower,
vals=Enum(*(m[0] for m in self.measurements)),
docstring=textwrap.dedent(
"Please see page 566-569 of the programmers manual "
"for a detailed description of these arguments. "
"http://download.tek.com/manual/077001022.pdf"
),
)
"""Parameter type"""
for measurement, unit in self.measurements:
self.add_parameter(
name=measurement,
unit=unit,
parameter_class=TektronixDPOMeasurementParameter,
)
for src in [1, 2]:
self.add_parameter(
f"source{src}",
get_cmd=f"MEASUrement:MEAS{self._measurement_number}:SOUrce{src}?",
set_cmd=partial(self._set_source, src),
vals=Enum(*(TektronixDPOWaveform.valid_identifiers + ["HISTogram"])),
)
@property
def measurement_number(self) -> int:
return self._measurement_number
def _set_measurement_type(self, value: str) -> None:
self._adjustment_time = time.perf_counter()
self.write(f"MEASUrement:MEAS{self._measurement_number}:TYPe {value}")
def _set_source(self, source_number: int, value: str) -> None:
self._adjustment_time = time.perf_counter()
self.write(
f"MEASUrement:MEAS{self._measurement_number}:SOUrce{source_number} "
f"{value}"
)
[docs]
def wait_adjustment_time(self) -> None:
"""
Wait until the minimum time after adjusting the measurement source or
type has elapsed
"""
time_since_adjust = time.perf_counter() - self._adjustment_time
if time_since_adjust < self._minimum_adjustment_time:
time_remaining = self._minimum_adjustment_time - time_since_adjust
time.sleep(time_remaining)
[docs]
class TektronixDPOMeasurementStatistics(InstrumentChannel):
def __init__(
self, parent: InstrumentBase, name: str, **kwargs: Unpack[InstrumentBaseKWArgs]
):
super().__init__(parent=parent, name=name, **kwargs)
self.mode: Parameter = self.add_parameter(
"mode",
get_cmd="MEASUrement:STATIstics:MODe?",
set_cmd="MEASUrement:STATIstics:MODe {}",
vals=Enum("OFF", "ALL", "VALUEMean", "MINMax", "MEANSTDdev"),
docstring=textwrap.dedent(
"This command controls the operation and display of measurement "
"statistics. "
"1. OFF turns off all measurements. This is the default value "
"2. ALL turns on statistics and displays all statistics for "
"each measurement. "
"3. VALUEMean turns on statistics and displays the value and the "
"mean (μ) of each measurement. "
"4. MINMax turns on statistics and displays the min and max of "
"each measurement. "
"5. MEANSTDdev turns on statistics and displays the mean and "
"standard deviation of each measurement."
),
)
"""Parameter mode"""
self.time_constant: Parameter = self.add_parameter(
"time_constant",
get_cmd="MEASUrement:STATIstics:WEIghting?",
set_cmd="MEASUrement:STATIstics:WEIghting {}",
get_parser=int,
docstring=textwrap.dedent(
"This command sets or queries the time constant for mean and "
"standard deviation statistical accumulations, which is equivalent "
"to selecting Measurement Setup from the Measure menu, clicking "
"the Statistics button and entering the desired Weight n= value."
),
)
"""Parameter time_constant"""
[docs]
def reset(self) -> None:
self.write("MEASUrement:STATIstics:COUNt RESEt")