Source code for qcodes.station

"""
Station objects - collect all the equipment you use to do an experiment.
"""

from __future__ import annotations

import importlib
import inspect
import itertools
import json
import logging
import os
import pkgutil
import warnings
from collections import deque
from contextlib import suppress
from copy import copy, deepcopy
from functools import partial
from io import StringIO
from typing import (
    IO,
    TYPE_CHECKING,
    Any,
    AnyStr,
    ClassVar,
    NoReturn,
    Union,
    cast,
    overload,
)

import jsonschema
import jsonschema.exceptions

import qcodes
import qcodes.instrument_drivers
from qcodes import validators
from qcodes.instrument.base import Instrument, InstrumentBase
from qcodes.instrument.channel import ChannelTuple
from qcodes.metadatable import Metadatable, MetadatableWithName
from qcodes.monitor.monitor import Monitor
from qcodes.parameters import (
    DelegateParameter,
    ManualParameter,
    Parameter,
    ParameterBase,
)
from qcodes.utils import (
    DelegateAttributes,
    checked_getattr_indexed,
    get_qcodes_path,
    get_qcodes_user_path,
)

if TYPE_CHECKING:
    from collections.abc import Iterable, Sequence
    from pathlib import Path
    from types import ModuleType

log = logging.getLogger(__name__)

PARAMETER_ATTRIBUTES = ['label', 'unit', 'scale', 'inter_delay', 'post_delay',
                        'step', 'offset']

SCHEMA_TEMPLATE_PATH = os.path.join(
    get_qcodes_path('dist', 'schemas'),
    'station-template.schema.json')
SCHEMA_PATH = get_qcodes_user_path('schemas', 'station.schema.json')
STATION_YAML_EXT = '*.station.yaml'


[docs] def get_config_enable_forced_reconnect() -> bool: return qcodes.config["station"]["enable_forced_reconnect"]
[docs] def get_config_default_folder() -> str | None: return qcodes.config["station"]["default_folder"]
[docs] def get_config_default_file() -> str | None: return qcodes.config["station"]["default_file"]
[docs] def get_config_use_monitor() -> str | None: return qcodes.config["station"]["use_monitor"]
ChannelOrInstrumentBase = Union[InstrumentBase, ChannelTuple]
[docs] class ValidationWarning(Warning): """Replacement for jsonschema.error.ValidationError as warning.""" pass
[docs] class StationConfig(dict[Any, Any]):
[docs] def snapshot(self, update: bool = True) -> StationConfig: return self
[docs] class Station(Metadatable, DelegateAttributes): """ A representation of the entire physical setup. Lists all the connected Components and the current default measurement (a list of actions). Args: *components: components to add immediately to the Station. Can be added later via ``self.add_component``. config_file: Path to YAML files to load the station config from. - If only one yaml file needed to be loaded, it should be passed as a string, e.g., '~/station.yaml' - If more than one yaml file needed, they should be supplied as a sequence of strings, e.g. ['~/station1.yaml', '~/station2.yaml'] use_monitor: Should the QCoDeS monitor be activated for this station. default: Is this station the default? update_snapshot: Immediately update the snapshot of each component as it is added to the Station. """ default: Station | None = None "Class attribute to store the default station." delegate_attr_dicts: ClassVar[list[str]] = ["components"] """ A list of names (strings) of dictionaries which are (or will be) attributes of ``self``, whose keys should be treated as attributes of ``self``. """ config: StationConfig | None = None """ A user dict representing the YAML file that the station was loaded from""" def __init__( self, *components: MetadatableWithName, config_file: str | Sequence[str] | None = None, use_monitor: bool | None = None, default: bool = True, update_snapshot: bool = True, **kwargs: Any, ) -> None: super().__init__(**kwargs) # when a new station is defined, store it in a class variable # so it becomes the globally accessible default station. # You can still have multiple stations defined, but to use # other than the default one you must specify it explicitly. # If for some reason you want this new Station NOT to be the # default, just specify default=False if default: Station.default = self self.components: dict[str, MetadatableWithName] = {} for item in components: self.add_component(item, update_snapshot=update_snapshot) self.use_monitor = use_monitor self._added_methods: list[str] = [] self._monitor_parameters: list[Parameter] = [] if config_file is None: self.config_file = [] elif isinstance(config_file, str): self.config_file = [config_file, ] else: self.config_file = list(config_file) self.load_config_files(*self.config_file)
[docs] def snapshot_base( self, update: bool | None = True, params_to_skip_update: Sequence[str] | None = None, ) -> dict[Any, Any]: """ State of the station as a JSON-compatible dictionary (everything that the custom JSON encoder class :class:`.NumpyJSONEncoder` supports). Note: If the station contains an instrument that has already been closed, not only will it not be snapshotted, it will also be removed from the station during the execution of this function. Args: update: If ``True``, update the state by querying the all the children: f.ex. instruments, parameters, components, etc. If None only update if the state is known to be invalid. If ``False``, just use the latest values in memory and never update the state. params_to_skip_update: Not used. Returns: dict: Base snapshot. """ snap: dict[str, Any] = { 'instruments': {}, 'parameters': {}, 'components': {}, 'config': self.config, } components_to_remove = [] for name, itm in self.components.items(): if isinstance(itm, Instrument): # instruments can be closed during the lifetime of the # station object, hence this 'if' allows to avoid # snapshotting instruments that are already closed if Instrument.is_valid(itm): snap['instruments'][name] = itm.snapshot(update=update) else: components_to_remove.append(name) elif isinstance(itm, (Parameter, ManualParameter )): if not itm.snapshot_exclude: snap['parameters'][name] = itm.snapshot(update=update) else: snap['components'][name] = itm.snapshot(update=update) for c in components_to_remove: self.remove_component(c) return snap
[docs] def add_component( self, component: MetadatableWithName, name: str | None = None, update_snapshot: bool = True, ) -> str: """ Record one component as part of this Station. Args: component: Components to add to the Station. name: Name of the component. update_snapshot: Immediately update the snapshot of each component as it is added to the Station. Returns: str: The name assigned this component, which may have been changed to make it unique among previously added components. """ try: if not (isinstance(component, Parameter) and component.snapshot_exclude): component.snapshot(update=update_snapshot) except Exception: pass if name is None: name = getattr(component, "name", f"component{len(self.components)}") namestr = str(name) if namestr in self.components.keys(): raise RuntimeError( f'Cannot add component "{namestr}", because a ' 'component of that name is already registered to the station') self.components[namestr] = component return namestr
[docs] def remove_component(self, name: str) -> MetadatableWithName | None: """ Remove a component with a given name from this Station. Args: name: Name of the component. Returns: The component that has been removed (this behavior is the same as for Python dictionaries). Raises: KeyError: If a component with the given name is not part of this station. """ try: return self.components.pop(name) except KeyError as e: if name in str(e): raise KeyError(f'Component {name} is not part of the station') else: raise e
[docs] def get_component(self, full_name: str) -> MetadatableWithName: """ Get a (sub)component with a given name from this Station. The name may be of a component that is a sub-component of another component, e.g. a parameter on an instrument, an instrument module or an top-level instrument. Args: full_name: Name of the component. Returns: The component with the given name. Raises: KeyError: If a component with the given name is not part of this station. """ def find_component( potential_top_level_name: str, remaining_name_parts: list[str] ) -> tuple[MetadatableWithName, list[str]]: toplevel_component = self.components.get(potential_top_level_name, None) if toplevel_component is not None: return toplevel_component, remaining_name_parts else: if len(remaining_name_parts) < 1: raise KeyError(f"Component {full_name} is not part of the station") extra = remaining_name_parts.pop() new_potential_name = f"{potential_top_level_name}_{extra}" return find_component(new_potential_name, remaining_name_parts) name_parts = full_name.split("_") name_parts.reverse() potential_top_level_name = name_parts.pop() toplevel_component, remaining_name_parts = find_component( potential_top_level_name, name_parts ) if len(remaining_name_parts) == 0: return toplevel_component remaining_name_parts.reverse() remaining_name = "_".join(remaining_name_parts) if isinstance(toplevel_component, InstrumentBase): component = toplevel_component.get_component(remaining_name) else: raise KeyError( f"Found component {toplevel_component} but this has no " f"sub-component {remaining_name}." ) return component
# station['someitem'] and station.someitem are both # shortcuts to station.components['someitem'] # (assuming 'someitem' doesn't have another meaning in Station)
[docs] def __getitem__(self, key: str) -> Metadatable: """Shortcut to components dictionary.""" return self.components[key]
[docs] def close_all_registered_instruments(self) -> None: """ Closes all instruments that are registered to this `Station` object by calling the :meth:`.base.Instrument.close`-method on each one. The instruments will be removed from the station and from the QCoDeS monitor. """ for c in tuple(self.components.values()): if isinstance(c, Instrument): self.close_and_remove_instrument(c)
@staticmethod def _get_config_file_path(filename: str | None = None) -> str | None: """ Methods to get complete path of a provided file. If not able to find path then returns None. """ filename = filename or get_config_default_file() if filename is None: return None search_list = [filename] if (not os.path.isabs(filename) and get_config_default_folder() is not None): config_folder = cast(str, get_config_default_folder()) search_list += [os.path.join(config_folder, filename)] for p in search_list: if os.path.isfile(p): return p return None
[docs] def load_config_file(self, filename: str | None = None) -> None: """ Loads a configuration from a YAML file. If `filename` is not specified the default file name from the qcodes configuration will be used. Loading of a configuration will update the snapshot of the station and make the instruments described in the config file available for instantiation with the :meth:`load_instrument` method. Additionally the shortcut methods ``load_<instrument_name>`` will be updated. """ path = self._get_config_file_path(filename) if path is None: if filename is not None: raise FileNotFoundError(path) else: if get_config_default_file() is not None: log.warning( 'Could not load default config for Station: \n' f'File {get_config_default_file()} not found. \n' 'You can change the default config file in ' '`qcodesrc.json`.') return with open(path) as f: self.load_config(f)
[docs] def load_config_files(self, *filenames: str ) -> None: """ Loads configuration from multiple YAML files after merging them into one. If `filenames` are not specified the default file name from the qcodes configuration will be used. Loading of configuration will update the snapshot of the station and make the instruments described in the config files available for instantiation with the :meth:`load_instrument` method. Additionally the shortcut methods ``load_<instrument_name>`` will be updated. """ if len(filenames) == 0: self.load_config_file() else: paths = list() for filename in filenames: assert isinstance(filename, str) path = self._get_config_file_path(filename) if path is None and filename is not None: raise FileNotFoundError(path) paths.append(path) yamls = _merge_yamls(*paths) self.load_config(yamls)
[docs] def load_config(self, config: str | IO[AnyStr]) -> None: """ Loads a configuration from a supplied string or file/stream handle. The string or file/stream is expected to be YAML formatted Loading of a configuration will update the snapshot of the station and make the instruments described in the config file available for instantiation with the :meth:`load_instrument` method. Additionally the shortcut methods ``load_<instrument_name>`` will be updated. """ def update_station_configuration_snapshot() -> None: self.config = StationConfig(self._config) def update_load_instrument_methods() -> None: # create shortcut methods to instantiate instruments via # `load_<instrument_name>()` so that autocompletion can be used # first remove methods that have been added by a previous # :meth:`load_config_file` call while len(self._added_methods): delattr(self, self._added_methods.pop()) # add shortcut methods for instrument_name in self._instrument_config.keys(): method_name = f'load_{instrument_name}' if method_name.isidentifier(): setattr(self, method_name, partial( self.load_instrument, identifier=instrument_name)) self._added_methods.append(method_name) else: log.warning(f'Invalid identifier: ' f'for the instrument {instrument_name} no ' f'lazy loading method {method_name} could ' 'be created in the Station.') # Load template schema, and thereby don't fail on instruments that are # not included in the user schema. import ruamel.yaml # lazy import yaml = ruamel.yaml.YAML().load(config) with open(SCHEMA_TEMPLATE_PATH) as f: schema = json.load(f) try: jsonschema.validate(yaml, schema) except jsonschema.exceptions.ValidationError as e: message = str(e) warnings.warn(message, ValidationWarning) self._config = yaml self._instrument_config = self._config['instruments'] update_station_configuration_snapshot() update_load_instrument_methods()
[docs] def close_and_remove_instrument(self, instrument: Instrument | str) -> None: """ Safely close instrument and remove from station and monitor list. """ # remove parameters related to this instrument from the # monitor list if isinstance(instrument, str): instrument = Instrument.find_instrument(instrument) self._monitor_parameters = [v for v in self._monitor_parameters if v.root_instrument is not instrument] # remove instrument from station snapshot self.remove_component(instrument.name) # del will remove weakref and close the instrument instrument.close() del instrument
[docs] def load_instrument(self, identifier: str, revive_instance: bool = False, **kwargs: Any) -> Instrument: """ Creates an :class:`~.Instrument` instance as described by the loaded configuration file. Args: identifier: The identifying string that is looked up in the yaml configuration file, which identifies the instrument to be added. revive_instance: If ``True``, try to return an instrument with the specified name instead of closing it and creating a new one. **kwargs: Additional keyword arguments that get passed on to the ``__init__``-method of the instrument to be added. """ # try to revive the instrument if revive_instance and Instrument.exist(identifier): return Instrument.find_instrument(identifier) # load file # try to reload file on every call. This makes script execution a # little slower but makes the overall workflow more convenient. self.load_config_files(*self.config_file) # load from config if identifier not in self._instrument_config.keys(): raise RuntimeError(f'Instrument {identifier} not found in ' 'instrument config file') instr_cfg = self._instrument_config[identifier] # TODO: add validation of config for better verbose errors: # check if instrument is already defined and close connection if instr_cfg.get('enable_forced_reconnect', get_config_enable_forced_reconnect()): with suppress(KeyError): self.close_and_remove_instrument(identifier) # instantiate instrument init_kwargs = instr_cfg.get('init', {}) # somebody might have a empty init section in the config init_kwargs = {} if init_kwargs is None else init_kwargs if 'address' in instr_cfg: init_kwargs['address'] = instr_cfg['address'] if 'port' in instr_cfg: init_kwargs['port'] = instr_cfg['port'] # make explicitly passed arguments override the ones from the config # file. # We are mutating the dict below # so make a copy to ensure that any changes # does not leek into the station config object # specifically we may be passing non picklable # instrument instances via kwargs instr_kwargs = deepcopy(init_kwargs) instr_kwargs.update(kwargs) name = instr_kwargs.pop('name', identifier) module_name = ".".join(instr_cfg["type"].split(".")[:-1]) instr_class_name = instr_cfg["type"].split(".")[-1] module = importlib.import_module(module_name) instr_class = getattr(module, instr_class_name) instr = instr_class(name=name, **instr_kwargs) def resolve_instrument_identifier( instrument: ChannelOrInstrumentBase, identifier: str ) -> ChannelOrInstrumentBase: """ Get the instrument, channel or channel_list described by a nested string. E.g: 'dac.ch1' will return the instance of ch1, 'dac.channels[0]' returns the first item of the channels property. """ levels = identifier.split(".") level = levels[0] try: for level in levels: instrument = checked_getattr_indexed( instrument, level, (InstrumentBase, ChannelTuple) ) except TypeError: raise RuntimeError( f'Cannot resolve `{level}` in {identifier} to an ' f'instrument/channel for base instrument ' f'{instrument!r}.') return instrument def resolve_parameter_identifier( instrument: ChannelOrInstrumentBase, identifier: str ) -> ParameterBase: parts = identifier.split('.') if len(parts) > 1: instrument = resolve_instrument_identifier( instrument, '.'.join(parts[:-1])) try: return checked_getattr_indexed(instrument, parts[-1], ParameterBase) except TypeError: raise RuntimeError( f'Cannot resolve parameter identifier `{identifier}` to ' f'a parameter on instrument {instrument!r}.') def setup_parameter_from_dict( parameter: ParameterBase, options: dict[str, Any] ) -> None: for attr, val in options.items(): if attr in PARAMETER_ATTRIBUTES: # set the attributes of the parameter, that map 1 to 1 setattr(parameter, attr, val) # extra attributes that need parsing elif attr == 'limits': lower, upper = val parameter.vals = validators.Numbers(lower, upper) elif attr == 'monitor' and val is True: if isinstance(parameter, Parameter): self._monitor_parameters.append(parameter) else: raise RuntimeError(f"Trying to add {parameter} to " f"monitored parameters. But it's " f"not a Parameter but a" f" {type(parameter)}") elif attr == 'alias': setattr(parameter.instrument, val, parameter) elif attr == 'initial_value': # skip value attribute so that it gets set last # when everything else has been set up pass else: log.warning( f"Attribute {attr} not recognized when " f'instantiating parameter "{parameter.name}"' ) if "initial_value" in options: parameter.set(options["initial_value"]) def add_parameter_from_dict( instr: InstrumentBase, name: str, options: dict[str, Any] ) -> None: # keep the original dictionary intact for snapshot options = copy(options) param_type: type = ParameterBase kwargs = {} if 'source' in options: param_type = DelegateParameter kwargs['source'] = resolve_parameter_identifier( instr.root_instrument, options['source']) options.pop('source') instr.add_parameter(name, param_type, **kwargs) setup_parameter_from_dict(instr.parameters[name], options) def update_monitor() -> None: if ((self.use_monitor is None and get_config_use_monitor()) or self.use_monitor): # restart Monitor Monitor(*self._monitor_parameters) for name, options in instr_cfg.get('parameters', {}).items(): parameter = resolve_parameter_identifier(instr, name) setup_parameter_from_dict(parameter, options) for name, options in instr_cfg.get('add_parameters', {}).items(): parts = name.split('.') local_instr = ( instr if len(parts) < 2 else resolve_instrument_identifier(instr, '.'.join(parts[:-1]))) if isinstance(local_instr, ChannelTuple): raise RuntimeError("A parameter cannot be added to an ChannelTuple") add_parameter_from_dict(local_instr, parts[-1], options) self.add_component(instr) update_monitor() return instr
@overload def load_all_instruments( self, only_names: None, only_types: Iterable[str], ) -> tuple[str, ...]: ... @overload def load_all_instruments( self, only_names: Iterable[str], only_types: None, ) -> tuple[str, ...]: ... @overload def load_all_instruments( self, only_names: None, only_types: None, ) -> tuple[str, ...]: ... @overload def load_all_instruments( self, only_names: Iterable[str], only_types: Iterable[str], ) -> NoReturn: ...
[docs] def load_all_instruments( self, only_names: Iterable[str] | None = None, only_types: Iterable[str] | None = None, ) -> tuple[str, ...]: """ Load all instruments specified in the loaded YAML station configuration. Optionally, the instruments to be loaded can be filtered by their names or types, use ``only_names`` and ``only_types`` arguments for that. It is an error to supply both ``only_names`` and ``only_types``. Args: only_names: List of instrument names to load from the config. If left as None, then all instruments are loaded. only_types: List of instrument types e.g. the class names of the instruments to load. If left as None, then all instruments are loaded. Returns: The names of the loaded instruments """ config = self.config if config is None: raise ValueError("Station has no config") instrument_names_to_load = set() if only_names is None and only_types is None: instrument_names_to_load = set(config["instruments"].keys()) elif only_types is None and only_names is not None: instrument_names_to_load = set(only_names) elif only_types is not None and only_names is None: for inst_name, inst_dict in config["instruments"].items(): if "driver" in inst_dict.keys(): # fallback for old format where type was used # together with the driver key. inst_type = inst_dict["type"] else: inst_type = inst_dict["type"].split(".")[-1] if inst_type in only_types: instrument_names_to_load.add(inst_name) else: raise ValueError( "It is an error to supply both ``only_names`` " "and ``only_types`` arguments." ) for instrument in instrument_names_to_load: self.load_instrument(instrument) return tuple(instrument_names_to_load)
[docs] def update_config_schema( additional_instrument_modules: list[ModuleType] | None = None, ) -> None: """Update the json schema file 'station.schema.json'. Args: additional_instrument_modules: python modules that contain :class:`qcodes.instrument.base.InstrumentBase` definitions (and subclasses thereof) to be included as values for instrument definition in the station definition yaml files. """ def instrument_names_from_module(module: ModuleType) -> tuple[str, ...]: submodules = list(pkgutil.walk_packages(module.__path__, module.__name__ + ".")) res = set() for s in submodules: try: ms = inspect.getmembers( importlib.import_module(s.name), inspect.isclass) except Exception: ms = [] new_members = [ f"{instr[1].__module__}.{instr[1].__name__}" for instr in ms if (issubclass(instr[1], InstrumentBase) and instr[1].__module__.startswith(module.__name__)) ] res.update(new_members) return tuple(res) def update_schema_file( template_path: str, output_path: str, instrument_names: tuple[str, ...] ) -> None: with open(template_path, 'r+') as f: data = json.load(f) data['definitions']['instruments']['enum'] = instrument_names if os.path.exists(output_path): os.remove(output_path) os.makedirs(os.path.dirname(output_path), exist_ok=True) with open(output_path, 'w') as f: json.dump(data, f, indent=4) additional_instrument_modules = additional_instrument_modules or [] instrument_modules: set[ModuleType] = set( [qcodes.instrument_drivers] + additional_instrument_modules ) instrument_names = tuple( itertools.chain.from_iterable( instrument_names_from_module(m) for m in instrument_modules ) ) update_schema_file( template_path=SCHEMA_TEMPLATE_PATH, output_path=SCHEMA_PATH, instrument_names=instrument_names, )
def _merge_yamls(*yamls: str | Path) -> str: """ Merge multiple station yamls files into one and stores it in the memory. Args: yamls: string or Path to yaml files separated by comma. Returns: Full yaml file stored in the memory. Returns an empty string if no files are given. """ import ruamel.yaml # lazy import if len(yamls) == 0: return "" if len(yamls) == 1: with open(yamls[0]) as file: content = file.read() return content top_key = "instruments" yaml = ruamel.yaml.YAML() deq: deque[Any] = deque() # Load the yaml files and add to deque in reverse entry order for filepath in yamls[::-1]: with open(filepath) as file_pointer: deq.append(yaml.load(file_pointer)) data1 = None # Add the top key entries from filepath n to filepath n-1 to # ... filepath 1. while len(deq) > 1: data2, data1 = deq[0], deq[1] for entry in data2[top_key]: if entry not in data1[top_key].keys(): data1[top_key].update({entry: data2[top_key][entry]}) else: raise KeyError( f"duplicate key `{entry}` detected among files:" f"{ ','.join(map(str, yamls))}" ) deq.popleft() assert data1 is not None with StringIO() as merged_yaml_stream: yaml.dump(data1, merged_yaml_stream) merged_yaml = merged_yaml_stream.getvalue() return merged_yaml