import configparser
import logging
import re
from functools import partial
from time import sleep
from typing import Any, Optional, Union
from qcodes.instrument import IPInstrument
from qcodes.validators import Enum, Ints, Numbers
[docs]
class OxfordTriton(IPInstrument):
r"""
Triton Driver
Args:
name: name of the cryostat.
address: IP-address of the fridge computer. Defaults to None.
port: port of the oxford program running on the fridge computer.
The relevant port can be found in the manual for the fridge
or looked up on the fridge computer. Defaults to None.
terminator: Defaults to '\r\n'
tmpfile: an exported windows registry file from the registry
path:
`[HKEY_CURRENT_USER\Software\Oxford Instruments\Triton System Control\Thermometry]`
and is used to extract the available temperature channels.
timeout: Defaults to 20.
Status: beta-version.
Todo:
fetch registry directly from fridge-computer
"""
def __init__(
self,
name: str,
address: Optional[str] = None,
port: Optional[int] = None,
terminator: str = "\r\n",
tmpfile: Optional[str] = None,
timeout: float = 20,
**kwargs: Any,
):
super().__init__(
name,
address=address,
port=port,
terminator=terminator,
timeout=timeout,
**kwargs,
)
self._heater_range_auto = False
self._heater_range_temp = [0.03, 0.1, 0.3, 1, 12, 40]
self._heater_range_curr = [0.316, 1, 3.16, 10, 31.6, 100]
self._control_channel = 5
self.pump_label_dict = {"TURB1": "Turbo 1", "COMP": "Compressor"}
self.add_parameter(
name="time",
label="System Time",
get_cmd="READ:SYS:TIME",
get_parser=self._parse_time,
)
self.add_parameter(
name="action",
label="Current action",
get_cmd="READ:SYS:DR:ACTN",
get_parser=self._parse_action,
)
self.add_parameter(
name="status",
label="Status",
get_cmd="READ:SYS:DR:STATUS",
get_parser=self._parse_status,
)
self.add_parameter(
name="pid_control_channel",
label="PID control channel",
get_cmd=self._get_control_channel,
set_cmd=self._set_control_channel,
vals=Ints(1, 16),
)
self.add_parameter(
name="pid_mode",
label="PID Mode",
get_cmd=partial(self._get_control_param, "MODE"),
set_cmd=partial(self._set_control_param, "MODE"),
val_mapping={"on": "ON", "off": "OFF"},
)
self.add_parameter(
name="pid_ramp",
label="PID ramp enabled",
get_cmd=partial(self._get_control_param, "RAMP:ENAB"),
set_cmd=partial(self._set_control_param, "RAMP:ENAB"),
val_mapping={"on": "ON", "off": "OFF"},
)
self.add_parameter(
name="pid_setpoint",
label="PID temperature setpoint",
unit="K",
get_cmd=partial(self._get_control_param, "TSET"),
set_cmd=partial(self._set_control_param, "TSET"),
)
self.add_parameter(
name="pid_p",
label="PID proportionality",
get_cmd=partial(self._get_control_param, "P"),
set_cmd=partial(self._set_control_param, "P"),
vals=Numbers(0, 1e3),
)
self.add_parameter(
name="pid_i",
label="PID intergral",
get_cmd=partial(self._get_control_param, "I"),
set_cmd=partial(self._set_control_param, "I"),
vals=Numbers(0, 1e3),
)
self.add_parameter(
name="pid_d",
label="PID derivative",
get_cmd=partial(self._get_control_param, "D"),
set_cmd=partial(self._set_control_param, "D"),
vals=Numbers(0, 1e3),
)
self.add_parameter(
name="pid_rate",
label="PID ramp rate",
unit="K/min",
get_cmd=partial(self._get_control_param, "RAMP:RATE"),
set_cmd=partial(self._set_control_param, "RAMP:RATE"),
)
self.add_parameter(
name="pid_range",
label="PID heater range",
# TODO: The units in the software are mA, how to
# do this correctly?
unit="mA",
get_cmd=partial(self._get_control_param, "RANGE"),
set_cmd=partial(self._set_control_param, "RANGE"),
vals=Enum(*self._heater_range_curr),
)
self.add_parameter(
name="magnet_status",
label="Magnet status",
unit="",
get_cmd=partial(self._get_control_B_param, "ACTN"),
)
self.add_parameter(
name="magnet_sweeprate",
label="Magnet sweep rate",
unit="T/min",
get_cmd=partial(self._get_control_B_param, "RVST:RATE"),
set_cmd=partial(self._set_control_magnet_sweeprate_param),
)
self.add_parameter(
name="magnet_sweeprate_insta",
label="Instantaneous magnet sweep rate",
unit="T/min",
get_cmd=partial(self._get_control_B_param, "RFST"),
)
self.add_parameter(
name="B",
label="Magnetic field",
unit="T",
get_cmd=partial(self._get_control_B_param, "VECT"),
)
self.add_parameter(
name="Bx",
label="Magnetic field x-component",
unit="T",
get_cmd=partial(self._get_control_Bcomp_param, "VECTBx"),
set_cmd=partial(self._set_control_Bx_param),
)
self.add_parameter(
name="By",
label="Magnetic field y-component",
unit="T",
get_cmd=partial(self._get_control_Bcomp_param, "VECTBy"),
set_cmd=partial(self._set_control_By_param),
)
self.add_parameter(
name="Bz",
label="Magnetic field z-component",
unit="T",
get_cmd=partial(self._get_control_Bcomp_param, "VECTBz"),
set_cmd=partial(self._set_control_Bz_param),
)
self.add_parameter(
name="magnet_sweep_time",
label="Magnet sweep time",
unit="T/min",
get_cmd=partial(self._get_control_B_param, "RVST:TIME"),
)
self.add_parameter(
name="turb1_speed",
label=self.pump_label_dict["TURB1"] + " speed",
unit="Hz",
get_cmd="READ:DEV:TURB1:PUMP:SIG:SPD",
get_parser=self._get_parser_pump_speed,
)
self._add_pump_state()
self._add_temp_state()
self.chan_alias: dict[str, str] = {}
self.chan_temp_names: dict[str, dict[str, Optional[str]]] = {}
if tmpfile is not None:
self._get_temp_channel_names(tmpfile)
self._get_temp_channels()
self._get_pressure_channels()
try:
self._get_named_channels()
except Exception:
logging.warning("Ignored an error in _get_named_channels\n", exc_info=True)
self.connect_message()
[docs]
def set_B(self, x: float, y: float, z: float, s: float) -> None:
if 0 < s <= 0.2:
self.write(
"SET:SYS:VRM:COO:CART:RVST:MODE:RATE:RATE:"
+ str(s)
+ ":VSET:["
+ str(x)
+ " "
+ str(y)
+ " "
+ str(z)
+ "]\r\n"
)
self.write("SET:SYS:VRM:ACTN:RTOS\r\n")
t_wait = self.magnet_sweep_time() * 60 + 10
print("Please wait " + str(t_wait) + " seconds for the field sweep...")
sleep(t_wait)
else:
print("Warning: set magnet sweep rate in range (0 , 0.2] T/min")
def _get_control_B_param(
self, param: str
) -> Optional[Union[float, str, list[float]]]:
cmd = f"READ:SYS:VRM:{param}"
return self._get_response_value(self.ask(cmd))
def _get_control_Bcomp_param(
self, param: str
) -> Optional[Union[float, str, list[float]]]:
cmd = f"READ:SYS:VRM:{param}"
return self._get_response_value(self.ask(cmd[:-2]) + cmd[-2:])
def _get_response(self, msg: str) -> str:
return msg.split(":")[-1]
def _get_response_value(self, msg: str) -> Optional[Union[float, str, list[float]]]:
msg = self._get_response(msg)
if msg.endswith("NOT_FOUND"):
return None
elif msg.endswith("IDLE"):
return "IDLE"
elif msg.endswith("RTOS"):
return "RTOS"
elif msg.endswith("Bx"):
return float(re.findall(r"[-+]?\d*\.\d+|\d+", msg)[0])
elif msg.endswith("By"):
return float(re.findall(r"[-+]?\d*\.\d+|\d+", msg)[1])
elif msg.endswith("Bz"):
return float(re.findall(r"[-+]?\d*\.\d+|\d+", msg)[2])
elif len(re.findall(r"[-+]?\d*\.\d+|\d+", msg)) > 1:
return [
float(re.findall(r"[-+]?\d*\.\d+|\d+", msg)[0]),
float(re.findall(r"[-+]?\d*\.\d+|\d+", msg)[1]),
float(re.findall(r"[-+]?\d*\.\d+|\d+", msg)[2]),
]
try:
return float(re.findall(r"[-+]?\d*\.\d+|\d+", msg)[0])
except Exception:
return msg
[docs]
def get_idn(self) -> dict[str, Optional[str]]:
"""Return the Instrument Identifier Message"""
idstr = self.ask("*IDN?")
idparts = [p.strip() for p in idstr.split(":", 4)][1:]
return dict(zip(("vendor", "model", "serial", "firmware"), idparts))
def _get_control_channel(self, force_get: bool = False) -> int:
# verify current channel
if self._control_channel and not force_get:
tempval = self.ask(f"READ:DEV:T{self._control_channel}:TEMP:LOOP:MODE")
if not tempval.endswith("NOT_FOUND"):
return self._control_channel
# either _control_channel is not set or wrong
for i in range(1, 17):
tempval = self.ask(f"READ:DEV:T{i}:TEMP:LOOP:MODE")
if not tempval.endswith("NOT_FOUND"):
self._control_channel = i
break
return self._control_channel
def _set_control_channel(self, channel: int) -> None:
self._control_channel = channel
self.write(f"SET:DEV:T{self._get_control_channel()}:TEMP:LOOP:HTR:H1")
def _get_control_param(
self, param: str
) -> Optional[Union[float, str, list[float]]]:
chan = self._get_control_channel()
cmd = f"READ:DEV:T{chan}:TEMP:LOOP:{param}"
return self._get_response_value(self.ask(cmd))
def _set_control_param(self, param: str, value: float) -> None:
chan = self._get_control_channel()
cmd = f"SET:DEV:T{chan}:TEMP:LOOP:{param}:{value}"
self.write(cmd)
def _set_control_magnet_sweeprate_param(self, s: float) -> None:
if 0 < s <= 0.2:
x = round(self.Bx(), 4)
y = round(self.By(), 4)
z = round(self.Bz(), 4)
self.write(
"SET:SYS:VRM:COO:CART:RVST:MODE:RATE:RATE:"
+ str(s)
+ ":VSET:["
+ str(x)
+ " "
+ str(y)
+ " "
+ str(z)
+ "]\r\n"
)
else:
print(
"Warning: set sweeprate in range (0 , 0.2] T/min, not setting sweeprate"
)
def _set_control_Bx_param(self, x: float) -> None:
s = self.magnet_sweeprate()
y = round(self.By(), 4)
z = round(self.Bz(), 4)
self.write(
"SET:SYS:VRM:COO:CART:RVST:MODE:RATE:RATE:"
+ str(s)
+ ":VSET:["
+ str(x)
+ " "
+ str(y)
+ " "
+ str(z)
+ "]\r\n"
)
self.write("SET:SYS:VRM:ACTN:RTOS\r\n")
# just to give an time estimate, +10s for overhead
t_wait = self.magnet_sweep_time() * 60 + 10
print("Please wait " + str(t_wait) + " seconds for the field sweep...")
while self.magnet_status() != "IDLE":
pass
def _set_control_By_param(self, y: float) -> None:
s = self.magnet_sweeprate()
x = round(self.Bx(), 4)
z = round(self.Bz(), 4)
self.write(
"SET:SYS:VRM:COO:CART:RVST:MODE:RATE:RATE:"
+ str(s)
+ ":VSET:["
+ str(x)
+ " "
+ str(y)
+ " "
+ str(z)
+ "]\r\n"
)
self.write("SET:SYS:VRM:ACTN:RTOS\r\n")
# just to give an time estimate, +10s for overhead
t_wait = self.magnet_sweep_time() * 60 + 10
print("Please wait " + str(t_wait) + " seconds for the field sweep...")
while self.magnet_status() != "IDLE":
pass
def _set_control_Bz_param(self, z: float) -> None:
s = self.magnet_sweeprate()
x = round(self.Bx(), 4)
y = round(self.By(), 4)
self.write(
"SET:SYS:VRM:COO:CART:RVST:MODE:RATE:RATE:"
+ str(s)
+ ":VSET:["
+ str(x)
+ " "
+ str(y)
+ " "
+ str(z)
+ "]\r\n"
)
self.write("SET:SYS:VRM:ACTN:RTOS\r\n")
# just to give an time estimate, +10s for overhead
t_wait = self.magnet_sweep_time() * 60 + 10
print("Please wait " + str(t_wait) + " seconds for the field sweep...")
while self.magnet_status() != "IDLE":
pass
def _get_named_channels(self) -> None:
allchans_str = self.ask("READ:SYS:DR:CHAN")
allchans = allchans_str.replace("STAT:SYS:DR:CHAN:", "", 1).split(":")
for ch in allchans:
msg = f"READ:SYS:DR:CHAN:{ch}"
rep = self.ask(msg)
if "INVALID" not in rep and "NONE" not in rep:
alias, chan = rep.split(":")[-2:]
self.chan_alias[alias] = chan
self.add_parameter(
name=alias,
unit="K",
get_cmd=f"READ:DEV:{chan}:TEMP:SIG:TEMP",
get_parser=self._parse_temp,
)
def _get_pressure_channels(self) -> None:
chan_pressure_list = []
for i in range(1, 7):
chan = f"P{i}"
chan_pressure_list.append(chan)
self.add_parameter(
name=chan,
unit="bar",
get_cmd=f"READ:DEV:{chan}:PRES:SIG:PRES",
get_parser=self._parse_pres,
)
self.chan_pressure = set(chan_pressure_list)
def _get_temp_channel_names(self, file: str) -> None:
config = configparser.ConfigParser()
with open(file, encoding="utf16") as f:
next(f)
config.read_file(f)
for section in config.sections():
options = config.options(section)
namestr = '"m_lpszname"'
if namestr in options:
chan_number = int(section.split("\\")[-1].split("[")[-1]) + 1
# the names used in the register file are base 0 but the api and the gui
# uses base one names so add one
chan = "T" + str(chan_number)
name = config.get(section, '"m_lpszname"').strip('"')
self.chan_temp_names[chan] = {"name": name, "value": None}
def _get_temp_channels(self) -> None:
chan_temps_list = []
for i in range(1, 17):
chan = f"T{i}"
chan_temps_list.append(chan)
self.add_parameter(
name=chan,
unit="K",
get_cmd=f"READ:DEV:{chan}:TEMP:SIG:TEMP",
get_parser=self._parse_temp,
)
self.chan_temps = set(chan_temps_list)
def _parse_action(self, msg: str) -> str:
"""Parse message and return action as a string
Args:
msg: message string
Returns
action: string describing the action
"""
action = msg[17:]
if action == "PCL":
action = "Precooling"
elif action == "EPCL":
action = "Empty precool loop"
elif action == "COND":
action = "Condensing"
elif action == "NONE":
if self.MC.get() < 2:
action = "Circulating"
else:
action = "Idle"
elif action == "COLL":
action = "Collecting mixture"
else:
action = "Unknown"
return action
def _parse_status(self, msg: str) -> str:
return msg[19:]
def _parse_time(self, msg: str) -> str:
return msg[14:]
def _parse_temp(self, msg: str) -> Optional[float]:
if "NOT_FOUND" in msg:
return None
return float(msg.split("SIG:TEMP:")[-1].strip("K"))
def _parse_pres(self, msg: str) -> Optional[float]:
if "NOT_FOUND" in msg:
return None
return float(msg.split("SIG:PRES:")[-1].strip("mB")) * 1e3
def _recv(self) -> str:
return super()._recv().rstrip()
def _add_pump_state(self) -> None:
self.pumps = set(self.pump_label_dict.keys())
for pump in self.pumps:
self.add_parameter(
name=pump.lower() + "_state",
label=self.pump_label_dict[pump] + " state",
get_cmd=f"READ:DEV:{pump}:PUMP:SIG:STATE",
get_parser=partial(self._get_parser_state, "STATE"),
set_cmd=partial(self._set_pump_state, pump),
val_mapping={"on": "ON", "off": "OFF"},
)
def _set_pump_state(self, pump: str, state: str) -> None:
self.write(f"SET:DEV:{pump}:PUMP:SIG:STATE:{state}")
def _get_parser_pump_speed(self, msg: str) -> Optional[float]:
if "NOT_FOUND" in msg:
return None
return float(msg.split("SPD:")[-1].strip("Hz"))
def _add_temp_state(self) -> None:
for i in range(1, 17):
chan = f"T{i}"
self.add_parameter(
name=chan + "_state",
label=f"Temperature ch{i} state",
get_cmd=f"READ:DEV:{chan}:TEMP:MEAS:ENAB",
get_parser=partial(self._get_parser_state, "ENAB"),
set_cmd=partial(self._set_temp_state, chan),
val_mapping={"on": "ON", "off": "OFF"},
)
def _set_temp_state(self, chan: str, state: str) -> None:
self.write(f"SET:DEV:{chan}:TEMP:MEAS:ENAB:{state}")
def _get_parser_state(self, key: str, msg: str) -> Optional[str]:
if "NOT_FOUND" in msg:
return None
return msg.split(f"{key}:")[-1]
Triton = OxfordTriton
"""Alias for backwards compatibility"""