Source code for autogen_core.base._cancellation_token
import threading
from asyncio import Future
from typing import Any, Callable, List
[docs]
class CancellationToken:
def __init__(self) -> None:
self._cancelled: bool = False
self._lock: threading.Lock = threading.Lock()
self._callbacks: List[Callable[[], None]] = []
[docs]
def cancel(self) -> None:
with self._lock:
if not self._cancelled:
self._cancelled = True
for callback in self._callbacks:
callback()
[docs]
def is_cancelled(self) -> bool:
with self._lock:
return self._cancelled
[docs]
def add_callback(self, callback: Callable[[], None]) -> None:
with self._lock:
if self._cancelled:
callback()
else:
self._callbacks.append(callback)
[docs]
def link_future(self, future: Future[Any]) -> Future[Any]:
with self._lock:
if self._cancelled:
future.cancel()
else:
def _cancel() -> None:
future.cancel()
self._callbacks.append(_cancel)
return future