Source code for qcodes.instrument_drivers.oxford.triton

import configparser
import logging
import re
from functools import partial
from time import sleep
from typing import TYPE_CHECKING

from qcodes.instrument import InstrumentBaseKWArgs, IPInstrument
from qcodes.validators import Enum, Ints, Numbers

if TYPE_CHECKING:
    from typing_extensions import Unpack

    from qcodes.parameters import Parameter


[docs] class OxfordTriton(IPInstrument): r""" Triton Driver Args: name: name of the cryostat. address: IP-address of the fridge computer. Defaults to None. port: port of the oxford program running on the fridge computer. The relevant port can be found in the manual for the fridge or looked up on the fridge computer. Defaults to None. terminator: Defaults to '\r\n' tmpfile: an exported windows registry file from the registry path: `[HKEY_CURRENT_USER\Software\Oxford Instruments\Triton System Control\Thermometry]` and is used to extract the available temperature channels. timeout: Defaults to 20. **kwargs: Forwarded to base class. Status: beta-version. Todo: fetch registry directly from fridge-computer """ def __init__( self, name: str, address: str | None = None, port: int | None = None, terminator: str = "\r\n", tmpfile: str | None = None, timeout: float = 20, **kwargs: "Unpack[InstrumentBaseKWArgs]", ): super().__init__( name, address=address, port=port, terminator=terminator, timeout=timeout, **kwargs, ) self._heater_range_auto = False self._heater_range_temp = [0.03, 0.1, 0.3, 1, 12, 40] self._heater_range_curr = [0.316, 1, 3.16, 10, 31.6, 100] self._control_channel = 5 self.pump_label_dict = {"TURB1": "Turbo 1", "COMP": "Compressor"} self.time: Parameter = self.add_parameter( name="time", label="System Time", get_cmd="READ:SYS:TIME", get_parser=self._parse_time, ) """Parameter time""" self.action: Parameter = self.add_parameter( name="action", label="Current action", get_cmd="READ:SYS:DR:ACTN", get_parser=self._parse_action, ) """Parameter action""" self.status: Parameter = self.add_parameter( name="status", label="Status", get_cmd="READ:SYS:DR:STATUS", get_parser=self._parse_status, ) """Parameter status""" self.pid_control_channel: Parameter = self.add_parameter( name="pid_control_channel", label="PID control channel", get_cmd=self._get_control_channel, set_cmd=self._set_control_channel, vals=Ints(1, 16), ) """Parameter pid_control_channel""" self.pid_mode: Parameter = self.add_parameter( name="pid_mode", label="PID Mode", get_cmd=partial(self._get_control_param, "MODE"), set_cmd=partial(self._set_control_param, "MODE"), val_mapping={"on": "ON", "off": "OFF"}, ) """Parameter pid_mode""" self.pid_ramp: Parameter = self.add_parameter( name="pid_ramp", label="PID ramp enabled", get_cmd=partial(self._get_control_param, "RAMP:ENAB"), set_cmd=partial(self._set_control_param, "RAMP:ENAB"), val_mapping={"on": "ON", "off": "OFF"}, ) """Parameter pid_ramp""" self.pid_setpoint: Parameter = self.add_parameter( name="pid_setpoint", label="PID temperature setpoint", unit="K", get_cmd=partial(self._get_control_param, "TSET"), set_cmd=partial(self._set_control_param, "TSET"), ) """Parameter pid_setpoint""" self.pid_p: Parameter = self.add_parameter( name="pid_p", label="PID proportionality", get_cmd=partial(self._get_control_param, "P"), set_cmd=partial(self._set_control_param, "P"), vals=Numbers(0, 1e3), ) """Parameter pid_p""" self.pid_i: Parameter = self.add_parameter( name="pid_i", label="PID intergral", get_cmd=partial(self._get_control_param, "I"), set_cmd=partial(self._set_control_param, "I"), vals=Numbers(0, 1e3), ) """Parameter pid_i""" self.pid_d: Parameter = self.add_parameter( name="pid_d", label="PID derivative", get_cmd=partial(self._get_control_param, "D"), set_cmd=partial(self._set_control_param, "D"), vals=Numbers(0, 1e3), ) """Parameter pid_d""" self.pid_rate: Parameter = self.add_parameter( name="pid_rate", label="PID ramp rate", unit="K/min", get_cmd=partial(self._get_control_param, "RAMP:RATE"), set_cmd=partial(self._set_control_param, "RAMP:RATE"), ) """Parameter pid_rate""" self.pid_range: Parameter = self.add_parameter( name="pid_range", label="PID heater range", # TODO: The units in the software are mA, how to # do this correctly? unit="mA", get_cmd=partial(self._get_control_param, "RANGE"), set_cmd=partial(self._set_control_param, "RANGE"), vals=Enum(*self._heater_range_curr), ) """Parameter pid_range""" self.magnet_status: Parameter = self.add_parameter( name="magnet_status", label="Magnet status", unit="", get_cmd=partial(self._get_control_B_param, "ACTN"), ) """Parameter magnet_status""" self.magnet_sweeprate: Parameter = self.add_parameter( name="magnet_sweeprate", label="Magnet sweep rate", unit="T/min", get_cmd=partial(self._get_control_B_param, "RVST:RATE"), set_cmd=partial(self._set_control_magnet_sweeprate_param), ) """Parameter magnet_sweeprate""" self.magnet_sweeprate_insta: Parameter = self.add_parameter( name="magnet_sweeprate_insta", label="Instantaneous magnet sweep rate", unit="T/min", get_cmd=partial(self._get_control_B_param, "RFST"), ) """Parameter magnet_sweeprate_insta""" self.B: Parameter = self.add_parameter( name="B", label="Magnetic field", unit="T", get_cmd=partial(self._get_control_B_param, "VECT"), ) """Parameter B""" self.Bx: Parameter = self.add_parameter( name="Bx", label="Magnetic field x-component", unit="T", get_cmd=partial(self._get_control_Bcomp_param, "VECTBx"), set_cmd=partial(self._set_control_Bx_param), ) """Parameter Bx""" self.By: Parameter = self.add_parameter( name="By", label="Magnetic field y-component", unit="T", get_cmd=partial(self._get_control_Bcomp_param, "VECTBy"), set_cmd=partial(self._set_control_By_param), ) """Parameter By""" self.Bz: Parameter = self.add_parameter( name="Bz", label="Magnetic field z-component", unit="T", get_cmd=partial(self._get_control_Bcomp_param, "VECTBz"), set_cmd=partial(self._set_control_Bz_param), ) """Parameter Bz""" self.magnet_sweep_time: Parameter = self.add_parameter( name="magnet_sweep_time", label="Magnet sweep time", unit="T/min", get_cmd=partial(self._get_control_B_param, "RVST:TIME"), ) """Parameter magnet_sweep_time""" self.turb1_speed: Parameter = self.add_parameter( name="turb1_speed", label=self.pump_label_dict["TURB1"] + " speed", unit="Hz", get_cmd="READ:DEV:TURB1:PUMP:SIG:SPD", get_parser=self._get_parser_pump_speed, ) """Parameter turb1_speed""" self._add_pump_state() self._add_temp_state() self.chan_alias: dict[str, str] = {} self.chan_temp_names: dict[str, dict[str, str | None]] = {} if tmpfile is not None: self._get_temp_channel_names(tmpfile) self._get_temp_channels() self._get_pressure_channels() try: self._get_named_channels() except Exception: logging.warning("Ignored an error in _get_named_channels\n", exc_info=True) self.connect_message()
[docs] def set_B(self, x: float, y: float, z: float, s: float) -> None: if 0 < s <= 0.2: self.write( "SET:SYS:VRM:COO:CART:RVST:MODE:RATE:RATE:" + str(s) + ":VSET:[" + str(x) + " " + str(y) + " " + str(z) + "]\r\n" ) self.write("SET:SYS:VRM:ACTN:RTOS\r\n") t_wait = self.magnet_sweep_time() * 60 + 10 print("Please wait " + str(t_wait) + " seconds for the field sweep...") sleep(t_wait) else: print("Warning: set magnet sweep rate in range (0 , 0.2] T/min")
def _get_control_B_param(self, param: str) -> float | str | list[float] | None: cmd = f"READ:SYS:VRM:{param}" return self._get_response_value(self.ask(cmd)) def _get_control_Bcomp_param(self, param: str) -> float | str | list[float] | None: cmd = f"READ:SYS:VRM:{param}" return self._get_response_value(self.ask(cmd[:-2]) + cmd[-2:]) def _get_response(self, msg: str) -> str: return msg.split(":")[-1] def _get_response_value(self, msg: str) -> float | str | list[float] | None: msg = self._get_response(msg) if msg.endswith("NOT_FOUND"): return None elif msg.endswith("IDLE"): return "IDLE" elif msg.endswith("RTOS"): return "RTOS" elif msg.endswith("Bx"): return float(re.findall(r"[-+]?\d*\.\d+|\d+", msg)[0]) elif msg.endswith("By"): return float(re.findall(r"[-+]?\d*\.\d+|\d+", msg)[1]) elif msg.endswith("Bz"): return float(re.findall(r"[-+]?\d*\.\d+|\d+", msg)[2]) elif len(re.findall(r"[-+]?\d*\.\d+|\d+", msg)) > 1: return [ float(re.findall(r"[-+]?\d*\.\d+|\d+", msg)[0]), float(re.findall(r"[-+]?\d*\.\d+|\d+", msg)[1]), float(re.findall(r"[-+]?\d*\.\d+|\d+", msg)[2]), ] try: return float(re.findall(r"[-+]?\d*\.\d+|\d+", msg)[0]) except Exception: return msg
[docs] def get_idn(self) -> dict[str, str | None]: """Return the Instrument Identifier Message""" idstr = self.ask("*IDN?") idparts = [p.strip() for p in idstr.split(":", 4)][1:] return dict(zip(("vendor", "model", "serial", "firmware"), idparts))
def _get_control_channel(self, force_get: bool = False) -> int: # verify current channel if self._control_channel and not force_get: tempval = self.ask(f"READ:DEV:T{self._control_channel}:TEMP:LOOP:MODE") if not tempval.endswith("NOT_FOUND"): return self._control_channel # either _control_channel is not set or wrong for i in range(1, 17): tempval = self.ask(f"READ:DEV:T{i}:TEMP:LOOP:MODE") if not tempval.endswith("NOT_FOUND"): self._control_channel = i break return self._control_channel def _set_control_channel(self, channel: int) -> None: self._control_channel = channel self.write(f"SET:DEV:T{self._get_control_channel()}:TEMP:LOOP:HTR:H1") def _get_control_param(self, param: str) -> float | str | list[float] | None: chan = self._get_control_channel() cmd = f"READ:DEV:T{chan}:TEMP:LOOP:{param}" return self._get_response_value(self.ask(cmd)) def _set_control_param(self, param: str, value: float) -> None: chan = self._get_control_channel() cmd = f"SET:DEV:T{chan}:TEMP:LOOP:{param}:{value}" self.write(cmd) def _set_control_magnet_sweeprate_param(self, s: float) -> None: if 0 < s <= 0.2: x = round(self.Bx(), 4) y = round(self.By(), 4) z = round(self.Bz(), 4) self.write( "SET:SYS:VRM:COO:CART:RVST:MODE:RATE:RATE:" + str(s) + ":VSET:[" + str(x) + " " + str(y) + " " + str(z) + "]\r\n" ) else: print( "Warning: set sweeprate in range (0 , 0.2] T/min, not setting sweeprate" ) def _set_control_Bx_param(self, x: float) -> None: s = self.magnet_sweeprate() y = round(self.By(), 4) z = round(self.Bz(), 4) self.write( "SET:SYS:VRM:COO:CART:RVST:MODE:RATE:RATE:" + str(s) + ":VSET:[" + str(x) + " " + str(y) + " " + str(z) + "]\r\n" ) self.write("SET:SYS:VRM:ACTN:RTOS\r\n") # just to give an time estimate, +10s for overhead t_wait = self.magnet_sweep_time() * 60 + 10 print("Please wait " + str(t_wait) + " seconds for the field sweep...") while self.magnet_status() != "IDLE": pass def _set_control_By_param(self, y: float) -> None: s = self.magnet_sweeprate() x = round(self.Bx(), 4) z = round(self.Bz(), 4) self.write( "SET:SYS:VRM:COO:CART:RVST:MODE:RATE:RATE:" + str(s) + ":VSET:[" + str(x) + " " + str(y) + " " + str(z) + "]\r\n" ) self.write("SET:SYS:VRM:ACTN:RTOS\r\n") # just to give an time estimate, +10s for overhead t_wait = self.magnet_sweep_time() * 60 + 10 print("Please wait " + str(t_wait) + " seconds for the field sweep...") while self.magnet_status() != "IDLE": pass def _set_control_Bz_param(self, z: float) -> None: s = self.magnet_sweeprate() x = round(self.Bx(), 4) y = round(self.By(), 4) self.write( "SET:SYS:VRM:COO:CART:RVST:MODE:RATE:RATE:" + str(s) + ":VSET:[" + str(x) + " " + str(y) + " " + str(z) + "]\r\n" ) self.write("SET:SYS:VRM:ACTN:RTOS\r\n") # just to give an time estimate, +10s for overhead t_wait = self.magnet_sweep_time() * 60 + 10 print("Please wait " + str(t_wait) + " seconds for the field sweep...") while self.magnet_status() != "IDLE": pass def _get_named_channels(self) -> None: allchans_str = self.ask("READ:SYS:DR:CHAN") allchans = allchans_str.replace("STAT:SYS:DR:CHAN:", "", 1).split(":") for ch in allchans: msg = f"READ:SYS:DR:CHAN:{ch}" rep = self.ask(msg) if "INVALID" not in rep and "NONE" not in rep: alias, chan = rep.split(":")[-2:] self.chan_alias[alias] = chan self.add_parameter( name=alias, unit="K", get_cmd=f"READ:DEV:{chan}:TEMP:SIG:TEMP", get_parser=self._parse_temp, ) def _get_pressure_channels(self) -> None: chan_pressure_list = [] for i in range(1, 7): chan = f"P{i}" chan_pressure_list.append(chan) self.add_parameter( name=chan, unit="bar", get_cmd=f"READ:DEV:{chan}:PRES:SIG:PRES", get_parser=self._parse_pres, ) self.chan_pressure = set(chan_pressure_list) def _get_temp_channel_names(self, file: str) -> None: config = configparser.ConfigParser() with open(file, encoding="utf16") as f: next(f) config.read_file(f) for section in config.sections(): options = config.options(section) namestr = '"m_lpszname"' if namestr in options: chan_number = int(re.findall(r"\d+", section)[-1]) + 1 # the names used in the register file are base 0 but the api and the gui # uses base one names so add one chan = "T" + str(chan_number) name = config.get(section, '"m_lpszname"').strip('"') self.chan_temp_names[chan] = {"name": name, "value": None} def _get_temp_channels(self) -> None: chan_temps_list = [] for i in range(1, 17): chan = f"T{i}" chan_temps_list.append(chan) self.add_parameter( name=chan, unit="K", get_cmd=f"READ:DEV:{chan}:TEMP:SIG:TEMP", get_parser=self._parse_temp, ) self.chan_temps = set(chan_temps_list) def _parse_action(self, msg: str) -> str: """Parse message and return action as a string Args: msg: message string Returns action: string describing the action """ action = msg[17:] if action == "PCL": action = "Precooling" elif action == "EPCL": action = "Empty precool loop" elif action == "COND": action = "Condensing" elif action == "NONE": if self.MC.get() < 2: action = "Circulating" else: action = "Idle" elif action == "COLL": action = "Collecting mixture" else: action = "Unknown" return action def _parse_status(self, msg: str) -> str: return msg[19:] def _parse_time(self, msg: str) -> str: return msg[14:] def _parse_temp(self, msg: str) -> float | None: if "NOT_FOUND" in msg: return None return float(msg.split("SIG:TEMP:")[-1].strip("K")) def _parse_pres(self, msg: str) -> float | None: if "NOT_FOUND" in msg: return None return float(msg.split("SIG:PRES:")[-1].strip("mB")) * 1e3 def _recv(self) -> str: return super()._recv().rstrip() def _add_pump_state(self) -> None: self.pumps = set(self.pump_label_dict.keys()) for pump in self.pumps: self.add_parameter( name=pump.lower() + "_state", label=self.pump_label_dict[pump] + " state", get_cmd=f"READ:DEV:{pump}:PUMP:SIG:STATE", get_parser=partial(self._get_parser_state, "STATE"), set_cmd=partial(self._set_pump_state, pump), val_mapping={"on": "ON", "off": "OFF"}, ) def _set_pump_state(self, pump: str, state: str) -> None: self.write(f"SET:DEV:{pump}:PUMP:SIG:STATE:{state}") def _get_parser_pump_speed(self, msg: str) -> float | None: if "NOT_FOUND" in msg: return None return float(msg.split("SPD:")[-1].strip("Hz")) def _add_temp_state(self) -> None: for i in range(1, 17): chan = f"T{i}" self.add_parameter( name=chan + "_state", label=f"Temperature ch{i} state", get_cmd=f"READ:DEV:{chan}:TEMP:MEAS:ENAB", get_parser=partial(self._get_parser_state, "ENAB"), set_cmd=partial(self._set_temp_state, chan), val_mapping={"on": "ON", "off": "OFF"}, ) def _set_temp_state(self, chan: str, state: str) -> None: self.write(f"SET:DEV:{chan}:TEMP:MEAS:ENAB:{state}") def _get_parser_state(self, key: str, msg: str) -> str | None: if "NOT_FOUND" in msg: return None return msg.split(f"{key}:")[-1]
Triton = OxfordTriton """Alias for backwards compatibility"""