Custom Agents#

You may have agents with behaviors that do not fall into a preset. In such cases, you can build custom agents.

All agents in AgentChat inherit from BaseChatAgent class and implement the following abstract methods and attributes:

  • on_messages(): The abstract method that defines the behavior of the agent in response to messages. This method is called when the agent is asked to provide a response in run(). It returns a Response object.

  • on_reset(): The abstract method that resets the agent to its initial state. This method is called when the agent is asked to reset itself.

  • produced_message_types: The list of possible ChatMessage message types the agent can produce in its response.

Optionally, you can implement the the on_messages_stream() method to stream messages as they are generated by the agent. If this method is not implemented, the agent uses the default implementation of on_messages_stream() that calls the on_messages() method and yields all messages in the response.

CounterDownAgent#

In this example, we create a simple agent that counts down from a given number to zero, and produces a stream of messages with the current count.

from typing import AsyncGenerator, List, Sequence

from autogen_agentchat.agents import BaseChatAgent
from autogen_agentchat.base import Response
from autogen_agentchat.messages import AgentMessage, ChatMessage, TextMessage
from autogen_core.base import CancellationToken


class CountDownAgent(BaseChatAgent):
    def __init__(self, name: str, count: int = 3):
        super().__init__(name, "A simple agent that counts down.")
        self._count = count

    @property
    def produced_message_types(self) -> List[type[ChatMessage]]:
        return [TextMessage]

    async def on_messages(self, messages: Sequence[ChatMessage], cancellation_token: CancellationToken) -> Response:
        # Calls the on_messages_stream.
        response: Response | None = None
        async for message in self.on_messages_stream(messages, cancellation_token):
            if isinstance(message, Response):
                response = message
        assert response is not None
        return response

    async def on_messages_stream(
        self, messages: Sequence[ChatMessage], cancellation_token: CancellationToken
    ) -> AsyncGenerator[AgentMessage | Response, None]:
        inner_messages: List[AgentMessage] = []
        for i in range(self._count, 0, -1):
            msg = TextMessage(content=f"{i}...", source=self.name)
            inner_messages.append(msg)
            yield msg
        # The response is returned at the end of the stream.
        # It contains the final message and all the inner messages.
        yield Response(chat_message=TextMessage(content="Done!", source=self.name), inner_messages=inner_messages)

    async def on_reset(self, cancellation_token: CancellationToken) -> None:
        pass


async def run_countdown_agent() -> None:
    # Create a countdown agent.
    countdown_agent = CountDownAgent("countdown")

    # Run the agent with a given task and stream the response.
    async for message in countdown_agent.on_messages_stream([], CancellationToken()):
        if isinstance(message, Response):
            print(message.chat_message.content)
        else:
            print(message.content)


# Use asyncio.run(run_countdown_agent()) when running in a script.
await run_countdown_agent()

UserProxyAgent#

A common use case for building a custom agent is to create an agent that acts as a proxy for the user.

In the example below we show how to implement a UserProxyAgent - an agent that asks the user to enter some text through console and then returns that message as a response.

import asyncio
from typing import List, Sequence

from autogen_agentchat.agents import BaseChatAgent
from autogen_agentchat.base import Response
from autogen_agentchat.messages import ChatMessage
from autogen_core.base import CancellationToken


class UserProxyAgent(BaseChatAgent):
    def __init__(self, name: str) -> None:
        super().__init__(name, "A human user.")

    @property
    def produced_message_types(self) -> List[type[ChatMessage]]:
        return [TextMessage]

    async def on_messages(self, messages: Sequence[ChatMessage], cancellation_token: CancellationToken) -> Response:
        user_input = await asyncio.get_event_loop().run_in_executor(None, input, "Enter your response: ")
        return Response(chat_message=TextMessage(content=user_input, source=self.name))

    async def on_reset(self, cancellation_token: CancellationToken) -> None:
        pass


async def run_user_proxy_agent() -> None:
    user_proxy_agent = UserProxyAgent(name="user_proxy_agent")
    response = await user_proxy_agent.on_messages([], CancellationToken())
    print(response.chat_message.content)


# Use asyncio.run(run_user_proxy_agent()) when running in a script.
await run_user_proxy_agent()