Source code for autogen_agentchat.agents._assistant_agent
importasyncioimportjsonimportloggingimportwarningsfromtypingimport(Any,AsyncGenerator,Awaitable,Callable,Dict,List,Mapping,Sequence,)fromautogen_coreimportCancellationToken,Component,ComponentModel,FunctionCallfromautogen_core.memoryimportMemoryfromautogen_core.model_contextimport(ChatCompletionContext,UnboundedChatCompletionContext,)fromautogen_core.modelsimport(AssistantMessage,ChatCompletionClient,FunctionExecutionResult,FunctionExecutionResultMessage,LLMMessage,SystemMessage,UserMessage,)fromautogen_core.toolsimportFunctionTool,ToolfrompydanticimportBaseModelfromtyping_extensionsimportSelffrom..importEVENT_LOGGER_NAMEfrom..baseimportHandoffasHandoffBasefrom..baseimportResponsefrom..messagesimport(AgentEvent,ChatMessage,HandoffMessage,MemoryQueryEvent,MultiModalMessage,TextMessage,ToolCallExecutionEvent,ToolCallRequestEvent,ToolCallSummaryMessage,)from..stateimportAssistantAgentStatefrom._base_chat_agentimportBaseChatAgentevent_logger=logging.getLogger(EVENT_LOGGER_NAME)classAssistantAgentConfig(BaseModel):"""The declarative configuration for the assistant agent."""name:strmodel_client:ComponentModel# tools: List[Any] | None = None # TBDhandoffs:List[HandoffBase|str]|None=Nonemodel_context:ComponentModel|None=Nonedescription:strsystem_message:str|None=Nonereflect_on_tool_use:booltool_call_summary_format:str
[docs]classAssistantAgent(BaseChatAgent,Component[AssistantAgentConfig]):"""An agent that provides assistance with tool use. The :meth:`on_messages` returns a :class:`~autogen_agentchat.base.Response` in which :attr:`~autogen_agentchat.base.Response.chat_message` is the final response message. The :meth:`on_messages_stream` creates an async generator that produces the inner messages as they are created, and the :class:`~autogen_agentchat.base.Response` object as the last item before closing the generator. .. attention:: The caller must only pass the new messages to the agent on each call to the :meth:`on_messages` or :meth:`on_messages_stream` method. The agent maintains its state between calls to these methods. Do not pass the entire conversation history to the agent on each call. .. warning:: The assistant agent is not thread-safe or coroutine-safe. It should not be shared between multiple tasks or coroutines, and it should not call its methods concurrently. The following diagram shows how the assistant agent works: .. image:: ../../images/assistant-agent.svg Tool call behavior: * If the model returns no tool call, then the response is immediately returned as a :class:`~autogen_agentchat.messages.TextMessage` in :attr:`~autogen_agentchat.base.Response.chat_message`. * When the model returns tool calls, they will be executed right away: - When `reflect_on_tool_use` is False (default), the tool call results are returned as a :class:`~autogen_agentchat.messages.ToolCallSummaryMessage` in :attr:`~autogen_agentchat.base.Response.chat_message`. `tool_call_summary_format` can be used to customize the tool call summary. - When `reflect_on_tool_use` is True, the another model inference is made using the tool calls and results, and the text response is returned as a :class:`~autogen_agentchat.messages.TextMessage` in :attr:`~autogen_agentchat.base.Response.chat_message`. * If the model returns multiple tool calls, they will be executed concurrently. To disable parallel tool calls you need to configure the model client. For example, set `parallel_tool_calls=False` for :class:`~autogen_ext.models.openai.OpenAIChatCompletionClient` and :class:`~autogen_ext.models.openai.AzureOpenAIChatCompletionClient`. .. tip:: By default, the tool call results are returned as response when tool calls are made. So it is recommended to pay attention to the formatting of the tools return values, especially if another agent is expecting them in a specific format. Use `tool_call_summary_format` to customize the tool call summary, if needed. Hand off behavior: * If a handoff is triggered, a :class:`~autogen_agentchat.messages.HandoffMessage` will be returned in :attr:`~autogen_agentchat.base.Response.chat_message`. * If there are tool calls, they will also be executed right away before returning the handoff. * The tool calls and results are passed to the target agent through :attr:`~autogen_agentchat.messages.HandoffMessage.context`. .. note:: If multiple handoffs are detected, only the first handoff is executed. To avoid this, disable parallel tool calls in the model client configuration. Limit context size sent to the model: You can limit the number of messages sent to the model by setting the `model_context` parameter to a :class:`~autogen_core.model_context.BufferedChatCompletionContext`. This will limit the number of recent messages sent to the model and can be useful when the model has a limit on the number of tokens it can process. Args: name (str): The name of the agent. model_client (ChatCompletionClient): The model client to use for inference. tools (List[Tool | Callable[..., Any] | Callable[..., Awaitable[Any]]] | None, optional): The tools to register with the agent. handoffs (List[HandoffBase | str] | None, optional): The handoff configurations for the agent, allowing it to transfer to other agents by responding with a :class:`HandoffMessage`. The transfer is only executed when the team is in :class:`~autogen_agentchat.teams.Swarm`. If a handoff is a string, it should represent the target agent's name. model_context (ChatCompletionContext | None, optional): The model context for storing and retrieving :class:`~autogen_core.models.LLMMessage`. It can be preloaded with initial messages. The initial messages will be cleared when the agent is reset. description (str, optional): The description of the agent. system_message (str, optional): The system message for the model. If provided, it will be prepended to the messages in the model context when making an inference. Set to `None` to disable. reflect_on_tool_use (bool, optional): If `True`, the agent will make another model inference using the tool call and result to generate a response. If `False`, the tool call result will be returned as the response. Defaults to `False`. tool_call_summary_format (str, optional): The format string used to create a tool call summary for every tool call result. Defaults to "{result}". When `reflect_on_tool_use` is `False`, a concatenation of all the tool call summaries, separated by a new line character ('\\n') will be returned as the response. Available variables: `{tool_name}`, `{arguments}`, `{result}`. For example, `"{tool_name}: {result}"` will create a summary like `"tool_name: result"`. memory (Sequence[Memory] | None, optional): The memory store to use for the agent. Defaults to `None`. Raises: ValueError: If tool names are not unique. ValueError: If handoff names are not unique. ValueError: If handoff names are not unique from tool names. ValueError: If maximum number of tool iterations is less than 1. Examples: The following example demonstrates how to create an assistant agent with a model client and generate a response to a simple task. .. code-block:: python import asyncio from autogen_core import CancellationToken from autogen_ext.models.openai import OpenAIChatCompletionClient from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.messages import TextMessage async def main() -> None: model_client = OpenAIChatCompletionClient( model="gpt-4o", # api_key = "your_openai_api_key" ) agent = AssistantAgent(name="assistant", model_client=model_client) response = await agent.on_messages( [TextMessage(content="What is the capital of France?", source="user")], CancellationToken() ) print(response) asyncio.run(main()) The following example demonstrates how to create an assistant agent with a model client and a tool, generate a stream of messages for a task, and print the messages to the console. .. code-block:: python import asyncio from autogen_ext.models.openai import OpenAIChatCompletionClient from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.messages import TextMessage from autogen_agentchat.ui import Console from autogen_core import CancellationToken async def get_current_time() -> str: return "The current time is 12:00 PM." async def main() -> None: model_client = OpenAIChatCompletionClient( model="gpt-4o", # api_key = "your_openai_api_key" ) agent = AssistantAgent(name="assistant", model_client=model_client, tools=[get_current_time]) await Console( agent.on_messages_stream( [TextMessage(content="What is the current time?", source="user")], CancellationToken() ) ) asyncio.run(main()) The following example shows how to use `o1-mini` model with the assistant agent. .. code-block:: python import asyncio from autogen_core import CancellationToken from autogen_ext.models.openai import OpenAIChatCompletionClient from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.messages import TextMessage async def main() -> None: model_client = OpenAIChatCompletionClient( model="o1-mini", # api_key = "your_openai_api_key" ) # The system message is not supported by the o1 series model. agent = AssistantAgent(name="assistant", model_client=model_client, system_message=None) response = await agent.on_messages( [TextMessage(content="What is the capital of France?", source="user")], CancellationToken() ) print(response) asyncio.run(main()) .. note:: The `o1-preview` and `o1-mini` models do not support system message and function calling. So the `system_message` should be set to `None` and the `tools` and `handoffs` should not be set. See `o1 beta limitations <https://platform.openai.com/docs/guides/reasoning#beta-limitations>`_ for more details. """component_config_schema=AssistantAgentConfigcomponent_provider_override="autogen_agentchat.agents.AssistantAgent"def__init__(self,name:str,model_client:ChatCompletionClient,*,tools:List[Tool|Callable[...,Any]|Callable[...,Awaitable[Any]]]|None=None,handoffs:List[HandoffBase|str]|None=None,model_context:ChatCompletionContext|None=None,description:str="An agent that provides assistance with ability to use tools.",system_message:(str|None)="You are a helpful AI assistant. Solve tasks using your tools. Reply with TERMINATE when the task has been completed.",reflect_on_tool_use:bool=False,tool_call_summary_format:str="{result}",memory:Sequence[Memory]|None=None,):super().__init__(name=name,description=description)self._model_client=model_clientself._memory=NoneifmemoryisnotNone:ifisinstance(memory,list):self._memory=memoryelse:raiseTypeError(f"Expected Memory, List[Memory], or None, got {type(memory)}")self._system_messages:List[SystemMessage|UserMessage|AssistantMessage|FunctionExecutionResultMessage]=[]ifsystem_messageisNone:self._system_messages=[]else:self._system_messages=[SystemMessage(content=system_message)]self._tools:List[Tool]=[]iftoolsisnotNone:ifmodel_client.model_info["function_calling"]isFalse:raiseValueError("The model does not support function calling.")fortoolintools:ifisinstance(tool,Tool):self._tools.append(tool)elifcallable(tool):ifhasattr(tool,"__doc__")andtool.__doc__isnotNone:description=tool.__doc__else:description=""self._tools.append(FunctionTool(tool,description=description))else:raiseValueError(f"Unsupported tool type: {type(tool)}")# Check if tool names are unique.tool_names=[tool.namefortoolinself._tools]iflen(tool_names)!=len(set(tool_names)):raiseValueError(f"Tool names must be unique: {tool_names}")# Handoff tools.self._handoff_tools:List[Tool]=[]self._handoffs:Dict[str,HandoffBase]={}ifhandoffsisnotNone:ifmodel_client.model_info["function_calling"]isFalse:raiseValueError("The model does not support function calling, which is needed for handoffs.")forhandoffinhandoffs:ifisinstance(handoff,str):handoff=HandoffBase(target=handoff)ifisinstance(handoff,HandoffBase):self._handoff_tools.append(handoff.handoff_tool)self._handoffs[handoff.name]=handoffelse:raiseValueError(f"Unsupported handoff type: {type(handoff)}")# Check if handoff tool names are unique.handoff_tool_names=[tool.namefortoolinself._handoff_tools]iflen(handoff_tool_names)!=len(set(handoff_tool_names)):raiseValueError(f"Handoff names must be unique: {handoff_tool_names}")# Check if handoff tool names not in tool names.ifany(nameintool_namesfornameinhandoff_tool_names):raiseValueError(f"Handoff names must be unique from tool names. Handoff names: {handoff_tool_names}; tool names: {tool_names}")ifmodel_contextisnotNone:self._model_context=model_contextelse:self._model_context=UnboundedChatCompletionContext()self._reflect_on_tool_use=reflect_on_tool_useself._tool_call_summary_format=tool_call_summary_formatself._is_running=False@propertydefproduced_message_types(self)->Sequence[type[ChatMessage]]:"""The types of messages that the assistant agent produces."""message_types:List[type[ChatMessage]]=[TextMessage]ifself._handoffs:message_types.append(HandoffMessage)ifself._tools:message_types.append(ToolCallSummaryMessage)returntuple(message_types)
[docs]asyncdefon_messages(self,messages:Sequence[ChatMessage],cancellation_token:CancellationToken)->Response:asyncformessageinself.on_messages_stream(messages,cancellation_token):ifisinstance(message,Response):returnmessageraiseAssertionError("The stream should have returned the final result.")
[docs]asyncdefon_messages_stream(self,messages:Sequence[ChatMessage],cancellation_token:CancellationToken)->AsyncGenerator[AgentEvent|ChatMessage|Response,None]:# Add messages to the model context.formsginmessages:ifisinstance(msg,MultiModalMessage)andself._model_client.model_info["vision"]isFalse:raiseValueError("The model does not support vision.")ifisinstance(msg,HandoffMessage):# Add handoff context to the model context.forcontext_msginmsg.context:awaitself._model_context.add_message(context_msg)awaitself._model_context.add_message(UserMessage(content=msg.content,source=msg.source))# Inner messages.inner_messages:List[AgentEvent|ChatMessage]=[]# Update the model context with memory content.ifself._memory:formemoryinself._memory:update_context_result=awaitmemory.update_context(self._model_context)ifupdate_context_resultandlen(update_context_result.memories.results)>0:memory_query_event_msg=MemoryQueryEvent(content=update_context_result.memories.results,source=self.name)inner_messages.append(memory_query_event_msg)yieldmemory_query_event_msg# Generate an inference result based on the current model context.llm_messages=self._system_messages+awaitself._model_context.get_messages()model_result=awaitself._model_client.create(llm_messages,tools=self._tools+self._handoff_tools,cancellation_token=cancellation_token)# Add the response to the model context.awaitself._model_context.add_message(AssistantMessage(content=model_result.content,source=self.name))# Check if the response is a string and return it.ifisinstance(model_result.content,str):yieldResponse(chat_message=TextMessage(content=model_result.content,source=self.name,models_usage=model_result.usage),inner_messages=inner_messages,)return# Process tool calls.assertisinstance(model_result.content,list)andall(isinstance(item,FunctionCall)foriteminmodel_result.content)tool_call_msg=ToolCallRequestEvent(content=model_result.content,source=self.name,models_usage=model_result.usage)event_logger.debug(tool_call_msg)# Add the tool call message to the output.inner_messages.append(tool_call_msg)yieldtool_call_msg# Execute the tool calls.exec_results=awaitasyncio.gather(*[self._execute_tool_call(call,cancellation_token)forcallinmodel_result.content])tool_call_result_msg=ToolCallExecutionEvent(content=exec_results,source=self.name)event_logger.debug(tool_call_result_msg)awaitself._model_context.add_message(FunctionExecutionResultMessage(content=exec_results))inner_messages.append(tool_call_result_msg)yieldtool_call_result_msg# Correlate tool call results with tool calls.tool_calls=[callforcallinmodel_result.contentifcall.namenotinself._handoffs]tool_call_results:List[FunctionExecutionResult]=[]fortool_callintool_calls:found=Falseforexec_resultinexec_results:ifexec_result.call_id==tool_call.id:found=Truetool_call_results.append(exec_result)breakifnotfound:raiseRuntimeError(f"Tool call result not found for call id: {tool_call.id}")# Detect handoff requests.handoff_reqs=[callforcallinmodel_result.contentifcall.nameinself._handoffs]iflen(handoff_reqs)>0:handoffs=[self._handoffs[call.name]forcallinhandoff_reqs]iflen(handoffs)>1:# show warning if multiple handoffs detectedwarnings.warn((f"Multiple handoffs detected only the first is executed: {[handoff.nameforhandoffinhandoffs]}. ""Disable parallel tool call in the model client to avoid this warning."),stacklevel=2,)# Current context for handoff.handoff_context:List[LLMMessage]=[]iflen(tool_calls)>0:handoff_context.append(AssistantMessage(content=tool_calls,source=self.name))handoff_context.append(FunctionExecutionResultMessage(content=tool_call_results))# Return the output messages to signal the handoff.yieldResponse(chat_message=HandoffMessage(content=handoffs[0].message,target=handoffs[0].target,source=self.name,context=handoff_context),inner_messages=inner_messages,)returnifself._reflect_on_tool_use:# Generate another inference result based on the tool call and result.llm_messages=self._system_messages+awaitself._model_context.get_messages()model_result=awaitself._model_client.create(llm_messages,cancellation_token=cancellation_token)assertisinstance(model_result.content,str)# Add the response to the model context.awaitself._model_context.add_message(AssistantMessage(content=model_result.content,source=self.name))# Yield the response.yieldResponse(chat_message=TextMessage(content=model_result.content,source=self.name,models_usage=model_result.usage),inner_messages=inner_messages,)else:# Return tool call result as the response.tool_call_summaries:List[str]=[]fortool_call,tool_call_resultinzip(tool_calls,tool_call_results,strict=False):tool_call_summaries.append(self._tool_call_summary_format.format(tool_name=tool_call.name,arguments=tool_call.arguments,result=tool_call_result.content,),)tool_call_summary="\n".join(tool_call_summaries)yieldResponse(chat_message=ToolCallSummaryMessage(content=tool_call_summary,source=self.name),inner_messages=inner_messages,)
asyncdef_execute_tool_call(self,tool_call:FunctionCall,cancellation_token:CancellationToken)->FunctionExecutionResult:"""Execute a tool call and return the result."""try:ifnotself._tools+self._handoff_tools:raiseValueError("No tools are available.")tool=next((tfortinself._tools+self._handoff_toolsift.name==tool_call.name),None)iftoolisNone:raiseValueError(f"The tool '{tool_call.name}' is not available.")arguments=json.loads(tool_call.arguments)result=awaittool.run_json(arguments,cancellation_token)result_as_str=tool.return_value_as_string(result)returnFunctionExecutionResult(content=result_as_str,call_id=tool_call.id)exceptExceptionase:returnFunctionExecutionResult(content=f"Error: {e}",call_id=tool_call.id)
[docs]asyncdefon_reset(self,cancellation_token:CancellationToken)->None:"""Reset the assistant agent to its initialization state."""awaitself._model_context.clear()
[docs]asyncdefsave_state(self)->Mapping[str,Any]:"""Save the current state of the assistant agent."""model_context_state=awaitself._model_context.save_state()returnAssistantAgentState(llm_context=model_context_state).model_dump()
[docs]asyncdefload_state(self,state:Mapping[str,Any])->None:"""Load the state of the assistant agent"""assistant_agent_state=AssistantAgentState.model_validate(state)# Load the model context state.awaitself._model_context.load_state(assistant_agent_state.llm_context)
def_to_config(self)->AssistantAgentConfig:"""Convert the assistant agent to a declarative config."""# raise an error if tools is not empty until it is implemented# TBD : Implement serializing tools and remove this check.ifself._toolsandlen(self._tools)>0:raiseNotImplementedError("Serializing tools is not implemented yet.")returnAssistantAgentConfig(name=self.name,model_client=self._model_client.dump_component(),# tools=[], # TBDhandoffs=list(self._handoffs.values()),model_context=self._model_context.dump_component(),description=self.description,system_message=self._system_messages[0].contentifself._system_messagesandisinstance(self._system_messages[0].content,str)elseNone,reflect_on_tool_use=self._reflect_on_tool_use,tool_call_summary_format=self._tool_call_summary_format,)@classmethoddef_from_config(cls,config:AssistantAgentConfig)->Self:"""Create an assistant agent from a declarative config."""returncls(name=config.name,model_client=ChatCompletionClient.load_component(config.model_client),# tools=[], # TBDhandoffs=config.handoffs,model_context=None,description=config.description,system_message=config.system_message,reflect_on_tool_use=config.reflect_on_tool_use,tool_call_summary_format=config.tool_call_summary_format,)