from abc import ABC, abstractmethod
from typing import Any, AsyncGenerator, List, Mapping, Sequence
from autogen_core import CancellationToken
from ..base import ChatAgent, Response, TaskResult
from ..messages import (
AgentEvent,
BaseChatMessage,
ChatMessage,
TextMessage,
)
from ..state import BaseState
[docs]
class BaseChatAgent(ChatAgent, ABC):
"""Base class for a chat agent.
This abstract class provides a base implementation for a :class:`ChatAgent`.
To create a new chat agent, subclass this class and implement the
:meth:`on_messages`, :meth:`on_reset`, and :attr:`produced_message_types`.
If streaming is required, also implement the :meth:`on_messages_stream` method.
An agent is considered stateful and maintains its state between calls to
the :meth:`on_messages` or :meth:`on_messages_stream` methods.
The agent should store its state in the
agent instance. The agent should also implement the :meth:`on_reset` method
to reset the agent to its initialization state.
.. note::
The caller should only pass the new messages to the agent on each call
to the :meth:`on_messages` or :meth:`on_messages_stream` method.
Do not pass the entire conversation history to the agent on each call.
This design principle must be followed when creating a new agent.
"""
def __init__(self, name: str, description: str) -> None:
self._name = name
if self._name.isidentifier() is False:
raise ValueError("The agent name must be a valid Python identifier.")
self._description = description
@property
def name(self) -> str:
"""The name of the agent. This is used by team to uniquely identify
the agent. It should be unique within the team."""
return self._name
@property
def description(self) -> str:
"""The description of the agent. This is used by team to
make decisions about which agents to use. The description should
describe the agent's capabilities and how to interact with it."""
return self._description
@property
@abstractmethod
def produced_message_types(self) -> Sequence[type[ChatMessage]]:
"""The types of messages that the agent produces in the
:attr:`Response.chat_message` field. They must be :class:`ChatMessage` types."""
...
[docs]
@abstractmethod
async def on_messages(self, messages: Sequence[ChatMessage], cancellation_token: CancellationToken) -> Response:
"""Handles incoming messages and returns a response.
.. note::
Agents are stateful and the messages passed to this method should
be the new messages since the last call to this method. The agent
should maintain its state between calls to this method. For example,
if the agent needs to remember the previous messages to respond to
the current message, it should store the previous messages in the
agent state.
"""
...
[docs]
async def on_messages_stream(
self, messages: Sequence[ChatMessage], cancellation_token: CancellationToken
) -> AsyncGenerator[AgentEvent | ChatMessage | Response, None]:
"""Handles incoming messages and returns a stream of messages and
and the final item is the response. The base implementation in
:class:`BaseChatAgent` simply calls :meth:`on_messages` and yields
the messages in the response.
.. note::
Agents are stateful and the messages passed to this method should
be the new messages since the last call to this method. The agent
should maintain its state between calls to this method. For example,
if the agent needs to remember the previous messages to respond to
the current message, it should store the previous messages in the
agent state.
"""
response = await self.on_messages(messages, cancellation_token)
for inner_message in response.inner_messages or []:
yield inner_message
yield response
[docs]
async def run(
self,
*,
task: str | ChatMessage | Sequence[ChatMessage] | None = None,
cancellation_token: CancellationToken | None = None,
) -> TaskResult:
"""Run the agent with the given task and return the result."""
if cancellation_token is None:
cancellation_token = CancellationToken()
input_messages: List[ChatMessage] = []
output_messages: List[AgentEvent | ChatMessage] = []
if task is None:
pass
elif isinstance(task, str):
text_msg = TextMessage(content=task, source="user")
input_messages.append(text_msg)
output_messages.append(text_msg)
elif isinstance(task, BaseChatMessage):
input_messages.append(task)
output_messages.append(task)
else:
if not task:
raise ValueError("Task list cannot be empty.")
# Task is a sequence of messages.
for msg in task:
if isinstance(msg, BaseChatMessage):
input_messages.append(msg)
output_messages.append(msg)
else:
raise ValueError(f"Invalid message type in sequence: {type(msg)}")
response = await self.on_messages(input_messages, cancellation_token)
if response.inner_messages is not None:
output_messages += response.inner_messages
output_messages.append(response.chat_message)
return TaskResult(messages=output_messages)
[docs]
async def run_stream(
self,
*,
task: str | ChatMessage | Sequence[ChatMessage] | None = None,
cancellation_token: CancellationToken | None = None,
) -> AsyncGenerator[AgentEvent | ChatMessage | TaskResult, None]:
"""Run the agent with the given task and return a stream of messages
and the final task result as the last item in the stream."""
if cancellation_token is None:
cancellation_token = CancellationToken()
input_messages: List[ChatMessage] = []
output_messages: List[AgentEvent | ChatMessage] = []
if task is None:
pass
elif isinstance(task, str):
text_msg = TextMessage(content=task, source="user")
input_messages.append(text_msg)
output_messages.append(text_msg)
yield text_msg
elif isinstance(task, BaseChatMessage):
input_messages.append(task)
output_messages.append(task)
yield task
else:
if not task:
raise ValueError("Task list cannot be empty.")
for msg in task:
if isinstance(msg, BaseChatMessage):
input_messages.append(msg)
output_messages.append(msg)
yield msg
else:
raise ValueError(f"Invalid message type in sequence: {type(msg)}")
async for message in self.on_messages_stream(input_messages, cancellation_token):
if isinstance(message, Response):
yield message.chat_message
output_messages.append(message.chat_message)
yield TaskResult(messages=output_messages)
else:
output_messages.append(message)
yield message
[docs]
@abstractmethod
async def on_reset(self, cancellation_token: CancellationToken) -> None:
"""Resets the agent to its initialization state."""
...
[docs]
async def save_state(self) -> Mapping[str, Any]:
"""Export state. Default implementation for stateless agents."""
return BaseState().model_dump()
[docs]
async def load_state(self, state: Mapping[str, Any]) -> None:
"""Restore agent from saved state. Default implementation for stateless agents."""
BaseState.model_validate(state)