Custom Agents#
You may have agents with behaviors that do not fall into a preset. In such cases, you can build custom agents.
All agents in AgentChat inherit from BaseChatAgent
class and implement the following abstract methods and attributes:
on_messages()
: The abstract method that defines the behavior of the agent in response to messages. This method is called when the agent is asked to provide a response inrun()
. It returns aResponse
object.on_reset()
: The abstract method that resets the agent to its initial state. This method is called when the agent is asked to reset itself.produced_message_types
: The list of possibleChatMessage
message types the agent can produce in its response.
Optionally, you can implement the the on_messages_stream()
method to stream messages as they are generated by the agent. If this method is not implemented, the agent
uses the default implementation of on_messages_stream()
that calls the on_messages()
method and
yields all messages in the response.
CounterDownAgent#
In this example, we create a simple agent that counts down from a given number to zero, and produces a stream of messages with the current count.
from typing import AsyncGenerator, List, Sequence
from autogen_agentchat.agents import BaseChatAgent
from autogen_agentchat.base import Response
from autogen_agentchat.messages import AgentMessage, ChatMessage, TextMessage
from autogen_core.base import CancellationToken
class CountDownAgent(BaseChatAgent):
def __init__(self, name: str, count: int = 3):
super().__init__(name, "A simple agent that counts down.")
self._count = count
@property
def produced_message_types(self) -> List[type[ChatMessage]]:
return [TextMessage]
async def on_messages(self, messages: Sequence[ChatMessage], cancellation_token: CancellationToken) -> Response:
# Calls the on_messages_stream.
response: Response | None = None
async for message in self.on_messages_stream(messages, cancellation_token):
if isinstance(message, Response):
response = message
assert response is not None
return response
async def on_messages_stream(
self, messages: Sequence[ChatMessage], cancellation_token: CancellationToken
) -> AsyncGenerator[AgentMessage | Response, None]:
inner_messages: List[AgentMessage] = []
for i in range(self._count, 0, -1):
msg = TextMessage(content=f"{i}...", source=self.name)
inner_messages.append(msg)
yield msg
# The response is returned at the end of the stream.
# It contains the final message and all the inner messages.
yield Response(chat_message=TextMessage(content="Done!", source=self.name), inner_messages=inner_messages)
async def on_reset(self, cancellation_token: CancellationToken) -> None:
pass
async def run_countdown_agent() -> None:
# Create a countdown agent.
countdown_agent = CountDownAgent("countdown")
# Run the agent with a given task and stream the response.
async for message in countdown_agent.on_messages_stream([], CancellationToken()):
if isinstance(message, Response):
print(message.chat_message.content)
else:
print(message.content)
# Use asyncio.run(run_countdown_agent()) when running in a script.
await run_countdown_agent()
UserProxyAgent#
A common use case for building a custom agent is to create an agent that acts as a proxy for the user.
In the example below we show how to implement a UserProxyAgent
- an agent that asks the user to enter
some text through console and then returns that message as a response.
import asyncio
from typing import List, Sequence
from autogen_agentchat.agents import BaseChatAgent
from autogen_agentchat.base import Response
from autogen_agentchat.messages import ChatMessage
from autogen_core.base import CancellationToken
class UserProxyAgent(BaseChatAgent):
def __init__(self, name: str) -> None:
super().__init__(name, "A human user.")
@property
def produced_message_types(self) -> List[type[ChatMessage]]:
return [TextMessage]
async def on_messages(self, messages: Sequence[ChatMessage], cancellation_token: CancellationToken) -> Response:
user_input = await asyncio.get_event_loop().run_in_executor(None, input, "Enter your response: ")
return Response(chat_message=TextMessage(content=user_input, source=self.name))
async def on_reset(self, cancellation_token: CancellationToken) -> None:
pass
async def run_user_proxy_agent() -> None:
user_proxy_agent = UserProxyAgent(name="user_proxy_agent")
response = await user_proxy_agent.on_messages([], CancellationToken())
print(response.chat_message.content)
# Use asyncio.run(run_user_proxy_agent()) when running in a script.
await run_user_proxy_agent()