[docs]classRespondingThread(threading.Thread,Generic[T]):""" Thread subclass for parallelizing execution. Behaves like a regular thread but returns a value from target, and propagates exceptions back to the main thread when this value is collected. The `output` method joins the thread, then checks for errors and returns the output value. so, if you have a function `f` where `f(1, 2, a=3) == 4`, then: >>> thread = RespondingThread(target=f, args=(1, 2), kwargs={'a': 3}) >>> thread.start() >>> # do other things while this is running >>> out = thread.output() # out is 4 """def__init__(self,target:"Callable[..., T]",args:"Sequence[Any]"=(),kwargs:dict[str,Any]|None=None,*args2:Any,**kwargs2:Any,):ifkwargsisNone:kwargs={}super().__init__(*args2,**kwargs2)self._target=targetself._args=argsself._kwargs=kwargsself._exception:Exception|None=Noneself._output:T|None=None
[docs]defrun(self)->None:_LOG.debug(f"Executing {self._target} on thread: {threading.get_ident()}")try:self._output=self._target(*self._args,**self._kwargs)exceptExceptionase:self._exception=e
[docs]defthread_map(callables:"Sequence[Callable[..., T]]",args:Optional["Sequence[Sequence[Any]]"]=None,kwargs:Optional["Sequence[dict[str, Any]]"]=None,)->list[T|None]:""" Evaluate a sequence of callables in separate threads, returning a list of their return values. Args: callables: A sequence of callables. args (Optional): A sequence of sequences containing the positional arguments for each callable. kwargs (Optional): A sequence of dicts containing the keyword arguments for each callable. """ifargsisNone:args=((),)*len(callables)ifkwargsisNone:empty_dict:dict[str,Any]={}kwargs=(empty_dict,)*len(callables)threads=[RespondingThread(target=c,args=a,kwargs=k)forc,a,kinzip(callables,args,kwargs)]fortinthreads:t.start()return[t.output()fortinthreads]