import re
from collections import namedtuple
from typing import TYPE_CHECKING, cast
import numpy as np
from typing_extensions import TypedDict, Unpack, deprecated
from qcodes.instrument import InstrumentBaseKWArgs, InstrumentChannel
from qcodes.utils import QCoDeSDeprecationWarning
from . import constants
from .constants import ChannelName, ChNr, MeasurementStatus, ModuleKind, SlotNr
from .message_builder import MessageBuilder
if TYPE_CHECKING:
import qcodes.instrument_drivers.Keysight.keysightb1500
_FMTResponse = namedtuple("_FMTResponse", "value status channel type")
class MeasurementNotTaken(Exception):
pass
def fmt_response_base_parser(raw_data_val: str) -> _FMTResponse:
"""
Parse the response from SPA for `FMT 1,0` format into a named tuple
with names, value (value of the data), status (Normal or with compliance
error such as C, T, V), channel (channel number of the output data such
as CH1,CH2), type (current 'I' or voltage 'V'). This parser is tested
for FMT1,0 and FMT1,1 response.
Args:
raw_data_val: Unparsed (raw) data for the instrument.
"""
values_separator = ","
data_val = []
data_status = []
data_channel = []
data_datatype = []
for str_value in raw_data_val.split(values_separator):
status = str_value[0]
channel_id = constants.ChannelName[str_value[1]].value
datatype = str_value[2]
value = float(str_value[3:])
data_val.append(value)
data_status.append(status)
data_channel.append(channel_id)
data_datatype.append(datatype)
data = _FMTResponse(data_val, data_status, data_channel, data_datatype)
return data
def parse_module_query_response(response: str) -> dict[SlotNr, str]:
"""
Extract installed module information from the given string and return the
information as a dictionary.
Args:
response: Response str to `UNT? 0` query.
Returns:
Dictionary from slot numbers to model name strings.
"""
pattern = r";?(?P<model>\w+),(?P<revision>\d+)"
moduleinfo = re.findall(pattern, response)
return {
SlotNr(slot_nr): model
for slot_nr, (model, rev) in enumerate(moduleinfo, start=1)
if model != "0"
}
# pattern to match dcv experiment
_pattern_lrn = re.compile(
r"(?P<status_dc>\w{1,3})(?P<chnr_dc>\w),(?P<voltage_dc>\d{1,3}.\d{1,4});"
r"(?P<status_ac>\w{1,3})(?P<chnr_ac>\w),(?P<voltage_ac>\d{1,3}.\d{1,4});"
r"(?P<status_fc>\w{1,2})(?P<chnr_fc>\w),(?P<frequency>\d{1,6}.\d{1,4})"
)
def parse_dcv_measurement_response(response: str) -> dict[str, str | float]:
"""
Extract status, channel number, value and accompanying metadata from
the string and return them as a dictionary.
Args:
response: Response str to lrn_query For the MFCMU.
"""
match = re.match(_pattern_lrn, response)
if match is None:
raise ValueError(f"{response!r} didn't match {_pattern_lrn!r} pattern")
dd = match.groupdict()
d = cast(dict[str, str | float], dd)
return d
# Pattern to match the spot measurement response against
_pattern = re.compile(
r"((?P<status>\w)(?P<channel>\w)(?P<dtype>\w))?"
r"(?P<value>[+-]\d{1,3}\.\d{3,6}E[+-]\d{2})"
)
class SpotResponse(TypedDict):
value: float
status: MeasurementStatus
channel: ChannelName
dtype: str
def parse_spot_measurement_response(response: str) -> SpotResponse:
"""
Extract measured value and accompanying metadata from the string
and return them as a dictionary.
Args:
response: Response str to spot measurement query.
Returns:
Dictionary with measured value and associated metadata (e.g.
timestamp, channel number, etc.)
"""
match = re.match(_pattern, response)
if match is None:
raise ValueError(f"{response!r} didn't match {_pattern!r} pattern")
dd = match.groupdict()
d = SpotResponse(
value=_convert_to_nan_if_dummy_value(float(dd["value"])),
status=MeasurementStatus[dd["status"]],
channel=ChannelName[dd["channel"]],
dtype=dd["dtype"],
)
return d
_DCORRResponse = namedtuple("_DCORRResponse", "mode primary secondary")
def parse_dcorr_query_response(response: str) -> _DCORRResponse:
"""
Parse string response of ``DCORR?`` `command into a named tuple of
:class:`constants.DCORR.Mode` and primary and secondary reference or
calibration values.
"""
mode, primary, secondary = response.split(",")
return _DCORRResponse(
mode=constants.DCORR.Mode(int(mode)),
primary=float(primary),
secondary=float(secondary),
)
def fixed_negative_float(response: str) -> float:
"""
Keysight sometimes responds for ex. '-0.-1' as an output when you input
'-0.1'. This function can convert such strings also to float.
"""
if len(response.split(".")) > 2:
raise ValueError("String must of format `a` or `a.b`")
parts = response.split(".")
number = parts[0]
decimal = parts[1] if len(parts) > 1 else "0"
decimal = decimal.replace("-", "")
output = ".".join([number, decimal])
return float(output)
_dcorr_labels_units_map = {
constants.DCORR.Mode.Cp_G: dict(
primary=dict(label="Cp", unit="F"), secondary=dict(label="G", unit="S")
),
constants.DCORR.Mode.Ls_Rs: dict(
primary=dict(label="Ls", unit="H"), secondary=dict(label="Rs", unit="Ω")
),
}
def format_dcorr_response(r: _DCORRResponse) -> str:
"""
Format a given response tuple ``_DCORRResponse`` from
``DCORR?`` command as a human-readable string.
"""
labels_units = _dcorr_labels_units_map[r.mode]
primary = labels_units["primary"]
secondary = labels_units["secondary"]
result_str = (
f"Mode: {r.mode.name}, "
f"Primary {primary['label']}: {r.primary} {primary['unit']}, "
f"Secondary {secondary['label']}: {r.secondary} {secondary['unit']}"
)
return result_str
def get_name_label_unit_of_impedance_model(
mode: constants.IMP.MeasurementMode,
) -> tuple[tuple[str, str], tuple[str, str], tuple[str, str]]:
params = mode.name.split("_")
param1 = params[0]
param2 = "_".join(params[1:])
label = (constants.IMP.Name[param1].value, constants.IMP.Name[param2].value)
unit = (constants.IMP.Unit[param1].value, constants.IMP.Unit[param2].value)
name = (label[0].lower().replace(" ", "_"), label[1].lower().replace(" ", "_"))
return name, label, unit
# TODO notes:
# - [ ] Instead of generating a Qcodes InstrumentChannel for each **module**,
# it might make more sense to generate one for each **channel**
def get_measurement_summary(status_array: np.ndarray) -> str:
unique_error_statuses = np.unique(status_array[status_array != "N"])
if len(unique_error_statuses) > 0:
summary = " ".join(
constants.MeasurementStatus[err] for err in unique_error_statuses
)
else:
summary = constants.MeasurementStatus["N"]
return summary
def convert_dummy_val_to_nan(param: _FMTResponse) -> None:
"""
Converts dummy value to NaN. Instrument may output dummy value (
199.999E+99) if measurement data is over the measurement range. Or the
sweep measurement was aborted by the automatic stop function or power
compliance. Or if any abort condition is detected. Dummy data
199.999E+99 will be returned for the data after abort."
Args:
param: This must be of type named tuple _FMTResponse.
"""
for index, value in enumerate(param.value):
param.value[index] = _convert_to_nan_if_dummy_value(param.value[index])
def _convert_to_nan_if_dummy_value(value: float) -> float:
return float("nan") if value > 1e99 else value
[docs]
class KeysightB1500Module(InstrumentChannel):
"""Base class for all modules of B1500 Parameter Analyzer
When subclassing,
- set ``MODULE_KIND`` attribute to the correct module kind
:class:`~.constants.ModuleKind` that the module is.
- populate ``channels`` attribute according to the number of
channels that the module has.
Args:
parent: Mainframe B1500 instance that this module belongs to
name: Name of the instrument instance to create. If `None`
(Default), then the name is autogenerated from the instrument
class.
slot_nr: Slot number of this module (not channel number)
"""
MODULE_KIND: ModuleKind
def __init__(
self,
parent: "qcodes.instrument_drivers.Keysight.keysightb1500.KeysightB1500",
name: str | None,
slot_nr: int,
**kwargs: Unpack[InstrumentBaseKWArgs],
):
# self.channels will be populated in the concrete module subclasses
# because channel count is module specific
self.channels: tuple[ChNr, ...]
self.slot_nr = SlotNr(slot_nr)
if name is None:
number = len(parent.by_kind[self.MODULE_KIND]) + 1
name = self.MODULE_KIND.lower() + str(number)
super().__init__(parent=parent, name=name, **kwargs)
# Response parsing functions as static methods for user convenience
parse_spot_measurement_response = parse_spot_measurement_response
parse_module_query_response = parse_module_query_response
[docs]
def enable_outputs(self) -> None:
"""
Enables all outputs of this module by closing the output relays of its
channels.
"""
# TODO This always enables all outputs of a module, which is maybe not
# desirable. (Also check the TODO item at the top about
# InstrumentChannel per Channel instead of per Module.
msg = MessageBuilder().cn(self.channels).message
self.write(msg)
[docs]
def disable_outputs(self) -> None:
"""
Disables all outputs of this module by opening the output relays of its
channels.
"""
# TODO See enable_output TODO item
msg = MessageBuilder().cl(self.channels).message
self.write(msg)
[docs]
def is_enabled(self) -> bool:
"""
Check if channels of this module are enabled.
Returns:
`True` if *all* channels of this module are enabled. `False`,
otherwise.
"""
# TODO If a module has multiple channels, and only one is enabled, then
# this will return false, which is probably not desirable.
# Also check the TODO item at the top about InstrumentChannel per
# Channel instead of per Module.
msg = MessageBuilder().lrn_query(constants.LRN.Type.OUTPUT_SWITCH).message
response = self.ask(msg)
activated_channels = re.sub(r"[^,\d]", "", response).split(",")
is_enabled = set(self.channels).issubset(
int(x) for x in activated_channels if x != ""
)
return is_enabled
[docs]
def clear_timer_count(self) -> None:
"""
This command clears the timer count. This command is effective for
all measurement modes, regardless of the TSC setting. This command
is not effective for the 4 byte binary data output format
(FMT3 and FMT4).
"""
self.root_instrument.clear_timer_count(chnum=self.channels)
@deprecated("Use KeysightB1500Module", category=QCoDeSDeprecationWarning, stacklevel=2)
class B1500Module(KeysightB1500Module):
pass
class StatusMixin:
def __init__(self) -> None:
self.names = tuple(["param1", "param2"])
def status_summary(self) -> dict[str, str]:
return_dict: dict[str, str] = {}
for name_index, name in enumerate(self.names):
param_data: _FMTResponse = getattr(self, f"param{name_index+1}")
status_array = param_data.status
if status_array is None:
self_full_name = getattr(self, "full_name", "this")
raise MeasurementNotTaken(
f"First run sweep measurement with {self_full_name} "
f"parameter to obtain the data; then it will be possible "
f"to obtain status summary for that data."
)
summary = get_measurement_summary(status_array)
return_dict[name] = summary
return return_dict