"""
This is a driver for the Stahl power supplies
"""
import logging
import re
from collections import OrderedDict
from collections.abc import Callable, Iterable
from functools import partial
from typing import TYPE_CHECKING, Any
import numpy as np
from pyvisa.resources.serial import SerialInstrument
from pyvisa.resources.tcpip import TCPIPSocket
from qcodes.instrument import (
ChannelList,
InstrumentBaseKWArgs,
InstrumentChannel,
VisaInstrument,
VisaInstrumentKWArgs,
)
from qcodes.validators import Numbers
if TYPE_CHECKING:
from typing_extensions import Unpack
from qcodes.parameters import Parameter
logger = logging.getLogger()
def chain(*functions: Callable[..., Any]) -> Callable[..., Any]:
"""
The output of the first callable is piped to the input of the second, etc.
Example:
>>> def f():
>>> return "1.2"
>>> chain(f, float)() # return 1.2 as float
"""
def make_iter(args: Any) -> Iterable[Any]:
if not isinstance(args, Iterable) or isinstance(args, str):
return (args,)
return args
def inner(*args: Any) -> Any:
result = args
for fun in functions:
new_args = make_iter(result)
result = fun(*new_args)
return result
return inner
[docs]
class StahlChannel(InstrumentChannel):
acknowledge_reply = chr(6)
def __init__(
self,
parent: VisaInstrument,
name: str,
channel_number: int,
**kwargs: "Unpack[InstrumentBaseKWArgs]",
):
"""
A Stahl source channel
Args:
parent: Parent instrument
name: Name of the channel
channel_number: The channel number
**kwargs: kwargs to be passed to the base class
"""
super().__init__(parent, name, **kwargs)
self._channel_string = f"{channel_number:02d}"
self._channel_number = channel_number
_FLOATING_POINT_RE = r"[+\-]?(?:[.,]\d+|\d+(?:[.,]\d*)?)(?:[eE][-+]?\d+)?"
self.voltage: Parameter = self.add_parameter(
"voltage",
get_cmd=f"{self.parent.identifier} U{self._channel_string}",
get_parser=chain(
re.compile(f"^({_FLOATING_POINT_RE})[ ]?V$").findall,
partial(re.sub, ",", "."),
float,
),
set_cmd=self._set_voltage,
unit="V",
vals=Numbers(-self.parent.voltage_range, self.parent.voltage_range),
)
"""Parameter voltage"""
self.current: Parameter = self.add_parameter(
"current",
get_cmd=f"{self.parent.identifier} I{self._channel_string}",
get_parser=chain(
re.compile(f"^({_FLOATING_POINT_RE})[ ]?mA$").findall,
partial(re.sub, ",", "."),
lambda ma: float(ma) / 1000, # Convert mA to A
),
unit="A",
)
"""Parameter current"""
self.is_locked: Parameter = self.add_parameter(
"is_locked", get_cmd=self._get_lock_status
)
"""Parameter is_locked"""
def _set_voltage(self, voltage: float) -> None:
"""
Args:
voltage: The voltage to set.
"""
# Normalize the voltage in the range 0 to 1, where 0 is maximum negative
# voltage and 1 is maximum positive voltage
voltage_normalized = np.interp(
voltage, self.parent.voltage_range * np.array([-1, 1]), [0, 1]
)
send_string = (
f"{self.parent.identifier} CH{self._channel_string} "
f"{voltage_normalized:.5f}"
)
response = self.ask(send_string)
if response != self.acknowledge_reply:
self.log.warning(
f"Command {send_string} did not produce an acknowledge reply\n response was: {response}"
)
def _get_lock_status(self) -> bool:
"""
A lock occurs when an output is overloaded
Return:
lock_status: True when locked
"""
send_string = f"{self.parent.identifier} LOCK"
response = self.parent.visa_handle.query_binary_values(
send_string, datatype="B", header_fmt="empty"
)
channel_index = self._channel_number - 1
channel_group = channel_index // 4
lock_code_group = response[channel_group]
return format(lock_code_group, "b")[channel_index % 4 + 1] == "1"
[docs]
class Stahl(VisaInstrument):
default_terminator = "\r"
def __init__(
self,
name: str,
address: str,
**kwargs: "Unpack[VisaInstrumentKWArgs]",
):
"""
Stahl driver.
Args:
name: instrument name
address: A serial port or TCP/IP VISA address
**kwargs: forwarded to base class
The TCP/IP scenario can be used when the Stahl is connected to
a different computer, for example a Raspberry Pi running Linux,
and exposed using something like the following script:
::
#!/bin/sh
DEVICE=/dev/ttyUSB0
PORT=8088
echo Listening...
while socat $DEVICE,echo=0,b115200,raw tcp-listen:$PORT,reuseaddr,nodelay; do
echo Restarting...
done
In this case the VISA address would be: ``"TCPIP0::hostname::8088::SOCKET"``
"""
super().__init__(name, address, **kwargs)
if isinstance(self.visa_handle, TCPIPSocket):
pass # allow connection to remote serial device
elif isinstance(self.visa_handle, SerialInstrument):
self.visa_handle.baud_rate = 115200
else:
raise TypeError(
"VisaHandle must be either a 'SerialInstrument' or a 'TCPIPSocket'"
)
instrument_info = self.parse_idn_string(self.ask("IDN"))
for key, value in instrument_info.items():
setattr(self, key, value)
channels = ChannelList(self, "channel", StahlChannel, snapshotable=False)
for channel_number in range(1, self.n_channels + 1):
name = f"channel{channel_number}"
channel = StahlChannel(self, name, channel_number)
self.add_submodule(name, channel)
channels.append(channel)
self.add_submodule("channel", channels)
self.temperature: Parameter = self.add_parameter(
"temperature",
get_cmd=f"{self.identifier} TEMP",
get_parser=chain(re.compile("^TEMP (.*)°C$").findall, float),
unit="C",
)
"""Parameter temperature"""
self.connect_message()
[docs]
def ask_raw(self, cmd: str) -> str:
"""
Sometimes the instrument returns non-ascii characters in response
strings manually adjust the encoding to latin-1
"""
self.visa_log.debug(f"Querying: {cmd}")
self.visa_handle.write(cmd)
response = self.visa_handle.read(encoding="latin-1")
self.visa_log.debug(f"Response: {response}")
return response
[docs]
@staticmethod
def parse_idn_string(idn_string: str) -> dict[str, Any]:
"""
Return:
dict: The dict contains the following keys "model",
"serial_number", "voltage_range","n_channels", "output_type"
"""
result = re.search(r"(HV|BS)(\d{3}) (\d{3}) (\d{2}) ([buqsm])", idn_string)
if result is None:
raise RuntimeError(
"Unexpected instrument response. Perhaps the model of the "
"instrument does not match the drivers expectation or a "
"firmware upgrade has taken place. Please get in touch "
"with a QCoDeS core developer"
)
converters: dict[str, Callable[..., Any]] = OrderedDict(
{
"model": str,
"serial_number": str,
"voltage_range": float,
"n_channels": int,
"output_type": {
"b": "bipolar",
"u": "unipolar",
"q": "quadrupole",
"s": "steerer",
"m": "bipolar milivolt",
}.get,
}
)
return {
name: converter(value)
for (name, converter), value in zip(converters.items(), result.groups())
}
[docs]
def get_idn(self) -> dict[str, str | None]:
"""
The Stahl sends a uncommon IDN string which does not include a
firmware version.
"""
return {
"vendor": "Stahl",
"model": self.model,
"serial": self.serial_number,
"firmware": None,
}
@property
def identifier(self) -> str:
return f"{self.model}{self.serial_number}"