{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Termination using Intervention Handler\n",
    "\n",
    "```{note}\n",
    "This method is valid when using {py:class}`~autogen_core.SingleThreadedAgentRuntime`.\n",
    "```\n",
    "\n",
    "There are many different ways to handle termination in `autogen_core`. Ultimately, the goal is to detect that the runtime no longer needs to be executed and you can proceed to finalization tasks. One way to do this is to use an {py:class}`autogen_core.base.intervention.InterventionHandler` to detect a termination message and then act on it."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [],
   "source": [
    "from dataclasses import dataclass\n",
    "from typing import Any\n",
    "\n",
    "from autogen_core import (\n",
    "    DefaultInterventionHandler,\n",
    "    DefaultTopicId,\n",
    "    MessageContext,\n",
    "    RoutedAgent,\n",
    "    SingleThreadedAgentRuntime,\n",
    "    default_subscription,\n",
    "    message_handler,\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "First, we define a dataclass for regular message and message that will be used to signal termination."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [],
   "source": [
    "@dataclass\n",
    "class Message:\n",
    "    content: Any\n",
    "\n",
    "\n",
    "@dataclass\n",
    "class Termination:\n",
    "    reason: str"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We code our agent to publish a termination message when it decides it is time to terminate."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [],
   "source": [
    "@default_subscription\n",
    "class AnAgent(RoutedAgent):\n",
    "    def __init__(self) -> None:\n",
    "        super().__init__(\"MyAgent\")\n",
    "        self.received = 0\n",
    "\n",
    "    @message_handler\n",
    "    async def on_new_message(self, message: Message, ctx: MessageContext) -> None:\n",
    "        self.received += 1\n",
    "        if self.received > 3:\n",
    "            await self.publish_message(Termination(reason=\"Reached maximum number of messages\"), DefaultTopicId())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Next, we create an InterventionHandler that will detect the termination message and act on it. This one hooks into publishes and when it encounters `Termination` it alters its internal state to indicate that termination has been requested."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [],
   "source": [
    "class TerminationHandler(DefaultInterventionHandler):\n",
    "    def __init__(self) -> None:\n",
    "        self._termination_value: Termination | None = None\n",
    "\n",
    "    async def on_publish(self, message: Any, *, message_context: MessageContext) -> Any:\n",
    "        if isinstance(message, Termination):\n",
    "            self._termination_value = message\n",
    "        return message\n",
    "\n",
    "    @property\n",
    "    def termination_value(self) -> Termination | None:\n",
    "        return self._termination_value\n",
    "\n",
    "    @property\n",
    "    def has_terminated(self) -> bool:\n",
    "        return self._termination_value is not None"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Finally, we add this handler to the runtime and use it to detect termination and stop the runtime when the termination message is received."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Termination(reason='Reached maximum number of messages')\n"
     ]
    }
   ],
   "source": [
    "termination_handler = TerminationHandler()\n",
    "runtime = SingleThreadedAgentRuntime(intervention_handlers=[termination_handler])\n",
    "\n",
    "await AnAgent.register(runtime, \"my_agent\", AnAgent)\n",
    "\n",
    "runtime.start()\n",
    "\n",
    "# Publish more than 3 messages to trigger termination.\n",
    "await runtime.publish_message(Message(\"hello\"), DefaultTopicId())\n",
    "await runtime.publish_message(Message(\"hello\"), DefaultTopicId())\n",
    "await runtime.publish_message(Message(\"hello\"), DefaultTopicId())\n",
    "await runtime.publish_message(Message(\"hello\"), DefaultTopicId())\n",
    "\n",
    "# Wait for termination.\n",
    "await runtime.stop_when(lambda: termination_handler.has_terminated)\n",
    "\n",
    "print(termination_handler.termination_value)"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": ".venv",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.12.5"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}