Source code for qcodes.instrument_drivers.cryomagnetics._cryomagnetics4g

from __future__ import annotations

import re
import time
from dataclasses import dataclass
from typing import TYPE_CHECKING

from pyvisa import VisaIOError

from qcodes.instrument import VisaInstrument, VisaInstrumentKWArgs
from qcodes.validators import Enum, Numbers

if TYPE_CHECKING:
    from typing_extensions import Unpack

    from qcodes.parameters import Parameter


[docs] @dataclass class CryomagneticsOperatingState: ramping: bool = False holding: bool = False standby: bool = False quench_condition_present: bool = False power_module_failure: bool = False
[docs] def can_start_ramping(self) -> bool: required_checks = [ "ramping", "quench_condition_present", "power_module_failure", ] return all(not getattr(self, field) for field in required_checks)
[docs] class Cryomagnetics4GException(Exception): pass
[docs] class Cryomagnetics4GWarning(Warning): pass
[docs] class CryomagneticsModel4G(VisaInstrument): """ Driver for the Cryomagnetics Model 4G superconducting magnet power supply. This driver provides an interface to control and communicate with the Cryomagnetics Model 4G superconducting magnet power supply using the VISA protocol. It allows setting and reading the magnetic field, ramp rate, and various other parameters of the instrument. Args: name: The name of the instrument instance. address: The VISA resource name of the instrument. max_current_limits: A dictionary specifying the maximum current limits and rates for each range. The keys are the range indices, and the values are tuples containing the upper current limit and maximum rate for that range. coil_constant: The coil constant of the magnet in Tesla per Amp. **kwargs: Forwarded to base class. """ KG_TO_TESLA: float = 0.1 # Constant for unit conversion default_terminator = "\n" def __init__( self, name: str, address: str, max_current_limits: dict[int, tuple[float, float]], coil_constant: float, **kwargs: Unpack[VisaInstrumentKWArgs], ): super().__init__(name, address, **kwargs) self.coil_constant = coil_constant self.max_current_limits = max_current_limits # Initialize rate manager based on hypothetical hardware specific limits # Initialize rate manager based on hypothetical hardware specific limits self._initialize_max_current_limits() # Adding parameters self.units: Parameter = self.add_parameter( name="units", set_cmd="UNITS {}", get_cmd="UNITS?", get_parser=str, vals=Enum("A", "kG", "T"), docstring="Field Units", ) """Field Units""" self.ramping_state_check_interval: Parameter = self.add_parameter( "ramping_state_check_interval", initial_value=0.05, unit="s", vals=Numbers(0, 10), set_cmd=None, ) """Parameter ramping_state_check_interval""" self.field: Parameter = self.add_parameter( name="field", unit="T", set_cmd=self.set_field, get_cmd=self._get_field, get_parser=float, vals=Numbers(-9.001, 9.001), docstring="Magnetic Field in Tesla", ) """Magnetic Field in Tesla""" self.rate: Parameter = self.add_parameter( name="rate", unit="T/min", get_cmd=self._get_rate, set_cmd=self._set_rate, get_parser=float, docstring="Rate for magnetic field T/min", ) """Rate for magnetic field T/min""" self.Vmag: Parameter = self.add_parameter( name="Vmag", unit="V", get_cmd="VMAG?", get_parser=float, vals=Numbers(-10, 10), docstring="Magnet sense voltage", ) """Magnet sense voltage""" self.Vout: Parameter = self.add_parameter( name="Vout", unit="V", get_cmd="VOUT?", get_parser=float, vals=Numbers(-12.8, 12.8), docstring="Magnet output voltage", ) """Magnet output voltage""" self.Iout: Parameter = self.add_parameter( name="Iout", unit="A", get_cmd="IOUT?", get_parser=float, docstring="Magnet output field/current", ) """Magnet output field/current""" # Set to remote mode self.operating_mode() # Set units to tesla by default self.units("T") self.connect_message()
[docs] def quenched_state_reset(self) -> None: """ Resets the device's quenched state. """ self.write("QRESET")
[docs] def operating_mode(self, remote: bool = True) -> None: """ Sets the device's operating mode to either remote or local. Args: remote: If True, sets to remote mode, otherwise sets to local mode. """ if remote: self.write("REMOTE") else: self.write("LOCAL")
[docs] def zero_current(self) -> None: """ Sets the device current to zero. """ self.write("SWEEP ZERO")
[docs] def reset(self) -> None: """ Resets the device to its default settings. """ self.write("*RST")
[docs] def magnet_operating_state(self) -> CryomagneticsOperatingState: """ Retrieves the current operating state of the magnet. Returns: CryomagneticsOperatingState: An object representing the current operating state of the magnet. Raises: Cryomagnetics4GException: If the magnet is in a state that prevents ramping, such as quench condition, power module failure, or already ramping. The operating state is determined by querying the status byte (`*STB?`) of the instrument. The status byte is interpreted as follows: - Bit 0: Holding (not ramping) - Bit 1: Ramping - Bit 2: Standby - Bit 4: Quench condition present - Bit 8: Power module failure If the magnet is in a state that prevents ramping (quench condition, power module failure, or already ramping), an exception is raised with an appropriate error message. The error message is also logged using the instrument's logger. If the magnet is in a valid state for ramping, a CryomagneticsOperatingState object is returned, representing the current operating state of the magnet. """ status_byte = int(self.ask("*STB?")) operating_state = CryomagneticsOperatingState( holding=not bool(status_byte & 1) and not bool(status_byte & 2), ramping=bool(status_byte & 1), standby=bool(status_byte & 2), quench_condition_present=bool(status_byte & 4), power_module_failure=bool(status_byte & 8), ) if operating_state.quench_condition_present: error_message = "Cannot ramp due to quench condition." self.log.error(error_message) # Log the error message raise Cryomagnetics4GException(error_message) if operating_state.power_module_failure: error_message = "Cannot ramp due to power module failure." self.log.error(error_message) # Log the error message raise Cryomagnetics4GException(error_message) if operating_state.ramping: error_message = "Cannot ramp as the power supply is already ramping." self.log.error(error_message) # Log the error message raise Cryomagnetics4GException(error_message) return operating_state
[docs] def set_field(self, field_setpoint: float, block: bool = True) -> None: """ Sets the magnetic field strength in Tesla using ULIM, LLIM, and SWEEP commands. Args: field_setpoint: The desired magnetic field strength in Tesla. block: If True, the method will block until the field reaches the setpoint. Raises: Cryo4GException: If the power supply is not in a state where it can start ramping. """ # Convert field setpoint to kG for the instrument field_setpoint_kg = field_setpoint * 10 # Determine sweep direction based on setpoint and current field current_field = self._get_field() self.log.debug(f"Current field: {current_field}, Setpoint: {field_setpoint_kg}") if abs(field_setpoint_kg - current_field) < 1e-4: # Already at the setpoint, no need to sweep self.log.info(f"Magnetic field is already set to {field_setpoint}T") return # Check if we can start ramping try: state = self.magnet_operating_state() except Cryomagnetics4GException as e: self.log.error(f"Cannot set field: {e}") # Log the specific error return if state.can_start_ramping(): if field_setpoint_kg < current_field: sweep_direction = "DOWN" self.write(f"LLIM {field_setpoint_kg}") else: sweep_direction = "UP" self.write(f"ULIM {field_setpoint_kg}") self.log.debug(f"Sweeping {sweep_direction} to {field_setpoint_kg}") self.write(f"SWEEP {sweep_direction}") # Check if we want to block if not block: self.log.warning("Magnetic field is ramping but not currently blocked!") return # Otherwise, wait until no longer ramping self.log.debug( f"Starting blocking ramp of {self.name} to {field_setpoint} T" ) exit_state = self.wait_while_ramping(field_setpoint) self.log.debug("Finished blocking ramp") # If we are now holding, it was successful if not exit_state.holding: msg = "_set_field({}) failed with state: {}" raise Cryomagnetics4GException(msg.format(field_setpoint, exit_state))
[docs] def wait_while_ramping( self, value: float, threshold: float = 1e-5 ) -> CryomagneticsOperatingState: """Waits while the magnet is ramping, checking the status byte instead of field value.""" while True: status_byte = int(self.ask("*STB?")) if not bool(status_byte & 1): # Check if ramping bit is clear break self._sleep(self.ramping_state_check_interval()) self.write("SWEEP PAUSE") self._sleep(1.0) return self.magnet_operating_state()
def _sleep(self, t: float) -> None: """ Sleep for a number of seconds t. If we are or using the PyVISA 'sim' backend, omit this """ simmode = getattr(self, "visabackend", False) == "sim" if simmode: return else: time.sleep(t) def _get_field(self) -> float: current_value = self.ask("IMAG?") # Define a regular expression to match the floating point number and the unit match = re.match( r"^([-+]?[0-9]*\.?[0-9]+)\s*([a-zA-Z]+)$", current_value.strip() ) if not match: raise ValueError(f"Invalid format for measurement: '{current_value}'") raw_value, unit = match.groups() # Convert the numeric part to float try: numeric_value = float(raw_value) except ValueError: raise ValueError(f"Unable to convert '{raw_value}' to float") # Validate the unit part if unit != "kG": raise ValueError(f"Unexpected unit '{unit}'. Expected 'kG'") if self.units() == "A": raise ValueError( "Current units are set to Amperes (A). Cannot retrieve magnetic field in these units." ) # Return value in Tesla, only converting if necessary if self.units() == "T": return numeric_value * self.KG_TO_TESLA else: return numeric_value def _get_rate(self) -> float: """ Get the current ramp rate in Tesla per minute. """ # Get the rate from the instrument in Amps per second rate_amps_per_sec = float(self.ask("RATE?")) # Convert to Tesla per minute rate_tesla_per_min = rate_amps_per_sec * 60 / self.coil_constant return rate_tesla_per_min def _set_rate(self, rate_tesla_per_min: float) -> None: """ Set the ramp rate in Tesla per minute. """ # Convert from Tesla per minute to Amps per second rate_amps_per_sec = rate_tesla_per_min * self.coil_constant / 60 # Find the appropriate range and set the rate current_field = self._get_field() # Get current field in Tesla current_in_amps = current_field * self.coil_constant # Convert to Amps # (Implement a more efficient lookup method here if needed) for range_index, (upper_limit, max_rate) in self.max_current_limits.items(): if current_in_amps <= upper_limit: actual_rate = min( rate_amps_per_sec, max_rate ) # Ensure rate doesn't exceed maximum self.write(f"RATE {range_index} {actual_rate}") return raise ValueError("Current field is outside of defined rate ranges") def _initialize_max_current_limits(self) -> None: """ Initialize the instrument with the provided current limits and rates. """ for range_index, (upper_limit, max_rate) in self.max_current_limits.items(): self.write(f"RANGE {range_index} {upper_limit}") self.write(f"RATE {range_index} {max_rate}")
[docs] def write_raw(self, cmd: str) -> None: try: super().write_raw(cmd) except VisaIOError as err: # The ami communication has found to be unstable # so we retry the communication here msg = f"Got VisaIOError while writing {cmd} to instrument." if self._RETRY_WRITE_ASK: msg += f" Will retry in {self._RETRY_TIME} sec." self.log.exception(msg) if self._RETRY_WRITE_ASK: time.sleep(self._RETRY_TIME) self.device_clear() super().write_raw(cmd) else: raise err
[docs] def ask_raw(self, cmd: str) -> str: try: result = super().ask_raw(cmd) except VisaIOError as err: # The communication has found to be unstable # so we retry the communication here msg = f"Got VisaIOError while asking the instrument: {cmd}" if self._RETRY_WRITE_ASK: msg += f" Will retry in {self._RETRY_TIME} sec." self.log.exception(msg) if self._RETRY_WRITE_ASK: time.sleep(self._RETRY_TIME) self.device_clear() result = super().ask_raw(cmd) else: raise err return result