Source code for qcodes.dataset.database_extract_runs
from __future__ import annotations
import os
from typing import TYPE_CHECKING
from warnings import warn
import numpy as np
from qcodes.dataset.data_set import DataSet
from qcodes.dataset.dataset_helpers import _add_run_to_runs_table
from qcodes.dataset.experiment_container import _create_exp_if_needed
from qcodes.dataset.sqlite.connection import ConnectionPlus, atomic
from qcodes.dataset.sqlite.database import (
connect,
get_db_version_and_newest_available_version,
)
from qcodes.dataset.sqlite.queries import (
_populate_results_table,
get_exp_ids_from_run_ids,
get_experiment_attributes_by_exp_id,
get_runid_from_guid,
is_run_id_in_database,
)
if TYPE_CHECKING:
from pathlib import Path
[docs]
def extract_runs_into_db(
source_db_path: str | Path,
target_db_path: str | Path,
*run_ids: int,
upgrade_source_db: bool = False,
upgrade_target_db: bool = False,
) -> None:
"""
Extract a selection of runs into another DB file. All runs must come from
the same experiment. They will be added to an experiment with the same name
and ``sample_name`` in the target db. If such an experiment does not exist, it
will be created.
Args:
source_db_path: Path to the source DB file
target_db_path: Path to the target DB file. The target DB file will be
created if it does not exist.
run_ids: The ``run_id``'s of the runs to copy into the target DB file
upgrade_source_db: If the source DB is found to be in a version that is
not the newest, should it be upgraded?
upgrade_target_db: If the target DB is found to be in a version that is
not the newest, should it be upgraded?
"""
# Check for versions
(s_v, new_v) = get_db_version_and_newest_available_version(source_db_path)
if s_v < new_v and not upgrade_source_db:
warn(
f"Source DB version is {s_v}, but this function needs it to be"
f" in version {new_v}. Run this function again with "
"upgrade_source_db=True to auto-upgrade the source DB file."
)
return
if os.path.exists(target_db_path):
(t_v, new_v) = get_db_version_and_newest_available_version(target_db_path)
if t_v < new_v and not upgrade_target_db:
warn(
f"Target DB version is {t_v}, but this function needs it to "
f"be in version {new_v}. Run this function again with "
"upgrade_target_db=True to auto-upgrade the target DB file."
)
return
source_conn = connect(source_db_path)
# Validate that all runs are in the source database
do_runs_exist = is_run_id_in_database(source_conn, *run_ids)
if False in do_runs_exist.values():
source_conn.close()
non_existing_ids = [rid for rid in run_ids if not do_runs_exist[rid]]
err_mssg = (
"Error: not all run_ids exist in the source database. "
"The following run(s) is/are not present: "
f"{non_existing_ids}"
)
raise ValueError(err_mssg)
# Validate that all runs are from the same experiment
source_exp_ids = np.unique(get_exp_ids_from_run_ids(source_conn, run_ids))
if len(source_exp_ids) != 1:
source_conn.close()
raise ValueError(
"Did not receive runs from a single experiment. "
f"Got runs from experiments {source_exp_ids}"
)
# Fetch the attributes of the runs' experiment
# hopefully, this is enough to uniquely identify the experiment
exp_attrs = get_experiment_attributes_by_exp_id(source_conn, source_exp_ids[0])
# Massage the target DB file to accommodate the runs
# (create new experiment if needed)
target_conn = connect(target_db_path)
# this function raises if the target DB file has several experiments
# matching both the name and sample_name
try:
with atomic(target_conn) as target_conn:
target_exp_id = _create_exp_if_needed(
target_conn,
exp_attrs["name"],
exp_attrs["sample_name"],
exp_attrs["format_string"],
exp_attrs["start_time"],
exp_attrs["end_time"],
)
# Finally insert the runs
for run_id in run_ids:
_extract_single_dataset_into_db(
DataSet(run_id=run_id, conn=source_conn), target_conn, target_exp_id
)
finally:
source_conn.close()
target_conn.close()
def _extract_single_dataset_into_db(
dataset: DataSet, target_conn: ConnectionPlus, target_exp_id: int
) -> None:
"""
NB: This function should only be called from within
meth:`extract_runs_into_db`
Insert the given dataset into the specified database file as the latest
run.
Trying to insert a run already in the DB is a NOOP.
Args:
dataset: A dataset representing the run to be copied
target_conn: connection to the DB. Must be atomically guarded
target_exp_id: The ``exp_id`` of the (target DB) experiment in which to
insert the run
"""
if not dataset.completed:
raise ValueError(
"Dataset not completed. An incomplete dataset "
"can not be copied. The incomplete dataset has "
f"GUID: {dataset.guid} and run_id: {dataset.run_id}"
)
source_conn = dataset.conn
run_id = get_runid_from_guid(target_conn, dataset.guid)
if run_id is not None:
return
_, _, target_table_name = _add_run_to_runs_table(
dataset, target_conn, target_exp_id
)
assert target_table_name is not None
_populate_results_table(
source_conn, target_conn, dataset.table_name, target_table_name
)