from __future__ import annotations
import logging
import re
import time
import warnings
from collections import namedtuple
from typing import TYPE_CHECKING
import numpy as np
from packaging import version
from qcodes import validators as vals
from qcodes.instrument import (
ChannelList,
InstrumentBaseKWArgs,
InstrumentChannel,
VisaInstrument,
VisaInstrumentKWArgs,
)
from qcodes.parameters import ArrayParameter, ParamRawDataType
if TYPE_CHECKING:
from typing_extensions import Unpack
from qcodes.parameters import Parameter
log = logging.getLogger(__name__)
class RigolDS4000TraceNotReady(Exception):
pass
class ScopeArray(ArrayParameter):
def __init__(
self,
name: str,
instrument: RigolDS4000Channel,
channel: int,
raw: bool = False,
):
super().__init__(
name=name,
shape=(1400,),
label="Voltage",
unit="V",
setpoint_names=("Time",),
setpoint_labels=("Time",),
setpoint_units=("s",),
docstring="holds an array from scope",
instrument=instrument,
)
self.channel = channel
self.raw = raw
self.max_read_step = 50
self.trace_ready = False
def prepare_curvedata(self) -> None:
"""
Prepare the scope for returning curve data
"""
assert isinstance(self.instrument, RigolDS4000Channel)
if self.raw:
self.instrument.write(":STOP") # Stop acquisition
self.instrument.write(":WAVeform:MODE RAW") # Set RAW mode
else:
self.instrument.write(":WAVeform:MODE NORM") # Set normal mode
self.get_preamble()
p = self.preamble
# Generate time axis data
xdata = np.linspace(p.xorigin, p.xorigin + p.xincrement * p.points, p.points)
self.setpoints = (tuple(xdata),)
self.shape = (p.points,)
self.trace_ready = True
def get_raw(self) -> ParamRawDataType:
assert isinstance(self.instrument, RigolDS4000Channel)
assert isinstance(self.root_instrument, RigolDS4000)
if not self.trace_ready:
raise RigolDS4000TraceNotReady(
"Please run prepare_curvedata to prepare "
"the scope for giving a trace."
)
else:
self.trace_ready = False
# Set the data type for waveforms to "BYTE"
self.instrument.write(":WAVeform:FORMat BYTE")
# Set read channel
self.instrument.write(f":WAVeform:SOURce CHAN{self.channel}")
data_bin = bytearray()
if self.raw:
log.info("Readout of raw waveform started, %g points", self.shape[0])
# Ask for the right number of points
self.instrument.write(f":WAVeform:POINts {self.shape[0]}")
# Resets the waveform data reading
self.instrument.write(":WAVeform:RESet")
# Starts the waveform data reading
self.instrument.write(":WAVeform:BEGin")
for i in range(self.max_read_step):
status = self.instrument.ask(":WAVeform:STATus?").split(",")[0]
# Ask and retrieve waveform data
# It uses .read_raw() to get a byte
# string since our data is binary
self.instrument.write(":WAVeform:DATA?")
data_chunk = self.root_instrument.visa_handle.read_raw()
data_chuck = self._validate_strip_block(data_chunk)
data_bin.extend(data_chuck)
if status == "IDLE":
self.instrument.write(":WAVeform:END")
break
else:
# Wait some time to have the buffer re-filled
time.sleep(0.3)
log.info(
"chucks read: %d, last chuck points: %g, total read size: %g",
i,
len(data_chuck),
len(data_bin),
)
else:
raise ValueError("Communication error")
else:
# Ask and retrieve waveform data
# It uses .read_raw() to get a byte string since our data is binary
log.info("Readout of display waveform started, %d points", self.shape[0])
self.instrument.write(":WAVeform:DATA?") # Query data
data_chunk = self.root_instrument.visa_handle.read_raw()
data_bin.extend(self._validate_strip_block(data_chunk))
log.info("Readout ended, total read size: %g", len(data_bin))
log.info("Data conversion")
# Convert data to byte array
data_raw = np.frombuffer(data_bin, dtype=np.uint8).astype(float)
# Convert byte array to real data
p = self.preamble
data = (data_raw - p.yreference - p.yorigin) * p.yincrement
log.info("Data conversion done")
return data
@staticmethod
def _validate_strip_block(block: bytes) -> bytes:
"""
Given a block of raw data from the instrument, validate and
then strip the header with
size information. Raise ValueError if the sizes don't match.
Args:
block: The data block
Returns:
The stripped data
"""
# Validate header
header = block[:11].decode("ascii")
match = re.match(r"#9(\d{9})", header)
if match:
size = int(match[1])
block_nh = block[11:] # Strip header
block_nh = block_nh.strip() # Strip \n
if size == len(block_nh):
return block_nh
raise ValueError("Malformed data")
def get_preamble(self) -> None:
assert isinstance(self.instrument, RigolDS4000Channel)
preamble_nt = namedtuple(
"preamble_nt",
[
"format",
"mode",
"points",
"count",
"xincrement",
"xorigin",
"xreference",
"yincrement",
"yorigin",
"yreference",
],
)
def conv(x: str) -> float:
return int(x) if x.isdigit() else float(x)
preamble_raw = self.instrument.ask(":WAVeform:PREamble?")
preamble_num = [conv(x) for x in preamble_raw.strip().split(",")]
self.preamble = preamble_nt(*preamble_num)
[docs]
class RigolDS4000Channel(InstrumentChannel):
def __init__(
self,
parent: RigolDS4000,
name: str,
channel: int,
**kwargs: Unpack[InstrumentBaseKWArgs],
):
super().__init__(parent, name, **kwargs)
self.amplitude: Parameter = self.add_parameter(
"amplitude", get_cmd=f":MEASure:VAMP? chan{channel}", get_parser=float
)
"""Parameter amplitude"""
self.vertical_scale: Parameter = self.add_parameter(
"vertical_scale",
get_cmd=f":CHANnel{channel}:SCALe?",
set_cmd=":CHANnel{}:SCALe {}".format(channel, "{}"),
get_parser=float,
)
"""Parameter vertical_scale"""
# Return the waveform displayed on the screen
self.curvedata: ScopeArray = self.add_parameter(
"curvedata", channel=channel, parameter_class=ScopeArray, raw=False
)
"""Parameter curvedata"""
# Return the waveform in the internal memory
self.curvedata_raw: ScopeArray = self.add_parameter(
"curvedata_raw", channel=channel, parameter_class=ScopeArray, raw=True
)
"""Parameter curvedata_raw"""
[docs]
class RigolDS4000(VisaInstrument):
"""
This is the QCoDeS driver for the Rigol DS4000 series oscilloscopes.
"""
default_timeout = 20
def __init__(
self,
name: str,
address: str,
**kwargs: Unpack[VisaInstrumentKWArgs],
):
"""
Initialises the DS4000.
Args:
name: Name of the instrument used by QCoDeS
address: Instrument address as used by VISA
**kwargs: kwargs are forwarded to base class.
"""
# Init VisaInstrument. device_clear MUST NOT be issued, otherwise communications hangs
# due a bug in firmware
kwargs["device_clear"] = False
super().__init__(name, address, **kwargs)
self.connect_message()
self._check_firmware_version()
# functions
self.add_function("run", call_cmd=":RUN", docstring="Start acquisition")
self.add_function("stop", call_cmd=":STOP", docstring="Stop acquisition")
self.add_function(
"single", call_cmd=":SINGle", docstring="Single trace acquisition"
)
self.add_function(
"force_trigger", call_cmd="TFORce", docstring="Force trigger event"
)
self.add_function(
"auto_scale", call_cmd=":AUToscale", docstring="Perform autoscale"
)
# general parameters
self.trigger_type: Parameter = self.add_parameter(
"trigger_type",
label="Type of the trigger",
get_cmd=":TRIGger:MODE?",
set_cmd=":TRIGger:MODE {}",
vals=vals.Enum(
"EDGE",
"PULS",
"RUNT",
"NEDG",
"SLOP",
"VID",
"PATT",
"RS232",
"IIC",
"SPI",
"CAN",
"FLEX",
"USB",
),
)
"""Parameter trigger_type"""
self.trigger_mode: Parameter = self.add_parameter(
"trigger_mode",
label="Mode of the trigger",
get_cmd=":TRIGger:SWEep?",
set_cmd=":TRIGger:SWEep {}",
vals=vals.Enum("AUTO", "NORM", "SING"),
)
"""Parameter trigger_mode"""
self.time_base: Parameter = self.add_parameter(
"time_base",
label="Horizontal time base",
get_cmd=":TIMebase:MAIN:SCALe?",
set_cmd=":TIMebase:MAIN:SCALe {}",
get_parser=float,
unit="s/div",
)
"""Parameter time_base"""
self.sample_point_count: Parameter = self.add_parameter(
"sample_point_count",
label="Number of the waveform points",
get_cmd=":WAVeform:POINts?",
set_cmd=":WAVeform:POINts {}",
get_parser=int,
vals=vals.Ints(min_value=1),
)
"""Parameter sample_point_count"""
self.enable_auto_scale: Parameter = self.add_parameter(
"enable_auto_scale",
label="Enable or disable autoscale",
get_cmd=":SYSTem:AUToscale?",
set_cmd=":SYSTem:AUToscale {}",
get_parser=bool,
vals=vals.Bool(),
)
"""Parameter enable_auto_scale"""
channels = ChannelList(self, "Channels", RigolDS4000Channel, snapshotable=False)
for channel_number in range(1, 5):
channel = RigolDS4000Channel(self, f"ch{channel_number}", channel_number)
channels.append(channel)
self.add_submodule("channels", channels.to_channel_tuple())
def _check_firmware_version(self) -> None:
# Require version 00.02.03
idn = self.get_idn()
verstr = idn["firmware"]
if verstr is None:
raise RuntimeError("Could not determine firmware version of DS4000.")
ver = version.parse(verstr)
if ver < version.parse("00.02.03"):
warnings.warn(
"Firmware version should be at least 00.02.03,"
"data transfer may not work correctly"
)