# QCoDeS driver for QDac using channels
import logging
import time
from functools import partial
from typing import TYPE_CHECKING, Any
import pyvisa
import pyvisa.constants
from pyvisa.resources.serial import SerialInstrument
from qcodes import validators as vals
from qcodes.instrument import (
ChannelList,
Instrument,
InstrumentBaseKWArgs,
InstrumentChannel,
VisaInstrument,
VisaInstrumentKWArgs,
)
from qcodes.parameters import MultiChannelInstrumentParameter, ParamRawDataType
if TYPE_CHECKING:
from collections.abc import Sequence
from typing_extensions import Unpack
from qcodes.parameters import Parameter
log = logging.getLogger(__name__)
[docs]
class QDevQDacChannel(InstrumentChannel):
"""
A single output channel of the QDac.
Exposes chan.v, chan.vrange, chan.slope, chan.i, chan.irange
"""
_CHANNEL_VALIDATION = vals.Numbers(1, 48)
def __init__(
self,
parent: Instrument,
name: str,
channum: int,
**kwargs: "Unpack[InstrumentBaseKWArgs]",
):
"""
Args:
parent: The instrument to which the channel is
attached.
name: The name of the channel
channum: The number of the channel in question (1-48)
**kwargs: Forwarded to base class.
"""
super().__init__(parent, name, **kwargs)
# Validate the channel
self._CHANNEL_VALIDATION.validate(channum)
# Add the parameters
self.v: Parameter = self.add_parameter(
"v",
label=f"Channel {channum} voltage",
unit="V",
set_cmd=partial(self._parent._set_voltage, channum),
get_cmd=partial(self._parent._get_voltage, channum),
get_parser=float,
vals=vals.Numbers(-10, 10),
)
"""Parameter v"""
self.vrange: Parameter = self.add_parameter(
"vrange",
label=f"Channel {channum} atten.",
set_cmd=partial(self._parent._set_vrange, channum),
get_cmd=partial(self._parent._get_vrange, channum),
vals=vals.Enum(0, 1),
)
"""Parameter vrange"""
self.i: Parameter = self.add_parameter(
"i",
label=f"Channel {channum} current",
get_cmd=f"get {channum}",
unit="A",
get_parser=self._parent._current_parser,
)
"""Parameter i"""
self.irange: Parameter = self.add_parameter(
"irange",
label=f"Channel {channum} irange",
set_cmd=f"cur {channum} {{}}",
get_cmd=f"cur {channum}",
get_parser=int,
)
"""Parameter irange"""
self.slope: Parameter = self.add_parameter(
"slope",
label=f"Channel {channum} slope",
unit="V/s",
set_cmd=partial(self._parent._setslope, channum),
get_cmd=partial(self._parent._getslope, channum),
vals=vals.MultiType(vals.Enum("Inf"), vals.Numbers(1e-3, 100)),
)
"""Parameter slope"""
self.sync: Parameter = self.add_parameter(
"sync",
label=f"Channel {channum} sync output",
set_cmd=partial(self._parent._setsync, channum),
get_cmd=partial(self._parent._getsync, channum),
vals=vals.Ints(0, 5),
)
"""Parameter sync"""
self.sync_delay: Parameter = self.add_parameter(
name="sync_delay",
label=f"Channel {channum} sync pulse delay",
unit="s",
get_cmd=None,
set_cmd=None,
initial_value=0,
)
"""Parameter sync_delay"""
self.sync_duration: Parameter = self.add_parameter(
name="sync_duration",
label=f"Channel {channum} sync pulse duration",
unit="s",
get_cmd=None,
set_cmd=None,
initial_value=0.01,
)
"""Parameter sync_duration"""
[docs]
def snapshot_base(
self,
update: bool | None = False,
params_to_skip_update: "Sequence[str] | None" = None,
) -> dict[Any, Any]:
update_currents = self._parent._update_currents and update
if update and not self._parent._get_status_performed:
self._parent._update_cache(readcurrents=update_currents)
# call get_status rather than getting the status individually for
# each parameter. This is only done if _get_status_performed is False
# this is used to signal that the parent has already called it and
# no need to repeat.
if params_to_skip_update is None:
params_to_skip_update = ("v", "i", "irange", "vrange")
snap = super().snapshot_base(
update=update, params_to_skip_update=params_to_skip_update
)
return snap
QDacChannel = QDevQDacChannel
class QDevQDacMultiChannelParameter(MultiChannelInstrumentParameter):
"""
The class to be returned by __getattr__ of the ChannelList. Here customised
for fast multi-readout of voltages.
"""
def __init__(
self,
channels: "Sequence[InstrumentChannel]",
param_name: str,
*args: Any,
**kwargs: Any,
):
super().__init__(channels, param_name, *args, **kwargs)
def get_raw(self) -> tuple[ParamRawDataType, ...]:
"""
Return a tuple containing the data from each of the channels in the
list.
"""
# For voltages, we can do something slightly faster than the naive
# approach
if self._param_name == "v":
qdac = self._channels[0]._parent
qdac._update_cache(readcurrents=False)
output = tuple(
chan.parameters[self._param_name].get_latest()
for chan in self._channels
)
else:
output = tuple(
chan.parameters[self._param_name].get() for chan in self._channels
)
return output
QDacMultiChannelParameter = QDevQDacMultiChannelParameter
[docs]
class QDevQDac(VisaInstrument):
"""
Channelised driver for the QDev digital-analog converter QDac
Based on "DAC_commands_v_13.pdf"
Tested with Software Version: 0.170202
The driver assumes that the instrument is ALWAYS in verbose mode OFF
"""
# set nonzero value (seconds) to accept older status when reading settings
max_status_age = 1
def __init__(
self,
name: str,
address: str,
num_chans: int = 48,
update_currents: bool = True,
**kwargs: "Unpack[VisaInstrumentKWArgs]",
):
"""
Instantiates the instrument.
Args:
name: The instrument name used by qcodes
address: The VISA name of the resource
num_chans: Number of channels to assign. Default: 48
update_currents: Whether to query all channels for their
current current value on startup. Default: True.
**kwargs: kwargs are forwarded to base class.
Returns:
QDac object
"""
super().__init__(name, address, **kwargs)
self._output_n_lines = 50
handle = self.visa_handle
assert isinstance(handle, SerialInstrument)
self._get_status_performed = False
# This is the baud rate on power-up. It can be changed later but
# you must start out with this value.
handle.baud_rate = 460800
handle.parity = pyvisa.constants.Parity(0)
handle.data_bits = 8
self.set_terminator("\n")
# TODO: do we want a method for write termination too?
handle.write_termination = "\n"
# TODO: do we need a query delay for robust operation?
self._write_response = ""
if self._get_firmware_version() < 0.170202:
raise RuntimeError("""
Obsolete QDAC Software version detected.
QCoDeS only supports version 0.170202 or newer.
Contact rikke.lutge@nbi.ku.dk for an update.
""")
self.num_chans = num_chans
# Assigned slopes. Entries will eventually be [chan, slope]
self._slopes: list[tuple[int, str | float]] = []
# Function generators (used in _set_voltage)
self._fgs = set(range(1, 9))
self._assigned_fgs: dict[int, int] = {} # {chan: fg}
# Sync channels
self._syncoutputs: list[tuple[int, int]] = [] # Entries: [chan, syncchannel]
self.chan_range = range(1, 1 + self.num_chans)
self.channel_validator = vals.Ints(1, self.num_chans)
channels = ChannelList(
self,
"Channels",
QDevQDacChannel,
snapshotable=False,
multichan_paramclass=QDevQDacMultiChannelParameter,
)
for i in self.chan_range:
channel = QDevQDacChannel(self, f"chan{i:02}", i)
channels.append(channel)
# Should raise valueerror if name is invalid (silently fails now)
self.add_submodule(f"ch{i:02}", channel)
self.add_submodule("channels", channels.to_channel_tuple())
for board in range(6):
for sensor in range(3):
label = f"Board {board}, Temperature {sensor}"
self.add_parameter(
name=f"temp{board}_{sensor}",
label=label,
unit="C",
get_cmd=f"tem {board} {sensor}",
get_parser=self._num_verbose,
)
self.cal: Parameter = self.add_parameter(
name="cal", set_cmd="cal {}", vals=self.channel_validator
)
"""Parameter cal"""
# TO-DO: maybe it's too dangerous to have this settable.
# And perhaps ON is a better verbose mode default?
self.verbose: Parameter = self.add_parameter(
name="verbose", set_cmd="ver {}", val_mapping={True: 1, False: 0}
)
"""Parameter verbose"""
# Initialise the instrument, all channels DC (unbind func. generators)
for chan in self.chan_range:
# Note: this call does NOT change the voltage on the channel
self.write(f"wav {chan} 0 1 0")
self.verbose.set(False)
self.connect_message()
log.info("[*] Querying all channels for voltages and currents...")
self.version = self._update_cache(readcurrents=update_currents)
self._update_currents = update_currents
log.info("[+] Done")
[docs]
def snapshot_base(
self,
update: bool | None = False,
params_to_skip_update: "Sequence[str] | None" = None,
) -> dict[Any, Any]:
update_currents = self._update_currents and update is True
if update:
self._update_cache(readcurrents=update_currents)
self._get_status_performed = True
# call get_status rather than getting the status individually for
# each parameter. We set _get_status_performed to True
# to indicate that each update channel does not need to call this
# function as opposed to when snapshot is called on an individual
# channel
snap = super().snapshot_base(
update=update, params_to_skip_update=params_to_skip_update
)
self._get_status_performed = False
return snap
#########################
# Channel gets/sets
#########################
def _set_voltage(self, chan: int, v_set: float) -> None:
"""
set_cmd for the chXX_v parameter
Args:
chan: The 1-indexed channel number
v_set: The target voltage
If a finite slope has been assigned, we assign a function generator to
ramp the voltage.
"""
channel = self.channels[chan - 1]
slopechans = [sl[0] for sl in self._slopes]
if chan in slopechans:
slope = next(sl[1] for sl in self._slopes if sl[0] == chan)
# find and assign fg
fg = min(self._fgs.difference(set(self._assigned_fgs.values())))
self._assigned_fgs[chan] = fg
# We need .get and not get_latest in case a ramp was interrupted
v_start = channel.v.get()
time = abs(v_set - v_start) / slope
log.info(f"Slope: {slope}, time: {time}")
# Attenuation compensation and syncing
# happen inside _rampvoltage
self._rampvoltage(chan, fg, v_start, v_set, time)
else:
v_dac = QDevQDac._get_v_dac_from_v_exp(channel, v_set)
# set the mode back to DC in case it had been changed
# and then set the voltage
self.write(f"wav {chan} 0 0 0;set {chan} {v_dac:.6f}")
def _get_voltage(self, chan: int) -> float:
"""
get_cmd for the chXX_v parameter
Args:
chan: The 1-indexed channel number
"""
self._update_cache(readcurrents=False)
return self.channels[chan - 1].v.cache()
# In order to get conversions right let us define a vocabulary:
# v_exp: is the voltage including the attenuation.This is the value
# we want to store in the cache and the value we interact with as a
# qcodes user.
# v_dac: this is the voltage generated by the dac and handled by the VISA
# commands.
# Then we have the general relationship`v_exp = v_dac * attenuation`,
@staticmethod
def _get_attenuation(channel: QDevQDacChannel) -> float:
return 0.1 if channel.vrange.cache() == 1 else 1.0
@staticmethod
def _get_v_dac_from_v_exp(channel: QDevQDacChannel, v_exp: float) -> float:
return v_exp / QDevQDac._get_attenuation(channel)
@staticmethod
def _get_v_exp_from_v_dac(channel: QDevQDacChannel, v_dac: float) -> float:
return v_dac * QDevQDac._get_attenuation(channel)
def _set_vrange(self, chan: int, switchint: int) -> None:
"""
set_cmd for the chXX_vrange parameter
The switchint is an integer. 1 means attenuation ON.
Since the vrange is actually a 20 dB attenuator (amplitude factor 0.1)
immediately applied to the channel output, we must update the voltage
parameter accordingly
"""
self.write(f"vol {chan} {switchint}")
# setting v_range preserves v_dac but changes v_exp, see comment above
# for definitions.
channel = self.channels[chan - 1]
if channel.vrange.cache() != switchint:
v_dac = QDevQDac._get_v_dac_from_v_exp(channel, channel.v.cache())
channel.vrange.cache.set(switchint)
self._update_v_validator(channel, switchint)
channel.v.cache.set(QDevQDac._get_v_exp_from_v_dac(channel, v_dac))
def _get_vrange(self, chan: int) -> float:
"""
get_cmd for the chXX_vrange parameter
Args:
chan: The 1-indexed channel number
"""
self._update_cache(readcurrents=False)
return self.channels[chan - 1].vrange.cache()
def _num_verbose(self, s: str) -> float:
"""
Turn a return value from the QDac into a number.
If the QDac is in verbose mode, this involves stripping off the
value descriptor.
"""
if self.verbose.get_latest():
s = s.split(": ")[-1]
return float(s)
def _current_parser(self, s: str) -> float:
"""
Parser for chXX_i parameter
"""
return 1e-6 * self._num_verbose(s)
def _update_cache(self, readcurrents: bool = False) -> str:
r"""
Function to query the instrument and get the status of all channels,
e.g. voltage (``v``), voltage range (``vrange``), and current range (``irange``)
parameters of all the channels.
Takes a while to finish.
The `status` call generates 51 lines of output. Send the command and
read the first one, which is the software version line
the full output looks like:
Software Version: 0.160218\r\n
Channel\tOut V\t\tVoltage range\tCurrent range\n
\n
8\t 0.000000\t\tX 1\t\tpA\n
7\t 0.000000\t\tX 1\t\tpA\n
... (all 48 channels like this in a somewhat peculiar order)
(no termination afterward besides the \n ending the last channel)
returns a list of dicts [{v, vrange, irange}]
NOTE - channels are 1-based, but the return is a list, so of course
0-based, ie chan1 is out[0]
"""
def validate_version(version_line: str) -> str:
if version_line.startswith("Software Version: "):
version = version_line.strip().split(": ")[1]
else:
self._wait_and_clear()
raise ValueError("unrecognized version line: " + version_line)
return version
def validate_header(header_line: str) -> None:
headers = header_line.lower().strip("\r\n").split("\t")
expected_headers = [
"channel",
"out v",
"",
"voltage range",
"current range",
]
if headers != expected_headers:
raise ValueError("unrecognized header line: " + header_line)
def parse_line(line: str) -> tuple[int, int, int, float]:
i_range_trans = {"hi cur": 1, "lo cur": 0}
v_range_trans = {"X 1": 0, "X 0.1": 1}
chan_str, v_str, _, v_range_str, _, i_range_str = line.split("\t")
chan = int(chan_str)
v_dac = float(v_str)
v_range = v_range_trans[v_range_str.strip()]
i_range = i_range_trans[i_range_str.strip()]
return chan, i_range, v_range, v_dac
version = validate_version(self.ask("status"))
validate_header(self.read())
chans_left = set(self.chan_range)
while chans_left:
line = self.read().strip()
if not line:
continue
chan, i_range, v_range, v_dac = parse_line(line)
channel = self.channels[chan - 1]
channel.vrange.cache.set(v_range)
self._update_v_validator(channel, v_range)
channel.irange.cache.set(i_range)
channel.v.cache.set(QDevQDac._get_v_exp_from_v_dac(channel, v_dac))
chans_left.remove(chan)
if readcurrents:
self._read_currents()
return version
def _read_currents(self) -> None:
for chan in range(1, self.num_chans + 1):
param = self.channels[chan - 1].i
_ = param.get()
@staticmethod
def _update_v_validator(channel: QDevQDacChannel, v_range: int) -> None:
range = (-10.01, 10.01) if v_range == 0 else (-1.001, 1.001)
channel.v.vals = vals.Numbers(*range)
def _setsync(self, chan: int, sync: int) -> None:
"""
set_cmd for the chXX_sync parameter.
Args:
chan (int): The channel number (1-48)
sync (int): The associated sync output. 0 means 'unassign'
"""
if chan not in range(1, 49):
raise ValueError("Channel number must be 1-48.")
if sync == 0:
# try to remove the sync from internal bookkeeping
try:
sc = self._syncoutputs
to_remove = next(sc.index(syn) for syn in sc if syn[0] == chan)
self._syncoutputs.remove(sc[to_remove])
except IndexError:
pass
# free the previously assigned sync
oldsync = self.channels[chan - 1].sync.get_latest()
if oldsync is not None:
self.write(f"syn {oldsync} 0 0 0")
return
if sync in [syn[1] for syn in self._syncoutputs]:
oldchan = next(syn[0] for syn in self._syncoutputs if syn[1] == sync)
self._syncoutputs.remove((oldchan, sync))
if chan in [syn[0] for syn in self._syncoutputs]:
oldsyn = next(syn[1] for syn in self._syncoutputs if syn[0] == chan)
self._syncoutputs[self._syncoutputs.index((chan, oldsyn))] = (chan, sync)
return
self._syncoutputs.append((chan, sync))
return
def _getsync(self, chan: int) -> float:
"""
get_cmd of the chXX_sync parameter
"""
if chan in [syn[0] for syn in self._syncoutputs]:
sync = next(syn[1] for syn in self._syncoutputs if syn[0] == chan)
return sync
else:
return 0
def _setslope(self, chan: int, slope: float | str) -> None:
"""
set_cmd for the chXX_slope parameter, the maximum slope of a channel.
Args:
chan: The channel number (1-48)
slope: The slope in V/s. Write 'Inf' to allow
arbitrary small rise times.
"""
if chan not in range(1, 49):
raise ValueError("Channel number must be 1-48.")
if slope == "Inf":
self.write(f"wav {chan} 0 0 0")
# Now clear the assigned slope and function generator (if possible)
try:
self._assigned_fgs.pop(chan)
except KeyError:
pass
# Remove a sync output, if one was assigned
syncchans = [syn[0] for syn in self._syncoutputs]
if chan in syncchans:
self.channels[chan - 1].sync.set(0)
try:
sls = self._slopes
to_remove = next(sls.index(sl) for sl in sls if sl[0] == chan)
self._slopes.remove(sls[to_remove])
return
# If the value was already 'Inf', the channel was not
# in the list and nothing happens
except IndexError:
return
if chan in [sl[0] for sl in self._slopes]:
oldslope = next(sl[1] for sl in self._slopes if sl[0] == chan)
self._slopes[self._slopes.index((chan, oldslope))] = (chan, slope)
return
if len(self._slopes) >= 8:
rampchans = ", ".join(str(c[0]) for c in self._slopes)
raise ValueError(
"Can not assign finite slope to more than "
"8 channels. Assign 'Inf' to at least one of "
f"the following channels: {rampchans}"
)
self._slopes.append((chan, slope))
return
def _getslope(self, chan: int) -> str | float:
"""
get_cmd of the chXX_slope parameter
"""
if chan in [sl[0] for sl in self._slopes]:
slope = next(sl[1] for sl in self._slopes if sl[0] == chan)
return slope
else:
return "Inf"
[docs]
def printslopes(self) -> None:
"""
Print the finite slopes assigned to channels
"""
for sl in self._slopes:
print(f"Channel {sl[0]}, slope: {sl[1]} (V/s)")
def _rampvoltage(
self, chan: int, fg: int, v_start: float, setvoltage: float, ramptime: float
) -> None:
"""
Smoothly ramp the voltage of a channel by the means of a function
generator. Helper function used by _set_voltage.
Args:
chan: The channel number (counting from 1)
fg: The function generator (counting from 1)
v_start: The starting voltage
setvoltage: The voltage to ramp to
ramptime: The ramp time in seconds.
"""
# Crazy stuff happens if the period is too small, e.g. the channel
# can jump to its max voltage
if ramptime <= 0.002:
ramptime = 0
log.warning(
"Cancelled a ramp with a ramptime of "
f"{ramptime} s. Voltage not changed."
)
offset = v_start
amplitude = setvoltage - v_start
if self.channels[chan - 1].vrange.get_latest() == 1:
offset *= 10
amplitude *= 10
chanmssg = f"wav {chan} {fg} {amplitude} {offset}"
if chan in [syn[0] for syn in self._syncoutputs]:
sync = next(syn[1] for syn in self._syncoutputs if syn[0] == chan)
sync_duration = 1000 * self.channels[chan - 1].sync_duration.get()
sync_delay = 1000 * self.channels[chan - 1].sync_delay.get()
self.write(f"syn {sync} {fg} {sync_delay} {sync_duration}")
typedict = {"SINE": 1, "SQUARE": 2, "RAMP": 3}
typeval = typedict["RAMP"]
dutyval = 100
# s -> ms
periodval = ramptime * 1e3
repval = 1
funmssg = f"fun {fg} {typeval} {periodval} {dutyval} {repval}"
self.write(chanmssg)
self.write(funmssg)
[docs]
def write(self, cmd: str) -> None:
"""
QDac always returns something even from set commands, even when
verbose mode is off, so we'll override write to take this out
if you want to use this response, we put it in self._write_response
(but only for the very last write call)
In this method we expect to read one termination char per command. As
commands are concatenated by `;` we count the number of concatenated
commands as count(';') + 1 e.g. 'wav 1 1 1 0;fun 2 1 100 1 1' is two
commands. Note that only the response of the last command will be
available in `_write_response`
"""
log.debug(f"Writing to instrument {self.name}: {cmd}")
self.visa_handle.write(cmd)
for _ in range(cmd.count(";") + 1):
self._write_response = self.visa_handle.read()
[docs]
def read(self) -> str:
return self.visa_handle.read()
def _wait_and_clear(self, delay: float = 0.5) -> None:
time.sleep(delay)
self.visa_handle.clear()
[docs]
def connect_message(
self, idn_param: str = "IDN", begin_time: float | None = None
) -> None:
"""
Override of the standard Instrument class connect_message.
Usually, the response to `*IDN?` is printed. Here, the
software version is printed.
"""
self.visa_handle.write("status")
log.info(f"Connected to QDac on {self._address}, {self.visa_handle.read()}")
# take care of the rest of the output
for _ in range(self._output_n_lines):
self.visa_handle.read()
def _get_firmware_version(self) -> float:
self.write("status")
FW_str = self._write_response
FW_version = float(FW_str.replace("Software Version: ", ""))
for _ in range(self._output_n_lines):
self.read()
return FW_version
[docs]
def print_overview(self, update_currents: bool = False) -> None:
"""
Pretty-prints the status of the QDac
"""
self._update_cache(readcurrents=update_currents)
paramstoget = [["i", "v"], ["irange", "vrange"]]
printdict = {
"i": "Current",
"v": "Voltage",
"vrange": "Voltage range",
"irange": "Current range",
}
returnmap = {
"vrange": {1: "-1 V to 1 V", 0: "-10 V to 10 V"},
"irange": {0: "0 to 1 muA", 1: "0 to 100 muA"},
}
# Print the channels
for ii in range(self.num_chans):
line = f"Channel {ii+1} \n"
line += " "
for pp in paramstoget[0]:
param = getattr(self.channels[ii], pp)
line += printdict[pp]
line += f": {param.get_latest()}"
line += f" ({param.unit})"
line += ". "
line += "\n "
for pp in paramstoget[1]:
param = getattr(self.channels[ii], pp)
line += printdict[pp]
value = param.get_latest()
line += f": {returnmap[pp][value]}"
line += ". "
print(line)
class QDac(QDevQDac):
"""
Backwards compatibility alias for QDevQDac driver
"""
pass