import logging
from functools import partial
from typing import TYPE_CHECKING, Any, ClassVar, cast
from qcodes import validators as vals
from qcodes.instrument import (
ChannelList,
InstrumentBaseKWArgs,
InstrumentChannel,
VisaInstrument,
VisaInstrumentKWArgs,
)
from qcodes.utils import partial_with_docstring
if TYPE_CHECKING:
from typing_extensions import Unpack
from qcodes.parameters import Parameter
log = logging.getLogger(__name__)
[docs]
class RigolDG1062Burst(InstrumentChannel):
"""
Burst commands for the DG1062. We make a separate channel for these to
group burst commands together.
"""
def __init__(
self,
parent: "RigolDG1062",
name: str,
channel: int,
**kwargs: "Unpack[InstrumentBaseKWArgs]",
):
super().__init__(parent, name, **kwargs)
self.channel = channel
self.on: Parameter = self.add_parameter(
"on",
get_cmd=f":SOUR{channel}:BURS?",
set_cmd=f":SOUR{channel}:BURS {{}}",
vals=vals.Enum(0, 1, "ON", "OFF"),
)
"""Parameter on"""
self.polarity: Parameter = self.add_parameter(
"polarity",
get_cmd=f":SOUR{channel}:BURS:GATE:POL?",
set_cmd=f":SOUR{channel}:BURS:GATE:POL {{}}",
vals=vals.Enum("NORM", "INV"),
)
"""Parameter polarity"""
self.period: Parameter = self.add_parameter(
"period",
get_cmd=f":SOUR{channel}:BURS:INT:PER?",
set_cmd=f":SOUR{channel}:BURS:INT:PER {{}}",
vals=vals.MultiType(
vals.Numbers(min_value=3e-6, max_value=500), vals.Enum("MIN", "MAX")
),
)
"""Parameter period"""
self.mode: Parameter = self.add_parameter(
"mode",
get_cmd=f":SOUR{channel}:BURS:MODE?",
set_cmd=f":SOUR{channel}:BURS:MODE {{}}",
vals=vals.Enum("TRIG", "INF", "GAT"),
)
"""Parameter mode"""
self.ncycles: Parameter = self.add_parameter(
"ncycles",
get_cmd=f":SOUR{channel}:BURS:NCYC?",
set_cmd=f":SOUR{channel}:BURS:NCYC {{}}",
vals=vals.Numbers(min_value=1, max_value=500000),
)
"""Parameter ncycles"""
self.phase: Parameter = self.add_parameter(
"phase",
get_cmd=f":SOUR{channel}:BURS:PHAS?",
set_cmd=f":SOUR{channel}:BURS:PHAS {{}}",
vals=vals.Numbers(min_value=0, max_value=360),
)
"""Parameter phase"""
self.time_delay: Parameter = self.add_parameter(
"time_delay",
get_cmd=f":SOUR{channel}:BURS:TDEL?",
set_cmd=f":SOUR{channel}:BURS:TDEL {{}}",
vals=vals.Numbers(min_value=0),
)
"""Parameter time_delay"""
self.trigger_slope: Parameter = self.add_parameter(
"trigger_slope",
get_cmd=f":SOUR{channel}:BURS:TRIG:SLOP?",
set_cmd=f":SOUR{channel}:BURS:TRIG:SLOP {{}}",
vals=vals.Enum("POS", "NEG"),
)
"""Parameter trigger_slope"""
self.source: Parameter = self.add_parameter(
"source",
get_cmd=f":SOUR{channel}:BURS:TRIG:SOUR?",
set_cmd=f":SOUR{channel}:BURS:TRIG:SOUR {{}}",
vals=vals.Enum("INT", "EXT", "MAN"),
)
"""Parameter source"""
self.idle: Parameter = self.add_parameter(
"idle",
get_cmd=f":SOUR{channel}:BURST:IDLE?",
set_cmd=f":SOUR{channel}:BURST:IDLE {{}}",
vals=vals.MultiType(
vals.Enum("FPT", "TOP", "BOTTOM", "CENTER"),
vals.Numbers(), # DIY
),
)
"""Parameter idle"""
[docs]
def trigger(self) -> None:
"""
Send a software trigger to the instrument. This only works if the
trigger source is set to manual.
"""
self.parent.write_raw(f":SOUR{self.channel}:BURS:TRIG")
[docs]
class RigolDG1062Channel(InstrumentChannel):
min_impedance = 1
max_impedance = 10000
waveform_params: ClassVar[dict[str, tuple[str, ...]]] = {
waveform: ("freq", "ampl", "offset", "phase")
for waveform in ["HARM", "NOIS", "RAMP", "SIN", "SQU", "TRI", "USER", "PULS"]
}
waveform_params["DC"] = ("freq", "ampl", "offset")
waveform_params["ARB"] = ("sample_rate", "ampl", "offset")
"""
Responses from the machine don't always match
the name to set the function, hence a translater
"""
waveform_translate: ClassVar[dict[str, str]] = {
"HARM": "HARM",
"NOISE": "NOIS",
"RAMP": "RAMP",
"SIN": "SIN",
"SQU": "SQU",
"TRI": "TRI",
"USER": "USER",
"PULSE": "PULS",
}
waveforms: ClassVar[tuple[str, ...]] = tuple(waveform_params.keys())
def __init__(
self,
parent: "RigolDG1062",
name: str,
channel: int,
**kwargs: "Unpack[InstrumentBaseKWArgs]",
):
"""
Args:
parent: The instrument this channel belongs to
name: Name of the channel.
channel: Number of the channel.
**kwargs: Forwarded to base class.
"""
super().__init__(parent, name, **kwargs)
self.channel = channel
for param, unit in [
("freq", "Hz"),
("ampl", "V"),
("offset", "V"),
("phase", "deg"),
("sample_rate", "1/s"),
]:
self.add_parameter(
param,
unit=unit,
get_cmd=partial(self._get_waveform_param, param),
set_cmd=partial(self._set_waveform_param, param),
)
self.waveform: Parameter = self.add_parameter(
"waveform", get_cmd=partial(self._get_waveform_param, "waveform")
)
"""Parameter waveform"""
self.impedance: Parameter = self.add_parameter(
"impedance",
get_cmd=f":OUTPUT{channel}:IMP?",
set_cmd=f":OUTPUT{channel}:IMP {{}}",
unit="Ohm",
vals=vals.MultiType(
vals.Ints(
min_value=RigolDG1062Channel.min_impedance,
max_value=RigolDG1062Channel.max_impedance,
),
vals.Enum("INF", "MIN", "MAX", "HighZ"),
),
get_parser=(
lambda value: "HighZ"
if float(value) > RigolDG1062Channel.max_impedance
else float(value)
),
set_parser=lambda value: "INF" if value == "HighZ" else value,
)
"""Parameter impedance"""
self.sync: Parameter = self.add_parameter(
"sync",
get_cmd=f":OUTPUT{channel}:SYNC?",
set_cmd=f"OUTPUT{channel}:SYNC {{}}",
vals=vals.Enum(0, 1, "ON", "OFF"),
)
"""Parameter sync"""
self.polarity: Parameter = self.add_parameter(
"polarity",
get_cmd=f":OUTPUT{channel}:GAT:POL?",
set_cmd=f":OUTPUT{channel}:GAT:POL {{}}",
vals=vals.OnOff(),
val_mapping={1: "POSITIVE", 0: "NEGATIVE"},
)
"""Parameter polarity"""
self.state: Parameter = self.add_parameter(
"state",
get_cmd=f"OUTPUT{channel}:STATE?",
set_cmd=f"OUTPUT{channel}:STATE {{}}",
)
"""Parameter state"""
self.duty_cycle: Parameter = self.add_parameter(
"duty_cycle",
get_cmd=self._get_duty_cycle,
set_cmd=self._set_duty_cycle,
unit="%",
vals=vals.Numbers(min_value=1, max_value=99),
docstring=(
"This functions reads/sets the duty "
"cycle for a square and pulse wave "
"since these inherit a duty cycle.\n"
"For other waveforms it will give "
"the user an error"
),
)
"""
This functions reads/sets the duty cycle for a square and
pulse wave since these inherit a duty cycle.
For other waveforms it will give the user an error
"""
burst = RigolDG1062Burst(cast(RigolDG1062, self.parent), "burst", self.channel)
self.add_submodule("burst", burst)
# We want to be able to do the following:
# >>> help(gd.channels[0].sin)
# >>> gd.channels[0].sin(freq=2E3, ampl=1.0, offset=0, phase=0)
# We do not use add_function as it is more cumbersome to use.
for waveform in self.waveforms:
f = partial_with_docstring(
self.apply,
docstring="Args: " + ", ".join(self.waveform_params[waveform]),
waveform=waveform,
)
setattr(self, waveform.lower(), f)
# Retrieve current waveform from device
self.waveform()
[docs]
def apply(self, **kwargs: Any) -> None:
"""
Public interface to apply a waveform on the channel
Example:
>>> gd = RigolDG1062("gd", "TCPIP0::169.254.187.99::inst0::INSTR")
>>> gd.channels[0].apply(waveform="SIN", freq=1E3, ampl=1.0, offset=0, phase=0)
Valid waveforms are: HARM, NOIS, RAMP, SIN, SQU, TRI, USER, DC, ARB
To find the correct arguments of each waveform we can e.g. do:
>>> help(gd.channels[0].sin)
Notice the lower case when accessing the waveform through convenience
functions.
If not kwargs are given a dictionary with the current waveform
parameters are returned.
"""
self._set_waveform_params(**kwargs)
def _get_waveform_param(self, param: str) -> float:
"""
Get a parameter of the current waveform. Valid param names are
dependent on the waveform type (e.g. "DC" does not have a "phase")
"""
params_dict = self._get_waveform_params()
return params_dict.get(param, None)
def _get_waveform_params(self) -> dict[Any, Any]:
"""
Get all the parameters of the current waveform and
"""
def to_float(string: str) -> float | str:
try:
return float(string)
except ValueError:
return string
waveform_str = self.parent.ask_raw(f":SOUR{self.channel}:APPL?")
parts = waveform_str.strip('"').split(",")
current_waveform = self.waveform_translate[parts[0]]
param_vals: list[str | float] = [current_waveform]
param_vals += [to_float(i) for i in parts[1:]]
param_names = ["waveform"] + list(self.waveform_params[current_waveform])
params_dict = dict(zip(param_names, param_vals))
return params_dict
def _set_waveform_param(self, param: str, value: float) -> None:
"""
Set a particular waveform param to the given value.
"""
params_dict = self._get_waveform_params()
if param in params_dict:
params_dict[param] = value
else:
log.warning(f"Warning, unable to set '{param}' for the current waveform")
return
return self._set_waveform_params(**params_dict)
def _set_waveform_params(self, **params_dict: int | float) -> None:
"""
Apply a waveform with values given in a dictionary.
"""
if "waveform" not in params_dict:
raise ValueError("At least 'waveform' argument needed")
waveform = str(params_dict["waveform"])
if waveform not in self.waveform_params:
raise ValueError(
f"Unknown waveform '{waveform}'. Options are "
f"{self.waveform_params.keys()}"
)
param_names = self.waveform_params[waveform]
if not set(param_names).issubset(params_dict.keys()):
raise ValueError(
f"Waveform {waveform} needs at least parameters {param_names}"
)
string = f":SOUR{self.channel}:APPL:{waveform} "
values = [f"{params_dict[param]:7e}" for param in param_names]
string += ",".join(values)
self.parent.write_raw(string)
def _get_duty_cycle(self) -> float:
"""
Reads the duty cycle after checking waveform
"""
wf = self.waveform()
if wf in ["PULS", "SQU"]:
duty_cycle = self.parent.ask_raw(f":SOUR{self.channel}:FUNC:{wf}:DCYC?")
else:
raise ValueError(
f"Current function does not contain duty cycle. Current function: {wf}"
)
return duty_cycle
def _set_duty_cycle(self, duty_cycle: float) -> None:
"""
Sets the duty cycle after checking waveform
"""
wf = self.waveform()
if wf in ["PULS", "SQU"]:
self.parent.write_raw(f":SOUR{self.channel}:FUNC:{wf}:DCYC {duty_cycle}")
else:
raise ValueError(
f"Current function does not have duty cycle"
f" hence can not set. Current function: {wf}"
)
[docs]
class RigolDG1062(VisaInstrument):
"""
Instrument driver for the Rigol DG1062
"""
waveforms = RigolDG1062Channel.waveforms
default_terminator = "\n"
def __init__(
self, name: str, address: str, **kwargs: "Unpack[VisaInstrumentKWArgs]"
):
super().__init__(name, address, **kwargs)
channels = ChannelList(self, "channel", RigolDG1062Channel, snapshotable=False)
for ch_num in [1, 2]:
ch_name = f"ch{ch_num}"
channel = RigolDG1062Channel(self, ch_name, ch_num)
channels.append(channel)
self.add_submodule(ch_name, channel)
self.add_submodule("channels", channels.to_channel_tuple())
self.connect_message()