Source code for autogen_core._cancellation_token

import threading
from asyncio import Future
from typing import Any, Callable, List


[docs] class CancellationToken: """A token used to cancel pending async calls""" def __init__(self) -> None: self._cancelled: bool = False self._lock: threading.Lock = threading.Lock() self._callbacks: List[Callable[[], None]] = []
[docs] def cancel(self) -> None: """Cancel pending async calls linked to this cancellation token.""" with self._lock: if not self._cancelled: self._cancelled = True for callback in self._callbacks: callback()
[docs] def is_cancelled(self) -> bool: """Check if the CancellationToken has been used""" with self._lock: return self._cancelled
[docs] def add_callback(self, callback: Callable[[], None]) -> None: """Attach a callback that will be called when cancel is invoked""" with self._lock: if self._cancelled: callback() else: self._callbacks.append(callback)