Termination using Intervention Handler#
Note
This method is valid when using SingleThreadedAgentRuntime
.
There are many different ways to handle termination in autogen_core
. Ultimately, the goal is to detect that the runtime no longer needs to be executed and you can proceed to finalization tasks. One way to do this is to use an autogen_core.base.intervention.InterventionHandler
to detect a termination message and then act on it.
from dataclasses import dataclass
from typing import Any
from autogen_core import (
AgentId,
DefaultTopicId,
MessageContext,
RoutedAgent,
SingleThreadedAgentRuntime,
default_subscription,
message_handler,
)
from autogen_core.base.intervention import DefaultInterventionHandler
First, we define a dataclass for regular message and message that will be used to signal termination.
@dataclass
class Message:
content: Any
@dataclass
class Termination:
reason: str
We code our agent to publish a termination message when it decides it is time to terminate.
@default_subscription
class AnAgent(RoutedAgent):
def __init__(self) -> None:
super().__init__("MyAgent")
self.received = 0
@message_handler
async def on_new_message(self, message: Message, ctx: MessageContext) -> None:
self.received += 1
if self.received > 3:
await self.publish_message(Termination(reason="Reached maximum number of messages"), DefaultTopicId())
Next, we create an InterventionHandler that will detect the termination message and act on it. This one hooks into publishes and when it encounters Termination
it alters its internal state to indicate that termination has been requested.
class TerminationHandler(DefaultInterventionHandler):
def __init__(self) -> None:
self._termination_value: Termination | None = None
async def on_publish(self, message: Any, *, sender: AgentId | None) -> Any:
if isinstance(message, Termination):
self._termination_value = message
return message
@property
def termination_value(self) -> Termination | None:
return self._termination_value
@property
def has_terminated(self) -> bool:
return self._termination_value is not None
Finally, we add this handler to the runtime and use it to detect termination and stop the runtime when the termination message is received.
termination_handler = TerminationHandler()
runtime = SingleThreadedAgentRuntime(intervention_handlers=[termination_handler])
await AnAgent.register(runtime, "my_agent", AnAgent)
runtime.start()
# Publish more than 3 messages to trigger termination.
await runtime.publish_message(Message("hello"), DefaultTopicId())
await runtime.publish_message(Message("hello"), DefaultTopicId())
await runtime.publish_message(Message("hello"), DefaultTopicId())
await runtime.publish_message(Message("hello"), DefaultTopicId())
# Wait for termination.
await runtime.stop_when(lambda: termination_handler.has_terminated)
print(termination_handler.termination_value)
Termination(reason='Reached maximum number of messages')