from __future__ import annotations
import itertools
import logging
import time
from collections.abc import Callable, Mapping, Sequence
from contextlib import ExitStack
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Literal, cast, overload
import numpy as np
from opentelemetry import trace
from tqdm.auto import tqdm
from typing_extensions import TypedDict
from qcodes import config
from qcodes.dataset.descriptions.detect_shapes import detect_shape_of_measurement
from qcodes.dataset.dond.do_nd_utils import (
BreakConditionInterrupt,
_handle_plotting,
_register_actions,
_register_parameters,
_set_write_period,
catch_interrupts,
)
from qcodes.dataset.measurements import Measurement
from qcodes.dataset.threading import (
SequentialParamsCaller,
ThreadPoolParamsCaller,
process_params_meas,
)
from qcodes.parameters import ParameterBase
from .sweeps import AbstractSweep, TogetherSweep
LOG = logging.getLogger(__name__)
if TYPE_CHECKING:
from qcodes.dataset.descriptions.versioning.rundescribertypes import Shapes
from qcodes.dataset.dond.do_nd_utils import (
ActionsT,
AxesTupleListWithDataSet,
BreakConditionT,
MultiAxesTupleListWithDataSet,
ParamMeasT,
)
from qcodes.dataset.experiment_container import Experiment
SweepVarType = Any
TRACER = trace.get_tracer(__name__)
class ParameterGroup(TypedDict):
params: tuple[ParamMeasT, ...]
meas_name: str
@dataclass
class ParameterSetEvent:
parameter: ParameterBase
new_value: SweepVarType
should_set: bool
delay: float
actions: ActionsT
get_after_set: bool
class _Sweeper:
def __init__(
self,
sweeps: Sequence[AbstractSweep | TogetherSweep],
additional_setpoints: Sequence[ParameterBase],
):
self._additional_setpoints = additional_setpoints
self._sweeps = sweeps
self._setpoints = self._make_setpoints_tuples()
self._setpoints_dict = self._make_setpoints_dict()
self._shape = self._make_shape(sweeps, additional_setpoints)
self._iter_index = 0
@property
def setpoints_dict(self) -> dict[str, list[Any]]:
return self._setpoints_dict
def _make_setpoints_tuples(
self,
) -> tuple[tuple[tuple[SweepVarType, ...] | SweepVarType, ...], ...]:
sweeps = tuple(sweep.get_setpoints() for sweep in self._sweeps)
return cast(
tuple[tuple[tuple[SweepVarType, ...] | SweepVarType, ...], ...],
tuple(itertools.product(*sweeps)),
)
def _make_single_point_setpoints_dict(self, index: int) -> dict[str, SweepVarType]:
setpoint_dict = {}
values = self._setpoints[index]
for sweep, subvalues in zip(self._sweeps, values):
if isinstance(sweep, TogetherSweep):
for individual_sweep, value in zip(sweep.sweeps, subvalues):
setpoint_dict[individual_sweep.param.full_name] = value
else:
setpoint_dict[sweep.param.full_name] = subvalues
return setpoint_dict
def _make_setpoints_dict(self) -> dict[str, list[Any]]:
setpoint_dict: dict[str, list[SweepVarType]] = {}
for sweep in self._sweeps:
if isinstance(sweep, TogetherSweep):
for individual_sweep in sweep.sweeps:
setpoint_dict[individual_sweep.param.full_name] = []
else:
setpoint_dict[sweep.param.full_name] = []
for setpoint_tuples in self._setpoints:
for sweep, values in zip(self._sweeps, setpoint_tuples):
if isinstance(sweep, TogetherSweep):
for individual_sweep, individual_value in zip(sweep.sweeps, values):
setpoint_dict[individual_sweep.param.full_name].append(
individual_value
)
else:
setpoint_dict[sweep.param.full_name].append(values)
return setpoint_dict
@property
def all_sweeps(self) -> tuple[AbstractSweep, ...]:
sweeps: list[AbstractSweep] = []
for sweep in self._sweeps:
if isinstance(sweep, TogetherSweep):
sweeps.extend(sweep.sweeps)
else:
sweeps.append(sweep)
return tuple(sweeps)
@property
def all_setpoint_params(self) -> tuple[ParameterBase, ...]:
return tuple(sweep.param for sweep in self.all_sweeps) + tuple(
self._additional_setpoints
)
@property
def sweep_groupes(self) -> tuple[tuple[ParameterBase, ...], ...]:
"""
These are all the combinations of setpoints we consider
valid for setpoints in a dataset. A dataset must depend on
at least one parameter from each dimension of the dond.
For dimensions that uses TogetherSweep they may depend
on one or more of the parameters of that sweep.
"""
param_tuple_list: list[tuple[ParameterBase, ...]] = []
for sweep in self._sweeps:
if isinstance(sweep, TogetherSweep):
param_tuple_list.append(
tuple(sub_sweep.param for sub_sweep in sweep.sweeps)
)
else:
param_tuple_list.append((sweep.param,))
param_tuple_list.extend(
[(setpoint,) for setpoint in self._additional_setpoints]
)
# in param_tuple_list there is a tuple of possible setpoints for each
# dim in the dond. For regular sweeps this is a 1 tuple but for
# a TogetherSweep it is of length of the number of parameters.
# now we expand to a list of setpoints in a TogetherSweep
# to a list of all possible combinations of these.
expanded_param_tuples = tuple(
tuple(
itertools.chain.from_iterable(
itertools.combinations(param_tuple, j + 1)
for j in range(len(param_tuple))
)
)
for param_tuple in param_tuple_list
)
# next we generate all valid combinations of picking one parameter from each
# dimension in the setpoints.
setpoint_combinations = itertools.product(*expanded_param_tuples)
setpoint_combinations_expanded = tuple(
tuple(itertools.chain.from_iterable(setpoint_combination))
for setpoint_combination in setpoint_combinations
)
return setpoint_combinations_expanded
@staticmethod
def _make_shape(
sweeps: Sequence[AbstractSweep | TogetherSweep],
addtional_setpoints: Sequence[ParameterBase],
) -> tuple[int, ...]:
loop_shape = tuple(sweep.num_points for sweep in sweeps) + tuple(
1 for _ in addtional_setpoints
)
return loop_shape
@property
def shape(self) -> tuple[int, ...]:
return self._shape
def __getitem__(self, index: int) -> tuple[ParameterSetEvent, ...]:
setpoints = self._make_single_point_setpoints_dict(index)
if index == 0:
previous_setpoints: dict[str, SweepVarType | None] = {}
for key in setpoints.keys():
previous_setpoints[key] = None
else:
previous_setpoints = self._make_single_point_setpoints_dict(index - 1)
parameter_set_events = []
for sweep in self.all_sweeps:
new_value = setpoints[sweep.param.full_name]
old_value = previous_setpoints[sweep.param.full_name]
if old_value is None or old_value != new_value:
should_set = True
else:
should_set = False
event = ParameterSetEvent(
new_value=new_value,
parameter=sweep.param,
should_set=should_set,
delay=sweep.delay,
actions=sweep.post_actions,
get_after_set=sweep.get_after_set,
)
parameter_set_events.append(event)
return tuple(parameter_set_events)
def __len__(self) -> int:
return int(np.prod(self.shape))
def __iter__(self) -> _Sweeper:
return self
def __next__(self) -> tuple[ParameterSetEvent, ...]:
if self._iter_index < len(self):
return_val = self[self._iter_index]
self._iter_index += 1
return return_val
else:
raise StopIteration
class _Measurements:
def __init__(
self,
sweeper: _Sweeper,
measurement_name: str | Sequence[str],
params_meas: Sequence[ParamMeasT | Sequence[ParamMeasT]],
enter_actions: ActionsT,
exit_actions: ActionsT,
experiments: Experiment | Sequence[Experiment] | None,
write_period: float | None,
log_info: str | None,
dataset_dependencies: Mapping[str, Sequence[ParamMeasT]] | None = None,
):
self._sweeper = sweeper
self._enter_actions = enter_actions
self._exit_actions = exit_actions
self._write_period = write_period
self._log_info = log_info
if log_info is not None:
self._extra_log_info = log_info
else:
self._extra_log_info = "Using 'qcodes.dataset.dond'"
(
self._measured_all,
grouped_parameters,
self._measured_parameters,
) = self._extract_parameters_by_type_and_group(params_meas)
self._shapes = self._get_shapes()
if dataset_dependencies and len(grouped_parameters) > 1:
raise ValueError(
"Measured parameters have been grouped both in input "
"and given in dataset dependencies. This is not supported, "
"group measurement parameters either in input or in "
"dataset dependencies."
)
if dataset_dependencies is None:
self._groups = self._create_groups_from_grouped_parameters(
grouped_parameters, experiments, measurement_name
)
elif dataset_dependencies:
_validate_dataset_dependencies_and_names(
dataset_dependencies, measurement_name
)
dataset_dependencies_split = self._split_dateset_dependencies(
dataset_dependencies
)
self._groups = self._create_groups_from_dataset_dependencies(
dataset_dependencies_split,
self._measured_parameters,
experiments,
measurement_name,
)
@property
def measured_all(self) -> tuple[ParamMeasT, ...]:
return self._measured_all
@property
def measured_parameters(self) -> tuple[ParameterBase, ...]:
return self._measured_parameters
@property
def shapes(self) -> Shapes | None:
return self._shapes
@property
def groups(self) -> tuple[_SweepMeasGroup, ...]:
return self._groups
@staticmethod
def _create_measurement_names(
measurement_name: str | Sequence[str], n_names_required: int
) -> tuple[str, ...]:
if isinstance(measurement_name, str):
return (measurement_name,) * n_names_required
else:
if len(measurement_name) != n_names_required:
raise ValueError(
f"Got {len(measurement_name)} measurement names "
f"but should create {n_names_required} dataset(s)."
)
return tuple(measurement_name)
@staticmethod
def _extract_parameters_by_type_and_group(
params_meas: Sequence[ParamMeasT | Sequence[ParamMeasT]],
) -> tuple[
tuple[ParamMeasT, ...],
tuple[tuple[ParamMeasT, ...], ...],
tuple[ParameterBase, ...],
]:
measured_parameters: list[ParameterBase] = []
measured_all: list[ParamMeasT] = []
single_group: list[ParamMeasT] = []
multi_group: list[tuple[ParamMeasT, ...]] = []
grouped_parameters: tuple[tuple[ParamMeasT, ...], ...]
for param in params_meas:
if not isinstance(param, Sequence):
single_group.append(param)
measured_all.append(param)
if isinstance(param, ParameterBase):
measured_parameters.append(param)
elif not isinstance(param, str):
multi_group.append(tuple(param))
for nested_param in param:
measured_all.append(nested_param)
if isinstance(nested_param, ParameterBase):
measured_parameters.append(nested_param)
if single_group and multi_group:
raise ValueError(
f"Got both grouped and non grouped "
f"parameters to measure in "
f"{params_meas}. This is not supported."
)
if single_group:
grouped_parameters = (tuple(single_group),)
elif multi_group:
grouped_parameters = tuple(multi_group)
else:
raise ValueError("No parameters to measure supplied")
return tuple(measured_all), grouped_parameters, tuple(measured_parameters)
def _get_shapes(self) -> Shapes | None:
try:
shapes: Shapes | None = detect_shape_of_measurement(
self.measured_parameters, self._sweeper.shape
)
LOG.debug("Detected shapes to be %s", shapes)
except TypeError:
LOG.exception(
f"Could not detect shape of {self.measured_parameters} "
f"falling back to unknown shape."
)
shapes = None
return shapes
@staticmethod
def _get_experiments(
experiments: Experiment | Sequence[Experiment] | None,
n_experiments_required: int,
) -> Sequence[Experiment | None]:
if not isinstance(experiments, Sequence):
experiments_internal: Sequence[Experiment | None] = [
experiments
] * n_experiments_required
else:
experiments_internal = experiments
if len(experiments_internal) != n_experiments_required:
raise ValueError(
f"Inconsistent number of "
f"datasets and experiments, "
f"got {n_experiments_required} and {len(experiments_internal)}."
)
return experiments_internal
def _create_groups_from_grouped_parameters(
self,
grouped_parameters: tuple[tuple[ParamMeasT, ...], ...],
experiments: Experiment | Sequence[Experiment] | None,
meas_names: str | Sequence[str],
) -> tuple[_SweepMeasGroup, ...]:
setpoints = self._sweeper.all_setpoint_params
groups = []
m_group: Sequence[ParamMeasT]
experiments_internal = self._get_experiments(
experiments, len(grouped_parameters)
)
measurement_names = self._create_measurement_names(
meas_names, len(grouped_parameters)
)
for m_group, experiment, meas_name in zip(
grouped_parameters,
experiments_internal,
measurement_names,
):
meas_ctx = self._create_measurement_ctx_manager(
experiment, meas_name, setpoints, tuple(m_group)
)
s_m_group = _SweepMeasGroup(setpoints, tuple(m_group), meas_ctx)
groups.append(s_m_group)
return tuple(groups)
def _create_groups_from_dataset_dependencies(
self,
dataset_dependencies: dict[
str, tuple[Sequence[ParameterBase], Sequence[ParamMeasT]]
],
all_measured_parameters: tuple[ParameterBase, ...],
experiments: Experiment | Sequence[Experiment] | None,
meas_names: str | Sequence[str],
) -> tuple[_SweepMeasGroup, ...]:
potential_setpoint_groups = self._sweeper.sweep_groupes
experiments_internal = self._get_experiments(
experiments, len(dataset_dependencies)
)
if meas_names == "":
meas_names = tuple(dataset_dependencies.keys())
measurement_names = self._create_measurement_names(
meas_names, len(dataset_dependencies)
)
all_dataset_dependencies_meas_parameters = tuple(
itertools.chain.from_iterable(
output[1] for output in dataset_dependencies.values()
)
)
for meas_param in all_measured_parameters:
if meas_param not in all_dataset_dependencies_meas_parameters:
raise ValueError(
f"Parameter {meas_param} is measured but not added "
f"to any dataset in dataset_dependencies."
)
groups = []
for experiment, meas_name in zip(
experiments_internal,
measurement_names,
):
(sp_group, m_group) = dataset_dependencies[meas_name]
if tuple(sp_group) not in potential_setpoint_groups:
raise ValueError(
f"dataset_dependencies contains {sp_group} "
f"which is not among the expected groups of setpoints "
f"{potential_setpoint_groups}"
)
LOG.info(
f"creating context manager for setpoints"
f" {sp_group} and measurement parameters {m_group}"
)
meas_ctx = self._create_measurement_ctx_manager(
experiment, meas_name, tuple(sp_group), tuple(m_group)
)
s_m_group = _SweepMeasGroup(tuple(sp_group), tuple(m_group), meas_ctx)
groups.append(s_m_group)
return tuple(groups)
def _create_measurement_ctx_manager(
self,
experiment: Experiment | None,
measurement_name: str,
sweep_parameters: Sequence[ParameterBase],
measure_parameters: Sequence[ParamMeasT],
) -> Measurement:
meas = Measurement(name=measurement_name, exp=experiment)
_register_parameters(meas, sweep_parameters)
_register_parameters(
meas,
measure_parameters,
setpoints=sweep_parameters,
shapes=self.shapes,
)
meas._extra_log_info = self._extra_log_info
_set_write_period(meas, self._write_period)
_register_actions(meas, self._enter_actions, self._exit_actions)
return meas
def _split_dateset_dependencies(
self,
dataset_dependencies: Mapping[str, Sequence[ParamMeasT]],
) -> dict[str, tuple[Sequence[ParameterBase], Sequence[ParamMeasT]]]:
# split measured parameters from setpoint parameters using param_meas
dataset_dependencies_split: dict[
str, tuple[Sequence[ParameterBase], Sequence[ParamMeasT]]
] = {}
for name, dataset_parameters in dataset_dependencies.items():
meas_parameters = tuple(
param for param in dataset_parameters if param in self.measured_all
)
setpoint_parameters = cast(
Sequence[ParameterBase],
tuple(
param
for param in dataset_parameters
if param not in self.measured_all
),
)
dataset_dependencies_split[name] = (
setpoint_parameters,
meas_parameters,
)
return dataset_dependencies_split
# idealy we would want this to be frozen but then postinit
# cannot calculate all the parameters
# https://stackoverflow.com/questions/53756788/
@dataclass(frozen=False)
class _SweepMeasGroup:
sweep_parameters: tuple[ParameterBase, ...]
measure_parameters: tuple[ParamMeasT, ...]
measurement_cxt: Measurement
def __post_init__(self) -> None:
meas_parameters = tuple(
a for a in self.measure_parameters if isinstance(a, ParameterBase)
)
self._parameters = self.sweep_parameters + meas_parameters
@property
def parameters(self) -> tuple[ParameterBase, ...]:
return self._parameters
@overload
def dond(
*params: AbstractSweep | TogetherSweep | ParamMeasT | Sequence[ParamMeasT],
write_period: float | None = None,
measurement_name: str | Sequence[str] = "",
exp: Experiment | Sequence[Experiment] | None = None,
enter_actions: ActionsT = (),
exit_actions: ActionsT = (),
do_plot: bool | None = None,
show_progress: bool | None = None,
use_threads: bool | None = None,
additional_setpoints: Sequence[ParameterBase] = tuple(),
log_info: str | None = None,
break_condition: BreakConditionT | None = None,
dataset_dependencies: Mapping[str, Sequence[ParamMeasT]] | None = None,
in_memory_cache: bool | None = None,
squeeze: Literal[False],
) -> MultiAxesTupleListWithDataSet: ...
@overload
def dond(
*params: AbstractSweep | TogetherSweep | ParamMeasT | Sequence[ParamMeasT],
write_period: float | None = None,
measurement_name: str | Sequence[str] = "",
exp: Experiment | Sequence[Experiment] | None = None,
enter_actions: ActionsT = (),
exit_actions: ActionsT = (),
do_plot: bool | None = None,
show_progress: bool | None = None,
use_threads: bool | None = None,
additional_setpoints: Sequence[ParameterBase] = tuple(),
log_info: str | None = None,
break_condition: BreakConditionT | None = None,
dataset_dependencies: Mapping[str, Sequence[ParamMeasT]] | None = None,
in_memory_cache: bool | None = None,
squeeze: Literal[True],
) -> AxesTupleListWithDataSet | MultiAxesTupleListWithDataSet: ...
@overload
def dond(
*params: AbstractSweep | TogetherSweep | ParamMeasT | Sequence[ParamMeasT],
write_period: float | None = None,
measurement_name: str | Sequence[str] = "",
exp: Experiment | Sequence[Experiment] | None = None,
enter_actions: ActionsT = (),
exit_actions: ActionsT = (),
do_plot: bool | None = None,
show_progress: bool | None = None,
use_threads: bool | None = None,
additional_setpoints: Sequence[ParameterBase] = tuple(),
log_info: str | None = None,
break_condition: BreakConditionT | None = None,
dataset_dependencies: Mapping[str, Sequence[ParamMeasT]] | None = None,
in_memory_cache: bool | None = None,
squeeze: bool = True,
) -> AxesTupleListWithDataSet | MultiAxesTupleListWithDataSet: ...
[docs]
@TRACER.start_as_current_span("qcodes.dataset.dond")
def dond(
*params: AbstractSweep | TogetherSweep | ParamMeasT | Sequence[ParamMeasT],
write_period: float | None = None,
measurement_name: str | Sequence[str] = "",
exp: Experiment | Sequence[Experiment] | None = None,
enter_actions: ActionsT = (),
exit_actions: ActionsT = (),
do_plot: bool | None = None,
show_progress: bool | None = None,
use_threads: bool | None = None,
additional_setpoints: Sequence[ParameterBase] = tuple(),
log_info: str | None = None,
break_condition: BreakConditionT | None = None,
dataset_dependencies: Mapping[str, Sequence[ParamMeasT]] | None = None,
in_memory_cache: bool | None = None,
squeeze: bool = True,
) -> AxesTupleListWithDataSet | MultiAxesTupleListWithDataSet:
"""
Perform n-dimentional scan from slowest (first) to the fastest (last), to
measure m measurement parameters. The dimensions should be specified
as sweep objects, and after them the parameters to measure should be passed.
Args:
params: Instances of n sweep classes and m measurement parameters,
e.g. if linear sweep is considered:
.. code-block::
LinSweep(param_set_1, start_1, stop_1, num_points_1, delay_1), ...,
LinSweep(param_set_n, start_n, stop_n, num_points_n, delay_n),
param_meas_1, param_meas_2, ..., param_meas_m
If multiple DataSets creation is needed, measurement parameters should
be grouped, so one dataset will be created for each group. e.g.:
.. code-block::
LinSweep(param_set_1, start_1, stop_1, num_points_1, delay_1), ...,
LinSweep(param_set_n, start_n, stop_n, num_points_n, delay_n),
[param_meas_1, param_meas_2], ..., [param_meas_m]
If you want to sweep multiple parameters together.
.. code-block::
TogetherSweep(LinSweep(param_set_1, start_1, stop_1, num_points, delay_1),
LinSweep(param_set_2, start_2, stop_2, num_points, delay_2))
param_meas_1, param_meas_2, ..., param_meas_m
write_period: The time after which the data is actually written to the
database.
measurement_name: Name(s) of the measurement. This will be passed down to
the dataset produced by the measurement. If not given, a default
value of 'results' is used for the dataset. If more than one is
given, each dataset will have an individual name.
exp: The experiment to use for this measurement. If you create multiple
measurements using groups you may also supply multiple experiments.
enter_actions: A list of functions taking no arguments that will be
called before the measurements start.
exit_actions: A list of functions taking no arguments that will be
called after the measurements ends.
do_plot: should png and pdf versions of the images be saved and plots
are shown after the run. If None the setting will be read from
``qcodesrc.json``
show_progress: should a progress bar be displayed during the
measurement. If None the setting will be read from ``qcodesrc.json``
use_threads: If True, measurements from each instrument will be done on
separate threads. If you are measuring from several instruments
this may give a significant speedup.
additional_setpoints: A list of setpoint parameters to be registered in
the measurement but not scanned/swept-over.
log_info: Message that is logged during the measurement. If None a default
message is used.
break_condition: Callable that takes no arguments. If returned True,
measurement is interrupted.
dataset_dependencies: Optionally describe that measured datasets only depend
on a subset of the setpoint parameters. Given as a mapping from
measurement names to Sequence of Parameters. Note that a dataset must
depend on at least one parameter from each dimension but can depend
on one or more parameters from a dimension sweeped with a TogetherSweep.
in_memory_cache:
Should a cache of the data be kept available in memory for faster
plotting and exporting. Useful to disable if the data is very large
in order to save on memory consumption.
If ``None``, the value for this will be read from ``qcodesrc.json`` config file.
squeeze: If True, will return a tuple of QCoDeS DataSet, Matplotlib axis,
Matplotlib colorbar if only one group of measurements was performed
and a tuple of tuples of these if more than one group of measurements
was performed. If False, will always return a tuple where the first
member is a tuple of QCoDeS DataSet(s) and the second member is a tuple
of Matplotlib axis(es) and the third member is a tuple of Matplotlib
colorbar(s).
Returns:
A tuple of QCoDeS DataSet, Matplotlib axis, Matplotlib colorbar. If
more than one group of measurement parameters is supplied, the output
will be a tuple of tuple(QCoDeS DataSet), tuple(Matplotlib axis),
tuple(Matplotlib colorbar), in which each element of each sub-tuple
belongs to one group, and the order of elements is the order of
the supplied groups.
"""
if do_plot is None:
do_plot = cast(bool, config.dataset.dond_plot)
if show_progress is None:
show_progress = config.dataset.dond_show_progress
sweep_instances, params_meas = _parse_dond_arguments(*params)
sweeper = _Sweeper(sweep_instances, additional_setpoints)
measurements = _Measurements(
sweeper,
measurement_name,
params_meas,
enter_actions,
exit_actions,
exp,
write_period,
log_info,
dataset_dependencies,
)
LOG.info(
"Starting a doNd with scan with\n setpoints: %s,\n measuring: %s",
sweeper.all_setpoint_params,
measurements.measured_all,
)
LOG.debug(
"dond has been grouped into the following datasets:\n%s",
measurements.groups,
)
datasets = []
plots_axes = []
plots_colorbar = []
if use_threads is None:
use_threads = config.dataset.use_threads
params_meas_caller = (
ThreadPoolParamsCaller(*measurements.measured_all)
if use_threads
else SequentialParamsCaller(*measurements.measured_all)
)
datasavers = []
interrupted: Callable[ # noqa E731
[], KeyboardInterrupt | BreakConditionInterrupt | None
] = lambda: None
try:
with (
catch_interrupts() as interrupted,
ExitStack() as stack,
params_meas_caller as call_params_meas,
):
datasavers = [
stack.enter_context(
group.measurement_cxt.run(in_memory_cache=in_memory_cache)
)
for group in measurements.groups
]
additional_setpoints_data = process_params_meas(additional_setpoints)
for set_events in tqdm(sweeper, disable=not show_progress):
LOG.debug("Processing set events: %s", set_events)
results: dict[ParameterBase, Any] = {}
for set_event in set_events:
if set_event.should_set:
set_event.parameter(set_event.new_value)
for act in set_event.actions:
act()
time.sleep(set_event.delay)
if set_event.get_after_set:
results[set_event.parameter] = set_event.parameter()
else:
results[set_event.parameter] = set_event.new_value
meas_value_pair = call_params_meas()
for meas_param, value in meas_value_pair:
results[meas_param] = value
for datasaver, group in zip(datasavers, measurements.groups):
filtered_results_list = [
(param, value)
for param, value in results.items()
if param in group.parameters
]
datasaver.add_result(
*filtered_results_list,
*additional_setpoints_data,
)
if callable(break_condition):
if break_condition():
raise BreakConditionInterrupt("Break condition was met.")
finally:
for datasaver in datasavers:
ds, plot_axis, plot_color = _handle_plotting(
datasaver.dataset, do_plot, interrupted()
)
datasets.append(ds)
plots_axes.append(plot_axis)
plots_colorbar.append(plot_color)
if len(measurements.groups) == 1 and squeeze is True:
return datasets[0], plots_axes[0], plots_colorbar[0]
else:
return tuple(datasets), tuple(plots_axes), tuple(plots_colorbar)
def _validate_dataset_dependencies_and_names(
dataset_dependencies: Mapping[str, Sequence[ParamMeasT]] | None,
measurement_name: str | Sequence[str],
) -> None:
if dataset_dependencies is not None and measurement_name != "":
if isinstance(measurement_name, str):
raise ValueError(
"Creating multiple datasets but only one measurement name given."
)
if set(dataset_dependencies.keys()) != set(measurement_name):
raise ValueError(
f"Inconsistent measurement names: measurement_name "
f"contains {measurement_name} "
f"but dataset_dependencies contains "
f"{tuple(dataset_dependencies.keys())}."
)
def _parse_dond_arguments(
*params: AbstractSweep | TogetherSweep | ParamMeasT | Sequence[ParamMeasT],
) -> tuple[
list[AbstractSweep | TogetherSweep], list[ParamMeasT | Sequence[ParamMeasT]]
]:
"""
Parse supplied arguments into sweep objects and measurement parameters
and their callables.
"""
sweep_instances: list[AbstractSweep | TogetherSweep] = []
params_meas: list[ParamMeasT | Sequence[ParamMeasT]] = []
for par in params:
if isinstance(par, AbstractSweep):
sweep_instances.append(par)
elif isinstance(par, TogetherSweep):
sweep_instances.append(par)
else:
params_meas.append(par)
return sweep_instances, params_meas