{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Termination using Intervention Handler\n", "\n", "```{note}\n", "This method is valid when using {py:class}`~autogen_core.application.SingleThreadedAgentRuntime`.\n", "```\n", "\n", "There are many different ways to handle termination in `autogen_core`. Ultimately, the goal is to detect that the runtime no longer needs to be executed and you can proceed to finalization tasks. One way to do this is to use an {py:class}`autogen_core.base.intervention.InterventionHandler` to detect a termination message and then act on it." ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [], "source": [ "from dataclasses import dataclass\n", "from typing import Any\n", "\n", "from autogen_core.application import SingleThreadedAgentRuntime\n", "from autogen_core.base import AgentId, MessageContext\n", "from autogen_core.base.intervention import DefaultInterventionHandler\n", "from autogen_core.components import DefaultTopicId, RoutedAgent, default_subscription, message_handler" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "First, we define a dataclass for regular message and message that will be used to signal termination." ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [], "source": [ "@dataclass\n", "class Message:\n", " content: Any\n", "\n", "\n", "@dataclass\n", "class Termination:\n", " reason: str" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We code our agent to publish a termination message when it decides it is time to terminate." ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [], "source": [ "@default_subscription\n", "class AnAgent(RoutedAgent):\n", " def __init__(self) -> None:\n", " super().__init__(\"MyAgent\")\n", " self.received = 0\n", "\n", " @message_handler\n", " async def on_new_message(self, message: Message, ctx: MessageContext) -> None:\n", " self.received += 1\n", " if self.received > 3:\n", " await self.publish_message(Termination(reason=\"Reached maximum number of messages\"), DefaultTopicId())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Next, we create an InterventionHandler that will detect the termination message and act on it. This one hooks into publishes and when it encounters `Termination` it alters its internal state to indicate that termination has been requested." ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [], "source": [ "class TerminationHandler(DefaultInterventionHandler):\n", " def __init__(self) -> None:\n", " self._termination_value: Termination | None = None\n", "\n", " async def on_publish(self, message: Any, *, sender: AgentId | None) -> Any:\n", " if isinstance(message, Termination):\n", " self._termination_value = message\n", " return message\n", "\n", " @property\n", " def termination_value(self) -> Termination | None:\n", " return self._termination_value\n", "\n", " @property\n", " def has_terminated(self) -> bool:\n", " return self._termination_value is not None" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Finally, we add this handler to the runtime and use it to detect termination and stop the runtime when the termination message is received." ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Termination(reason='Reached maximum number of messages')\n" ] } ], "source": [ "termination_handler = TerminationHandler()\n", "runtime = SingleThreadedAgentRuntime(intervention_handlers=[termination_handler])\n", "\n", "await AnAgent.register(runtime, \"my_agent\", AnAgent)\n", "\n", "runtime.start()\n", "\n", "# Publish more than 3 messages to trigger termination.\n", "await runtime.publish_message(Message(\"hello\"), DefaultTopicId())\n", "await runtime.publish_message(Message(\"hello\"), DefaultTopicId())\n", "await runtime.publish_message(Message(\"hello\"), DefaultTopicId())\n", "await runtime.publish_message(Message(\"hello\"), DefaultTopicId())\n", "\n", "# Wait for termination.\n", "await runtime.stop_when(lambda: termination_handler.has_terminated)\n", "\n", "print(termination_handler.termination_value)" ] } ], "metadata": { "kernelspec": { "display_name": ".venv", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.11.9" } }, "nbformat": 4, "nbformat_minor": 2 }