[docs]classBaseChatAgent(ChatAgent,ABC,ComponentBase[BaseModel]):"""Base class for a chat agent. This abstract class provides a base implementation for a :class:`ChatAgent`. To create a new chat agent, subclass this class and implement the :meth:`on_messages`, :meth:`on_reset`, and :attr:`produced_message_types`. If streaming is required, also implement the :meth:`on_messages_stream` method. An agent is considered stateful and maintains its state between calls to the :meth:`on_messages` or :meth:`on_messages_stream` methods. The agent should store its state in the agent instance. The agent should also implement the :meth:`on_reset` method to reset the agent to its initialization state. .. note:: The caller should only pass the new messages to the agent on each call to the :meth:`on_messages` or :meth:`on_messages_stream` method. Do not pass the entire conversation history to the agent on each call. This design principle must be followed when creating a new agent. """component_type="agent"def__init__(self,name:str,description:str)->None:self._name=nameifself._name.isidentifier()isFalse:raiseValueError("The agent name must be a valid Python identifier.")self._description=description@propertydefname(self)->str:"""The name of the agent. This is used by team to uniquely identify the agent. It should be unique within the team."""returnself._name@propertydefdescription(self)->str:"""The description of the agent. This is used by team to make decisions about which agents to use. The description should describe the agent's capabilities and how to interact with it."""returnself._description@property@abstractmethoddefproduced_message_types(self)->Sequence[type[ChatMessage]]:"""The types of messages that the agent produces in the :attr:`Response.chat_message` field. They must be :class:`ChatMessage` types."""...
[docs]@abstractmethodasyncdefon_messages(self,messages:Sequence[ChatMessage],cancellation_token:CancellationToken)->Response:"""Handles incoming messages and returns a response. .. note:: Agents are stateful and the messages passed to this method should be the new messages since the last call to this method. The agent should maintain its state between calls to this method. For example, if the agent needs to remember the previous messages to respond to the current message, it should store the previous messages in the agent state. """...
[docs]asyncdefon_messages_stream(self,messages:Sequence[ChatMessage],cancellation_token:CancellationToken)->AsyncGenerator[AgentEvent|ChatMessage|Response,None]:"""Handles incoming messages and returns a stream of messages and and the final item is the response. The base implementation in :class:`BaseChatAgent` simply calls :meth:`on_messages` and yields the messages in the response. .. note:: Agents are stateful and the messages passed to this method should be the new messages since the last call to this method. The agent should maintain its state between calls to this method. For example, if the agent needs to remember the previous messages to respond to the current message, it should store the previous messages in the agent state. """response=awaitself.on_messages(messages,cancellation_token)forinner_messageinresponse.inner_messagesor[]:yieldinner_messageyieldresponse
[docs]asyncdefrun(self,*,task:str|ChatMessage|Sequence[ChatMessage]|None=None,cancellation_token:CancellationToken|None=None,)->TaskResult:"""Run the agent with the given task and return the result."""ifcancellation_tokenisNone:cancellation_token=CancellationToken()input_messages:List[ChatMessage]=[]output_messages:List[AgentEvent|ChatMessage]=[]iftaskisNone:passelifisinstance(task,str):text_msg=TextMessage(content=task,source="user")input_messages.append(text_msg)output_messages.append(text_msg)elifisinstance(task,BaseChatMessage):input_messages.append(task)output_messages.append(task)else:ifnottask:raiseValueError("Task list cannot be empty.")# Task is a sequence of messages.formsgintask:ifisinstance(msg,BaseChatMessage):input_messages.append(msg)output_messages.append(msg)else:raiseValueError(f"Invalid message type in sequence: {type(msg)}")response=awaitself.on_messages(input_messages,cancellation_token)ifresponse.inner_messagesisnotNone:output_messages+=response.inner_messagesoutput_messages.append(response.chat_message)returnTaskResult(messages=output_messages)
[docs]asyncdefrun_stream(self,*,task:str|ChatMessage|Sequence[ChatMessage]|None=None,cancellation_token:CancellationToken|None=None,)->AsyncGenerator[AgentEvent|ChatMessage|TaskResult,None]:"""Run the agent with the given task and return a stream of messages and the final task result as the last item in the stream."""ifcancellation_tokenisNone:cancellation_token=CancellationToken()input_messages:List[ChatMessage]=[]output_messages:List[AgentEvent|ChatMessage]=[]iftaskisNone:passelifisinstance(task,str):text_msg=TextMessage(content=task,source="user")input_messages.append(text_msg)output_messages.append(text_msg)yieldtext_msgelifisinstance(task,BaseChatMessage):input_messages.append(task)output_messages.append(task)yieldtaskelse:ifnottask:raiseValueError("Task list cannot be empty.")formsgintask:ifisinstance(msg,BaseChatMessage):input_messages.append(msg)output_messages.append(msg)yieldmsgelse:raiseValueError(f"Invalid message type in sequence: {type(msg)}")asyncformessageinself.on_messages_stream(input_messages,cancellation_token):ifisinstance(message,Response):yieldmessage.chat_messageoutput_messages.append(message.chat_message)yieldTaskResult(messages=output_messages)else:output_messages.append(message)yieldmessage
[docs]@abstractmethodasyncdefon_reset(self,cancellation_token:CancellationToken)->None:"""Resets the agent to its initialization state."""...
[docs]asyncdefsave_state(self)->Mapping[str,Any]:"""Export state. Default implementation for stateless agents."""returnBaseState().model_dump()
[docs]asyncdefload_state(self,state:Mapping[str,Any])->None:"""Restore agent from saved state. Default implementation for stateless agents."""BaseState.model_validate(state)
[docs]asyncdefclose(self)->None:"""Called when the runtime is closed"""pass