Source code for qcodes.dataset.database_extract_runs

from __future__ import annotations

import os
from typing import TYPE_CHECKING
from warnings import warn

import numpy as np

from qcodes.dataset.data_set import DataSet
from qcodes.dataset.dataset_helpers import _add_run_to_runs_table
from qcodes.dataset.experiment_container import _create_exp_if_needed
from qcodes.dataset.sqlite.connection import ConnectionPlus, atomic
from qcodes.dataset.sqlite.database import (
    connect,
    get_db_version_and_newest_available_version,
)
from qcodes.dataset.sqlite.queries import (
    _populate_results_table,
    get_exp_ids_from_run_ids,
    get_experiment_attributes_by_exp_id,
    get_runid_from_guid,
    is_run_id_in_database,
)

if TYPE_CHECKING:
    from pathlib import Path


[docs] def extract_runs_into_db( source_db_path: str | Path, target_db_path: str | Path, *run_ids: int, upgrade_source_db: bool = False, upgrade_target_db: bool = False, ) -> None: """ Extract a selection of runs into another DB file. All runs must come from the same experiment. They will be added to an experiment with the same name and ``sample_name`` in the target db. If such an experiment does not exist, it will be created. Args: source_db_path: Path to the source DB file target_db_path: Path to the target DB file. The target DB file will be created if it does not exist. run_ids: The ``run_id``'s of the runs to copy into the target DB file upgrade_source_db: If the source DB is found to be in a version that is not the newest, should it be upgraded? upgrade_target_db: If the target DB is found to be in a version that is not the newest, should it be upgraded? """ # Check for versions (s_v, new_v) = get_db_version_and_newest_available_version(source_db_path) if s_v < new_v and not upgrade_source_db: warn(f'Source DB version is {s_v}, but this function needs it to be' f' in version {new_v}. Run this function again with ' 'upgrade_source_db=True to auto-upgrade the source DB file.') return if os.path.exists(target_db_path): (t_v, new_v) = get_db_version_and_newest_available_version(target_db_path) if t_v < new_v and not upgrade_target_db: warn(f'Target DB version is {t_v}, but this function needs it to ' f'be in version {new_v}. Run this function again with ' 'upgrade_target_db=True to auto-upgrade the target DB file.') return source_conn = connect(source_db_path) # Validate that all runs are in the source database do_runs_exist = is_run_id_in_database(source_conn, *run_ids) if False in do_runs_exist.values(): source_conn.close() non_existing_ids = [rid for rid in run_ids if not do_runs_exist[rid]] err_mssg = ("Error: not all run_ids exist in the source database. " "The following run(s) is/are not present: " f"{non_existing_ids}") raise ValueError(err_mssg) # Validate that all runs are from the same experiment source_exp_ids = np.unique(get_exp_ids_from_run_ids(source_conn, run_ids)) if len(source_exp_ids) != 1: source_conn.close() raise ValueError('Did not receive runs from a single experiment. ' f'Got runs from experiments {source_exp_ids}') # Fetch the attributes of the runs' experiment # hopefully, this is enough to uniquely identify the experiment exp_attrs = get_experiment_attributes_by_exp_id(source_conn, source_exp_ids[0]) # Massage the target DB file to accommodate the runs # (create new experiment if needed) target_conn = connect(target_db_path) # this function raises if the target DB file has several experiments # matching both the name and sample_name try: with atomic(target_conn) as target_conn: target_exp_id = _create_exp_if_needed(target_conn, exp_attrs['name'], exp_attrs['sample_name'], exp_attrs['format_string'], exp_attrs['start_time'], exp_attrs['end_time']) # Finally insert the runs for run_id in run_ids: _extract_single_dataset_into_db(DataSet(run_id=run_id, conn=source_conn), target_conn, target_exp_id) finally: source_conn.close() target_conn.close()
def _extract_single_dataset_into_db(dataset: DataSet, target_conn: ConnectionPlus, target_exp_id: int) -> None: """ NB: This function should only be called from within meth:`extract_runs_into_db` Insert the given dataset into the specified database file as the latest run. Trying to insert a run already in the DB is a NOOP. Args: dataset: A dataset representing the run to be copied target_conn: connection to the DB. Must be atomically guarded target_exp_id: The ``exp_id`` of the (target DB) experiment in which to insert the run """ if not dataset.completed: raise ValueError('Dataset not completed. An incomplete dataset ' 'can not be copied. The incomplete dataset has ' f'GUID: {dataset.guid} and run_id: {dataset.run_id}') source_conn = dataset.conn run_id = get_runid_from_guid(target_conn, dataset.guid) if run_id is not None: return _, _, target_table_name = _add_run_to_runs_table( dataset, target_conn, target_exp_id ) assert target_table_name is not None _populate_results_table( source_conn, target_conn, dataset.table_name, target_table_name )