Source code for autogen_agentchat.base._termination

import asyncio
from abc import ABC, abstractmethod
from typing import List, Sequence

from ..messages import AgentEvent, ChatMessage, StopMessage


[docs] class TerminatedException(BaseException): ...
[docs] class TerminationCondition(ABC): """A stateful condition that determines when a conversation should be terminated. A termination condition is a callable that takes a sequence of ChatMessage objects since the last time the condition was called, and returns a StopMessage if the conversation should be terminated, or None otherwise. Once a termination condition has been reached, it must be reset before it can be used again. Termination conditions can be combined using the AND and OR operators. Example: .. code-block:: python import asyncio from autogen_agentchat.conditions import MaxMessageTermination, TextMentionTermination async def main() -> None: # Terminate the conversation after 10 turns or if the text "TERMINATE" is mentioned. cond1 = MaxMessageTermination(10) | TextMentionTermination("TERMINATE") # Terminate the conversation after 10 turns and if the text "TERMINATE" is mentioned. cond2 = MaxMessageTermination(10) & TextMentionTermination("TERMINATE") # ... # Reset the termination condition. await cond1.reset() await cond2.reset() asyncio.run(main()) """ @property @abstractmethod def terminated(self) -> bool: """Check if the termination condition has been reached""" ... @abstractmethod async def __call__(self, messages: Sequence[AgentEvent | ChatMessage]) -> StopMessage | None: """Check if the conversation should be terminated based on the messages received since the last time the condition was called. Return a StopMessage if the conversation should be terminated, or None otherwise. Args: messages: The messages received since the last time the condition was called. Returns: StopMessage | None: A StopMessage if the conversation should be terminated, or None otherwise. Raises: TerminatedException: If the termination condition has already been reached.""" ...
[docs] @abstractmethod async def reset(self) -> None: """Reset the termination condition.""" ...
def __and__(self, other: "TerminationCondition") -> "TerminationCondition": """Combine two termination conditions with an AND operation.""" return _AndTerminationCondition(self, other) def __or__(self, other: "TerminationCondition") -> "TerminationCondition": """Combine two termination conditions with an OR operation.""" return _OrTerminationCondition(self, other)
class _AndTerminationCondition(TerminationCondition): def __init__(self, *conditions: TerminationCondition) -> None: self._conditions = conditions self._stop_messages: List[StopMessage] = [] @property def terminated(self) -> bool: return all(condition.terminated for condition in self._conditions) async def __call__(self, messages: Sequence[AgentEvent | ChatMessage]) -> StopMessage | None: if self.terminated: raise TerminatedException("Termination condition has already been reached.") # Check all remaining conditions. stop_messages = await asyncio.gather( *[condition(messages) for condition in self._conditions if not condition.terminated] ) # Collect stop messages. for stop_message in stop_messages: if stop_message is not None: self._stop_messages.append(stop_message) if any(stop_message is None for stop_message in stop_messages): # If any remaining condition has not reached termination, it is not terminated. return None content = ", ".join(stop_message.content for stop_message in self._stop_messages) source = ", ".join(stop_message.source for stop_message in self._stop_messages) return StopMessage(content=content, source=source) async def reset(self) -> None: for condition in self._conditions: await condition.reset() self._stop_messages.clear() class _OrTerminationCondition(TerminationCondition): def __init__(self, *conditions: TerminationCondition) -> None: self._conditions = conditions @property def terminated(self) -> bool: return any(condition.terminated for condition in self._conditions) async def __call__(self, messages: Sequence[AgentEvent | ChatMessage]) -> StopMessage | None: if self.terminated: raise RuntimeError("Termination condition has already been reached") stop_messages = await asyncio.gather(*[condition(messages) for condition in self._conditions]) if any(stop_message is not None for stop_message in stop_messages): content = ", ".join(stop_message.content for stop_message in stop_messages if stop_message is not None) source = ", ".join(stop_message.source for stop_message in stop_messages if stop_message is not None) return StopMessage(content=content, source=source) return None async def reset(self) -> None: for condition in self._conditions: await condition.reset()