Source code for qcodes.utils.threading_utils
import logging
import threading
from typing import TYPE_CHECKING, Any, Generic, Optional, TypeVar
if TYPE_CHECKING:
    from collections.abc import Callable, Sequence
_LOG = logging.getLogger(__name__)
T = TypeVar("T")
[docs]
class RespondingThread(threading.Thread, Generic[T]):
    """
    Thread subclass for parallelizing execution. Behaves like a
    regular thread but returns a value from target, and propagates
    exceptions back to the main thread when this value is collected.
    The `output` method joins the thread, then checks for errors and
    returns the output value.
    so, if you have a function `f` where `f(1, 2, a=3) == 4`, then:
    >>> thread = RespondingThread(target=f, args=(1, 2), kwargs={'a': 3})
    >>> thread.start()
    >>> # do other things while this is running
    >>> out = thread.output()  # out is 4
    """
    def __init__(
        self,
        target: "Callable[..., T]",
        args: "Sequence[Any]" = (),
        kwargs: dict[str, Any] | None = None,
        *args2: Any,
        **kwargs2: Any,
    ):
        if kwargs is None:
            kwargs = {}
        super().__init__(*args2, **kwargs2)
        self._target = target
        self._args = args
        self._kwargs = kwargs
        self._exception: Exception | None = None
        self._output: T | None = None
[docs]
    def run(self) -> None:
        _LOG.debug(f"Executing {self._target} on thread: {threading.get_ident()}")
        try:
            self._output = self._target(*self._args, **self._kwargs)
        except Exception as e:
            self._exception = e 
[docs]
    def output(self, timeout: float | None = None) -> T | None:
        self.join(timeout=timeout)
        if self._exception:
            e = self._exception
            self._exception = None
            raise e
        return self._output 
 
[docs]
def thread_map(
    callables: "Sequence[Callable[..., T]]",
    args: Optional["Sequence[Sequence[Any]]"] = None,
    kwargs: Optional["Sequence[dict[str, Any]]"] = None,
) -> list[T | None]:
    """
    Evaluate a sequence of callables in separate threads, returning
    a list of their return values.
    Args:
        callables: A sequence of callables.
        args (Optional): A sequence of sequences containing the positional
            arguments for each callable.
        kwargs (Optional): A sequence of dicts containing the keyword arguments
            for each callable.
    """
    if args is None:
        args = ((),) * len(callables)
    if kwargs is None:
        empty_dict: dict[str, Any] = {}
        kwargs = (empty_dict,) * len(callables)
    threads = [
        RespondingThread(target=c, args=a, kwargs=k)
        for c, a, k in zip(callables, args, kwargs)
    ]
    for t in threads:
        t.start()
    return [t.output() for t in threads]