from enum import IntFlag
from itertools import takewhile
from typing import (
TYPE_CHECKING,
Any,
Literal,
SupportsBytes,
SupportsIndex,
TextIO,
cast,
)
from qcodes.instrument import (
ChannelList,
InstrumentBaseKWArgs,
InstrumentChannel,
VisaInstrument,
VisaInstrumentKWArgs,
)
from qcodes.parameters import Group, GroupParameter, Parameter
from qcodes.validators import Enum, Numbers
if TYPE_CHECKING:
from collections.abc import Iterable
from typing_extensions import Buffer, Self, Unpack
def _read_curve_file(curve_file: TextIO) -> dict[Any, Any]:
"""
Read a curve file with extension .330
The file format of this file is shown in test_lakeshore_file_parser.py
in the test module
The output is a dictionary with keys: "metadata" and "data".
The metadata dictionary contains the first n lines of the curve file which
are in the format "item: value". The data dictionary contains the actual
curve data.
"""
def split_data_line(line: str, parser: type = str) -> list[Any]:
return [parser(i) for i in line.split(" ") if i != ""]
def strip(strings: "Iterable[str]") -> tuple[str, ...]:
return tuple(s.strip() for s in strings)
lines = iter(curve_file.readlines())
# Meta data lines contain a colon
metadata_lines = takewhile(lambda s: ":" in s, lines)
# Data from the file is collected in the following dict
file_data: dict[str, dict[str, Any]] = dict()
# Capture meta data
parsed_lines = [strip(line.split(":")) for line in metadata_lines]
file_data["metadata"] = {key: value for key, value in parsed_lines}
# After meta data we have a data header
header_items = strip(split_data_line(next(lines)))
# After that we have the curve data
data: list[list[float]] = [
split_data_line(line, parser=float) for line in lines if line.strip() != ""
]
file_data["data"] = dict(zip(header_items, zip(*data)))
return file_data
def _get_sanitize_data(file_data: dict[Any, Any]) -> dict[Any, Any]:
"""
Data as found in the curve files are slightly different from
the dictionary as expected by the 'upload_curve' method of the
driver
"""
data_dict = dict(file_data["data"])
# We do not need the index column
del data_dict["No."]
# Rename the 'Units' column to the appropriate name
# Look up under the 'Data Format' entry to find what units we have
data_format = file_data["metadata"]["Data Format"]
# This is a string in the form '4 (Log Ohms/Kelvin)'
data_format_int = int(data_format.split()[0])
correct_name = LakeshoreModel325Curve.valid_sensor_units[data_format_int - 1]
# Rename the column
data_dict[correct_name] = data_dict["Units"]
del data_dict["Units"]
return data_dict
[docs]
class LakeshoreModel325Status(IntFlag):
"""
IntFlag that defines status codes for Lakeshore Model 325
"""
sensor_units_overrang = 128
sensor_units_zero = 64
temp_overrange = 32
temp_underrange = 16
invalid_reading = 1
# we reimplement from_bytes and to_bytes in order to fix docstrings that are incorrectly formatted
# this in turn will enable us to build docs with warnings as errors.
# This can be removed for python versions where https://github.com/python/cpython/pull/117847
# is merged
[docs]
@classmethod
def from_bytes(
cls,
bytes: "Iterable[SupportsIndex] | SupportsBytes | Buffer",
byteorder: Literal["big", "little"] = "big",
*,
signed: bool = False,
) -> "Self":
"""
Return the integer represented by the given array of bytes.
Args:
bytes: Holds the array of bytes to convert. The argument must either
support the buffer protocol or be an iterable object producing bytes.
Bytes and bytearray are examples of built-in objects that support the
buffer protocol.
byteorder: The byte order used to represent the integer. If byteorder is 'big',
the most significant byte is at the beginning of the byte array. If
byteorder is 'little', the most significant byte is at the end of the
byte array. To request the native byte order of the host system, use
`sys.byteorder` as the byte order value. Default is to use 'big'.
signed: Indicates whether two\'s complement is used to represent the integer.
"""
return super().from_bytes(bytes, byteorder, signed=signed)
[docs]
def to_bytes(
self,
length: SupportsIndex = 1,
byteorder: Literal["little", "big"] = "big",
*,
signed: bool = False,
) -> bytes:
"""
Return an array of bytes representing an integer.
Args:
length: Length of bytes object to use. An OverflowError is raised if the
integer is not representable with the given number of bytes. Default
is length 1.
byteorder: The byte order used to represent the integer. If byteorder is \'big\',
the most significant byte is at the beginning of the byte array. If
byteorder is \'little\', the most significant byte is at the end of the
byte array. To request the native byte order of the host system, use
`sys.byteorder` as the byte order value. Default is to use \'big\'.
signed: Determines whether two\'s complement is used to represent the integer.
If signed is False and a negative integer is given, an OverflowError
is raised.
"""
return super().to_bytes(length, byteorder, signed=signed)
[docs]
class LakeshoreModel325Curve(InstrumentChannel):
"""
An InstrumentChannel representing a curve on a Lakeshore Model 325
"""
valid_sensor_units = ("mV", "V", "Ohm", "log Ohm")
temperature_key = "Temperature (K)"
def __init__(
self,
parent: "LakeshoreModel325",
index: int,
**kwargs: "Unpack[InstrumentBaseKWArgs]",
) -> None:
self._index = index
name = f"curve_{index}"
super().__init__(parent, name, **kwargs)
self.serial_number: GroupParameter = self.add_parameter(
"serial_number", parameter_class=GroupParameter
)
"""Parameter serial_number"""
self.format: GroupParameter = self.add_parameter(
"format",
val_mapping={
f"{unt}/K": i + 1 for i, unt in enumerate(self.valid_sensor_units)
},
parameter_class=GroupParameter,
)
"""Parameter format"""
self.limit_value: GroupParameter = self.add_parameter(
"limit_value", parameter_class=GroupParameter
)
"""Parameter limit_value"""
self.coefficient: GroupParameter = self.add_parameter(
"coefficient",
val_mapping={"negative": 1, "positive": 2},
parameter_class=GroupParameter,
)
"""Parameter coefficient"""
self.curve_name: GroupParameter = self.add_parameter(
"curve_name", parameter_class=GroupParameter
)
"""Parameter curve_name"""
Group(
[
self.curve_name,
self.serial_number,
self.format,
self.limit_value,
self.coefficient,
],
set_cmd=f"CRVHDR {self._index}, {{curve_name}}, "
f"{{serial_number}}, {{format}}, {{limit_value}}, "
f"{{coefficient}}",
get_cmd=f"CRVHDR? {self._index}",
)
[docs]
def get_data(self) -> dict[Any, Any]:
curve = [
float(a)
for point_index in range(1, 200)
for a in self.ask(f"CRVPT? {self._index}, {point_index}").split(",")
]
d = {self.temperature_key: curve[1::2]}
sensor_unit = self.format().split("/")[0]
d[sensor_unit] = curve[::2]
return d
[docs]
@classmethod
def validate_datadict(cls, data_dict: dict[Any, Any]) -> str:
"""
A data dict has two keys, one of which is 'Temperature (K)'. The other
contains the units in which the curve is defined and must be one of:
'mV', 'V', 'Ohm' or 'log Ohm'
This method validates this and returns the sensor unit encountered in
the data dict
"""
if cls.temperature_key not in data_dict:
raise ValueError(
f"At least {cls.temperature_key} needed in the data dictionary"
)
sensor_units = [i for i in data_dict.keys() if i != cls.temperature_key]
if len(sensor_units) != 1:
raise ValueError(
"Data dictionary should have one other key, other then "
"'Temperature (K)'"
)
sensor_unit = sensor_units[0]
if sensor_unit not in cls.valid_sensor_units:
raise ValueError(
f"Sensor unit {sensor_unit} invalid. This needs to be one of "
f"{', '.join(cls.valid_sensor_units)}"
)
data_size = len(data_dict[cls.temperature_key])
if data_size != len(data_dict[sensor_unit]) or data_size > 200:
raise ValueError(
"The length of the temperature axis should be "
"the same as the length of the sensor axis and "
"should not exceed 200 in size"
)
return sensor_unit
[docs]
def set_data(
self, data_dict: dict[Any, Any], sensor_unit: str | None = None
) -> None:
"""
Set the curve data according to the values found the the dictionary.
Args:
data_dict (dict): See `validate_datadict` to see the format of this
dictionary
sensor_unit (str): If None, the data dict is validated and the
units are extracted.
"""
if sensor_unit is None:
sensor_unit = self.validate_datadict(data_dict)
temperature_values = data_dict[self.temperature_key]
sensor_values = data_dict[sensor_unit]
for value_index, (temperature_value, sensor_value) in enumerate(
zip(temperature_values, sensor_values)
):
cmd_str = (
f"CRVPT {self._index}, {value_index + 1}, "
f"{sensor_value:3.3f}, {temperature_value:3.3f}"
)
self.write(cmd_str)
[docs]
class LakeshoreModel325Sensor(InstrumentChannel):
"""
InstrumentChannel for a single sensor of a Lakeshore Model 325.
Args:
parent (LakeshoreModel325): The instrument this heater belongs to
name (str)
inp (str): Either "A" or "B"
"""
def __init__(
self,
parent: "LakeshoreModel325",
name: str,
inp: str,
**kwargs: "Unpack[InstrumentBaseKWArgs]",
) -> None:
if inp not in ["A", "B"]:
raise ValueError("Please either specify input 'A' or 'B'")
super().__init__(parent, name)
self._input = inp
self.temperature: Parameter = self.add_parameter(
"temperature",
get_cmd=f"KRDG? {self._input}",
get_parser=float,
label="Temperature",
unit="K",
)
"""Parameter temperature"""
self.status: Parameter = self.add_parameter(
"status",
get_cmd=f"RDGST? {self._input}",
get_parser=lambda status: self.decode_sensor_status(int(status)),
label="Sensor_Status",
)
"""Parameter status"""
self.type: GroupParameter = self.add_parameter(
"type",
val_mapping={
"Silicon diode": 0,
"GaAlAs diode": 1,
"100 Ohm platinum/250": 2,
"100 Ohm platinum/500": 3,
"1000 Ohm platinum": 4,
"NTC RTD": 5,
"Thermocouple 25mV": 6,
"Thermocouple 50 mV": 7,
"2.5 V, 1 mA": 8,
"7.5 V, 1 mA": 9,
},
parameter_class=GroupParameter,
)
"""Parameter type"""
self.compensation: GroupParameter = self.add_parameter(
"compensation", vals=Enum(0, 1), parameter_class=GroupParameter
)
"""Parameter compensation"""
Group(
[self.type, self.compensation],
set_cmd=f"INTYPE {self._input}, {{type}}, {{compensation}}",
get_cmd=f"INTYPE? {self._input}",
)
self.curve_index: Parameter = self.add_parameter(
"curve_index",
set_cmd=f"INCRV {self._input}, {{}}",
get_cmd=f"INCRV? {self._input}",
get_parser=int,
vals=Numbers(min_value=1, max_value=35),
)
"""Parameter curve_index"""
[docs]
@staticmethod
def decode_sensor_status(sum_of_codes: int) -> str:
total_status = LakeshoreModel325Status(sum_of_codes)
if sum_of_codes == 0:
return "OK"
status_messages = [
st.name.replace("_", " ")
for st in LakeshoreModel325Status
if st in total_status and st.name is not None
]
return ", ".join(status_messages)
@property
def curve(self) -> LakeshoreModel325Curve:
parent = cast(LakeshoreModel325, self.parent)
return LakeshoreModel325Curve(parent, self.curve_index())
[docs]
class LakeshoreModel325Heater(InstrumentChannel):
def __init__(
self,
parent: "LakeshoreModel325",
name: str,
loop: int,
**kwargs: "Unpack[InstrumentBaseKWArgs]",
) -> None:
"""
InstrumentChannel for heater control on a Lakeshore Model 325.
Args:
parent: The instrument this heater belongs to
name: Name of the Channel
loop: Either 1 or 2
**kwargs: Forwarded to baseclass.
"""
if loop not in [1, 2]:
raise ValueError("Please either specify loop 1 or 2")
super().__init__(parent, name, **kwargs)
self._loop = loop
self.control_mode: Parameter = self.add_parameter(
"control_mode",
get_cmd=f"CMODE? {self._loop}",
set_cmd=f"CMODE {self._loop},{{}}",
val_mapping={
"Manual PID": "1",
"Zone": "2",
"Open Loop": "3",
"AutoTune PID": "4",
"AutoTune PI": "5",
"AutoTune P": "6",
},
)
"""Parameter control_mode"""
self.input_channel: GroupParameter = self.add_parameter(
"input_channel", vals=Enum("A", "B"), parameter_class=GroupParameter
)
"""Parameter input_channel"""
self.unit: GroupParameter = self.add_parameter(
"unit",
val_mapping={"Kelvin": "1", "Celsius": "2", "Sensor Units": "3"},
parameter_class=GroupParameter,
)
"""Parameter unit"""
self.powerup_enable: GroupParameter = self.add_parameter(
"powerup_enable",
val_mapping={True: 1, False: 0},
parameter_class=GroupParameter,
)
"""Parameter powerup_enable"""
self.output_metric: GroupParameter = self.add_parameter(
"output_metric",
val_mapping={
"current": "1",
"power": "2",
},
parameter_class=GroupParameter,
)
"""Parameter output_metric"""
Group(
[self.input_channel, self.unit, self.powerup_enable, self.output_metric],
set_cmd=f"CSET {self._loop}, {{input_channel}}, {{unit}}, "
f"{{powerup_enable}}, {{output_metric}}",
get_cmd=f"CSET? {self._loop}",
)
self.P: GroupParameter = self.add_parameter(
"P", vals=Numbers(0, 1000), get_parser=float, parameter_class=GroupParameter
)
"""Parameter P"""
self.I: GroupParameter = self.add_parameter(
"I", vals=Numbers(0, 1000), get_parser=float, parameter_class=GroupParameter
)
"""Parameter I"""
self.D: GroupParameter = self.add_parameter(
"D", vals=Numbers(0, 1000), get_parser=float, parameter_class=GroupParameter
)
"""Parameter D"""
Group(
[self.P, self.I, self.D],
set_cmd=f"PID {self._loop}, {{P}}, {{I}}, {{D}}",
get_cmd=f"PID? {self._loop}",
)
if self._loop == 1:
valid_output_ranges = Enum(0, 1, 2)
else:
valid_output_ranges = Enum(0, 1)
self.output_range: Parameter = self.add_parameter(
"output_range",
vals=valid_output_ranges,
set_cmd=f"RANGE {self._loop}, {{}}",
get_cmd=f"RANGE? {self._loop}",
val_mapping={"Off": "0", "Low (2.5W)": "1", "High (25W)": "2"},
)
"""Parameter output_range"""
self.setpoint: Parameter = self.add_parameter(
"setpoint",
vals=Numbers(0, 400),
get_parser=float,
set_cmd=f"SETP {self._loop}, {{}}",
get_cmd=f"SETP? {self._loop}",
)
"""Parameter setpoint"""
self.ramp_state: GroupParameter = self.add_parameter(
"ramp_state", vals=Enum(0, 1), parameter_class=GroupParameter
)
"""Parameter ramp_state"""
self.ramp_rate: GroupParameter = self.add_parameter(
"ramp_rate",
vals=Numbers(0, 100 / 60 * 1e3),
unit="mK/s",
parameter_class=GroupParameter,
get_parser=lambda v: float(v) / 60 * 1e3, # We get values in K/min,
set_parser=lambda v: v * 60 * 1e-3, # Convert to K/min
)
"""Parameter ramp_rate"""
Group(
[self.ramp_state, self.ramp_rate],
set_cmd=f"RAMP {self._loop}, {{ramp_state}}, {{ramp_rate}}",
get_cmd=f"RAMP? {self._loop}",
)
self.is_ramping: Parameter = self.add_parameter(
"is_ramping", get_cmd=f"RAMPST? {self._loop}"
)
"""Parameter is_ramping"""
self.resistance: Parameter = self.add_parameter(
"resistance",
get_cmd=f"HTRRES? {self._loop}",
set_cmd=f"HTRRES {self._loop}, {{}}",
val_mapping={
25: 1,
50: 2,
},
label="Resistance",
unit="Ohm",
)
"""Parameter resistance"""
self.heater_output: Parameter = self.add_parameter(
"heater_output",
get_cmd=f"HTR? {self._loop}",
get_parser=float,
label="Heater Output",
unit="%",
)
"""Parameter heater_output"""
[docs]
class LakeshoreModel325(VisaInstrument):
"""
QCoDeS driver for Lakeshore Model 325 Temperature Controller.
"""
default_terminator = "\r\n"
def __init__(
self, name: str, address: str, **kwargs: "Unpack[VisaInstrumentKWArgs]"
) -> None:
super().__init__(name, address, **kwargs)
sensors = ChannelList(
self, "sensor", LakeshoreModel325Sensor, snapshotable=False
)
for inp in ["A", "B"]:
sensor = LakeshoreModel325Sensor(self, f"sensor_{inp}", inp)
sensors.append(sensor)
self.add_submodule(f"sensor_{inp}", sensor)
self.add_submodule("sensor", sensors.to_channel_tuple())
heaters = ChannelList(
self, "heater", LakeshoreModel325Heater, snapshotable=False
)
for loop in [1, 2]:
heater = LakeshoreModel325Heater(self, f"heater_{loop}", loop)
heaters.append(heater)
self.add_submodule(f"heater_{loop}", heater)
self.add_submodule("heater", heaters.to_channel_tuple())
curves = ChannelList(self, "curve", LakeshoreModel325Curve, snapshotable=False)
for curve_index in range(1, 35):
curve = LakeshoreModel325Curve(self, curve_index)
curves.append(curve)
self.add_submodule("curve", curves)
self.connect_message()
[docs]
def upload_curve(
self, index: int, name: str, serial_number: str, data_dict: dict[Any, Any]
) -> None:
"""
Upload a curve to the given index
Args:
index: The index to upload the curve to. We can only use
indices reserved for user defined curves, 21-35
name: Name of the curve
serial_number: Serial number of the curve
data_dict: A dictionary containing the curve data
"""
if index not in range(21, 36):
raise ValueError("index value should be between 21 and 35")
sensor_unit = LakeshoreModel325Curve.validate_datadict(data_dict)
curve = self.curve[index - 1]
curve.curve_name(name)
curve.serial_number(serial_number)
curve.format(f"{sensor_unit}/K")
curve.set_data(data_dict, sensor_unit=sensor_unit)
[docs]
def upload_curve_from_file(self, index: int, file_path: str) -> None:
"""
Upload a curve from a curve file. Note that we only support
curve files with extension .330
"""
if not file_path.endswith(".330"):
raise ValueError("Only curve files with extension .330 are supported")
with open(file_path) as curve_file:
file_data = _read_curve_file(curve_file)
data_dict = _get_sanitize_data(file_data)
name = file_data["metadata"]["Sensor Model"]
serial_number = file_data["metadata"]["Serial Number"]
self.upload_curve(index, name, serial_number, data_dict)