Source code for autogen_core.base._cancellation_token

import threading
from asyncio import Future
from typing import Any, Callable, List


[docs] class CancellationToken: def __init__(self) -> None: self._cancelled: bool = False self._lock: threading.Lock = threading.Lock() self._callbacks: List[Callable[[], None]] = []
[docs] def cancel(self) -> None: with self._lock: if not self._cancelled: self._cancelled = True for callback in self._callbacks: callback()
[docs] def is_cancelled(self) -> bool: with self._lock: return self._cancelled
[docs] def add_callback(self, callback: Callable[[], None]) -> None: with self._lock: if self._cancelled: callback() else: self._callbacks.append(callback)