Source code for qcodes.dataset.data_set_protocol

from __future__ import annotations

import logging
import os
import warnings
from collections.abc import Callable, Mapping, Sequence
from enum import Enum
from importlib.metadata import entry_points
from pathlib import Path
from typing import (
    TYPE_CHECKING,
    Any,
    Literal,
    Protocol,
    runtime_checkable,
)

import numpy as np

from qcodes.dataset.descriptions.dependencies import InterDependencies_
from qcodes.dataset.descriptions.param_spec import ParamSpec, ParamSpecBase
from qcodes.dataset.export_config import (
    DataExportType,
    get_data_export_name_elements,
    get_data_export_path,
    get_data_export_prefix,
    get_data_export_type,
)

from .descriptions.versioning.converters import new_to_old
from .exporters.export_to_csv import dataframe_to_csv
from .exporters.export_to_xarray import xarray_to_h5netcdf_with_complex_numbers
from .sqlite.queries import raw_time_to_str_time

if TYPE_CHECKING:
    from typing import TypeAlias

    import pandas as pd
    import xarray as xr

    from qcodes.dataset.descriptions.rundescriber import RunDescriber
    from qcodes.dataset.descriptions.versioning.rundescribertypes import Shapes
    from qcodes.dataset.linked_datasets.links import Link
    from qcodes.parameters import ParameterBase

    from .data_set_cache import DataSetCache
    from .exporters.export_info import ExportInfo

# for unknown reason entrypoints registered in pyproct.toml shows up
# twice here convert to set to ensure no duplication.
_EXPORT_CALLBACKS = set(entry_points(group="qcodes.dataset.on_export"))

array_like_types = (tuple, list, np.ndarray)
scalar_res_types: TypeAlias = (
    str | complex | np.integer | np.floating | np.complexfloating
)
values_type: TypeAlias = scalar_res_types | np.ndarray | Sequence[scalar_res_types]
res_type: TypeAlias = "tuple[ParameterBase | str, values_type]"
setpoints_type: TypeAlias = "Sequence[str | ParameterBase]"
SPECS: TypeAlias = list[ParamSpec]
# Transition period type: SpecsOrInterDeps. We will allow both as input to
# the DataSet constructor for a while, then deprecate SPECS and finally remove
# the ParamSpec class
SpecsOrInterDeps: TypeAlias = SPECS | InterDependencies_
ParameterData: TypeAlias = dict[str, dict[str, np.ndarray]]

LOG = logging.getLogger(__name__)


class CompletedError(RuntimeError):
    pass


[docs] @runtime_checkable class DataSetProtocol(Protocol): # the "persistent traits" are the attributes/properties of the DataSet # that are NOT tied to the representation of the DataSet in any particular # database persistent_traits: tuple[str, ...] = ( "name", "guid", "number_of_results", "exp_name", "sample_name", "completed", "snapshot", "run_timestamp_raw", "description", "completed_timestamp_raw", "metadata", "parent_dataset_links", "captured_run_id", "captured_counter", )
[docs] def prepare(
self, *, snapshot: Mapping[Any, Any], interdeps: InterDependencies_, shapes: Shapes | None = None, parent_datasets: Sequence[Mapping[Any, Any]] = (), write_in_background: bool = False, ) -> None: ... @property def pristine(self) -> bool: ... @property def running(self) -> bool: ... @property def completed(self) -> bool: ...
[docs] def mark_completed(self) -> None: ...
# dataset attributes @property def run_id(self) -> int: ... @property def captured_run_id(self) -> int: ... @property def counter(self) -> int: ... @property def captured_counter(self) -> int: ... @property def guid(self) -> str: ... @property def number_of_results(self) -> int: ... @property def name(self) -> str: ... @property def exp_name(self) -> str: ... @property def exp_id(self) -> int: ... @property def sample_name(self) -> str: ...
[docs] def run_timestamp(self, fmt: str = "%Y-%m-%d %H:%M:%S") -> str | None: ...
@property def run_timestamp_raw(self) -> float | None: ...
[docs] def completed_timestamp(self, fmt: str = "%Y-%m-%d %H:%M:%S") -> str | None: ...
@property def completed_timestamp_raw(self) -> float | None: ... # snapshot and metadata @property def snapshot(self) -> dict[str, Any] | None: ...
[docs] def add_snapshot(self, snapshot: str, overwrite: bool = False) -> None: ...
@property def _snapshot_raw(self) -> str | None: ...
[docs] def add_metadata(self, tag: str, metadata: Any) -> None: ...
@property def metadata(self) -> dict[str, Any]: ... @property def path_to_db(self) -> str | None: ... # dataset description and links @property def paramspecs(self) -> dict[str, ParamSpec]: ... @property def description(self) -> RunDescriber: ... @property def parent_dataset_links(self) -> list[Link]: ... # data related members
[docs] def export(
self, export_type: DataExportType | str | None = None, path: Path | str | None = None, prefix: str | None = None, automatic_export: bool = False, ) -> None: ... @property def export_info(self) -> ExportInfo: ... @property def cache(self) -> DataSetCache[DataSetProtocol]: ...
[docs] def get_parameter_data(
self, *params: str | ParamSpec | ParameterBase, start: int | None = None, end: int | None = None, callback: Callable[[float], None] | None = None, ) -> ParameterData: ...
[docs] def get_parameters(self) -> SPECS: # used by plottr ...
@property def dependent_parameters(self) -> tuple[ParamSpecBase, ...]: ... # exporters to other in memory formats
[docs] def to_xarray_dataarray_dict(
self, *params: str | ParamSpec | ParameterBase, start: int | None = None, end: int | None = None, use_multi_index: Literal["auto", "always", "never"] = "auto", ) -> dict[str, xr.DataArray]: ...
[docs] def to_xarray_dataset(
self, *params: str | ParamSpec | ParameterBase, start: int | None = None, end: int | None = None, use_multi_index: Literal["auto", "always", "never"] = "auto", ) -> xr.Dataset: ...
[docs] def to_pandas_dataframe_dict(
self, *params: str | ParamSpec | ParameterBase, start: int | None = None, end: int | None = None, ) -> dict[str, pd.DataFrame]: ...
[docs] def to_pandas_dataframe(
self, *params: str | ParamSpec | ParameterBase, start: int | None = None, end: int | None = None, ) -> pd.DataFrame: ... # private members called by various other parts or the api def _enqueue_results( self, result_dict: Mapping[ParamSpecBase, np.ndarray] ) -> None: ... def _flush_data_to_database(self, block: bool = False) -> None: ... @property def _parameters(self) -> str | None: ... def _set_export_info(self, export_info: ExportInfo) -> None: ... def __len__(self) -> int: ...
[docs] def the_same_dataset_as(self, other: DataSetProtocol) -> bool: ...
class BaseDataSet(DataSetProtocol, Protocol): # shared methods between all implementations of the dataset def the_same_dataset_as(self, other: DataSetProtocol) -> bool: """ Check if two datasets correspond to the same run by comparing all their persistent traits. Note that this method does not compare the data itself. This function raises if the GUIDs match but anything else doesn't Args: other: the dataset to compare self to """ if not isinstance(other, DataSetProtocol): return False guids_match = self.guid == other.guid # note that the guid is in itself a persistent trait of the DataSet. # We therefore do not need to handle the case of guids not equal # but all persistent traits equal, as this is not possible. # Thus, if all persistent traits are the same we can safely return True for attr in self.persistent_traits: if getattr(self, attr) != getattr(other, attr): if guids_match: raise RuntimeError( "Critical inconsistency detected! " "The two datasets have the same GUID, " f'but their "{attr}" differ.' ) return False return True def get_parameters(self) -> SPECS: old_interdeps = new_to_old(self.description.interdeps) return list(old_interdeps.paramspecs) def export( self, export_type: DataExportType | str | None = None, path: str | Path | None = None, prefix: str | None = None, automatic_export: bool = False, ) -> None: """Export data to disk with file name `{prefix}{name_elements}.{ext}`. Name elements are names of dataset object attributes that are taken from the dataset and inserted into the name of the export file, for example if name elements are ``["captured_run_id", "guid"]``, then the file name will be `{prefix}{captured_run_id}_{guid}.{ext}`. Values for the export type, path, export_name_elements and prefix can also be set in the "dataset" section of qcodes config. Args: export_type: Data export type, e.g. "netcdf" or ``DataExportType.NETCDF``, defaults to a value set in qcodes config path: Export path, defaults to value set in config prefix: File prefix, e.g. ``qcodes_``, defaults to value set in config. automatic_export: Is this export automatic? Raises: ValueError: If the export data type is not specified or unknown, raise an error """ if isinstance(path, str): path = Path(path) parsed_export_type = get_data_export_type(export_type) if parsed_export_type is None and export_type is None: raise ValueError( "No data export type specified. Please set the export data type " "by using ``qcodes.dataset.export_config.set_data_export_type`` or " "give an explicit export_type when calling ``dataset.export`` manually." ) elif parsed_export_type is None: raise ValueError( f"Export type {export_type} is unknown. Export type " f"should be a member of the `DataExportType` enum" ) export_path = self._export_data( export_type=parsed_export_type, path=path, prefix=prefix, automatic_export=automatic_export, ) export_info = self.export_info if export_path is not None: export_info.export_paths[parsed_export_type.value] = os.path.abspath( export_path ) self._set_export_info(export_info) def _export_data( self, export_type: DataExportType, path: Path | None = None, prefix: str | None = None, automatic_export: bool = False, ) -> Path | None: """Export data to disk with file name `{prefix}{name_elements}.{ext}`. Name elements are names of dataset object attributes that are taken from the dataset and inserted into the name of the export file, for example if name elements are ``["captured_run_id", "guid"]``, then the file name will be `{prefix}{captured_run_id}_{guid}.{ext}`. Values for the export type, path, export_name_elements and prefix can also be set in the "dataset" section of qcodes config. Args: export_type: Data export type, e.g. DataExportType.NETCDF path: Export path, defaults to value set in config prefix: File prefix, e.g. "qcodes_", defaults to value set in config. automatic_export: Is this export automatic? Returns: str: Path file was saved to, returns None if no file was saved. """ # Set defaults to values in config if the value was not set # (defaults to None) path = path if path is not None else get_data_export_path() path.mkdir(exist_ok=True, parents=True) prefix = prefix if prefix is not None else get_data_export_prefix() if DataExportType.NETCDF == export_type: file_name = self._export_file_name( prefix=prefix, export_type=DataExportType.NETCDF ) export_path = Path(self._export_as_netcdf(path=path, file_name=file_name)) elif DataExportType.CSV == export_type: file_name = self._export_file_name( prefix=prefix, export_type=DataExportType.CSV ) export_path = Path(self._export_as_csv(path=path, file_name=file_name)) else: export_path = None for export_callback in _EXPORT_CALLBACKS: try: export_callback_function = export_callback.load() LOG.info("Executing on_export callback %s", export_callback.name) export_callback_function(export_path, automatic_export=automatic_export) except Exception: LOG.exception("Exception during export callback function") return export_path def _export_file_name(self, prefix: str, export_type: DataExportType) -> str: """Get export file name""" extension = export_type.value name_elements = get_data_export_name_elements() post_fix = "_".join([str(getattr(self, name)) for name in name_elements]) return f"{prefix}{post_fix}.{extension}" def _export_as_netcdf(self, path: Path, file_name: str) -> Path: """Export data as netcdf to a given path with file prefix""" file_path = path / file_name xarr_dataset = self.to_xarray_dataset() xarray_to_h5netcdf_with_complex_numbers(xarr_dataset, file_path) return file_path def _export_as_csv(self, path: Path, file_name: str) -> Path: """Export data as csv to a given path with file prefix.""" dfdict = self.to_pandas_dataframe_dict() dataframe_to_csv( dfdict=dfdict, path=path, single_file=True, single_file_name=file_name, ) return path / file_name def _add_metadata_to_netcdf_if_nc_exported(self, tag: str, data: Any) -> None: export_paths = self.export_info.export_paths nc_file = export_paths.get(DataExportType.NETCDF.value, None) if nc_file is not None: import h5netcdf # type: ignore[import-untyped] try: with h5netcdf.File( nc_file, mode="r+", decode_vlen_strings=False ) as h5nc_file: h5nc_file.attrs[tag] = data except ( FileNotFoundError, OSError, ): # older versions of h5py may throw a OSError here warnings.warn( f"Could not add metadata to the exported NetCDF file, " f"was the file moved? GUID {self.guid}, NetCDF file {nc_file}" ) @staticmethod def _validate_parameters(*params: str | ParamSpec | ParameterBase) -> list[str]: """ Validate that the provided parameters have a name and return those names as a list. The Parameters may be a mix of strings, ParamSpecs or ordinary QCoDeS parameters. """ valid_param_names = [] for maybe_param in params: if isinstance(maybe_param, str): valid_param_names.append(maybe_param) else: try: maybe_param_name = maybe_param.name except Exception as e: raise ValueError("This parameter does not have a name") from e valid_param_names.append(maybe_param_name) return valid_param_names @staticmethod def _reshape_array_for_cache( param: ParamSpecBase, param_data: np.ndarray ) -> np.ndarray: """ Shape cache data so it matches data read from database. This means: - Add an extra singleton dim to array data - flatten non array data into a linear array. """ param_data = np.atleast_1d(param_data) if param.type == "array": new_data = np.reshape(param_data, (1,) + param_data.shape) else: new_data = param_data.ravel() return new_data def run_timestamp(self, fmt: str = "%Y-%m-%d %H:%M:%S") -> str | None: """ Returns run timestamp in a human-readable format The run timestamp is the moment when the measurement for this run started. If the run has not yet been started, this function returns None. Consult with :func:`time.strftime` for information about the format. """ return raw_time_to_str_time(self.run_timestamp_raw, fmt) def completed_timestamp(self, fmt: str = "%Y-%m-%d %H:%M:%S") -> str | None: """ Returns timestamp when measurement run was completed in a human-readable format If the run (or the dataset) is not completed, then returns None. Consult with ``time.strftime`` for information about the format. """ return raw_time_to_str_time(self.completed_timestamp_raw, fmt) @property def dependent_parameters(self) -> tuple[ParamSpecBase, ...]: """ Return all the parameters that explicitly depend on other parameters """ return tuple(self.description.interdeps.dependencies.keys())
[docs] class DataSetType(str, Enum): DataSet = "DataSet" DataSetInMem = "DataSetInMem"