from enum import IntFlag
from itertools import takewhile
from typing import (
TYPE_CHECKING,
Any,
TextIO,
)
from qcodes.instrument import (
ChannelList,
InstrumentBaseKWArgs,
InstrumentChannel,
VisaInstrument,
VisaInstrumentKWArgs,
)
from qcodes.parameters import Group, GroupParameter, Parameter
from qcodes.validators import Enum, Numbers
if TYPE_CHECKING:
from collections.abc import Iterable
from typing_extensions import Unpack
from qcodes.instrument.channel import ChannelTuple
def _read_curve_file(curve_file: TextIO) -> dict[Any, Any]:
"""
Read a curve file with extension .330
The file format of this file is shown in test_lakeshore_file_parser.py
in the test module
The output is a dictionary with keys: "metadata" and "data".
The metadata dictionary contains the first n lines of the curve file which
are in the format "item: value". The data dictionary contains the actual
curve data.
"""
def split_data_line(line: str, parser: type = str) -> list[Any]:
return [parser(i) for i in line.split(" ") if i != ""]
def strip(strings: "Iterable[str]") -> tuple[str, ...]:
return tuple(s.strip() for s in strings)
lines = iter(curve_file.readlines())
# Meta data lines contain a colon
metadata_lines = takewhile(lambda s: ":" in s, lines)
# Data from the file is collected in the following dict
file_data: dict[str, dict[str, Any]] = dict()
# Capture meta data
parsed_lines = [strip(line.split(":")) for line in metadata_lines]
file_data["metadata"] = {key: value for key, value in parsed_lines}
# After meta data we have a data header
header_items = strip(split_data_line(next(lines)))
# After that we have the curve data
data: list[list[float]] = [
split_data_line(line, parser=float) for line in lines if line.strip() != ""
]
file_data["data"] = dict(zip(header_items, zip(*data)))
return file_data
def _get_sanitize_data(file_data: dict[Any, Any]) -> dict[Any, Any]:
"""
Data as found in the curve files are slightly different from
the dictionary as expected by the 'upload_curve' method of the
driver
"""
data_dict = dict(file_data["data"])
# We do not need the index column
del data_dict["No."]
# Rename the 'Units' column to the appropriate name
# Look up under the 'Data Format' entry to find what units we have
data_format = file_data["metadata"]["Data Format"]
# This is a string in the form '4 (Log Ohms/Kelvin)'
data_format_int = int(data_format.split()[0])
correct_name = LakeshoreModel325Curve.valid_sensor_units[data_format_int - 1]
# Rename the column
data_dict[correct_name] = data_dict["Units"]
del data_dict["Units"]
return data_dict
[docs]
class LakeshoreModel325Status(IntFlag):
"""
IntFlag that defines status codes for Lakeshore Model 325
"""
sensor_units_overrang = 128
sensor_units_zero = 64
temp_overrange = 32
temp_underrange = 16
invalid_reading = 1
[docs]
class LakeshoreModel325Curve(InstrumentChannel):
"""
An InstrumentChannel representing a curve on a Lakeshore Model 325
"""
valid_sensor_units = ("mV", "V", "Ohm", "log Ohm")
temperature_key = "Temperature (K)"
def __init__(
self,
parent: "LakeshoreModel325",
index: int,
**kwargs: "Unpack[InstrumentBaseKWArgs]",
) -> None:
self._index = index
name = f"curve_{index}"
super().__init__(parent, name, **kwargs)
self.serial_number: GroupParameter = self.add_parameter(
"serial_number", parameter_class=GroupParameter
)
"""Parameter serial_number"""
self.format: GroupParameter = self.add_parameter(
"format",
val_mapping={
f"{unt}/K": i + 1 for i, unt in enumerate(self.valid_sensor_units)
},
parameter_class=GroupParameter,
)
"""Parameter format"""
self.limit_value: GroupParameter = self.add_parameter(
"limit_value", parameter_class=GroupParameter
)
"""Parameter limit_value"""
self.coefficient: GroupParameter = self.add_parameter(
"coefficient",
val_mapping={"negative": 1, "positive": 2},
parameter_class=GroupParameter,
)
"""Parameter coefficient"""
self.curve_name: GroupParameter = self.add_parameter(
"curve_name", parameter_class=GroupParameter
)
"""Parameter curve_name"""
Group(
[
self.curve_name,
self.serial_number,
self.format,
self.limit_value,
self.coefficient,
],
set_cmd=f"CRVHDR {self._index}, {{curve_name}}, "
f"{{serial_number}}, {{format}}, {{limit_value}}, "
f"{{coefficient}}",
get_cmd=f"CRVHDR? {self._index}",
)
[docs]
def get_data(self) -> dict[Any, Any]:
curve = [
float(a)
for point_index in range(1, 200)
for a in self.ask(f"CRVPT? {self._index}, {point_index}").split(",")
]
d = {self.temperature_key: curve[1::2]}
sensor_unit = self.format().split("/")[0]
d[sensor_unit] = curve[::2]
return d
[docs]
@classmethod
def validate_datadict(cls, data_dict: dict[Any, Any]) -> str:
"""
A data dict has two keys, one of which is 'Temperature (K)'. The other
contains the units in which the curve is defined and must be one of:
'mV', 'V', 'Ohm' or 'log Ohm'
This method validates this and returns the sensor unit encountered in
the data dict
"""
if cls.temperature_key not in data_dict:
raise ValueError(
f"At least {cls.temperature_key} needed in the data dictionary"
)
sensor_units = [i for i in data_dict.keys() if i != cls.temperature_key]
if len(sensor_units) != 1:
raise ValueError(
"Data dictionary should have one other key, other then "
"'Temperature (K)'"
)
sensor_unit = sensor_units[0]
if sensor_unit not in cls.valid_sensor_units:
raise ValueError(
f"Sensor unit {sensor_unit} invalid. This needs to be one of "
f"{', '.join(cls.valid_sensor_units)}"
)
data_size = len(data_dict[cls.temperature_key])
if data_size != len(data_dict[sensor_unit]) or data_size > 200:
raise ValueError(
"The length of the temperature axis should be "
"the same as the length of the sensor axis and "
"should not exceed 200 in size"
)
return sensor_unit
[docs]
def set_data(
self, data_dict: dict[Any, Any], sensor_unit: str | None = None
) -> None:
"""
Set the curve data according to the values found the the dictionary.
Args:
data_dict (dict): See `validate_datadict` to see the format of this
dictionary
sensor_unit (str): If None, the data dict is validated and the
units are extracted.
"""
if sensor_unit is None:
sensor_unit = self.validate_datadict(data_dict)
temperature_values = data_dict[self.temperature_key]
sensor_values = data_dict[sensor_unit]
for value_index, (temperature_value, sensor_value) in enumerate(
zip(temperature_values, sensor_values)
):
cmd_str = (
f"CRVPT {self._index}, {value_index + 1}, "
f"{sensor_value:3.3f}, {temperature_value:3.3f}"
)
self.write(cmd_str)
[docs]
class LakeshoreModel325Sensor(InstrumentChannel["LakeshoreModel325"]):
"""
InstrumentChannel for a single sensor of a Lakeshore Model 325.
Args:
parent (LakeshoreModel325): The instrument this heater belongs to
name (str)
inp (str): Either "A" or "B"
"""
def __init__(
self,
parent: "LakeshoreModel325",
name: str,
inp: str,
**kwargs: "Unpack[InstrumentBaseKWArgs]",
) -> None:
if inp not in ["A", "B"]:
raise ValueError("Please either specify input 'A' or 'B'")
super().__init__(parent, name)
self._input = inp
self.temperature: Parameter = self.add_parameter(
"temperature",
get_cmd=f"KRDG? {self._input}",
get_parser=float,
label="Temperature",
unit="K",
)
"""Parameter temperature"""
self.status: Parameter = self.add_parameter(
"status",
get_cmd=f"RDGST? {self._input}",
get_parser=lambda status: self.decode_sensor_status(int(status)),
label="Sensor_Status",
)
"""Parameter status"""
self.type: GroupParameter = self.add_parameter(
"type",
val_mapping={
"Silicon diode": 0,
"GaAlAs diode": 1,
"100 Ohm platinum/250": 2,
"100 Ohm platinum/500": 3,
"1000 Ohm platinum": 4,
"NTC RTD": 5,
"Thermocouple 25mV": 6,
"Thermocouple 50 mV": 7,
"2.5 V, 1 mA": 8,
"7.5 V, 1 mA": 9,
},
parameter_class=GroupParameter,
)
"""Parameter type"""
self.compensation: GroupParameter = self.add_parameter(
"compensation", vals=Enum(0, 1), parameter_class=GroupParameter
)
"""Parameter compensation"""
Group(
[self.type, self.compensation],
set_cmd=f"INTYPE {self._input}, {{type}}, {{compensation}}",
get_cmd=f"INTYPE? {self._input}",
)
self.curve_index: Parameter = self.add_parameter(
"curve_index",
set_cmd=f"INCRV {self._input}, {{}}",
get_cmd=f"INCRV? {self._input}",
get_parser=int,
vals=Numbers(min_value=1, max_value=35),
)
"""Parameter curve_index"""
[docs]
@staticmethod
def decode_sensor_status(sum_of_codes: int) -> str:
total_status = LakeshoreModel325Status(sum_of_codes)
if sum_of_codes == 0:
return "OK"
status_messages = [
st.name.replace("_", " ")
for st in LakeshoreModel325Status
if st in total_status and st.name is not None
]
return ", ".join(status_messages)
@property
def curve(self) -> LakeshoreModel325Curve:
return LakeshoreModel325Curve(self.parent, self.curve_index())
[docs]
class LakeshoreModel325Heater(InstrumentChannel):
def __init__(
self,
parent: "LakeshoreModel325",
name: str,
loop: int,
**kwargs: "Unpack[InstrumentBaseKWArgs]",
) -> None:
"""
InstrumentChannel for heater control on a Lakeshore Model 325.
Args:
parent: The instrument this heater belongs to
name: Name of the Channel
loop: Either 1 or 2
**kwargs: Forwarded to baseclass.
"""
if loop not in [1, 2]:
raise ValueError("Please either specify loop 1 or 2")
super().__init__(parent, name, **kwargs)
self._loop = loop
self.control_mode: Parameter = self.add_parameter(
"control_mode",
get_cmd=f"CMODE? {self._loop}",
set_cmd=f"CMODE {self._loop},{{}}",
val_mapping={
"Manual PID": "1",
"Zone": "2",
"Open Loop": "3",
"AutoTune PID": "4",
"AutoTune PI": "5",
"AutoTune P": "6",
},
)
"""Parameter control_mode"""
self.input_channel: GroupParameter = self.add_parameter(
"input_channel", vals=Enum("A", "B"), parameter_class=GroupParameter
)
"""Parameter input_channel"""
self.unit: GroupParameter = self.add_parameter(
"unit",
val_mapping={"Kelvin": "1", "Celsius": "2", "Sensor Units": "3"},
parameter_class=GroupParameter,
)
"""Parameter unit"""
self.powerup_enable: GroupParameter = self.add_parameter(
"powerup_enable",
val_mapping={True: 1, False: 0},
parameter_class=GroupParameter,
)
"""Parameter powerup_enable"""
self.output_metric: GroupParameter = self.add_parameter(
"output_metric",
val_mapping={
"current": "1",
"power": "2",
},
parameter_class=GroupParameter,
)
"""Parameter output_metric"""
Group(
[self.input_channel, self.unit, self.powerup_enable, self.output_metric],
set_cmd=f"CSET {self._loop}, {{input_channel}}, {{unit}}, "
f"{{powerup_enable}}, {{output_metric}}",
get_cmd=f"CSET? {self._loop}",
)
self.P: GroupParameter = self.add_parameter(
"P", vals=Numbers(0, 1000), get_parser=float, parameter_class=GroupParameter
)
"""Parameter P"""
self.I: GroupParameter = self.add_parameter(
"I", vals=Numbers(0, 1000), get_parser=float, parameter_class=GroupParameter
)
"""Parameter I"""
self.D: GroupParameter = self.add_parameter(
"D", vals=Numbers(0, 1000), get_parser=float, parameter_class=GroupParameter
)
"""Parameter D"""
Group(
[self.P, self.I, self.D],
set_cmd=f"PID {self._loop}, {{P}}, {{I}}, {{D}}",
get_cmd=f"PID? {self._loop}",
)
if self._loop == 1:
valid_output_ranges = Enum(0, 1, 2)
else:
valid_output_ranges = Enum(0, 1)
self.output_range: Parameter = self.add_parameter(
"output_range",
vals=valid_output_ranges,
set_cmd=f"RANGE {self._loop}, {{}}",
get_cmd=f"RANGE? {self._loop}",
val_mapping={"Off": "0", "Low (2.5W)": "1", "High (25W)": "2"},
)
"""Parameter output_range"""
self.setpoint: Parameter = self.add_parameter(
"setpoint",
vals=Numbers(0, 400),
get_parser=float,
set_cmd=f"SETP {self._loop}, {{}}",
get_cmd=f"SETP? {self._loop}",
)
"""Parameter setpoint"""
self.ramp_state: GroupParameter = self.add_parameter(
"ramp_state", vals=Enum(0, 1), parameter_class=GroupParameter
)
"""Parameter ramp_state"""
self.ramp_rate: GroupParameter = self.add_parameter(
"ramp_rate",
vals=Numbers(0, 100 / 60 * 1e3),
unit="mK/s",
parameter_class=GroupParameter,
get_parser=lambda v: float(v) / 60 * 1e3, # We get values in K/min,
set_parser=lambda v: v * 60 * 1e-3, # Convert to K/min
)
"""Parameter ramp_rate"""
Group(
[self.ramp_state, self.ramp_rate],
set_cmd=f"RAMP {self._loop}, {{ramp_state}}, {{ramp_rate}}",
get_cmd=f"RAMP? {self._loop}",
)
self.is_ramping: Parameter = self.add_parameter(
"is_ramping", get_cmd=f"RAMPST? {self._loop}"
)
"""Parameter is_ramping"""
self.resistance: Parameter = self.add_parameter(
"resistance",
get_cmd=f"HTRRES? {self._loop}",
set_cmd=f"HTRRES {self._loop}, {{}}",
val_mapping={
25: 1,
50: 2,
},
label="Resistance",
unit="Ohm",
)
"""Parameter resistance"""
self.heater_output: Parameter = self.add_parameter(
"heater_output",
get_cmd=f"HTR? {self._loop}",
get_parser=float,
label="Heater Output",
unit="%",
)
"""Parameter heater_output"""
[docs]
class LakeshoreModel325(VisaInstrument):
"""
QCoDeS driver for Lakeshore Model 325 Temperature Controller.
"""
default_terminator = "\r\n"
def __init__(
self, name: str, address: str, **kwargs: "Unpack[VisaInstrumentKWArgs]"
) -> None:
super().__init__(name, address, **kwargs)
sensors = ChannelList(
self,
"sensor",
LakeshoreModel325Sensor,
snapshotable=False,
)
self.sensor_A: LakeshoreModel325Sensor = self.add_submodule(
"sensor_A", LakeshoreModel325Sensor(self, "sensor_A", "A")
)
"""Sensor A"""
sensors.append(self.sensor_A)
self.sensor_B: LakeshoreModel325Sensor = self.add_submodule(
"sensor_B", LakeshoreModel325Sensor(self, "sensor_B", "B")
)
"""Sensor B"""
sensors.append(self.sensor_B)
self.sensor: ChannelTuple[LakeshoreModel325Sensor] = self.add_submodule(
"sensor", sensors.to_channel_tuple()
)
"""ChannelTuple of sensors"""
heaters = ChannelList(
self, "heater", LakeshoreModel325Heater, snapshotable=False
)
self.heater_1: LakeshoreModel325Heater = self.add_submodule(
"heater_1", LakeshoreModel325Heater(self, "heater_1", 1)
)
"""Heater 1"""
heaters.append(self.heater_1)
self.heater_2: LakeshoreModel325Heater = self.add_submodule(
"heater_2", LakeshoreModel325Heater(self, "heater_2", 2)
)
"""Heater 2"""
heaters.append(self.heater_2)
self.heater: ChannelTuple[LakeshoreModel325Heater] = self.add_submodule(
"heater", heaters.to_channel_tuple()
)
"""ChannelTuple of heaters"""
curves = ChannelList(self, "curve", LakeshoreModel325Curve, snapshotable=False)
for curve_index in range(1, 35):
curve = LakeshoreModel325Curve(self, curve_index)
curves.append(curve)
self.curve: ChannelList[LakeshoreModel325Curve] = self.add_submodule(
"curve", curves
)
"""ChannelList of curves"""
self.connect_message()
[docs]
def upload_curve(
self, index: int, name: str, serial_number: str, data_dict: dict[Any, Any]
) -> None:
"""
Upload a curve to the given index
Args:
index: The index to upload the curve to. We can only use
indices reserved for user defined curves, 21-35
name: Name of the curve
serial_number: Serial number of the curve
data_dict: A dictionary containing the curve data
"""
if index not in range(21, 36):
raise ValueError("index value should be between 21 and 35")
sensor_unit = LakeshoreModel325Curve.validate_datadict(data_dict)
curve = self.curve[index - 1]
curve.curve_name(name)
curve.serial_number(serial_number)
curve.format(f"{sensor_unit}/K")
curve.set_data(data_dict, sensor_unit=sensor_unit)
[docs]
def upload_curve_from_file(self, index: int, file_path: str) -> None:
"""
Upload a curve from a curve file. Note that we only support
curve files with extension .330
"""
if not file_path.endswith(".330"):
raise ValueError("Only curve files with extension .330 are supported")
with open(file_path) as curve_file:
file_data = _read_curve_file(curve_file)
data_dict = _get_sanitize_data(file_data)
name = file_data["metadata"]["Sensor Model"]
serial_number = file_data["metadata"]["Serial Number"]
self.upload_curve(index, name, serial_number, data_dict)