Source code for qcodes.dataset.database_extract_runs

from __future__ import annotations

import logging
import os
from contextlib import closing
from pathlib import Path
from typing import TYPE_CHECKING, Literal
from warnings import warn

import numpy as np
from opentelemetry import trace
from tqdm.auto import tqdm

from qcodes.dataset.data_set import DataSet, load_by_id
from qcodes.dataset.data_set_in_memory import load_from_netcdf
from qcodes.dataset.dataset_helpers import _add_run_to_runs_table
from qcodes.dataset.experiment_container import _create_exp_if_needed
from qcodes.dataset.export_config import get_data_export_path
from qcodes.dataset.sqlite.connection import AtomicConnection, atomic
from qcodes.dataset.sqlite.database import (
    connect,
    get_db_version_and_newest_available_version,
)
from qcodes.dataset.sqlite.queries import (
    _populate_results_table,
    get_exp_ids_from_run_ids,
    get_experiment_attributes_by_exp_id,
    get_runid_from_guid,
    get_runs,
    is_run_id_in_database,
)

if TYPE_CHECKING:
    from qcodes.dataset.data_set_protocol import DataSetProtocol

_LOG = logging.getLogger(__name__)
_TRACER = trace.get_tracer(__name__)


[docs] @_TRACER.start_as_current_span(f"{__name__}.extract_runs_into_db") def extract_runs_into_db( source_db_path: str | Path, target_db_path: str | Path, *run_ids: int, upgrade_source_db: bool = False, upgrade_target_db: bool = False, ) -> None: """ Extract a selection of runs into another DB file. All runs must come from the same experiment. They will be added to an experiment with the same name and ``sample_name`` in the target db. If such an experiment does not exist, it will be created. Args: source_db_path: Path to the source DB file target_db_path: Path to the target DB file. The target DB file will be created if it does not exist. run_ids: The ``run_id``'s of the runs to copy into the target DB file upgrade_source_db: If the source DB is found to be in a version that is not the newest, should it be upgraded? upgrade_target_db: If the target DB is found to be in a version that is not the newest, should it be upgraded? """ # Check for versions (s_v, new_v) = get_db_version_and_newest_available_version(source_db_path) if s_v < new_v and not upgrade_source_db: warn( f"Source DB version is {s_v}, but this function needs it to be" f" in version {new_v}. Run this function again with " "upgrade_source_db=True to auto-upgrade the source DB file." ) return if os.path.exists(target_db_path): (t_v, new_v) = get_db_version_and_newest_available_version(target_db_path) if t_v < new_v and not upgrade_target_db: warn( f"Target DB version is {t_v}, but this function needs it to " f"be in version {new_v}. Run this function again with " "upgrade_target_db=True to auto-upgrade the target DB file." ) return source_conn = connect(source_db_path) # Validate that all runs are in the source database do_runs_exist = is_run_id_in_database(source_conn, *run_ids) if False in do_runs_exist.values(): source_conn.close() non_existing_ids = [rid for rid in run_ids if not do_runs_exist[rid]] err_mssg = ( "Error: not all run_ids exist in the source database. " "The following run(s) is/are not present: " f"{non_existing_ids}" ) raise ValueError(err_mssg) # Validate that all runs are from the same experiment source_exp_ids = np.unique(get_exp_ids_from_run_ids(source_conn, run_ids)) if len(source_exp_ids) != 1: source_conn.close() raise ValueError( "Did not receive runs from a single experiment. " f"Got runs from experiments {source_exp_ids}" ) # Fetch the attributes of the runs' experiment # hopefully, this is enough to uniquely identify the experiment exp_attrs = get_experiment_attributes_by_exp_id(source_conn, source_exp_ids[0]) # Massage the target DB file to accommodate the runs # (create new experiment if needed) target_conn = connect(target_db_path) # this function raises if the target DB file has several experiments # matching both the name and sample_name try: with atomic(target_conn) as target_conn: target_exp_id = _create_exp_if_needed( target_conn, exp_attrs["name"], exp_attrs["sample_name"], exp_attrs["format_string"], exp_attrs["start_time"], exp_attrs["end_time"], ) # Finally insert the runs for run_id in run_ids: _extract_single_dataset_into_db( DataSet(run_id=run_id, conn=source_conn), target_conn, target_exp_id ) finally: source_conn.close() target_conn.close()
def _extract_single_dataset_into_db( dataset: DataSet, target_conn: AtomicConnection, target_exp_id: int ) -> None: """ NB: This function should only be called from within meth:`extract_runs_into_db` Insert the given dataset into the specified database file as the latest run. Trying to insert a run already in the DB is a NOOP. Args: dataset: A dataset representing the run to be copied target_conn: connection to the DB. Must be atomically guarded target_exp_id: The ``exp_id`` of the (target DB) experiment in which to insert the run """ if not dataset.completed: raise ValueError( "Dataset not completed. An incomplete dataset " "can not be copied. The incomplete dataset has " f"GUID: {dataset.guid} and run_id: {dataset.run_id}" ) source_conn = dataset.conn run_id = get_runid_from_guid(target_conn, dataset.guid) if run_id is not None: return _, _, target_table_name = _add_run_to_runs_table( dataset, target_conn, target_exp_id ) assert target_table_name is not None _populate_results_table( source_conn, target_conn, dataset.table_name, target_table_name )
[docs] @_TRACER.start_as_current_span(f"{__name__}.export_datasets_and_create_metadata_db") def export_datasets_and_create_metadata_db( source_db_path: str | Path, target_db_path: str | Path, export_path: str | Path | None = None, ) -> dict[int, Literal["exported", "copied_as_is", "failed"]]: """ Export all datasets from a source database to NetCDF files and create a new database file containing only metadata (no raw data) for those exported datasets. Datasets that cannot be exported to NetCDF will be transferred as-is to the new database file. This function is useful for reducing the size of database files by offloading raw data to NetCDF files while preserving all metadata information in a database file. Args: source_db_path: Path to the source database file target_db_path: Path to the target database file that will be created. Error is raised if it already exist. export_path: Optional path where NetCDF files should be exported. If None, uses the default export path from QCoDeS configuration. Returns: A dictionary mapping run_id to status ('exported', 'copied_as_is', or 'failed') """ span = trace.get_current_span() span.set_attribute("source_db_path", str(source_db_path)) span.set_attribute("target_db_path", str(target_db_path)) span.set_attribute("export_path", str(export_path)) source_db_path = Path(source_db_path) if not source_db_path.exists(): raise FileNotFoundError(f"Source database file not found: {source_db_path}") with closing(connect(source_db_path)) as source_con: run_ids = sorted(get_runs(source_con)) _LOG.debug(f"Found {len(run_ids)} datasets to process") if not run_ids: _LOG.warning( f"No datasets found in source database {source_db_path}, nothing to export" ) return {} target_db_path = Path(target_db_path) if target_db_path.exists(): raise FileExistsError( f"Target database file already exists: {target_db_path}. " "Please choose a different path or remove the existing file." ) span.set_attribute("export_path_from_qcodes_config", str(get_data_export_path())) if export_path is None: export_path = get_data_export_path() else: export_path = Path(export_path) try: export_path.mkdir(parents=True, exist_ok=True) except Exception as e: raise RuntimeError(f"Failed to create export directory {export_path}") from e _LOG.info( f"Starting NetCDF export process from {source_db_path} to {export_path}, " f"and creating metadata-only database file {target_db_path}." ) # Process datasets by experiment to preserve structure result_status = {} processed_experiments = {} # Map source exp_id to target exp_id with ( closing(connect(source_db_path)) as source_conn, closing(connect(target_db_path)) as target_conn, ): for run_id in tqdm(run_ids): try: dataset = load_by_id(run_id, conn=source_conn) exp_id = dataset.exp_id # Create experiment in target DB if not already done if exp_id not in processed_experiments: exp_attrs = get_experiment_attributes_by_exp_id(source_conn, exp_id) with atomic(target_conn) as atomic_target_conn: target_exp_id = _create_exp_if_needed( atomic_target_conn, exp_attrs["name"], exp_attrs["sample_name"], exp_attrs["format_string"], exp_attrs["start_time"], exp_attrs["end_time"], ) processed_experiments[exp_id] = target_exp_id _LOG.info( f"Created experiment `{exp_attrs['name']}` on `{exp_attrs['sample_name']}` with ID {target_exp_id} in target database" ) else: target_exp_id = processed_experiments[exp_id] # Try to export dataset to NetCDF and create metadata-only version status = _process_single_dataset( dataset, source_conn, target_conn, export_path, target_exp_id ) result_status[run_id] = status except Exception: _LOG.exception(f"Failed to process dataset {run_id}") result_status[run_id] = "failed" _LOG.info("Exporting complete.") return result_status
@_TRACER.start_as_current_span(f"{__name__}._process_single_dataset") def _process_single_dataset( dataset: DataSetProtocol, source_conn: AtomicConnection, target_conn: AtomicConnection, export_path: Path, target_exp_id: int, ) -> Literal["exported", "copied_as_is", "failed"]: """ Export a dataset to NetCDF and add its metadata to target database file, or, if it fails, copy directily into the target database file. Returns: Status string indicating what was done with the dataset """ span = trace.get_current_span() span.set_attribute("guid", dataset.guid) span.set_attribute("given_export_path", str(export_path)) run_id = dataset.run_id span.set_attribute("run_id", run_id) netcdf_export_path = None existing_netcdf_path = dataset.export_info.export_paths.get("nc") span.set_attribute("dataset_netcdf_export_path", str(existing_netcdf_path)) if existing_netcdf_path is not None: existing_path = Path(existing_netcdf_path) # Check if the existing export path matches the desired export path if existing_path.exists() and existing_path.parent == export_path: _LOG.debug( f"Dataset {run_id} already exported to NetCDF at {existing_netcdf_path}" ) netcdf_export_path = existing_netcdf_path else: _LOG.info( f"Dataset {run_id} was exported to different location, re-exporting to {export_path}" ) else: _LOG.debug(f"Attempting to export dataset {run_id} to NetCDF") if netcdf_export_path is None: try: dataset.export("netcdf", path=export_path) netcdf_export_path = dataset.export_info.export_paths.get("nc") if netcdf_export_path is None: raise RuntimeError( f"Failed to get NetCDF export path for dataset {run_id}. " "Export appears to have succeeded but no path was recorded." ) except Exception: _LOG.exception( f"Failed to export dataset {run_id} to NetCDF, copying as-is" ) return _copy_dataset_as_is(dataset, source_conn, target_conn, target_exp_id) _LOG.debug(f"Dataset {run_id} available as NetCDF at {netcdf_export_path}") netcdf_dataset = load_from_netcdf( netcdf_export_path, path_to_db=target_conn.path_to_dbfile ) netcdf_dataset.write_metadata_to_db() _LOG.info( f"Successfully wrote dataset metadata of {run_id} to {target_conn.path_to_dbfile}" ) return "exported" def _copy_dataset_as_is( dataset: DataSetProtocol, source_conn: AtomicConnection, target_conn: AtomicConnection, target_exp_id: int, ) -> Literal["copied_as_is", "failed"]: try: dataset_obj = DataSet(run_id=dataset.run_id, conn=source_conn) with atomic(target_conn) as target_conn_atomic: _extract_single_dataset_into_db( dataset_obj, target_conn_atomic, target_exp_id ) _LOG.debug(f"Successfully copied dataset {dataset.run_id} as-is") return "copied_as_is" except Exception: _LOG.exception(f"Failed to copy dataset {dataset.run_id} as-is") return "failed"