[docs]classCancellationToken:"""A token used to cancel pending async calls"""def__init__(self)->None:self._cancelled:bool=Falseself._lock:threading.Lock=threading.Lock()self._callbacks:List[Callable[[],None]]=[]
[docs]defcancel(self)->None:"""Cancel pending async calls linked to this cancellation token."""withself._lock:ifnotself._cancelled:self._cancelled=Trueforcallbackinself._callbacks:callback()
[docs]defis_cancelled(self)->bool:"""Check if the CancellationToken has been used"""withself._lock:returnself._cancelled
[docs]defadd_callback(self,callback:Callable[[],None])->None:"""Attach a callback that will be called when cancel is invoked"""withself._lock:ifself._cancelled:callback()else:self._callbacks.append(callback)
[docs]deflink_future(self,future:Future[Any])->Future[Any]:"""Link a pending async call to a token to allow its cancellation"""withself._lock:ifself._cancelled:future.cancel()else:def_cancel()->None:future.cancel()self._callbacks.append(_cancel)returnfuture