{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Distributed Agent Runtime\n", "\n", "```{attention}\n", "The distributed agent runtime is an experimental feature. Expect breaking changes\n", "to the API.\n", "```\n", "\n", "A distributed agent runtime facilitates communication and agent lifecycle management\n", "across process boundaries.\n", "It consists of a host service and at least one worker runtime.\n", "\n", "The host service maintains connections to all active worker runtimes,\n", "facilitates message delivery, and keeps sessions for all direct messages (i.e., RPCs).\n", "A worker runtime processes application code (agents) and connects to the host service.\n", "It also advertises the agents which they support to the host service,\n", "so the host service can deliver messages to the correct worker.\n", "\n", "We can start a host service using {py:class}`~autogen_core.application.WorkerAgentRuntimeHost`." ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "from autogen_core.application import WorkerAgentRuntimeHost\n", "\n", "host = WorkerAgentRuntimeHost(address=\"localhost:50051\")\n", "host.start() # Start a host service in the background." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The above code starts the host service in the background and accepts\n", "worker connections on port 50051.\n", "\n", "Before running worker runtimes, let's define our agent.\n", "The agent will publish a new message on every message it receives.\n", "It also keeps track of how many messages it has published, and \n", "stops publishing new messages once it has published 5 messages." ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "from dataclasses import dataclass\n", "\n", "from autogen_core.base import MessageContext\n", "from autogen_core.components import DefaultTopicId, RoutedAgent, default_subscription, message_handler\n", "\n", "\n", "@dataclass\n", "class MyMessage:\n", " content: str\n", "\n", "\n", "@default_subscription\n", "class MyAgent(RoutedAgent):\n", " def __init__(self, name: str) -> None:\n", " super().__init__(\"My agent\")\n", " self._name = name\n", " self._counter = 0\n", "\n", " @message_handler\n", " async def my_message_handler(self, message: MyMessage, ctx: MessageContext) -> None:\n", " self._counter += 1\n", " if self._counter > 5:\n", " return\n", " content = f\"{self._name}: Hello x {self._counter}\"\n", " print(content)\n", " await self.publish_message(MyMessage(content=content), DefaultTopicId())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now we can set up the worker agent runtimes.\n", "We use {py:class}`~autogen_core.application.WorkerAgentRuntime`.\n", "We set up two worker runtimes. Each runtime hosts one agent.\n", "All agents publish and subscribe to the default topic, so they can see all\n", "messages being published.\n", "\n", "To run the agents, we publishes a message from a worker." ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "worker1: Hello x 1\n", "worker2: Hello x 1\n", "worker2: Hello x 2\n", "worker1: Hello x 2\n", "worker1: Hello x 3\n", "worker2: Hello x 3\n", "worker2: Hello x 4\n", "worker1: Hello x 4\n", "worker1: Hello x 5\n", "worker2: Hello x 5\n" ] } ], "source": [ "import asyncio\n", "\n", "from autogen_core.application import WorkerAgentRuntime\n", "\n", "worker1 = WorkerAgentRuntime(host_address=\"localhost:50051\")\n", "worker1.start()\n", "await MyAgent.register(worker1, \"worker1\", lambda: MyAgent(\"worker1\"))\n", "\n", "worker2 = WorkerAgentRuntime(host_address=\"localhost:50051\")\n", "worker2.start()\n", "await MyAgent.register(worker2, \"worker2\", lambda: MyAgent(\"worker2\"))\n", "\n", "await worker2.publish_message(MyMessage(content=\"Hello!\"), DefaultTopicId())\n", "\n", "# Let the agents run for a while.\n", "await asyncio.sleep(5)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can see each agent published exactly 5 messages.\n", "\n", "To stop the worker runtimes, we can call {py:meth}`~autogen_core.application.WorkerAgentRuntime.stop`." ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "await worker1.stop()\n", "await worker2.stop()\n", "\n", "# To keep the worker running until a termination signal is received (e.g., SIGTERM).\n", "# await worker1.stop_when_signal()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can call {py:meth}`~autogen_core.application.WorkerAgentRuntimeHost.stop`\n", "to stop the host service." ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [], "source": [ "await host.stop()\n", "\n", "# To keep the host service running until a termination signal (e.g., SIGTERM)\n", "# await host.stop_when_signal()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Next Steps\n", "To see complete examples of using distributed runtime, please take a look at the following samples:\n", "\n", "- [Distributed Workers](https://github.com/microsoft/autogen/tree/main/python/packages/autogen-core/samples/worker) \n", "- [Distributed Semantic Router](https://github.com/microsoft/autogen/tree/main/python/packages/autogen-core/samples/semantic_router) \n", "- [Distributed Group Chat](https://github.com/microsoft/autogen/tree/main/python/packages/autogen-core/samples/distributed-group-chat) \n" ] } ], "metadata": { "kernelspec": { "display_name": "agnext", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.11.9" } }, "nbformat": 4, "nbformat_minor": 2 }