Source code for qcodes.instrument_drivers.oxford.MercuryiPS_VISA

from __future__ import annotations

import logging
import time
from functools import partial
from typing import TYPE_CHECKING, cast

import numpy as np
import numpy.typing as npt
from packaging import version

from qcodes.instrument import (
    InstrumentBaseKWArgs,
    InstrumentChannel,
    VisaInstrument,
    VisaInstrumentKWArgs,
)
from qcodes.math_utils import FieldVector

if TYPE_CHECKING:
    from collections.abc import Callable

    from typing_extensions import Unpack

    from qcodes.parameters import Parameter

log = logging.getLogger(__name__)
visalog = logging.getLogger('qcodes.instrument.visa')


def _response_preparser(bare_resp: str) -> str:
    """
    Pre-parse response from the instrument
    """
    return bare_resp.replace(':', '')


def _signal_parser(our_scaling: float, response: str) -> float:
    """
    Parse a response string into a correct SI value.

    Args:
        our_scaling: Whatever scale we might need to apply to get from
            e.g. A/min to A/s.
        response: What comes back from instrument.ask
    """

    # there might be a scale before the unit. We only want to deal in SI
    # units, so we translate the scale
    scale_to_factor = {'n': 1e-9, 'u': 1e-6, 'm': 1e-3,
                       'k': 1e3, 'M': 1e6}

    numchars = ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '.', '-']

    response = _response_preparser(response)
    digits = "".join(d for d in response if d in numchars)
    scale_and_unit = response[len(digits) :]
    if scale_and_unit == "":
        their_scaling: float = 1
    elif scale_and_unit[0] in scale_to_factor.keys():
        their_scaling = scale_to_factor[scale_and_unit[0]]
    else:
        their_scaling = 1

    return float(digits)*their_scaling*our_scaling


[docs] class OxfordMercuryWorkerPS(InstrumentChannel): """ Class to hold a worker power supply for the Oxford MercuryiPS """ def __init__( self, parent: VisaInstrument, name: str, UID: str, **kwargs: Unpack[InstrumentBaseKWArgs], ) -> None: """ Args: parent: The Instrument instance of the MercuryiPS name: The 'colloquial' name of the PS UID: The UID as used internally by the MercuryiPS, e.g. 'GRPX' **kwargs: Forwarded to base class. """ if ':' in UID: raise ValueError('Invalid UID. Must be axis group name or device ' 'name, e.g. "GRPX" or "PSU.M1"') super().__init__(parent, name, **kwargs) self.uid = UID # The firmware update from 2.5 -> 2.6 changed the command # syntax slightly if version.parse(self.root_instrument.firmware) >= version.parse("2.6"): self.psu_string = "SPSU" else: self.psu_string = "PSU" self.voltage: Parameter = self.add_parameter( "voltage", label="Output voltage", get_cmd=partial(self._param_getter, "SIG:VOLT"), unit="V", get_parser=partial(_signal_parser, 1), ) """Parameter voltage""" self.current: Parameter = self.add_parameter( "current", label="Output current", get_cmd=partial(self._param_getter, "SIG:CURR"), unit="A", get_parser=partial(_signal_parser, 1), ) """Parameter current""" self.current_persistent: Parameter = self.add_parameter( "current_persistent", label="Output persistent current", get_cmd=partial(self._param_getter, "SIG:PCUR"), unit="A", get_parser=partial(_signal_parser, 1), ) """Parameter current_persistent""" self.current_target: Parameter = self.add_parameter( "current_target", label="Target current", get_cmd=partial(self._param_getter, "SIG:CSET"), unit="A", get_parser=partial(_signal_parser, 1), ) """Parameter current_target""" self.field_target: Parameter = self.add_parameter( "field_target", label="Target field", get_cmd=partial(self._param_getter, "SIG:FSET"), set_cmd=partial(self._param_setter, "SIG:FSET"), unit="T", get_parser=partial(_signal_parser, 1), ) """Parameter field_target""" # NB: The current ramp rate follows the field ramp rate # (converted via the ATOB param) self.current_ramp_rate: Parameter = self.add_parameter( "current_ramp_rate", label="Ramp rate (current)", unit="A/s", get_cmd=partial(self._param_getter, "SIG:RCST"), get_parser=partial(_signal_parser, 1 / 60), ) """Parameter current_ramp_rate""" self.field_ramp_rate: Parameter = self.add_parameter( "field_ramp_rate", label="Ramp rate (field)", unit="T/s", set_cmd=partial(self._param_setter, "SIG:RFST"), get_cmd=partial(self._param_getter, "SIG:RFST"), get_parser=partial(_signal_parser, 1 / 60), set_parser=lambda x: x * 60, ) """Parameter field_ramp_rate""" self.field: Parameter = self.add_parameter( "field", label="Field strength", unit="T", get_cmd=partial(self._param_getter, "SIG:FLD"), get_parser=partial(_signal_parser, 1), ) """Parameter field""" self.field_persistent: Parameter = self.add_parameter( "field_persistent", label="Persistent field strength", unit="T", get_cmd=partial(self._param_getter, "SIG:PFLD"), get_parser=partial(_signal_parser, 1), ) """Parameter field_persistent""" self.ATOB: Parameter = self.add_parameter( "ATOB", label="Current to field ratio", unit="A/T", get_cmd=partial(self._param_getter, "ATOB"), get_parser=partial(_signal_parser, 1), set_cmd=partial(self._param_setter, "ATOB"), ) """Parameter ATOB""" self.ramp_status: Parameter = self.add_parameter( "ramp_status", label="Ramp status", get_cmd=partial(self._param_getter, "ACTN"), set_cmd=self._ramp_status_setter, get_parser=_response_preparser, val_mapping={ "HOLD": "HOLD", "TO SET": "RTOS", "CLAMP": "CLMP", "TO ZERO": "RTOZ", }, ) """Parameter ramp_status"""
[docs] def ramp_to_target(self) -> None: """ Unconditionally ramp this PS to its target """ status = self.ramp_status() if status == 'CLAMP': self.ramp_status('HOLD') self.ramp_status('TO SET')
def _ramp_status_setter(self, cmd: str) -> None: status_now = self.ramp_status() if status_now == 'CLAMP' and cmd == 'RTOS': raise ValueError(f'Error in ramping unit {self.uid}: ' 'Can not ramp to target value; power supply is ' 'clamped. Unclamp first by setting ramp status ' 'to HOLD.') else: partial(self._param_setter, 'ACTN')(cmd) def _param_getter(self, get_cmd: str) -> str: """ General getter function for parameters Args: get_cmd: raw string for the command, e.g. 'SIG:VOLT' Returns: The response. Cf. MercuryiPS.ask for how much is returned """ dressed_cmd = f"READ:DEV:{self.uid}:{self.psu_string}:{get_cmd}" resp = self._parent.ask(dressed_cmd) return resp def _param_setter(self, set_cmd: str, value: float | str) -> None: """ General setter function for parameters Args: set_cmd: raw string for the command, e.g. 'SIG:FSET' value: Value to set """ dressed_cmd = f"SET:DEV:{self.uid}:{self.psu_string}:{set_cmd}:{value}" # the instrument always very verbosely responds # the return value of `ask` # holds the value reported back by the instrument self._parent.ask(dressed_cmd)
# TODO: we could use the opportunity to check that we did set/achieve # the intended value MercuryWorkerPS = OxfordMercuryWorkerPS """ Alias for backwards compatibility """
[docs] class OxfordMercuryiPS(VisaInstrument): """ Driver class for the QCoDeS Oxford Instruments MercuryiPS magnet power supply """ default_terminator = "\n" def __init__( self, name: str, address: str, *, field_limits: Callable[[float, float, float], bool] | None = None, **kwargs: Unpack[VisaInstrumentKWArgs], ) -> None: """ Args: name: The name to give this instrument internally in QCoDeS address: The VISA resource of the instrument. Note that a socket connection to port 7020 must be made field_limits: A function describing the allowed field range (T). The function shall take (x, y, z) as an input and return a boolean describing whether that field value is acceptable. **kwargs: kwargs are forwarded to base class. """ if field_limits is not None and not(callable(field_limits)): raise ValueError('Got wrong type of field_limits. Must be a ' 'function from (x, y, z) -> Bool. Received ' f'{type(field_limits)} instead.') pyvisa_sim_file = kwargs.get("pyvisa_sim_file", None) # ensure that a socket is used unless we are in simulation mode if ( not address.endswith("SOCKET") and not address.endswith("@sim") and not pyvisa_sim_file ): raise ValueError( "Incorrect VISA resource name. Must be of type " "TCPIP0::XXX.XXX.XXX.XXX::7020::SOCKET." ) super().__init__(name, address, **kwargs) self.firmware = self.IDN()['firmware'] # TODO: Query instrument to ensure which PSUs are actually present for grp in ['GRPX', 'GRPY', 'GRPZ']: psu_name = grp psu = OxfordMercuryWorkerPS(self, psu_name, grp) self.add_submodule(psu_name, psu) self._field_limits = (field_limits if field_limits else lambda x, y, z: True) self._target_vector = FieldVector(x=self.GRPX.field(), y=self.GRPY.field(), z=self.GRPZ.field()) for coord, unit in zip( ['x', 'y', 'z', 'r', 'theta', 'phi', 'rho'], ['T', 'T', 'T', 'T', 'degrees', 'degrees', 'T']): self.add_parameter(name=f'{coord}_target', label=f'{coord.upper()} target field', unit=unit, get_cmd=partial(self._get_component, coord), set_cmd=partial(self._set_target, coord)) self.add_parameter(name=f'{coord}_measured', label=f'{coord.upper()} measured field', unit=unit, get_cmd=partial(self._get_measured, [coord])) self.add_parameter(name=f'{coord}_ramp', label=f'{coord.upper()} ramp field', unit=unit, docstring='A safe ramp for each coordinate', get_cmd=partial(self._get_component, coord), set_cmd=partial(self._set_target_and_ramp, coord, 'safe')) if coord in ['r', 'theta', 'phi', 'rho']: self.add_parameter(name=f'{coord}_simulramp', label=f'{coord.upper()} ramp field', unit=unit, docstring='A simultaneous blocking ramp for' ' a combined coordinate', get_cmd=partial(self._get_component, coord), set_cmd=partial(self._set_target_and_ramp, coord, 'simul_block')) # FieldVector-valued parameters # self.field_target: Parameter = self.add_parameter( name="field_target", label="target field", unit="T", get_cmd=self._get_target_field, set_cmd=self._set_target_field, ) """Parameter field_target""" self.field_measured: Parameter = self.add_parameter( name="field_measured", label="measured field", unit="T", get_cmd=self._get_field, ) """Parameter field_measured""" self.field_ramp_rate: Parameter = self.add_parameter( name="field_ramp_rate", label="ramp rate", unit="T/s", get_cmd=self._get_ramp_rate, set_cmd=self._set_ramp_rate, ) """Parameter field_ramp_rate""" self.connect_message() def _get_component(self, coordinate: str) -> float: return self._target_vector.get_components(coordinate)[0] def _get_target_field(self) -> FieldVector: return FieldVector( **{ coord: self._get_component(coord) for coord in 'xyz' } ) def _get_ramp_rate(self) -> FieldVector: return FieldVector( x=self.GRPX.field_ramp_rate(), y=self.GRPY.field_ramp_rate(), z=self.GRPZ.field_ramp_rate(), ) def _set_ramp_rate(self, rate: FieldVector) -> None: self.GRPX.field_ramp_rate(rate.x) self.GRPY.field_ramp_rate(rate.y) self.GRPZ.field_ramp_rate(rate.z) def _get_measured(self, coordinates: list[str]) -> float | list[float]: """ Get the measured value of a coordinate. Measures all three fields and computes whatever coordinate we asked for. """ meas_field = FieldVector(x=self.GRPX.field(), y=self.GRPY.field(), z=self.GRPZ.field()) if len(coordinates) == 1: return meas_field.get_components(*coordinates)[0] else: return meas_field.get_components(*coordinates) def _get_field(self) -> FieldVector: return FieldVector( x=self.x_measured(), y=self.y_measured(), z=self.z_measured() ) def _set_target(self, coordinate: str, target: float) -> None: """ The function to set a target value for a coordinate, i.e. the set_cmd for the XXX_target parameters """ # first validate the new target valid_vec = FieldVector() valid_vec.copy(self._target_vector) valid_vec.set_component(**{coordinate: target}) components = valid_vec.get_components('x', 'y', 'z') if not self._field_limits(*components): raise ValueError(f'Cannot set {coordinate} target to {target}, ' 'that would violate the field_limits. ') # update our internal target cache self._target_vector.set_component(**{coordinate: target}) # actually assign the target on the workers cartesian_targ = self._target_vector.get_components('x', 'y', 'z') for targ, worker in zip(cartesian_targ, self.submodules.values()): if not isinstance(worker, OxfordMercuryWorkerPS): raise RuntimeError(f"Expected a MercuryWorkerPS but got " f"{type(worker)}") worker.field_target(targ) def _set_target_field(self, field: FieldVector) -> None: for coord in 'xyz': self._set_target(coord, field[coord])
[docs] def get_idn(self) -> dict[str, str | None]: """ Parse the raw non-SCPI compliant IDN string into an IDN dict Returns: The normal IDN dict """ raw_idn_string = self.ask('*IDN?') resps = raw_idn_string.split(':') idn_dict: dict[str, str | None] = { "model": resps[2], "vendor": resps[1], "serial": resps[3], "firmware": resps[4], } return idn_dict
def _ramp_simultaneously(self) -> None: """ Ramp all three fields to their target simultaneously at their given ramp rates. NOTE: there is NO guarantee that this does not take you out of your safe region. Use with care. """ for worker in self.submodules.values(): if not isinstance(worker, OxfordMercuryWorkerPS): raise RuntimeError(f"Expected a MercuryWorkerPS but got " f"{type(worker)}") worker.ramp_to_target() def _ramp_simultaneously_blocking(self) -> None: """ Ramp all three fields to their target simultaneously at their given ramp rates. NOTE: there is NO guarantee that this does not take you out of your safe region. Use with care. This function is BLOCKING. """ self._ramp_simultaneously() for worker in self.submodules.values(): if not isinstance(worker, OxfordMercuryWorkerPS): raise RuntimeError(f"Expected a MercuryWorkerPS but got " f"{type(worker)}") # wait for the ramp to finish, we don't care about the order while worker.ramp_status() == 'TO SET': time.sleep(0.1) self.update_field() def _ramp_safely(self) -> None: """ Ramp all three fields to their target using the 'first-down-then-up' sequential ramping procedure. This function is BLOCKING. """ meas_vals: npt.NDArray[np.floating] = np.array( self._get_measured(["x", "y", "z"]) ) targ_vals: npt.NDArray[np.floating] = np.array( self._target_vector.get_components("x", "y", "z") ) order = np.argsort(np.abs(targ_vals - meas_vals)) for worker in np.array(list(self.submodules.values()))[order]: worker.ramp_to_target() # now just wait for the ramp to finish # (unless we are testing) if self.visabackend == 'sim': pass else: while worker.ramp_status() == 'TO SET': time.sleep(0.1) self.update_field()
[docs] def update_field(self) -> None: """ Update all the field components. """ coords = ['x', 'y', 'z', 'r', 'theta', 'phi', 'rho'] _ = self._get_field() [getattr(self, f'{i}_measured').get() for i in coords]
[docs] def is_ramping(self) -> bool: """ Returns True if any axis has a ramp status that is either 'TO SET' or 'TO ZERO' """ ramping_statuus = ['TO SET', 'TO ZERO'] is_x_ramping = self.GRPX.ramp_status() in ramping_statuus is_y_ramping = self.GRPY.ramp_status() in ramping_statuus is_z_ramping = self.GRPZ.ramp_status() in ramping_statuus return is_x_ramping or is_y_ramping or is_z_ramping
[docs] def set_new_field_limits(self, limit_func: Callable[[float, float, float], bool]) -> None: """ Assign a new field limit function to the driver Args: limit_func: must be a function mapping (Bx, By, Bz) -> True/False where True means that the field is INSIDE the allowed region """ # first check that the current target is allowed if not limit_func(*self._target_vector.get_components('x', 'y', 'z')): raise ValueError('Can not assign new limit function; present ' 'target is illegal. Please change the target ' 'and try again.') self._field_limits = limit_func
[docs] def ramp(self, mode: str = "safe") -> None: """ Ramp the fields to their present target value Args: mode: how to ramp, either 'simul', 'simul-block' or 'safe'. In 'simul' and 'simul-block' mode, the fields are ramping simultaneously in a non-blocking mode and blocking mode, respectively. There is no safety check that the safe zone is not exceeded. In 'safe' mode, the fields are ramped one-by-one in a blocking way that ensures that the total field stays within the safe region (provided that this region is convex). """ if mode not in ['simul', 'safe', 'simul_block']: raise ValueError('Invalid ramp mode. Please provide either "simul"' ', "safe" or "simul_block".') meas_vals = self._get_measured(['x', 'y', 'z']) # we asked for three coordinates, so we know that we got a list meas_vals = cast(list[float], meas_vals) for cur, worker in zip(meas_vals, self.submodules.values()): if not isinstance(worker, OxfordMercuryWorkerPS): raise RuntimeError(f"Expected a MercuryWorkerPS but got " f"{type(worker)}") if worker.field_target() != cur: if worker.field_ramp_rate() == 0: raise ValueError(f'Can not ramp {worker}; ramp rate set to' ' zero!') # then the actual ramp {'simul': self._ramp_simultaneously, 'safe': self._ramp_safely, 'simul_block': self._ramp_simultaneously_blocking}[mode]()
def _set_target_and_ramp(self, coordinate: str, mode: str, target: float) -> None: """Convenient method to combine setting target and ramping""" self._set_target(coordinate, target) self.ramp(mode)
[docs] def ask(self, cmd: str) -> str: """ Since Oxford Instruments implement their own version of a SCPI-like language, we implement our own reader. Note that this command is used for getting and setting (asking and writing) alike. Args: cmd: the command to send to the instrument """ visalog.debug(f"Writing to instrument {self.name}: {cmd}") resp = self.visa_handle.query(cmd) visalog.debug(f"Got instrument response: {resp}") if 'INVALID' in resp: log.error(f'Invalid command. Got response: {resp}') base_resp = resp # if the command was not invalid, it can either be a SET or a READ # SET: elif resp.endswith('VALID'): base_resp = resp.split(':')[-2] # READ: else: # For "normal" commands only (e.g. '*IDN?' is excepted): # the response of a valid command echoes back said command, # thus we remove that part base_cmd = cmd.replace('READ:', '') base_resp = resp.replace(f'STAT:{base_cmd}', '') return base_resp
MercuryiPS = OxfordMercuryiPS """Alias for backwards compatibility"""