"""
Driver for the Keithley S46 RF switch
"""
import re
from itertools import product
from typing import TYPE_CHECKING, Any, ClassVar
from qcodes.instrument import (
Instrument,
VisaInstrument,
VisaInstrumentKWArgs,
)
from qcodes.parameters import Parameter, ParamRawDataType
if TYPE_CHECKING:
from typing_extensions import Unpack
[docs]
class KeithleyS46LockAcquisitionError(Exception):
pass
[docs]
class KeithleyS46RelayLock:
"""
The S46 either has six pole or a four pole relays. For example, channels
'A1' to 'A6' are all on relay 'A'. However, channels 'R1' to 'R8' are all on
individual relays.
Only one channel per relay may be closed at any given time to prevent
degradation of RF performance and even switch damage. See page 2-11
of the manual. To enforce this, a lock mechanism has been implemented.
"""
def __init__(self, relay_name: str):
self.relay_name = relay_name
self._locked_by: int | None = None
[docs]
def acquire(self, channel_number: int) -> None:
"""
Request a lock acquisition
"""
if self._locked_by is not None and self._locked_by != channel_number:
raise KeithleyS46LockAcquisitionError(
f"Relay {self.relay_name} is already in use by channel "
f"{self._locked_by}"
)
else:
self._locked_by = channel_number
[docs]
def release(self, channel_number: int) -> None:
"""
Release a lock.
"""
if self._locked_by == channel_number:
self._locked_by = None
class S46Parameter(Parameter):
"""
A parameter class for S46 channels. We do not use the QCoDeS
InstrumentChannel class because our channel has one state parameter,
which can either be "open" or "close".
Args:
name
instrument
channel_number
lock: Acquire the lock when closing and release when opening
"""
def __init__(
self,
name: str,
instrument: Instrument | None,
channel_number: int,
lock: KeithleyS46RelayLock,
**kwargs: Any,
):
super().__init__(name, instrument=instrument, **kwargs)
self._lock = lock
self._channel_number = channel_number
if self._get(get_cached=True) == "close":
try:
self._lock.acquire(self._channel_number)
except KeithleyS46LockAcquisitionError as e:
raise RuntimeError(
"The driver is initialized from an undesirable instrument "
"state where more then one channel on a single relay is "
"closed. It is advised to power cycle the instrument. "
"Refusing to initialize driver!"
) from e
def _get(self, get_cached: bool) -> str:
assert isinstance(self.instrument, KeithleyS46)
closed_channels = self.instrument.closed_channels.get_latest()
if not get_cached or closed_channels is None:
closed_channels = self.instrument.closed_channels.get()
return "close" if self.name in closed_channels else "open"
def get_raw(self) -> ParamRawDataType:
return self._get(get_cached=False)
def set_raw(self, value: ParamRawDataType) -> None:
if value == "close":
self._lock.acquire(self._channel_number)
elif value == "open":
self._lock.release(self._channel_number)
if self.instrument is None:
raise RuntimeError(
"Cannot set the value on a parameter "
"that is not attached to an instrument."
)
self.instrument.write(f":{value} (@{self._channel_number})")
def is_closed(self) -> bool:
"""
Returns: True if channels is closed, False otherwise.
"""
return self.get() == "close"
@property
def channel_number(self) -> int:
return self._channel_number
[docs]
class KeithleyS46(VisaInstrument):
relay_names: list[str] = ["A", "B", "C", "D"] + [f"R{j}" for j in range(1, 9)]
# Make a dictionary where keys are channel aliases (e.g. 'A1', 'B3', etc)
# and values are corresponding channel numbers.
channel_numbers: ClassVar[dict[str, int]] = {
f"{a}{b}": count + 1
for count, (a, b) in enumerate(product(["A", "B", "C", "D"], range(1, 7)))
}
channel_numbers.update({f"R{i}": i + 24 for i in range(1, 9)})
# Make a reverse dict for efficient alias lookup given a channel number
aliases: ClassVar[dict[int, str]] = {v: k for k, v in channel_numbers.items()}
default_terminator = "\n"
def __init__(
self,
name: str,
address: str,
**kwargs: "Unpack[VisaInstrumentKWArgs]",
):
super().__init__(name, address, **kwargs)
try:
self.closed_channels: Parameter = self.add_parameter(
"closed_channels",
get_cmd=":CLOS?",
get_parser=self._get_closed_channels_parser,
)
"""Parameter closed_channels"""
self._available_channels: list[str] = []
for relay_name, channel_count in zip(
KeithleyS46.relay_names, self.relay_layout
):
relay_lock = KeithleyS46RelayLock(relay_name)
for channel_index in range(1, channel_count + 1):
# E.g. For channel 'B2', channel_index is 2
if channel_count > 1:
alias = f"{relay_name}{channel_index}"
else:
alias = relay_name # For channels R1 to R8, we have one
# channel per relay. Channel alias = relay name
self.add_parameter(
alias,
channel_number=KeithleyS46.channel_numbers[alias],
lock=relay_lock,
parameter_class=S46Parameter,
)
self._available_channels.append(alias)
except RuntimeError as err:
# If we error on undesirable state we want to make sure
# we also close the visa connection
self.close()
raise err
@staticmethod
def _get_closed_channels_parser(reply: str) -> list[str]:
"""
The SCPI command ":CLOS ?" returns a reply in the form
"(@1,9)", if channels 1 and 9 are closed. Return a list of
strings, representing the aliases of the closed channels
"""
closed_channels_str = re.findall(r"\d+", reply)
return [KeithleyS46.aliases[int(i)] for i in closed_channels_str]
[docs]
def open_all_channels(self) -> None:
for channel_name in self.closed_channels():
self.parameters[channel_name].set("open")
@property
def relay_layout(self) -> list[int]:
"""
The relay layout tells us how many channels we have per relay. Note
that we can have zero channels per relay.
"""
return [int(i) for i in self.ask(":CONF:CPOL?").split(",")]
@property
def available_channels(self) -> list[str]:
return self._available_channels