Source code for qcodes.instrument_drivers.HP.HP_8753D

import logging
from functools import partial
from typing import TYPE_CHECKING

import numpy as np

import qcodes.validators as vals
from qcodes.instrument import VisaInstrument, VisaInstrumentKWArgs
from qcodes.parameters import ArrayParameter, ParamRawDataType

if TYPE_CHECKING:
    from typing_extensions import Unpack

    from qcodes.parameters import Parameter

log = logging.getLogger(__name__)

_unit_map = {
    "Log mag": "dB",
    "Phase": "degree",
    "Delay": "",
    "Smith chart": "dim. less",
    "Polar": "dim. less",
    "Lin mag": "dim. less",
    "Real": "",
    "Imaginary": "",
    "SWR": "dim. less",
}


def HPIntParser(value: str) -> int:
    """
    Small custom parser for ints

    Args:
        value: the VISA return string using exponential notation

    """
    return int(float(value))


[docs] class TraceNotReady(Exception): pass
class HP8753DTrace(ArrayParameter): """ Class to hold a trace from the HP8753D Although the trace can have two values per frequency, this class only returns the first value """ def __init__(self, name: str, instrument: "HP8753D"): super().__init__( name=name, shape=(1,), # is overwritten by prepare_trace label="", # is overwritten by prepare_trace unit="", # is overwritten by prepare_trace setpoint_names=("Frequency",), setpoint_labels=("Frequency",), setpoint_units=("Hz",), snapshot_get=False, instrument=instrument, ) def prepare_trace(self) -> None: """ Update setpoints, units and labels """ # we don't trust users to keep their fingers off the front panel, # so we query the instrument for all values assert isinstance(self.instrument, HP8753D) fstart = self.instrument.start_freq() fstop = self.instrument.stop_freq() npts = self.instrument.trace_points() sps = np.linspace(fstart, fstop, npts) self.setpoints = (tuple(sps),) self.shape = (len(sps),) self.label = self.instrument.s_parameter() self.unit = _unit_map[self.instrument.display_format()] self.instrument._traceready = True def get_raw(self) -> ParamRawDataType: """ Return the trace """ inst = self.instrument assert isinstance(inst, HP8753D) if not inst._traceready: raise TraceNotReady("Trace not ready. Please run prepare_trace.") inst.write("FORM2") # 32-bit floating point numbers inst.write("OUTPFORM") inst.visa_handle.read_termination = "" raw_resp = inst.visa_handle.read_raw() inst.visa_handle.read_termination = "\n" first_points = b"" # 4 bytes header, 4 bytes per point, value1's and value2's # are intertwined like: val1_001, val2_001, val1_002, val2_002... for n in range((len(raw_resp) - 4) // 4): first_points += raw_resp[4:][2 * n * 4 : (2 * n + 1) * 4] dt = np.dtype(">f") trace1 = np.frombuffer(first_points, dtype=dt) return trace1 class HP8753D(VisaInstrument): """ QCoDeS driver for the Hewlett Packard 8753D Network Analyzer. """ default_terminator = "\n" def __init__( self, name: str, address: str, **kwargs: "Unpack[VisaInstrumentKWArgs]", ) -> None: super().__init__(name, address, **kwargs) self.start_freq: Parameter = self.add_parameter( "start_freq", label="Sweep start frequency", unit="Hz", set_cmd=partial(self.invalidate_trace, "STAR {} HZ"), get_cmd="STAR?", get_parser=float, vals=vals.Numbers(30000, 6000000000), ) """Parameter start_freq""" self.stop_freq: Parameter = self.add_parameter( "stop_freq", label="Sweep stop frequency", unit="Hz", set_cmd=partial(self.invalidate_trace, "STOP {} HZ"), get_cmd="STOP?", get_parser=float, vals=vals.Numbers(30000, 6000000000), ) """Parameter stop_freq""" self.averaging: Parameter = self.add_parameter( "averaging", label="Averaging state", set_cmd="AVERO{}", get_cmd="AVERO?", val_mapping={"ON": 1, "OFF": 0}, ) """Parameter averaging""" self.number_of_averages: Parameter = self.add_parameter( "number_of_averages", label="Number of averages", set_cmd="AVERFACT{}", get_cmd="AVERFACT?", get_parser=HPIntParser, vals=vals.Ints(0, 999), ) """Parameter number_of_averages""" self.trace_points: Parameter = self.add_parameter( "trace_points", label="Number of points in trace", set_cmd=partial(self.invalidate_trace, "POIN{}"), get_cmd="POIN?", get_parser=HPIntParser, vals=vals.Enum(3, 11, 26, 51, 101, 201, 401, 801, 1601), ) """Parameter trace_points""" self.sweep_time: Parameter = self.add_parameter( "sweep_time", label="Sweep time", set_cmd="SWET{}", get_cmd="SWET?", unit="s", get_parser=float, vals=vals.Numbers(0.01, 86400), ) """Parameter sweep_time""" self.output_power: Parameter = self.add_parameter( "output_power", label="Output power", unit="dBm", set_cmd="POWE{}", get_cmd="POWE?", get_parser=float, vals=vals.Numbers(-85, 20), ) """Parameter output_power""" self.s_parameter: Parameter = self.add_parameter( "s_parameter", label="S-parameter", set_cmd=self._s_parameter_setter, get_cmd=self._s_parameter_getter, ) """Parameter s_parameter""" # DISPLAY / Y SCALE PARAMETERS self.display_format: Parameter = self.add_parameter( "display_format", label="Display format", set_cmd=self._display_format_setter, get_cmd=self._display_format_getter, ) """Parameter display_format""" # TODO: update this on startup and via display format self.display_reference: Parameter = self.add_parameter( "display_reference", label="Display reference level", unit=None, # will be set by display_format get_cmd="REFV?", set_cmd="REFV{}", get_parser=float, vals=vals.Numbers(-500, 500), ) """Parameter display_reference""" self.display_scale: Parameter = self.add_parameter( "display_scale", label="Display scale", unit=None, # will be set by display_format get_cmd="SCAL?", set_cmd="SCAL{}", get_parser=float, vals=vals.Numbers(-500, 500), ) """Parameter display_scale""" self.trace: HP8753DTrace = self.add_parameter( name="trace", parameter_class=HP8753DTrace ) """Parameter trace""" # Startup self.startup() self.connect_message() def reset(self) -> None: """ Resets the instrument to factory default state """ # use OPC to make sure we wait for operation to finish self.ask("OPC?;PRES") def run_continously(self) -> None: """ Set the instrument in run continously mode """ self.write("CONT") def run_N_times(self, N: int) -> None: """ Run N sweeps and then hold. We wait for a response before returning """ st = self.sweep_time.get_latest() if N not in range(1, 1000): raise ValueError( f"Can not run {N} times." + " please select a number from 1-999." ) # set a longer timeout, to not timeout during the sweep new_timeout = st * N + 2 with self.timeout.set_to(new_timeout): log.debug( f"Making {N} blocking sweeps." f" Setting VISA timeout to {new_timeout} s." ) self.ask(f"OPC?;NUMG{N}") def invalidate_trace(self, cmd: str, value: float | int | str) -> None: """ Wrapper for set_cmds that make the trace not ready """ self._traceready = False self.write(cmd.format(value)) def startup(self) -> None: self._traceready = False self.display_format(self.display_format()) def _s_parameter_setter(self, param: str) -> None: """ set_cmd for the s_parameter parameter """ if param not in ["S11", "S12", "S21", "S22"]: raise ValueError("Cannot set s-parameter to {}") # the trace labels changes self._traceready = False self.write(param) def _s_parameter_getter(self) -> str: """ get_cmd for the s_parameter parameter """ cmd = "" for cmd in ["S11?", "S12?", "S21?", "S22?"]: resp = self.ask(cmd) if resp in ["1", "1\n"]: break return cmd.replace("?", "") def _display_format_setter(self, fmt: str) -> None: """ set_cmd for the display_format parameter """ val_mapping = { "Log mag": "LOGM", "Phase": "PHAS", "Delay": "DELA", "Smith chart": "SMIC", "Polar": "POLA", "Lin mag": "LINM", "Real": "REAL", "Imaginary": "IMAG", "SWR": "SWR", } if fmt not in val_mapping.keys(): raise ValueError(f"Cannot set display_format to {fmt}.") self._traceready = False self.display_reference.unit = _unit_map[fmt] self.display_scale.unit = _unit_map[fmt] self.write(val_mapping[fmt]) def _display_format_getter(self) -> str: """ get_cmd for the display_format parameter """ val_mapping = { "LOGM": "Log mag", "PHAS": "Phase", "DELA": "Delay", "SMIC": "Smith chart", "POLA": "Polar", "LINM": "Lin mag", "REAL": "Real", "IMAG": "Imaginary", "SWR": "SWR", } cmd = "" # keep asking until we find the currently used format for cmd in val_mapping.keys(): resp = self.ask(f"{cmd}?") if resp in ["1", "1\n"]: break return val_mapping[cmd]