Source code for qcodes.dataset.data_set

from __future__ import annotations

import importlib
import json
import logging
import tempfile
import time
import uuid
from dataclasses import dataclass
from pathlib import Path
from queue import Queue
from threading import Thread
from typing import TYPE_CHECKING, Any, Literal

import numpy
from tqdm.auto import trange

import qcodes
from qcodes.dataset.data_set_protocol import (
    SPECS,
    BaseDataSet,
    CompletedError,
    DataSetProtocol,
    ParameterData,
    SpecsOrInterDeps,
    values_type,
)
from qcodes.dataset.descriptions.dependencies import InterDependencies_
from qcodes.dataset.descriptions.rundescriber import RunDescriber
from qcodes.dataset.descriptions.versioning.converters import new_to_old, old_to_new
from qcodes.dataset.descriptions.versioning.v0 import InterDependencies
from qcodes.dataset.experiment_settings import get_default_experiment_id
from qcodes.dataset.export_config import (
    DataExportType,
)
from qcodes.dataset.guids import filter_guids_by_parts, generate_guid, parse_guid
from qcodes.dataset.linked_datasets.links import Link, links_to_str, str_to_links
from qcodes.dataset.sqlite.connection import ConnectionPlus, atomic, atomic_transaction
from qcodes.dataset.sqlite.database import (
    conn_from_dbpath_or_conn,
    connect,
    get_DB_location,
)
from qcodes.dataset.sqlite.queries import (
    _check_if_table_found,
    _get_result_table_name_by_guid,
    _query_guids_from_run_spec,
    add_data_to_dynamic_columns,
    add_parameter,
    completed,
    create_run,
    get_completed_timestamp_from_run_id,
    get_data_by_tag_and_table_name,
    get_experiment_name_from_experiment_id,
    get_guid_from_expid_and_counter,
    get_guid_from_run_id,
    get_metadata_from_run_id,
    get_parameter_data,
    get_parent_dataset_links,
    get_run_description,
    get_run_timestamp_from_run_id,
    get_runid_from_guid,
    get_sample_name_from_experiment_id,
    mark_run_complete,
    remove_trigger,
    run_exists,
    set_run_timestamp,
    update_parent_datasets,
    update_run_description,
)
from qcodes.dataset.sqlite.query_helpers import (
    VALUE,
    VALUES,
    insert_many_values,
    length,
    one,
    select_one_where,
)
from qcodes.utils import (
    NumpyJSONEncoder,
)

from .data_set_cache import DataSetCacheWithDBBackend
from .data_set_in_memory import DataSetInMem, load_from_file
from .descriptions.versioning import serialization as serial
from .exporters.export_info import ExportInfo
from .exporters.export_to_csv import dataframe_to_csv
from .exporters.export_to_pandas import (
    load_to_concatenated_dataframe,
    load_to_dataframe_dict,
)
from .exporters.export_to_xarray import (
    load_to_xarray_dataarray_dict,
    load_to_xarray_dataset,
    xarray_to_h5netcdf_with_complex_numbers,
)
from .subscriber import _Subscriber

if TYPE_CHECKING:
    from collections.abc import Callable, Mapping, Sequence

    import pandas as pd
    import xarray as xr

    from qcodes.dataset.descriptions.param_spec import ParamSpec, ParamSpecBase
    from qcodes.dataset.descriptions.versioning.rundescribertypes import Shapes
    from qcodes.parameters import ParameterBase


log = logging.getLogger(__name__)


# TODO: storing parameters in separate table as an extension (dropping
# the column parametenrs would be much nicer

# TODO: metadata split between well known columns and maybe something else is
# not such a good idea. The problem is if we allow for specific columns then
# how do the user/us know which are metatadata?  I THINK the only sane solution
# is to store JSON in a column called metadata

# TODO: fixix  a subset of metadata that we define well known (and create them)
# i.e. no dynamic creation of metadata columns, but add stuff to
# a json inside a 'metadata' column


class _BackgroundWriter(Thread):
    """
    Write the results from the DataSet's dataqueue in a new thread
    """

    def __init__(self, queue: Queue[Any], conn: ConnectionPlus):
        super().__init__(daemon=True)
        self.queue = queue
        self.path = conn.path_to_dbfile
        self.keep_writing = True

    def run(self) -> None:
        self.conn = connect(self.path)

        while self.keep_writing:
            item = self.queue.get()
            if item["keys"] == "stop":
                self.keep_writing = False
                self.conn.close()
            elif item["keys"] == "finalize":
                _WRITERS[self.path].active_datasets.remove(item["values"])
            else:
                self.write_results(item["keys"], item["values"], item["table_name"])
            self.queue.task_done()

    def write_results(
        self, keys: Sequence[str], values: Sequence[list[Any]], table_name: str
    ) -> None:
        insert_many_values(self.conn, table_name, keys, values)

    def shutdown(self) -> None:
        """
        Send a termination signal to the data writing queue, wait for the
        queue to empty and the thread to join.

        If the background writing thread is not alive this will do nothing.
        """
        if self.is_alive():
            self.queue.put({"keys": "stop", "values": []})
            self.queue.join()
            self.join()


@dataclass
class _WriterStatus:
    bg_writer: _BackgroundWriter | None
    write_in_background: bool | None
    data_write_queue: Queue[Any]
    active_datasets: set[int]


_WRITERS: dict[str, _WriterStatus] = {}


class DataSet(BaseDataSet):
    # the "persistent traits" are the attributes/properties of the DataSet
    # that are NOT tied to the representation of the DataSet in any particular
    # database
    persistent_traits = (
        "name",
        "guid",
        "number_of_results",
        "parameters",
        "paramspecs",
        "exp_name",
        "sample_name",
        "completed",
        "snapshot",
        "run_timestamp_raw",
        "description",
        "completed_timestamp_raw",
        "metadata",
        "dependent_parameters",
        "parent_dataset_links",
        "captured_run_id",
        "captured_counter",
    )
    background_sleep_time = 1e-3

    def __init__(
        self,
        path_to_db: str | None = None,
        run_id: int | None = None,
        conn: ConnectionPlus | None = None,
        exp_id: int | None = None,
        name: str | None = None,
        specs: SpecsOrInterDeps | None = None,
        values: VALUES | None = None,
        metadata: Mapping[str, Any] | None = None,
        shapes: Shapes | None = None,
        in_memory_cache: bool = True,
    ) -> None:
        """
        Create a new :class:`.DataSet` object. The object can either hold a new run or
        an already existing run. If a ``run_id`` is provided, then an old run is
        looked up, else a new run is created.

        Args:
            path_to_db: path to the sqlite file on disk. If not provided, the
                path will be read from the config.
            run_id: provide this when loading an existing run, leave it
                as None when creating a new run
            conn: connection to the DB; if provided and ``path_to_db`` is
                provided as well, then a ``ValueError`` is raised (this is to
                prevent the possibility of providing a connection to a DB
                file that is different from ``path_to_db``)
            exp_id: the id of the experiment in which to create a new run.
                Ignored if ``run_id`` is provided.
            name: the name of the dataset. Ignored if ``run_id`` is provided.
            specs: paramspecs belonging to the dataset or an ``InterDependencies_``
                object that describes the dataset. Ignored if ``run_id``
                is provided.
            values: values to insert into the dataset. Ignored if ``run_id`` is
                provided.
            metadata: metadata to insert into the dataset. Ignored if ``run_id``
                is provided.
            shapes:
                An optional dict from names of dependent parameters to the shape
                of the data captured as a list of integers. The list is in the
                same order as the interdependencies or paramspecs provided.
                Ignored if ``run_id`` is provided.
            in_memory_cache: Should measured data be keep in memory
                and available as part of the `dataset.cache` object.

        """
        self.conn = conn_from_dbpath_or_conn(conn, path_to_db)

        self._debug = False
        self.subscribers: dict[str, _Subscriber] = {}
        self._parent_dataset_links: list[Link]
        #: In memory representation of the data in the dataset.
        self._cache: DataSetCacheWithDBBackend = DataSetCacheWithDBBackend(self)
        self._results: list[dict[str, VALUE]] = []
        self._in_memory_cache = in_memory_cache

        if run_id is not None:
            if not run_exists(self.conn, run_id):
                raise ValueError(
                    f"Run with run_id {run_id} does not exist in the database"
                )
            self._run_id = run_id
            self._completed = completed(self.conn, self.run_id)
            run_desc = self._get_run_description_from_db()
            self._rundescriber = run_desc
            self._metadata = get_metadata_from_run_id(self.conn, self.run_id)
            self._started = self.run_timestamp_raw is not None
            self._parent_dataset_links = str_to_links(
                get_parent_dataset_links(self.conn, self.run_id)
            )
            self._export_info = ExportInfo.from_str(
                self.metadata.get("export_info", "")
            )
        else:
            # Actually perform all the side effects needed for the creation
            # of a new dataset. Note that a dataset is created (in the DB)
            # with no parameters; they are written to disk when the dataset
            # is marked as started
            if exp_id is None:
                exp_id = get_default_experiment_id(self.conn)
            name = name or "dataset"
            _, run_id, __ = create_run(
                self.conn,
                exp_id,
                name,
                generate_guid(),
                parameters=None,
                values=values,
                metadata=metadata,
            )
            # this is really the UUID (an ever increasing count in the db)
            self._run_id = run_id
            self._completed = False
            self._started = False

            if isinstance(specs, InterDependencies_):
                interdeps = specs
            elif specs is not None:
                interdeps = old_to_new(InterDependencies(*specs))
            else:
                interdeps = InterDependencies_()

            self.set_interdependencies(interdeps=interdeps, shapes=shapes)

            self._metadata = get_metadata_from_run_id(self.conn, self.run_id)
            self._parent_dataset_links = []
            self._export_info = ExportInfo({})
        assert self.path_to_db is not None
        if _WRITERS.get(self.path_to_db) is None:
            queue: Queue[Any] = Queue()
            ws: _WriterStatus = _WriterStatus(
                bg_writer=None,
                write_in_background=None,
                data_write_queue=queue,
                active_datasets=set(),
            )
            _WRITERS[self.path_to_db] = ws

    def prepare(
        self,
        *,
        snapshot: Mapping[Any, Any],
        interdeps: InterDependencies_,
        shapes: Shapes | None = None,
        parent_datasets: Sequence[Mapping[Any, Any]] = (),
        write_in_background: bool = False,
    ) -> None:
        self.add_snapshot(json.dumps(snapshot, cls=NumpyJSONEncoder))

        if interdeps == InterDependencies_():
            raise RuntimeError("No parameters supplied")

        self.set_interdependencies(interdeps, shapes)
        links = [Link(head=self.guid, **pdict) for pdict in parent_datasets]
        self.parent_dataset_links = links
        self.mark_started(start_bg_writer=write_in_background)

    @property
    def cache(self) -> DataSetCacheWithDBBackend:
        return self._cache

    @property
    def run_id(self) -> int:
        return self._run_id

    @property
    def captured_run_id(self) -> int:
        run_id = select_one_where(
            self.conn, "runs", "captured_run_id", "run_id", self.run_id
        )
        assert isinstance(run_id, int)
        return run_id

    @property
    def path_to_db(self) -> str | None:
        return self.conn.path_to_dbfile

    @property
    def name(self) -> str:
        name = select_one_where(self.conn, "runs", "name", "run_id", self.run_id)
        assert isinstance(name, str)
        return name

    @property
    def table_name(self) -> str:
        table_name = select_one_where(
            self.conn, "runs", "result_table_name", "run_id", self.run_id
        )
        assert isinstance(table_name, str)
        return table_name

    @property
    def guid(self) -> str:
        guid = get_guid_from_run_id(self.conn, self.run_id)
        assert guid is not None
        return guid

    @property
    def snapshot(self) -> dict[str, Any] | None:
        """Snapshot of the run as dictionary (or None)"""
        snapshot_json = self.snapshot_raw
        if snapshot_json is not None:
            return json.loads(snapshot_json)
        else:
            return None

    @property
    def _snapshot_raw(self) -> str | None:
        """Snapshot of the run as a JSON-formatted string (or None)"""
        snapshot_raw = select_one_where(
            self.conn, "runs", "snapshot", "run_id", self.run_id
        )
        assert isinstance(snapshot_raw, (str, type(None)))
        return snapshot_raw

    @property
    def snapshot_raw(self) -> str | None:
        """Snapshot of the run as a JSON-formatted string (or None)"""
        return self._snapshot_raw

    @property
    def number_of_results(self) -> int:
        sql = f'SELECT COUNT(*) FROM "{self.table_name}"'
        cursor = atomic_transaction(self.conn, sql)
        return one(cursor, "COUNT(*)")

    @property
    def counter(self) -> int:
        counter = select_one_where(
            self.conn, "runs", "result_counter", "run_id", self.run_id
        )
        assert isinstance(counter, int)
        return counter

    @property
    def captured_counter(self) -> int:
        captured_counter = select_one_where(
            self.conn, "runs", "captured_counter", "run_id", self.run_id
        )
        assert isinstance(captured_counter, int)
        return captured_counter

    @property
    def _parameters(self) -> str | None:
        if self.pristine:
            psnames = [ps.name for ps in self.description.interdeps.paramspecs]
            if len(psnames) > 0:
                return ",".join(psnames)
            else:
                return None
        else:
            parameters = select_one_where(
                self.conn, "runs", "parameters", "run_id", self.run_id
            )
            assert isinstance(parameters, (str, type(None)))
            return parameters

    @property
    def parameters(self) -> str | None:
        return self._parameters

    @property
    def paramspecs(self) -> dict[str, ParamSpec]:
        return {ps.name: ps for ps in self.get_parameters()}

    @property
    def exp_id(self) -> int:
        exp_id = select_one_where(self.conn, "runs", "exp_id", "run_id", self.run_id)
        assert isinstance(exp_id, int)
        return exp_id

    @property
    def exp_name(self) -> str:
        return get_experiment_name_from_experiment_id(self.conn, self.exp_id)

    @property
    def sample_name(self) -> str:
        return get_sample_name_from_experiment_id(self.conn, self.exp_id)

    @property
    def run_timestamp_raw(self) -> float | None:
        """
        Returns run timestamp as number of seconds since the Epoch

        The run timestamp is the moment when the measurement for this run
        started.
        """
        return get_run_timestamp_from_run_id(self.conn, self.run_id)

    @property
    def description(self) -> RunDescriber:
        return self._rundescriber

    @property
    def metadata(self) -> dict[str, Any]:
        return self._metadata

    @property
    def parent_dataset_links(self) -> list[Link]:
        """
        Return a list of Link objects. Each Link object describes a link from
        this dataset to one of its parent datasets
        """
        return self._parent_dataset_links

    @parent_dataset_links.setter
    def parent_dataset_links(self, links: list[Link]) -> None:
        """
        Assign one or more links to parent datasets to this dataset. It is an
        error to assign links to a non-pristine dataset

        Args:
            links: The links to assign to this dataset

        """
        if not self.pristine:
            raise RuntimeError(
                "Can not set parent dataset links on a dataset "
                "that has been started."
            )

        if not all(isinstance(link, Link) for link in links):
            raise ValueError("Invalid input. Did not receive a list of Links")

        for link in links:
            if link.head != self.guid:
                raise ValueError(
                    "Invalid input. All links must point to this dataset. "
                    "Got link(s) with head(s) pointing to another dataset."
                )

        self._parent_dataset_links = links

    @property
    def _writer_status(self) -> _WriterStatus:
        assert self.path_to_db is not None
        return _WRITERS[self.path_to_db]

    @property
    def completed_timestamp_raw(self) -> float | None:
        """
        Returns timestamp when measurement run was completed
        as number of seconds since the Epoch

        If the run (or the dataset) is not completed, then returns None.
        """
        return get_completed_timestamp_from_run_id(self.conn, self.run_id)

    def _get_run_description_from_db(self) -> RunDescriber:
        """
        Look up the run_description from the database
        """
        desc_str = get_run_description(self.conn, self.run_id)
        return serial.from_json_to_current(desc_str)

    def toggle_debug(self) -> None:
        """
        Toggle debug mode, if debug mode is on all the queries made are
        echoed back.
        """
        path_to_db = self.path_to_db
        assert path_to_db is not None
        self._debug = not self._debug
        self.conn.close()
        self.conn = connect(path_to_db, self._debug)

    def set_interdependencies(
        self, interdeps: InterDependencies_, shapes: Shapes | None = None
    ) -> None:
        """
        Set the interdependencies object (which holds all added
        parameters and their relationships) of this dataset and
        optionally the shapes object that holds information about
        the shape of the data to be measured.
        """
        if not isinstance(interdeps, InterDependencies_):
            raise TypeError(
                f"Wrong input type. Expected InterDepencies_, got {type(interdeps)}"
            )

        if not self.pristine:
            mssg = "Can not set interdependencies on a DataSet that has been started."
            raise RuntimeError(mssg)
        self._rundescriber = RunDescriber(interdeps, shapes=shapes)

    def add_metadata(self, tag: str, metadata: Any) -> None:
        """
        Adds metadata to the :class:`.DataSet`. The metadata is stored under the
        provided tag. Note that None is not allowed as a metadata value, and the
        tag has to be a valid python identified (e.g. containing alphanumeric
        characters and underscores).

        Args:
            tag: represents the key in the metadata dictionary
            metadata: actual metadata

        """

        self._metadata[tag] = metadata

        # `add_data_to_dynamic_columns` is not atomic by itself, hence using `atomic`
        with atomic(self.conn) as conn:
            add_data_to_dynamic_columns(conn, self.run_id, {tag: metadata})

        self._add_metadata_to_netcdf_if_nc_exported(tag, metadata)

    def add_snapshot(self, snapshot: str, overwrite: bool = False) -> None:
        """
        Adds a snapshot to this run

        Args:
            snapshot: the raw JSON dump of the snapshot
            overwrite: force overwrite an existing snapshot

        """
        if self.snapshot is None or overwrite:
            with atomic(self.conn) as conn:
                add_data_to_dynamic_columns(conn, self.run_id, {"snapshot": snapshot})
        elif self.snapshot is not None and not overwrite:
            log.warning(
                "This dataset already has a snapshot. Use overwrite"
                "=True to overwrite that"
            )

    @property
    def pristine(self) -> bool:
        """
        Is this :class:`.DataSet` pristine? A pristine :class:`.DataSet` has not yet been started,
        meaning that parameters can still be added and removed, but results
        can not be added.
        """
        return not (self._started or self._completed)

    @property
    def running(self) -> bool:
        """
        Is this :class:`.DataSet` currently running? A running :class:`.DataSet` has been started,
        but not yet completed.
        """
        return self._started and not self._completed

    @property
    def started(self) -> bool:
        """
        Has this :class:`.DataSet` been started? A :class:`.DataSet` not started can not have any
        results added to it.
        """
        return self._started

    @property
    def completed(self) -> bool:
        """
        Is this :class:`.DataSet` completed? A completed :class:`.DataSet` may not be modified in
        any way.
        """
        return self._completed

    @completed.setter
    def completed(self, value: bool) -> None:
        self._completed = value
        if value:
            mark_run_complete(self.conn, self.run_id)

    def mark_started(self, start_bg_writer: bool = False) -> None:
        """
        Mark this :class:`.DataSet` as started. A :class:`.DataSet` that has been started can not
        have its parameters modified.

        Calling this on an already started :class:`.DataSet` is a NOOP.

        Args:
            start_bg_writer: If True, the add_results method will write to the
                database in a separate thread.

        """
        if not self._started:
            self._perform_start_actions(start_bg_writer=start_bg_writer)
            self._started = True

    def _perform_start_actions(self, start_bg_writer: bool) -> None:
        """
        Perform the actions that must take place once the run has been started
        """
        paramspecs = new_to_old(self._rundescriber.interdeps).paramspecs

        for spec in paramspecs:
            add_parameter(
                spec, conn=self.conn, run_id=self.run_id, insert_into_results_table=True
            )

        desc_str = serial.to_json_for_storage(self.description)

        update_run_description(self.conn, self.run_id, desc_str)

        set_run_timestamp(self.conn, self.run_id)

        pdl_str = links_to_str(self._parent_dataset_links)
        update_parent_datasets(self.conn, self.run_id, pdl_str)

        writer_status = self._writer_status

        write_in_background_status = writer_status.write_in_background
        if (
            write_in_background_status is not None
            and write_in_background_status != start_bg_writer
        ):
            raise RuntimeError(
                "All datasets written to the same database must "
                "be written either in the background or in the "
                "main thread. You cannot mix."
            )
        if start_bg_writer:
            writer_status.write_in_background = True
            if writer_status.bg_writer is None:
                writer_status.bg_writer = _BackgroundWriter(
                    writer_status.data_write_queue, self.conn
                )
            if not writer_status.bg_writer.is_alive():
                writer_status.bg_writer.start()
        else:
            writer_status.write_in_background = False

        writer_status.active_datasets.add(self.run_id)
        self.cache.prepare()

    def mark_completed(self) -> None:
        """
        Mark :class:`.DataSet` as complete and thus read only and notify the subscribers
        """
        if self.completed:
            return
        if self.pristine:
            raise RuntimeError(
                "Can not mark DataSet as complete before it "
                "has been marked as started."
            )

        self._perform_completion_actions()
        self.completed = True

    def _perform_completion_actions(self) -> None:
        """
        Perform the necessary clean-up
        """
        for sub in self.subscribers.values():
            sub.done_callback()
        self._ensure_dataset_written()

    def add_results(self, results: Sequence[Mapping[str, VALUE]]) -> None:
        """
        Adds a sequence of results to the :class:`.DataSet`.

        Args:
            results: list of name-value dictionaries where each dictionary
                provides the values for the parameters in that result. If some
                parameters are missing the corresponding values are assumed
                to be None

        It is an error to provide a value for a key or keyword that is not
        the name of a parameter in this :class:`.DataSet`.

        It is an error to add results to a completed :class:`.DataSet`.

        """

        self._raise_if_not_writable()

        expected_keys = frozenset.union(*(frozenset(d) for d in results))
        values = [[d.get(k, None) for k in expected_keys] for d in results]

        writer_status = self._writer_status

        if writer_status.write_in_background:
            item = {
                "keys": list(expected_keys),
                "values": values,
                "table_name": self.table_name,
            }
            writer_status.data_write_queue.put(item)
        else:
            insert_many_values(self.conn, self.table_name, list(expected_keys), values)

    def _raise_if_not_writable(self) -> None:
        if self.pristine:
            raise RuntimeError(
                "This DataSet has not been marked as started. "
                "Please mark the DataSet as started before "
                "adding results to it."
            )
        if self.completed:
            raise CompletedError(
                "This DataSet is complete, no further results can be added to it."
            )

    def _ensure_dataset_written(self) -> None:
        writer_status = self._writer_status

        if writer_status.write_in_background:
            writer_status.data_write_queue.put(
                {"keys": "finalize", "values": self.run_id}
            )
            while self.run_id in writer_status.active_datasets:
                time.sleep(self.background_sleep_time)
        elif self.run_id in writer_status.active_datasets:
            writer_status.active_datasets.remove(self.run_id)
        if len(writer_status.active_datasets) == 0:
            writer_status.write_in_background = None
            if writer_status.bg_writer is not None:
                writer_status.bg_writer.shutdown()
                writer_status.bg_writer = None

    def get_parameter_data(
        self,
        *params: str | ParamSpec | ParameterBase,
        start: int | None = None,
        end: int | None = None,
        callback: Callable[[float], None] | None = None,
    ) -> ParameterData:
        """
        Returns the values stored in the :class:`.DataSet` for the specified parameters
        and their dependencies. If no parameters are supplied the values will
        be returned for all parameters that are not them self dependencies.

        The values are returned as a dictionary with names of the requested
        parameters as keys and values consisting of dictionaries with the
        names of the parameters and its dependencies as keys and numpy arrays
        of the data as values. If the dataset has a shape recorded
        in its metadata and the number of datapoints recorded matches the
        expected number of points the data will be returned as numpy arrays
        in this shape. If there are less datapoints recorded than expected
        from the metadata the dataset will be returned as is. This could happen
        if you call `get_parameter_data` on an incomplete dataset. See
        :py:meth:`dataset.cache.data <.DataSetCache.data>` for an implementation that
        returns the data with the expected shape using `NaN` or zeros as
        placeholders.

        If there are more datapoints than expected the dataset will be returned
        as is and a warning raised.

        If some of the parameters are stored as arrays
        the remaining parameters are expanded to the same shape as these.

        If provided, the start and end arguments select a range of results
        by result count (index). If the range is empty - that is, if the end is
        less than or equal to the start, or if start is after the current end
        of the :class:`.DataSet` - then a list of empty arrays is returned.

        Args:
            *params: string parameter names, QCoDeS Parameter objects, and
                ParamSpec objects. If no parameters are supplied data for
                all parameters that are not a dependency of another
                parameter will be returned.
            start: start value of selection range (by result count); ignored
                if None
            end: end value of selection range (by results count); ignored if
                None
            callback: Function called during the data loading every
                config.dataset.callback_percent.

        Returns:
            Dictionary from requested parameters to Dict of parameter names
            to numpy arrays containing the data points of type numeric,
            array or string.

        """
        if len(params) == 0:
            valid_param_names = [
                ps.name for ps in self._rundescriber.interdeps.non_dependencies
            ]
        else:
            valid_param_names = self._validate_parameters(*params)
        return get_parameter_data(
            self.conn, self.table_name, valid_param_names, start, end, callback
        )

    def to_pandas_dataframe_dict(
        self,
        *params: str | ParamSpec | ParameterBase,
        start: int | None = None,
        end: int | None = None,
    ) -> dict[str, pd.DataFrame]:
        """
        Returns the values stored in the :class:`.DataSet` for the specified parameters
        and their dependencies as a dict of :py:class:`pandas.DataFrame` s
        Each element in the dict is indexed by the names of the requested
        parameters.

        Each DataFrame contains a column for the data and is indexed by a
        :py:class:`pandas.MultiIndex` formed from all the setpoints
        of the parameter.

        If no parameters are supplied data will be be
        returned for all parameters in the :class:`.DataSet` that are not them self
        dependencies of other parameters.

        If provided, the start and end arguments select a range of results
        by result count (index). If the range is empty - that is, if the end is
        less than or equal to the start, or if start is after the current end
        of the :class:`.DataSet` - then a dict of empty :py:class:`pandas.DataFrame` s is
        returned.

        Args:
            *params: string parameter names, QCoDeS Parameter objects, and
                ParamSpec objects. If no parameters are supplied data for
                all parameters that are not a dependency of another
                parameter will be returned.
            start: start value of selection range (by result count); ignored
                if None
            end: end value of selection range (by results count); ignored if
                None

        Returns:
            Dictionary from requested parameter names to
            :py:class:`pandas.DataFrame` s with the requested parameter as
            a column and a indexed by a :py:class:`pandas.MultiIndex` formed
            by the dependencies.

        """
        datadict = self.get_parameter_data(*params, start=start, end=end)
        dfs_dict = load_to_dataframe_dict(datadict)
        return dfs_dict

    def to_pandas_dataframe(
        self,
        *params: str | ParamSpec | ParameterBase,
        start: int | None = None,
        end: int | None = None,
    ) -> pd.DataFrame:
        """
        Returns the values stored in the :class:`.DataSet` for the specified parameters
        and their dependencies as a concatenated :py:class:`pandas.DataFrame` s

        The DataFrame contains a column for the data and is indexed by a
        :py:class:`pandas.MultiIndex` formed from all the setpoints
        of the parameter.

        If no parameters are supplied data will be be
        returned for all parameters in the :class:`.DataSet` that are not them self
        dependencies of other parameters.

        If provided, the start and end arguments select a range of results
        by result count (index). If the range is empty - that is, if the end is
        less than or equal to the start, or if start is after the current end
        of the :class:`.DataSet` - then a dict of empty :py:class:`pandas.DataFrame` s is
        returned.

        Args:
            *params: string parameter names, QCoDeS Parameter objects, and
                ParamSpec objects. If no parameters are supplied data for
                all parameters that are not a dependency of another
                parameter will be returned.
            start: start value of selection range (by result count); ignored
                if None
            end: end value of selection range (by results count); ignored if
                None

        Returns:
            :py:class:`pandas.DataFrame` s with the requested parameter as
            a column and a indexed by a :py:class:`pandas.MultiIndex` formed
            by the dependencies.

        Example:
            Return a pandas DataFrame with
                df = ds.to_pandas_dataframe()

        """
        datadict = self.get_parameter_data(*params, start=start, end=end)
        return load_to_concatenated_dataframe(datadict)

    def to_xarray_dataarray_dict(
        self,
        *params: str | ParamSpec | ParameterBase,
        start: int | None = None,
        end: int | None = None,
        use_multi_index: Literal["auto", "always", "never"] = "auto",
    ) -> dict[str, xr.DataArray]:
        """
        Returns the values stored in the :class:`.DataSet` for the specified parameters
        and their dependencies as a dict of :py:class:`xr.DataArray` s
        Each element in the dict is indexed by the names of the requested
        parameters.

        If no parameters are supplied data will be be
        returned for all parameters in the :class:`.DataSet` that are not them self
        dependencies of other parameters.

        If provided, the start and end arguments select a range of results
        by result count (index). If the range is empty - that is, if the end is
        less than or equal to the start, or if start is after the current end
        of the :class:`.DataSet` - then a dict of empty :py:class:`xr.DataArray` s is
        returned.

        The dependent parameters of the Dataset are normally used as coordinates of the
        XArray dataframe. However if non unique values are found for the dependent parameter
        values we will fall back to using an index as coordinates.

        Args:
            *params: string parameter names, QCoDeS Parameter objects, and
                ParamSpec objects. If no parameters are supplied data for
                all parameters that are not a dependency of another
                parameter will be returned.
            start: start value of selection range (by result count); ignored
                if None
            end: end value of selection range (by results count); ignored if
                None
            use_multi_index: Should the data be exported using a multi index
                rather than regular cartesian indexes. With regular cartesian
                coordinates, the xarray dimensions are calculated from the sets or all
                values along the setpoint axis of the QCoDeS dataset. Any position
                in this grid not corresponding to a measured value will be filled
                with a placeholder (typically NaN) potentially creating a sparse
                dataset with significant storage overhead.
                Multi index avoids this and is therefor better
                suited for data that is known to not be on a grid.
                If set to "auto" multi index will be used if projecting the data onto
                a grid requires filling non measured values with NaN  and the shapes
                of the data has not been set in the run description.

        Returns:
            Dictionary from requested parameter names to :py:class:`xr.DataArray` s
            with the requested parameter(s) as a column(s) and coordinates
            formed by the dependencies.

        Example:
            Return a dict of xr.DataArray with

                dataarray_dict = ds.to_xarray_dataarray_dict()

        """
        data = self.get_parameter_data(*params, start=start, end=end)
        datadict = load_to_xarray_dataarray_dict(
            self, data, use_multi_index=use_multi_index
        )

        return datadict

    def to_xarray_dataset(
        self,
        *params: str | ParamSpec | ParameterBase,
        start: int | None = None,
        end: int | None = None,
        use_multi_index: Literal["auto", "always", "never"] = "auto",
    ) -> xr.Dataset:
        """
        Returns the values stored in the :class:`.DataSet` for the specified parameters
        and their dependencies as a :py:class:`xr.Dataset` object.

        If no parameters are supplied data will be be
        returned for all parameters in the :class:`.DataSet` that are not then self
        dependencies of other parameters.

        If provided, the start and end arguments select a range of results
        by result count (index). If the range is empty - that is, if the end is
        less than or equal to the start, or if start is after the current end
        of the :class:`.DataSet` - then a empty :py:class:`xr.Dataset` s is
        returned.

        The dependent parameters of the Dataset are normally used as coordinates of the
        XArray dataframe. However if non unique values are found for the dependent parameter
        values we will fall back to using an index as coordinates.

        Args:
            *params: string parameter names, QCoDeS Parameter objects, and
                ParamSpec objects. If no parameters are supplied data for
                all parameters that are not a dependency of another
                parameter will be returned.
            start: start value of selection range (by result count); ignored
                if None
            end: end value of selection range (by results count); ignored if
                None
            use_multi_index: Should the data be exported using a multi index
                rather than regular cartesian indexes. With regular cartesian
                coordinates, the xarray dimensions are calculated from the sets or all
                values along the setpoint axis of the QCoDeS dataset. Any position
                in this grid not corresponding to a measured value will be filled
                with a placeholder (typically NaN) potentially creating a sparse
                dataset with significant storage overhead.
                Multi index avoids this and is therefor better
                suited for data that is known to not be on a grid.
                If set to "auto" multi index will be used if projecting the data onto
                a grid requires filling non measured values with NaN  and the shapes
                of the data has not been set in the run description.

        Returns:
            :py:class:`xr.Dataset` with the requested parameter(s) data as
            :py:class:`xr.DataArray` s and coordinates formed by the dependencies.

        Example:
            Return a concatenated xr.Dataset with

                xds = ds.to_xarray_dataset()

        """
        data = self.get_parameter_data(*params, start=start, end=end)

        return load_to_xarray_dataset(self, data, use_multi_index=use_multi_index)

    def write_data_to_text_file(
        self, path: str, single_file: bool = False, single_file_name: str | None = None
    ) -> None:
        """
        An auxiliary function to export data to a text file. When the data with more
        than one dependent variables, say "y(x)" and "z(x)", is concatenated to a single file
        it reads:

                    x1  y1(x1)  z1(x1)
                    x2  y2(x2)  z2(x2)
                    ..    ..      ..
                    xN  yN(xN)  zN(xN)

        For each new independent variable, say "k", the expansion is in the y-axis:

                    x1  y1(x1)  z1(x1)
                    x2  y2(x2)  z2(x2)
                    ..    ..      ..
                    xN  yN(xN)  zN(xN)
                    k1  y1(k1)  z1(k1)
                    k2  y2(k2)  z2(k2)
                    ..    ..      ..
                    kN  yN(kN)  zN(kN)

        Args:
            path: User defined path where the data to be exported
            single_file: If true, merges the data of same length of multiple
                         dependent parameters to a single file.
            single_file_name: User defined name for the data to be concatenated.
                              If no extension is passed (.dat, .csv or .txt),
                              .dat is automatically appended.

        Raises:
            DataLengthException: If the data of multiple parameters have not same
                                 length and wanted to be merged in a single file.
            DataPathException: If the data of multiple parameters are wanted to be merged
                               in a single file but no filename provided.

        """
        dfdict = self.to_pandas_dataframe_dict()
        dataframe_to_csv(
            dfdict=dfdict,
            path=path,
            single_file=single_file,
            single_file_name=single_file_name,
        )

    def subscribe(
        self,
        callback: Callable[[Any, int, Any | None], None],
        min_wait: int = 0,
        min_count: int = 1,
        state: Any | None = None,
        callback_kwargs: Mapping[str, Any] | None = None,
    ) -> str:
        subscriber_id = uuid.uuid4().hex
        subscriber = _Subscriber(
            self, subscriber_id, callback, state, min_wait, min_count, callback_kwargs
        )
        self.subscribers[subscriber_id] = subscriber
        subscriber.start()
        return subscriber_id

    def subscribe_from_config(self, name: str) -> str:
        """
        Subscribe a subscriber defined in the `qcodesrc.json` config file to
        the data of this :class:`.DataSet`. The definition can be found at
        ``subscription.subscribers`` in the ``qcodesrc.json`` config file.

        Args:
            name: identifier of the subscriber. Equal to the key of the entry
                in ``qcodesrc.json::subscription.subscribers``.

        """
        subscribers = qcodes.config.subscription.subscribers
        try:
            subscriber_info = getattr(subscribers, name)
        # the dot dict behind the config does not convert the error and
        # actually raises a `KeyError`
        except (AttributeError, KeyError):
            keys = ",".join(subscribers.keys())
            raise RuntimeError(
                f'subscribe_from_config: failed to subscribe "{name}" to '
                f"DataSet from list of subscribers in `qcodesrc.json` "
                f"(subscriptions.subscribers). Chose one of: {keys}"
            )
        # get callback from string
        parts = subscriber_info.factory.split(".")
        import_path, type_name = ".".join(parts[:-1]), parts[-1]
        module = importlib.import_module(import_path)
        factory = getattr(module, type_name)

        kwargs = {k: v for k, v in subscriber_info.subscription_kwargs.items()}
        kwargs["callback"] = factory(self, **subscriber_info.factory_kwargs)
        kwargs["state"] = {}
        return self.subscribe(**kwargs)

    def unsubscribe(self, uuid: str) -> None:
        """
        Remove subscriber with the provided uuid
        """
        with atomic(self.conn) as conn:
            sub = self.subscribers[uuid]
            remove_trigger(conn, sub.trigger_id)
            sub.schedule_stop()
            sub.join()
            del self.subscribers[uuid]

    def unsubscribe_all(self) -> None:
        """
        Remove all subscribers
        """
        sql = """
        SELECT name FROM sqlite_master
        WHERE type = 'trigger'
        """
        triggers = atomic_transaction(self.conn, sql).fetchall()
        with atomic(self.conn) as conn:
            for (trigger,) in triggers:
                remove_trigger(conn, trigger)
            for sub in self.subscribers.values():
                sub.schedule_stop()
                sub.join()
            self.subscribers.clear()

    def get_metadata(self, tag: str) -> VALUE | None:
        """Get metadata by tag. Returns None if no metadata is stored under that tag"""
        return get_data_by_tag_and_table_name(self.conn, tag, self.table_name)

    def __len__(self) -> int:
        return length(self.conn, self.table_name)

    def __repr__(self) -> str:
        out = []
        heading = f"{self.name} #{self.run_id}@{self.path_to_db}"
        out.append(heading)
        out.append("-" * len(heading))
        ps = self.get_parameters()
        if len(ps) > 0:
            for p in ps:
                out.append(f"{p.name} - {p.type}")

        return "\n".join(out)

    def _enqueue_results(
        self, result_dict: Mapping[ParamSpecBase, numpy.ndarray]
    ) -> None:
        """
        Enqueue the results into self._results

        Before we can enqueue the results, all values of the results dict
        must have the same length. We enqueue each parameter tree separately,
        effectively mimicking making one call to add_results per parameter
        tree.

        Deal with 'numeric' type parameters. If a 'numeric' top level parameter
        has non-scalar shape, it must be unrolled into a list of dicts of
        single values (database).
        """
        self._raise_if_not_writable()
        interdeps = self._rundescriber.interdeps

        toplevel_params = set(interdeps.dependencies).intersection(set(result_dict))

        new_results: dict[str, dict[str, numpy.ndarray]] = {}

        for toplevel_param in toplevel_params:
            inff_params = set(interdeps.inferences.get(toplevel_param, ()))
            deps_params = set(interdeps.dependencies.get(toplevel_param, ()))
            all_params = inff_params.union(deps_params).union({toplevel_param})

            if self._in_memory_cache:
                new_results[toplevel_param.name] = {}
                new_results[toplevel_param.name][toplevel_param.name] = (
                    self._reshape_array_for_cache(
                        toplevel_param, result_dict[toplevel_param]
                    )
                )
                for param in all_params:
                    if param is not toplevel_param:
                        new_results[toplevel_param.name][param.name] = (
                            self._reshape_array_for_cache(param, result_dict[param])
                        )

            if toplevel_param.type == "array":
                res_list = self._finalize_res_dict_array(result_dict, all_params)
            elif toplevel_param.type in ("numeric", "text", "complex"):
                res_list = self._finalize_res_dict_numeric_text_or_complex(
                    result_dict, toplevel_param, inff_params, deps_params
                )
            else:
                res_dict: dict[str, VALUE] = {
                    ps.name: result_dict[ps] for ps in all_params
                }
                res_list = [res_dict]
            self._results += res_list

        # Finally, handle standalone parameters

        standalones = set(interdeps.standalones).intersection(set(result_dict))

        if standalones:
            stdln_dict = {st: result_dict[st] for st in standalones}
            self._results += self._finalize_res_dict_standalones(stdln_dict)
            if self._in_memory_cache:
                for st in standalones:
                    new_results[st.name] = {
                        st.name: self._reshape_array_for_cache(st, result_dict[st])
                    }

        if self._in_memory_cache:
            self.cache.add_data(new_results)

    @staticmethod
    def _finalize_res_dict_array(
        result_dict: Mapping[ParamSpecBase, values_type], all_params: set[ParamSpecBase]
    ) -> list[dict[str, VALUE]]:
        """
        Make a list of res_dicts out of the results for a 'array' type
        parameter. The results are assumed to already have been validated for
        type and shape
        """

        def reshaper(val: Any, ps: ParamSpecBase) -> VALUE:
            paramtype = ps.type
            if paramtype == "numeric":
                return float(val)
            elif paramtype == "text":
                return str(val)
            elif paramtype == "complex":
                return complex(val)
            elif paramtype == "array":
                if val.shape:
                    return val
                else:
                    return numpy.reshape(val, (1,))
            else:
                raise ValueError(
                    f"Cannot handle unknown paramtype {paramtype!r} of {ps!r}."
                )

        res_dict = {ps.name: reshaper(result_dict[ps], ps) for ps in all_params}

        return [res_dict]

    @staticmethod
    def _finalize_res_dict_numeric_text_or_complex(
        result_dict: Mapping[ParamSpecBase, numpy.ndarray],
        toplevel_param: ParamSpecBase,
        inff_params: set[ParamSpecBase],
        deps_params: set[ParamSpecBase],
    ) -> list[dict[str, VALUE]]:
        """
        Make a res_dict in the format expected by DataSet.add_results out
        of the results for a 'numeric' or text type parameter. This includes
        replicating and unrolling values as needed and also handling the corner
        case of np.array(1) kind of values
        """

        res_list: list[dict[str, VALUE]] = []
        all_params = inff_params.union(deps_params).union({toplevel_param})

        t_map = {"numeric": float, "text": str, "complex": complex}

        toplevel_shape = result_dict[toplevel_param].shape
        if toplevel_shape == ():
            # In the case of a single value, life is reasonably simple
            res_list = [{ps.name: t_map[ps.type](result_dict[ps]) for ps in all_params}]
        else:
            # We first massage all values into np.arrays of the same
            # shape
            flat_results: dict[str, numpy.ndarray] = {}

            toplevel_val = result_dict[toplevel_param]
            flat_results[toplevel_param.name] = toplevel_val.ravel()
            N = len(flat_results[toplevel_param.name])
            for dep in deps_params:
                if result_dict[dep].shape == ():
                    flat_results[dep.name] = numpy.repeat(result_dict[dep], N)
                else:
                    flat_results[dep.name] = result_dict[dep].ravel()
            for inff in inff_params:
                if numpy.shape(result_dict[inff]) == ():
                    flat_results[inff.name] = numpy.repeat(result_dict[inff], N)
                else:
                    flat_results[inff.name] = result_dict[inff].ravel()

            # And then put everything into the list

            res_list = [
                {p.name: flat_results[p.name][ind] for p in all_params}
                for ind in range(N)
            ]

        return res_list

    @staticmethod
    def _finalize_res_dict_standalones(
        result_dict: Mapping[ParamSpecBase, numpy.ndarray],
    ) -> list[dict[str, VALUE]]:
        """
        Massage all standalone parameters into the correct shape
        """
        res_list: list[dict[str, VALUE]] = []
        for param, value in result_dict.items():
            if param.type == "text":
                if value.shape:
                    new_res: list[dict[str, VALUE]] = [
                        {param.name: str(val)} for val in value
                    ]
                    res_list += new_res
                else:
                    new_res = [{param.name: str(value)}]
                    res_list += new_res
            elif param.type == "numeric":
                if value.shape:
                    res_list += [{param.name: number} for number in value]
                else:
                    new_res = [{param.name: float(value)}]
                    res_list += new_res
            elif param.type == "complex":
                if value.shape:
                    res_list += [{param.name: number} for number in value]
                else:
                    new_res = [{param.name: complex(value)}]
                    res_list += new_res
            else:
                new_res = [{param.name: value}]
                res_list += new_res

        return res_list

    def _flush_data_to_database(self, block: bool = False) -> None:
        """
        Write the in-memory results to the database.

        Args:
            block: If writing using a background thread block until the
                background thread has written all data to disc. The
                argument has no effect if not using a background thread.

        """

        log.debug("Flushing to database")
        writer_status = self._writer_status
        if len(self._results) > 0:
            try:
                self.add_results(self._results)
                if writer_status.write_in_background:
                    log.debug("Successfully enqueued result for write thread")
                else:
                    log.debug("Successfully wrote result to disk")
                self._results = []
            except Exception as e:
                if writer_status.write_in_background:
                    log.warning(f"Could not enqueue result; {e}")
                else:
                    log.warning(f"Could not commit to database; {e}")
        else:
            log.debug("No results to flush")

        if writer_status.write_in_background and block:
            log.debug("Waiting for write queue to empty.")
            writer_status.data_write_queue.join()

    @property
    def export_info(self) -> ExportInfo:
        return self._export_info

    def _set_export_info(self, export_info: ExportInfo) -> None:
        tag = "export_info"
        data = export_info.to_str()

        self._metadata[tag] = data
        # `add_data_to_dynamic_columns` is not atomic by itself, hence using `atomic`
        with atomic(self.conn) as conn:
            add_data_to_dynamic_columns(conn, self.run_id, {tag: data})

        self._export_info = export_info

    def _export_as_netcdf(self, path: Path, file_name: str) -> Path:
        """Export data as netcdf to a given path with file prefix"""
        import xarray as xr

        file_path = path / file_name
        if (
            qcodes.config.dataset.export_chunked_export_of_large_files_enabled
            and self._estimate_ds_size()
            > qcodes.config.dataset.export_chunked_threshold
        ):
            log.info(
                "Dataset is expected to be larger that threshold. Using distributed export.",
                extra={
                    "file_name": str(file_path),
                    "qcodes_guid": self.guid,
                    "ds_name": self.name,
                    "exp_name": self.exp_name,
                    "_export_limit": qcodes.config.dataset.export_chunked_threshold,
                    "_estimated_ds_size": self._estimate_ds_size(),
                },
            )
            print(
                "Large dataset detected. Will write to multiple files first and combine after, to reduce memory overhead."
            )
            with tempfile.TemporaryDirectory() as temp_dir:
                temp_path = Path(temp_dir)
                log.info(
                    "Writing individual files to temp dir.",
                    extra={
                        "file_name": str(file_path),
                        "qcodes_guid": self.guid,
                        "ds_name": self.name,
                        "exp_name": self.exp_name,
                        "temp_dir": temp_dir,
                    },
                )
                num_files = len(self)
                num_digits = len(str(num_files))
                file_name_template = f"ds_{{:0{num_digits}d}}.nc"
                for i in trange(num_files, desc="Writing individual files"):
                    xarray_to_h5netcdf_with_complex_numbers(
                        self.to_xarray_dataset(start=i + 1, end=i + 1),
                        temp_path / file_name_template.format(i),
                    )
                files = tuple(temp_path.glob("*.nc"))
                data = xr.open_mfdataset(files)
                try:
                    log.info(
                        "Combining temp files into one file.",
                        extra={
                            "file_name": str(file_path),
                            "qcodes_guid": self.guid,
                            "ds_name": self.name,
                            "exp_name": self.exp_name,
                            "temp_dir": temp_dir,
                        },
                    )
                    xarray_to_h5netcdf_with_complex_numbers(
                        data, file_path, compute=False
                    )
                finally:
                    data.close()
        else:
            log.info(
                "Writing netcdf file directly.",
                extra={"file_name": str(file_path)},
            )

            file_path = super()._export_as_netcdf(path=path, file_name=file_name)
        return file_path

    def _estimate_ds_size(self) -> float:
        """
        Give an estimated size of the dataset as the size of a single row
        times the len of the dataset. Result is returned in Mega Bytes.

        Note that this does not take overhead from storing the array into account
        so it is assumed that the total array is large compared to the overhead.
        """
        sample_data = self.get_parameter_data(start=1, end=1)
        row_size = 0.0

        for param_data in sample_data.values():
            for array in param_data.values():
                row_size += array.size * array.dtype.itemsize
        return row_size * len(self) / 1024 / 1024


# public api
[docs] def load_by_run_spec( *, captured_run_id: int | None = None, captured_counter: int | None = None, experiment_name: str | None = None, sample_name: str | None = None, # guid parts sample_id: int | None = None, location: int | None = None, work_station: int | None = None, conn: ConnectionPlus | None = None, ) -> DataSetProtocol: """ Load a run from one or more pieces of runs specification. All fields are optional but the function will raise an error if more than one run matching the supplied specification is found. Along with the error specs of the runs found will be printed. If the raw data is in the database this will be loaded as a :class:`DataSet`. Otherwise it will be loaded as a :class:`.DataSetInMemory` If the raw data is exported to netcdf and ``qcodes.config.dataset.load_from_exported_file`` is set to True. this will be loaded from file as a :class:`DataSetInMemory`. regardless. Args: captured_run_id: The ``run_id`` that was originally assigned to this at the time of capture. captured_counter: The counter that was originally assigned to this at the time of capture. experiment_name: name of the experiment that the run was captured sample_name: The name of the sample given when creating the experiment. sample_id: The sample_id assigned as part of the GUID. location: The location code assigned as part of GUID. work_station: The workstation assigned as part of the GUID. conn: An optional connection to the database. If no connection is supplied a connection to the default database will be opened. Raises: NameError: if no run or more than one run with the given specification exists in the database Returns: :class:`qcodes.dataset.data_set.DataSet` or :class:`.DataSetInMemory` matching the provided specification. """ internal_conn = conn or connect(get_DB_location()) d: DataSetProtocol | None = None try: guids = get_guids_by_run_spec( captured_run_id=captured_run_id, captured_counter=captured_counter, experiment_name=experiment_name, sample_name=sample_name, # guid parts sample_id=sample_id, location=location, work_station=work_station, conn=internal_conn, ) if len(guids) == 1: d = load_by_guid(guids[0], internal_conn) elif len(guids) > 1: print(generate_dataset_table(guids, conn=internal_conn)) raise NameError( "More than one matching dataset found. " "Please supply more information to uniquely" "identify a dataset" ) else: raise NameError("No run matching the supplied information found.") finally: if not conn and not isinstance(d, DataSet): internal_conn.close() assert d is not None return d
[docs] def get_guids_by_run_spec( *, captured_run_id: int | None = None, captured_counter: int | None = None, experiment_name: str | None = None, sample_name: str | None = None, # guid parts sample_id: int | None = None, location: int | None = None, work_station: int | None = None, conn: ConnectionPlus | None = None, ) -> list[str]: """ Get a list of matching guids from one or more pieces of runs specification. All fields are optional. Args: captured_run_id: The ``run_id`` that was originally assigned to this at the time of capture. captured_counter: The counter that was originally assigned to this at the time of capture. experiment_name: name of the experiment that the run was captured sample_name: The name of the sample given when creating the experiment. sample_id: The sample_id assigned as part of the GUID. location: The location code assigned as part of GUID. work_station: The workstation assigned as part of the GUID. conn: An optional connection to the database. If no connection is supplied a connection to the default database will be opened. Returns: List of guids matching the run spec. """ internal_conn = conn or connect(get_DB_location()) try: guids = _query_guids_from_run_spec( internal_conn, captured_run_id=captured_run_id, captured_counter=captured_counter, experiment_name=experiment_name, sample_name=sample_name, ) matched_guids = filter_guids_by_parts(guids, location, sample_id, work_station) finally: if not conn: internal_conn.close() return matched_guids
[docs] def load_by_id(run_id: int, conn: ConnectionPlus | None = None) -> DataSetProtocol: """ Load a dataset by run id If no connection is provided, lookup is performed in the database file that is specified in the config. Note that the ``run_id`` used in this function in not preserved when copying data to another db file. We recommend using :func:`.load_by_run_spec` which does not have this issue and is significantly more flexible. If the raw data is in the database this will be loaded as a :class:`DataSet`. Otherwise it will be loaded as a :class:`.DataSetInMemory` If the raw data is exported to netcdf and ``qcodes.config.dataset.load_from_exported_file`` is set to True. this will be loaded from file as a :class:`DataSetInMemory`. regardless. Args: run_id: run id of the dataset conn: connection to the database to load from Returns: :class:`qcodes.dataset.data_set.DataSet` or :class:`.DataSetInMemory` with the given run id """ if run_id is None: raise ValueError("run_id has to be a positive integer, not None.") internal_conn = conn or connect(get_DB_location()) d: DataSetProtocol | None = None try: guid = get_guid_from_run_id(internal_conn, run_id) if guid is None: raise ValueError( f"Run with run_id {run_id} does not exist in the database: {internal_conn.path_to_dbfile}" ) d = _get_datasetprotocol_from_guid(guid, internal_conn) finally: # dataset takes ownership of the connection but DataSetInMem does not if not conn and not isinstance(d, DataSet): internal_conn.close() assert d is not None return d
[docs] def load_by_guid(guid: str, conn: ConnectionPlus | None = None) -> DataSetProtocol: """ Load a dataset by its GUID If no connection is provided, lookup is performed in the database file that is specified in the config. If the raw data is in the database this will be loaded as a :class:`DataSet`. Otherwise it will be loaded as a :class:`.DataSetInMemory` If the raw data is exported to netcdf and ``qcodes.config.dataset.load_from_exported_file`` is set to True. this will be loaded from file as a :class:`DataSetInMemory`. regardless. Args: guid: guid of the dataset conn: connection to the database to load from Returns: :class:`qcodes.dataset.data_set.DataSet` or :class:`.DataSetInMemory` with the given guid Raises: NameError: if no run with the given GUID exists in the database RuntimeError: if several runs with the given GUID are found """ internal_conn = conn or connect(get_DB_location()) d: DataSetProtocol | None = None # this function raises a RuntimeError if more than one run matches the GUID try: d = _get_datasetprotocol_from_guid(guid, internal_conn) finally: # dataset takes ownership of the connection but DataSetInMem does not if not conn and not isinstance(d, DataSet): internal_conn.close() assert d is not None return d
[docs] def load_by_counter( counter: int, exp_id: int, conn: ConnectionPlus | None = None ) -> DataSetProtocol: """ Load a dataset given its counter in a given experiment Lookup is performed in the database file that is specified in the config. Note that the `counter` used in this function in not preserved when copying data to another db file. We recommend using :func:`.load_by_run_spec` which does not have this issue and is significantly more flexible. If the raw data is in the database this will be loaded as a :class:`DataSet`. Otherwise it will be loaded as a :class:`.DataSetInMemory` If the raw data is exported to netcdf and ``qcodes.config.dataset.load_from_exported_file`` is set to True. this will be loaded from file as a :class:`DataSetInMemory`. regardless. Args: counter: counter of the dataset within the given experiment exp_id: id of the experiment where to look for the dataset conn: connection to the database to load from. If not provided, a connection to the DB file specified in the config is made Returns: :class:`DataSet` or :class:`.DataSetInMemory` of the given counter in the given experiment """ internal_conn = conn or connect(get_DB_location()) d: DataSetProtocol | None = None # this function raises a RuntimeError if more than one run matches the GUID try: guid = get_guid_from_expid_and_counter(internal_conn, exp_id, counter) d = _get_datasetprotocol_from_guid(guid, internal_conn) finally: # dataset takes ownership of the connection but DataSetInMem does not if not conn and not isinstance(d, DataSet): internal_conn.close() assert d is not None return d
def _get_datasetprotocol_from_guid(guid: str, conn: ConnectionPlus) -> DataSetProtocol: run_id = get_runid_from_guid(conn, guid) if run_id is None: raise NameError( "No run with GUID: %s found in database: %s", guid, conn.path_to_dbfile ) if qcodes.config.dataset.load_from_exported_file: export_info = _get_datasetprotocol_export_info(run_id=run_id, conn=conn) export_file_path = export_info.export_paths.get( DataExportType.NETCDF.value, None ) if export_file_path is not None: try: d: DataSetProtocol = load_from_file(export_file_path) except (ValueError, FileNotFoundError) as e: log.warning("Cannot load data from file: %s", e) else: return d result_table_name = _get_result_table_name_by_guid(conn, guid) if _check_if_table_found(conn, result_table_name): d = DataSet(conn=conn, run_id=run_id) else: d = DataSetInMem._load_from_db(conn=conn, guid=guid) return d def _get_datasetprotocol_export_info(run_id: int, conn: ConnectionPlus) -> ExportInfo: metadata = get_metadata_from_run_id(conn=conn, run_id=run_id) export_info_str = metadata.get("export_info", "") export_info = ExportInfo.from_str(export_info_str) return export_info
[docs] def new_data_set( name: str, exp_id: int | None = None, specs: SPECS | None = None, values: VALUES | None = None, metadata: Any | None = None, conn: ConnectionPlus | None = None, in_memory_cache: bool = True, ) -> DataSet: """ Create a new dataset in the currently active/selected database. If ``exp_id`` is not specified, the last experiment will be loaded by default. Args: name: the name of the new dataset exp_id: the id of the experiments this dataset belongs to, defaults to the last experiment specs: list of parameters to create this dataset with values: the values to associate with the parameters metadata: the metadata to associate with the dataset conn: Existing connection to the database. in_memory_cache: Should measured data be keep in memory and available as part of the `dataset.cache` object. Return: the newly created :class:`qcodes.dataset.data_set.DataSet` """ # note that passing `conn` is a secret feature that is unfortunately used # in `Runner` to pass a connection from an existing `Experiment`. d = DataSet( path_to_db=None, run_id=None, conn=conn, name=name, specs=specs, values=values, metadata=metadata, exp_id=exp_id, in_memory_cache=in_memory_cache, ) return d
def generate_dataset_table( guids: Sequence[str], conn: ConnectionPlus | None = None ) -> str: """ Generate an ASCII art table of information about the runs attached to the supplied guids. Args: guids: Sequence of one or more guids conn: A ConnectionPlus object with a connection to the database. Returns: ASCII art table of information about the supplied guids. """ from tabulate import tabulate headers = ( "captured_run_id", "captured_counter", "experiment_name", "sample_name", "location", "work_station", ) table = [] for guid in guids: ds = load_by_guid(guid, conn=conn) parsed_guid = parse_guid(guid) table.append( [ ds.captured_run_id, ds.captured_counter, ds.exp_name, ds.sample_name, parsed_guid["location"], parsed_guid["work_station"], ] ) return tabulate(table, headers=headers)