{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Message and Communication\n", "\n", "An agent in AutoGen core can react to, send, and publish messages,\n", "and messages are the only means through which agents can communicate\n", "with each other." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Messages\n", "\n", "Messages are serializable objects, they can be defined using:\n", "\n", "- A subclass of Pydantic's {py:class}`pydantic.BaseModel`, or\n", "- A dataclass\n", "\n", "For example:" ] }, { "cell_type": "code", "execution_count": 24, "metadata": {}, "outputs": [], "source": [ "from dataclasses import dataclass\n", "\n", "\n", "@dataclass\n", "class TextMessage:\n", " content: str\n", " source: str\n", "\n", "\n", "@dataclass\n", "class ImageMessage:\n", " url: str\n", " source: str" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "```{note}\n", "Messages are purely data, and should not contain any logic.\n", "```" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Message Handlers\n", "\n", "When an agent receives a message the runtime will invoke the agent's message handler\n", "({py:meth}`~autogen_core.base.Agent.on_message`) which should implement the agents message handling logic.\n", "If this message cannot be handled by the agent, the agent should raise a\n", "{py:class}`~autogen_core.base.exceptions.CantHandleException`.\n", "\n", "The base class {py:class}`~autogen_core.base.BaseAgent` provides no message handling logic\n", "and implementing the {py:meth}`~autogen_core.base.Agent.on_message` method directly is not recommended\n", "unless for the advanced use cases.\n", "\n", "Developers should start with implementing the {py:class}`~autogen_core.components.RoutedAgent` base class\n", "which provides built-in message routing capability.\n", "\n", "### Routing Messages by Type\n", "\n", "The {py:class}`~autogen_core.components.RoutedAgent` base class provides a mechanism\n", "for associating message types with message handlers \n", "with the {py:meth}`~autogen_core.components.message_handler` decorator,\n", "so developers do not need to implement the {py:meth}`~autogen_core.base.Agent.on_message` method.\n", "\n", "For example, the following type-routed agent responds to `TextMessage` and `ImageMessage`\n", "using different message handlers:" ] }, { "cell_type": "code", "execution_count": 25, "metadata": {}, "outputs": [], "source": [ "from autogen_core.application import SingleThreadedAgentRuntime\n", "from autogen_core.base import AgentId, MessageContext\n", "from autogen_core.components import RoutedAgent, message_handler\n", "\n", "\n", "class MyAgent(RoutedAgent):\n", " @message_handler\n", " async def on_text_message(self, message: TextMessage, ctx: MessageContext) -> None:\n", " print(f\"Hello, {message.source}, you said {message.content}!\")\n", "\n", " @message_handler\n", " async def on_image_message(self, message: ImageMessage, ctx: MessageContext) -> None:\n", " print(f\"Hello, {message.source}, you sent me {message.url}!\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Create the agent runtime and register the agent type (see [Agent and Agent Runtime](agent-and-agent-runtime.ipynb)):" ] }, { "cell_type": "code", "execution_count": 26, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "AgentType(type='my_agent')" ] }, "execution_count": 26, "metadata": {}, "output_type": "execute_result" } ], "source": [ "runtime = SingleThreadedAgentRuntime()\n", "await MyAgent.register(runtime, \"my_agent\", lambda: MyAgent(\"My Agent\"))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Test this agent with `TextMessage` and `ImageMessage`." ] }, { "cell_type": "code", "execution_count": 27, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Hello, User, you said Hello, World!!\n", "Hello, User, you sent me https://example.com/image.jpg!\n" ] } ], "source": [ "runtime.start()\n", "agent_id = AgentId(\"my_agent\", \"default\")\n", "await runtime.send_message(TextMessage(content=\"Hello, World!\", source=\"User\"), agent_id)\n", "await runtime.send_message(ImageMessage(url=\"https://example.com/image.jpg\", source=\"User\"), agent_id)\n", "await runtime.stop_when_idle()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The runtime automatically creates an instance of `MyAgent` with the \n", "agent ID `AgentId(\"my_agent\", \"default\")` when delivering the first message." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Routing Messages of the Same Type\n", "\n", "In some scenarios, it is useful to route messages of the same type to different handlers.\n", "For examples, messages from different sender agents should be handled differently.\n", "You can use the `match` parameter of the {py:meth}`~autogen_core.components.message_handler` decorator.\n", "\n", "The `match` parameter associates handlers for the same message type\n", "to a specific message -- it is secondary to the message type routing. \n", "It accepts a callable that takes the message and \n", "{py:class}`~autogen_core.base.MessageContext` as arguments, and\n", "returns a boolean indicating whether the message should be handled by the decorated handler.\n", "The callable is checked in the alphabetical order of the handlers.\n", "\n", "Here is an example of an agent that routes messages based on the sender agent\n", "using the `match` parameter:" ] }, { "cell_type": "code", "execution_count": 28, "metadata": {}, "outputs": [], "source": [ "class RoutedBySenderAgent(RoutedAgent):\n", " @message_handler(match=lambda msg, ctx: msg.source.startswith(\"user1\")) # type: ignore\n", " async def on_user1_message(self, message: TextMessage, ctx: MessageContext) -> None:\n", " print(f\"Hello from user 1 handler, {message.source}, you said {message.content}!\")\n", "\n", " @message_handler(match=lambda msg, ctx: msg.source.startswith(\"user2\")) # type: ignore\n", " async def on_user2_message(self, message: TextMessage, ctx: MessageContext) -> None:\n", " print(f\"Hello from user 2 handler, {message.source}, you said {message.content}!\")\n", "\n", " @message_handler(match=lambda msg, ctx: msg.source.startswith(\"user2\")) # type: ignore\n", " async def on_image_message(self, message: ImageMessage, ctx: MessageContext) -> None:\n", " print(f\"Hello, {message.source}, you sent me {message.url}!\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The above agent uses the `source` field of the message to determine the sender agent.\n", "You can also use the `sender` field of {py:class}`~autogen_core.base.MessageContext` to determine the sender agent\n", "using the agent ID if available.\n", "\n", "Let's test this agent with messages with different `source` values:" ] }, { "cell_type": "code", "execution_count": 29, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Hello from user 1 handler, user1-test, you said Hello, World!!\n", "Hello from user 2 handler, user2-test, you said Hello, World!!\n", "Hello, user2-test, you sent me https://example.com/image.jpg!\n" ] } ], "source": [ "runtime = SingleThreadedAgentRuntime()\n", "await RoutedBySenderAgent.register(runtime, \"my_agent\", lambda: RoutedBySenderAgent(\"Routed by sender agent\"))\n", "runtime.start()\n", "agent_id = AgentId(\"my_agent\", \"default\")\n", "await runtime.send_message(TextMessage(content=\"Hello, World!\", source=\"user1-test\"), agent_id)\n", "await runtime.send_message(TextMessage(content=\"Hello, World!\", source=\"user2-test\"), agent_id)\n", "await runtime.send_message(ImageMessage(url=\"https://example.com/image.jpg\", source=\"user1-test\"), agent_id)\n", "await runtime.send_message(ImageMessage(url=\"https://example.com/image.jpg\", source=\"user2-test\"), agent_id)\n", "await runtime.stop_when_idle()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In the above example, the first `ImageMessage` is not handled because the `source` field\n", "of the message does not match the handler's `match` condition." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Direct Messaging\n", "\n", "There are two types of communication in AutoGen core:\n", "\n", "- **Direct Messaging**: sends a direct message to another agent.\n", "- **Broadcast**: publishes a message to a topic.\n", "\n", "Let's first look at direct messaging.\n", "To send a direct message to another agent, within a message handler use\n", "the {py:meth}`autogen_core.base.BaseAgent.send_message` method,\n", "from the runtime use the {py:meth}`autogen_core.base.AgentRuntime.send_message` method.\n", "Awaiting calls to these methods will return the return value of the\n", "receiving agent's message handler.\n", "When the receiving agent's handler returns `None`, `None` will be returned.\n", "\n", "```{note}\n", "If the invoked agent raises an exception while the sender is awaiting,\n", "the exception will be propagated back to the sender.\n", "```\n", "\n", "### Request/Response\n", "\n", "Direct messaging can be used for request/response scenarios,\n", "where the sender expects a response from the receiver.\n", "The receiver can respond to the message by returning a value from its message handler.\n", "You can think of this as a function call between agents.\n", "\n", "For example, consider the following agents:" ] }, { "cell_type": "code", "execution_count": 30, "metadata": {}, "outputs": [], "source": [ "from dataclasses import dataclass\n", "\n", "from autogen_core.application import SingleThreadedAgentRuntime\n", "from autogen_core.base import MessageContext\n", "from autogen_core.components import RoutedAgent, message_handler\n", "\n", "\n", "@dataclass\n", "class Message:\n", " content: str\n", "\n", "\n", "class InnerAgent(RoutedAgent):\n", " @message_handler\n", " async def on_my_message(self, message: Message, ctx: MessageContext) -> Message:\n", " return Message(content=f\"Hello from inner, {message.content}\")\n", "\n", "\n", "class OuterAgent(RoutedAgent):\n", " def __init__(self, description: str, inner_agent_type: str):\n", " super().__init__(description)\n", " self.inner_agent_id = AgentId(inner_agent_type, self.id.key)\n", "\n", " @message_handler\n", " async def on_my_message(self, message: Message, ctx: MessageContext) -> None:\n", " print(f\"Received message: {message.content}\")\n", " # Send a direct message to the inner agent and receves a response.\n", " response = await self.send_message(Message(f\"Hello from outer, {message.content}\"), self.inner_agent_id)\n", " print(f\"Received inner response: {response.content}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Upone receving a message, the `OuterAgent` sends a direct message to the `InnerAgent` and receives\n", "a message in response.\n", "\n", "We can test these agents by sending a `Message` to the `OuterAgent`." ] }, { "cell_type": "code", "execution_count": 31, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Received message: Hello, World!\n", "Received inner response: Hello from inner, Hello from outer, Hello, World!\n" ] } ], "source": [ "runtime = SingleThreadedAgentRuntime()\n", "await InnerAgent.register(runtime, \"inner_agent\", lambda: InnerAgent(\"InnerAgent\"))\n", "await OuterAgent.register(runtime, \"outer_agent\", lambda: OuterAgent(\"OuterAgent\", \"inner_agent\"))\n", "runtime.start()\n", "outer_agent_id = AgentId(\"outer_agent\", \"default\")\n", "await runtime.send_message(Message(content=\"Hello, World!\"), outer_agent_id)\n", "await runtime.stop_when_idle()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Both outputs are produced by the `OuterAgent`'s message handler, however the second output is based on the response from the `InnerAgent`.\n", "\n", "Generally speaking, direct messaging is appropriate for scenarios when the sender and\n", "recipient are tightly coupled -- they are created together and the sender\n", "is linked to a specific instance of the recipient.\n", "For example, an agent executes tool calls by sending direct messages to\n", "an instance of {py:class}`~autogen_core.components.tool_agent.ToolAgent`,\n", "and uses the responses to form an action-observation loop." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Broadcast\n", "\n", "Broadcast is effectively the publish/subscribe model with topic and subscription.\n", "Read [Topic and Subscription](../core-concepts/topic-and-subscription.md)\n", "to learn the core concepts.\n", "\n", "The key difference between direct messaging and broadcast is that broadcast\n", "cannot be used for request/response scenarios.\n", "When an agent publishes a message it is one way only, it cannot receive a response\n", "from any other agent, even if a receiving agent's handler returns a value.\n", "\n", "```{note}\n", "If a response is given to a published message, it will be thrown away.\n", "```\n", "\n", "```{note}\n", "If an agent publishes a message type for which it is subscribed it will not\n", "receive the message it published. This is to prevent infinite loops.\n", "```\n", "\n", "### Subscribe and Publish to Topics\n", "\n", "[Type-based subscription](../core-concepts/topic-and-subscription.md#type-based-subscription)\n", "maps messages published to topics of a given topic type to \n", "agents of a given agent type. \n", "To make an agent that subsclasses {py:class}`~autogen_core.components.RoutedAgent`\n", "subscribe to a topic of a given topic type,\n", "you can use the {py:meth}`~autogen_core.components.type_subscription` class decorator.\n", "\n", "The following example shows a `ReceiverAgent` class that subscribes to topics of `\"default\"` topic type\n", "using the {py:meth}`~autogen_core.components.type_subscription` decorator.\n", "and prints the received messages." ] }, { "cell_type": "code", "execution_count": 32, "metadata": {}, "outputs": [], "source": [ "from autogen_core.components import RoutedAgent, message_handler, type_subscription\n", "\n", "\n", "@type_subscription(topic_type=\"default\")\n", "class ReceivingAgent(RoutedAgent):\n", " @message_handler\n", " async def on_my_message(self, message: Message, ctx: MessageContext) -> None:\n", " print(f\"Received a message: {message.content}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "To publish a message from an agent's handler,\n", "use the {py:meth}`~autogen_core.base.BaseAgent.publish_message` method and specify\n", "a {py:class}`~autogen_core.base.TopicId`.\n", "This call must still be awaited to allow the runtime to schedule delivery of \n", "the message to all subscribers, but it will always return `None`.\n", "If an agent raises an exception while handling a published message,\n", "this will be logged but will not be propagated back to the publishing agent.\n", "\n", "The following example shows a `BroadcastingAgent` that \n", "publishes a message to a topic upon receiving a message. " ] }, { "cell_type": "code", "execution_count": 33, "metadata": {}, "outputs": [], "source": [ "from autogen_core.base import TopicId\n", "\n", "\n", "class BroadcastingAgent(RoutedAgent):\n", " @message_handler\n", " async def on_my_message(self, message: Message, ctx: MessageContext) -> None:\n", " await self.publish_message(\n", " Message(\"Publishing a message from broadcasting agent!\"),\n", " topic_id=TopicId(type=\"default\", source=self.id.key),\n", " )" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "`BroadcastingAgent` publishes message to a topic with type `\"default\"`\n", "and source assigned to the agent instance's agent key." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Subscriptions are registered with the agent runtime, either as part of\n", "agent type's registeration or through a separate API method.\n", "Here is how we register {py:class}`~autogen_core.components.TypeSubscription`\n", "for the receiving agent with the {py:meth}`~autogen_core.components.type_subscription` decorator,\n", "and for the broadcasting agent without the decorator." ] }, { "cell_type": "code", "execution_count": 34, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Received a message: Hello, World! From the runtime!\n", "Received a message: Publishing a message from broadcasting agent!\n" ] } ], "source": [ "from autogen_core.components import TypeSubscription\n", "\n", "runtime = SingleThreadedAgentRuntime()\n", "\n", "# Option 1: with type_subscription decorator\n", "# The type_subscription class decorator automatically adds a TypeSubscription to\n", "# the runtime when the agent is registered.\n", "await ReceivingAgent.register(runtime, \"receiving_agent\", lambda: ReceivingAgent(\"Receiving Agent\"))\n", "\n", "# Option 2: with TypeSubscription\n", "await BroadcastingAgent.register(runtime, \"broadcasting_agent\", lambda: BroadcastingAgent(\"Broadcasting Agent\"))\n", "await runtime.add_subscription(TypeSubscription(topic_type=\"default\", agent_type=\"broadcasting_agent\"))\n", "\n", "# Start the runtime and publish a message.\n", "runtime.start()\n", "await runtime.publish_message(\n", " Message(\"Hello, World! From the runtime!\"), topic_id=TopicId(type=\"default\", source=\"default\")\n", ")\n", "await runtime.stop_when_idle()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "As shown in the above example, you can also publish directly to a topic\n", "through the runtime's {py:meth}`~autogen_core.base.AgentRuntime.publish_message` method\n", "without the need to create an agent instance.\n", "\n", "From the output, you can see two messages were received by the receiving agent:\n", "one was published through the runtime, and the other was published by the broadcasting agent." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Default Topic and Subscriptions\n", "\n", "In the above example, we used\n", "{py:class}`~autogen_core.base.TopicId` and {py:class}`~autogen_core.components.TypeSubscription`\n", "to specify the topic and subscriptions respectively.\n", "This is the appropriate way for many scenarios.\n", "However, when there is a single scope of publishing, that is, \n", "all agents publish and subscribe to all broadcasted messages,\n", "we can use the convenience classes {py:class}`~autogen_core.components.DefaultTopicId`\n", "and {py:meth}`~autogen_core.components.default_subscription` to simply our code.\n", "\n", "{py:class}`~autogen_core.components.DefaultTopicId` is\n", "for creating a topic that uses `\"default\"` as the default value for the topic type\n", "and the publishing agent's key as the default value for the topic source.\n", "{py:meth}`~autogen_core.components.default_subscription` is\n", "for creating a type subscription that subscribes to the default topic.\n", "We can simplify `BroadcastingAgent` by using\n", "{py:class}`~autogen_core.components.DefaultTopicId` and {py:meth}`~autogen_core.components.default_subscription`." ] }, { "cell_type": "code", "execution_count": 35, "metadata": {}, "outputs": [], "source": [ "from autogen_core.components import DefaultTopicId, default_subscription\n", "\n", "\n", "@default_subscription\n", "class BroadcastingAgentDefaultTopic(RoutedAgent):\n", " @message_handler\n", " async def on_my_message(self, message: Message, ctx: MessageContext) -> None:\n", " # Publish a message to all agents in the same namespace.\n", " await self.publish_message(\n", " Message(\"Publishing a message from broadcasting agent!\"),\n", " topic_id=DefaultTopicId(),\n", " )" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "When the runtime calls {py:meth}`~autogen_core.BaseAgent.register` to register the agent type,\n", "it creates a {py:class}`~autogen_core.components.TypeSubscription`\n", "whose topic type uses `\"default\"` as the default value and \n", "agent type uses the same agent type that is being registered in the same context." ] }, { "cell_type": "code", "execution_count": 37, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Received a message: Hello, World! From the runtime!\n", "Received a message: Publishing a message from broadcasting agent!\n" ] } ], "source": [ "runtime = SingleThreadedAgentRuntime()\n", "await BroadcastingAgentDefaultTopic.register(\n", " runtime, \"broadcasting_agent\", lambda: BroadcastingAgentDefaultTopic(\"Broadcasting Agent\")\n", ")\n", "await ReceivingAgent.register(runtime, \"receiving_agent\", lambda: ReceivingAgent(\"Receiving Agent\"))\n", "runtime.start()\n", "await runtime.publish_message(Message(\"Hello, World! From the runtime!\"), topic_id=DefaultTopicId())\n", "await runtime.stop_when_idle()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "```{note}\n", "If your scenario allows all agents to publish and subscribe to\n", "all broadcasted messages, use {py:class}`~autogen_core.components.DefaultTopicId`\n", "and {py:meth}`~autogen_core.components.default_subscription` to decorate your\n", "agent classes.\n", "```" ] } ], "metadata": { "kernelspec": { "display_name": "agnext", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.11.9" } }, "nbformat": 4, "nbformat_minor": 2 }