Source code for qcodes.dataset.sqlite.connection
"""
This module provides a subclass class :class:`AtomicConnection` of
:class:`sqlite3.Connection` together with functions around it which allow
performing nested atomic transactions on an SQLite database.
"""
from __future__ import annotations
import logging
import sqlite3
from contextlib import contextmanager
from typing import TYPE_CHECKING, Any
from qcodes.utils import DelayedKeyboardInterrupt
if TYPE_CHECKING:
from collections.abc import Iterator
log = logging.getLogger(__name__)
[docs]
class AtomicConnection(sqlite3.Connection):
"""
A class to extend the sqlite3.Connection object. This extends
Connection to allow addition operations to be performed atomically.
It is recommended to create an AtomicConnection using the function :func:`connect`
"""
atomic_in_progress: bool = False
"""
a bool describing whether the connection is
currently in the middle of an atomic block of transactions, thus
allowing to nest `atomic` context managers
"""
path_to_dbfile: str = ""
"""
Path to the database file of the connection.
"""
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.path_to_dbfile = path_to_dbfile(self)
@contextmanager
def atomic(conn: AtomicConnection) -> Iterator[AtomicConnection]:
"""
Guard a series of transactions as atomic.
If one transaction fails, all the previous transactions are rolled back
and no more transactions are performed.
NB: 'BEGIN' is by default only inserted before INSERT/UPDATE/DELETE/REPLACE
but we want to guard any transaction that modifies the database (e.g. also
ALTER)
Args:
conn: connection to guard
"""
with DelayedKeyboardInterrupt(context={"reason": "sqlite atomic operation"}):
if not isinstance(conn, AtomicConnection):
raise ValueError(
"atomic context manager only accepts "
"AtomicConnection database connection objects."
)
is_outmost = not (conn.atomic_in_progress)
if conn.in_transaction and is_outmost:
raise RuntimeError(
"SQLite connection has uncommitted "
"transactions. "
"Please commit those before starting an atomic "
"transaction."
)
old_atomic_in_progress = conn.atomic_in_progress
conn.atomic_in_progress = True
old_level = conn.isolation_level
try:
if is_outmost:
conn.isolation_level = None
conn.cursor().execute("BEGIN")
yield conn
except Exception as e:
conn.rollback()
log.exception("Rolling back due to unhandled exception")
raise RuntimeError("Rolling back due to unhandled exception") from e
else:
if is_outmost:
conn.commit()
finally:
if is_outmost:
conn.isolation_level = old_level
conn.atomic_in_progress = old_atomic_in_progress
def transaction(conn: AtomicConnection, sql: str, *args: Any) -> sqlite3.Cursor:
"""Perform a transaction.
The transaction needs to be committed or rolled back.
Args:
conn: database connection
sql: formatted string
*args: arguments to use for parameter substitution
Returns:
sqlite cursor
"""
c = conn.cursor()
if len(args) > 0:
c.execute(sql, args)
else:
c.execute(sql)
return c
def atomic_transaction(conn: AtomicConnection, sql: str, *args: Any) -> sqlite3.Cursor:
"""Perform an **atomic** transaction.
The transaction is committed if there are no exceptions else the
transaction is rolled back.
NB: 'BEGIN' is by default only inserted before INSERT/UPDATE/DELETE/REPLACE
but we want to guard any transaction that modifies the database (e.g. also
ALTER). 'BEGIN' marks a place to commit from/roll back to
Args:
conn: database connection
sql: formatted string
*args: arguments to use for parameter substitution
Returns:
sqlite cursor
"""
with atomic(conn) as atomic_conn:
c = transaction(atomic_conn, sql, *args)
return c
def path_to_dbfile(conn: AtomicConnection | sqlite3.Connection) -> str:
"""
Return the path of the database file that the conn object is connected to
"""
# according to https://www.sqlite.org/pragma.html#pragma_database_list
# the 3th element (1 indexed) is the path
cursor = conn.cursor()
cursor.execute("PRAGMA database_list")
row = cursor.fetchall()[0]
return row[2]