Source code for autogen_ext.agents.openai._openai_assistant_agent
importasyncioimportjsonimportloggingimportosfromtypingimport(Any,AsyncGenerator,Awaitable,Callable,Dict,Iterable,List,Literal,Optional,Sequence,Set,Union,cast,)importaiofilesfromautogen_agentchatimportEVENT_LOGGER_NAMEfromautogen_agentchat.agentsimportBaseChatAgentfromautogen_agentchat.baseimportResponsefromautogen_agentchat.messagesimport(AgentEvent,ChatMessage,HandoffMessage,MultiModalMessage,StopMessage,TextMessage,ToolCallExecutionEvent,ToolCallRequestEvent,)fromautogen_coreimportCancellationToken,FunctionCallfromautogen_core.models._model_clientimportChatCompletionClientfromautogen_core.models._typesimportFunctionExecutionResultfromautogen_core.toolsimportFunctionTool,ToolfromopenaiimportNOT_GIVEN,AsyncAzureOpenAI,AsyncOpenAI,NotGivenfromopenai.paginationimportAsyncCursorPagefromopenai.resources.beta.threadsimportAsyncMessages,AsyncRuns,AsyncThreadsfromopenai.typesimportFileObjectfromopenai.types.betaimportthread_update_paramsfromopenai.types.beta.assistantimportAssistantfromopenai.types.beta.assistant_response_format_option_paramimportAssistantResponseFormatOptionParamfromopenai.types.beta.assistant_tool_paramimportAssistantToolParamfromopenai.types.beta.code_interpreter_tool_paramimportCodeInterpreterToolParamfromopenai.types.beta.file_search_tool_paramimportFileSearchToolParamfromopenai.types.beta.function_tool_paramimportFunctionToolParamfromopenai.types.beta.threadimportThread,ToolResources,ToolResourcesCodeInterpreterfromopenai.types.beta.threadsimportMessage,MessageDeleted,Runfromopenai.types.beta.vector_storeimportVectorStorefromopenai.types.shared_params.function_definitionimportFunctionDefinitionevent_logger=logging.getLogger(EVENT_LOGGER_NAME)def_convert_tool_to_function_param(tool:Tool)->"FunctionToolParam":"""Convert an autogen Tool to an OpenAI Assistant function tool parameter."""schema=tool.schemaparameters:Dict[str,object]={}if"parameters"inschema:parameters={"type":schema["parameters"]["type"],"properties":schema["parameters"]["properties"],}if"required"inschema["parameters"]:parameters["required"]=schema["parameters"]["required"]function_def=FunctionDefinition(name=schema["name"],description=schema.get("description",""),parameters=parameters,)returnFunctionToolParam(type="function",function=function_def)
[docs]classOpenAIAssistantAgent(BaseChatAgent):"""An agent implementation that uses the Assistant API to generate responses. Installation: .. code-block:: bash pip install "autogen-ext[openai]" # pip install "autogen-ext[openai,azure]" # For Azure OpenAI Assistant This agent leverages the Assistant API to create AI assistants with capabilities like: * Code interpretation and execution * File handling and search * Custom function calling * Multi-turn conversations The agent maintains a thread of conversation and can use various tools including * Code interpreter: For executing code and working with files * File search: For searching through uploaded documents * Custom functions: For extending capabilities with user-defined tools Key Features: * Supports multiple file formats including code, documents, images * Can handle up to 128 tools per assistant * Maintains conversation context in threads * Supports file uploads for code interpreter and search * Vector store integration for efficient file search * Automatic file parsing and embedding You can use an existing thread or assistant by providing the `thread_id` or `assistant_id` parameters. Examples: Use the assistant to analyze data in a CSV file: .. code-block:: python from openai import AsyncOpenAI from autogen_core import CancellationToken import asyncio from autogen_ext.agents.openai import OpenAIAssistantAgent from autogen_agentchat.messages import TextMessage async def example(): cancellation_token = CancellationToken() # Create an OpenAI client client = AsyncOpenAI(api_key="your-api-key", base_url="your-base-url") # Create an assistant with code interpreter assistant = OpenAIAssistantAgent( name="Python Helper", description="Helps with Python programming", client=client, model="gpt-4", instructions="You are a helpful Python programming assistant.", tools=["code_interpreter"], ) # Upload files for the assistant to use await assistant.on_upload_for_code_interpreter("data.csv", cancellation_token) # Get response from the assistant response = await assistant.on_messages( [TextMessage(source="user", content="Analyze the data in data.csv")], cancellation_token ) print(response) # Clean up resources await assistant.delete_uploaded_files(cancellation_token) await assistant.delete_assistant(cancellation_token) asyncio.run(example()) Use Azure OpenAI Assistant with AAD authentication: .. code-block:: python from openai import AsyncAzureOpenAI import asyncio from azure.identity import DefaultAzureCredential, get_bearer_token_provider from autogen_core import CancellationToken from autogen_ext.agents.openai import OpenAIAssistantAgent from autogen_agentchat.messages import TextMessage async def example(): cancellation_token = CancellationToken() # Create an Azure OpenAI client token_provider = get_bearer_token_provider(DefaultAzureCredential()) client = AsyncAzureOpenAI( azure_deployment="YOUR_AZURE_DEPLOYMENT", api_version="YOUR_API_VERSION", azure_endpoint="YOUR_AZURE_ENDPOINT", azure_ad_token_provider=token_provider, ) # Create an assistant with code interpreter assistant = OpenAIAssistantAgent( name="Python Helper", description="Helps with Python programming", client=client, model="gpt-4o", instructions="You are a helpful Python programming assistant.", tools=["code_interpreter"], ) # Get response from the assistant response = await assistant.on_messages([TextMessage(source="user", content="Hello.")], cancellation_token) print(response) # Clean up resources await assistant.delete_assistant(cancellation_token) asyncio.run(example()) Args: name (str): Name of the assistant description (str): Description of the assistant's purpose client (AsyncOpenAI | AsyncAzureOpenAI): OpenAI client or Azure OpenAI client instance model (str): Model to use (e.g. "gpt-4") instructions (str): System instructions for the assistant tools (Optional[Iterable[Union[Literal["code_interpreter", "file_search"], Tool | Callable[..., Any] | Callable[..., Awaitable[Any]]]]]): Tools the assistant can use assistant_id (Optional[str]): ID of existing assistant to use thread_id (Optional[str]): ID of existing thread to use metadata (Optional[object]): Additional metadata for the assistant response_format (Optional[AssistantResponseFormatOptionParam]): Response format settings temperature (Optional[float]): Temperature for response generation tool_resources (Optional[ToolResources]): Additional tool configuration top_p (Optional[float]): Top p sampling parameter """def__init__(self,name:str,description:str,client:AsyncOpenAI|AsyncAzureOpenAI,model:str,instructions:str,tools:Optional[Iterable[Union[Literal["code_interpreter","file_search"],Tool|Callable[...,Any]|Callable[...,Awaitable[Any]],]]]=None,assistant_id:Optional[str]=None,thread_id:Optional[str]=None,metadata:Optional[object]=None,response_format:Optional["AssistantResponseFormatOptionParam"]=None,temperature:Optional[float]=None,tool_resources:Optional["ToolResources"]=None,top_p:Optional[float]=None,)->None:ifisinstance(client,ChatCompletionClient):raiseValueError("Incorrect client passed to OpenAIAssistantAgent. Please use an OpenAI AsyncClient instance instead of an AutoGen ChatCompletionClient instance.")super().__init__(name,description)iftoolsisNone:tools=[]# Store original tools and converted tools separatelyself._original_tools:List[Tool]=[]converted_tools:List["AssistantToolParam"]=[]fortoolintools:ifisinstance(tool,str):iftool=="code_interpreter":converted_tools.append(CodeInterpreterToolParam(type="code_interpreter"))eliftool=="file_search":converted_tools.append(FileSearchToolParam(type="file_search"))elifisinstance(tool,Tool):self._original_tools.append(tool)converted_tools.append(_convert_tool_to_function_param(tool))elifcallable(tool):ifhasattr(tool,"__doc__")andtool.__doc__isnotNone:description=tool.__doc__else:description=""function_tool=FunctionTool(tool,description=description)self._original_tools.append(function_tool)converted_tools.append(_convert_tool_to_function_param(function_tool))else:raiseValueError(f"Unsupported tool type: {type(tool)}")self._client=clientself._assistant:Optional["Assistant"]=Noneself._thread:Optional["Thread"]=Noneself._init_thread_id=thread_idself._model=modelself._instructions=instructionsself._api_tools=converted_toolsself._assistant_id=assistant_idself._metadata=metadataself._response_format=response_formatself._temperature=temperatureself._tool_resources=tool_resourcesself._top_p=top_pself._vector_store_id:Optional[str]=Noneself._uploaded_file_ids:List[str]=[]# Variables to track initial stateself._initial_message_ids:Set[str]=set()self._initial_state_retrieved:bool=Falseasyncdef_ensure_initialized(self)->None:"""Ensure assistant and thread are created."""ifself._assistantisNone:ifself._assistant_id:self._assistant=awaitself._client.beta.assistants.retrieve(assistant_id=self._assistant_id)else:self._assistant=awaitself._client.beta.assistants.create(model=self._model,description=self.description,instructions=self._instructions,tools=self._api_tools,metadata=self._metadata,response_format=self._response_formatifself._response_formatelseNOT_GIVEN,# type: ignoretemperature=self._temperature,tool_resources=self._tool_resourcesifself._tool_resourceselseNOT_GIVEN,# type: ignoretop_p=self._top_p,)ifself._threadisNone:ifself._init_thread_id:self._thread=awaitself._client.beta.threads.retrieve(thread_id=self._init_thread_id)else:self._thread=awaitself._client.beta.threads.create()# Retrieve initial state only onceifnotself._initial_state_retrieved:awaitself._retrieve_initial_state()self._initial_state_retrieved=Trueasyncdef_retrieve_initial_state(self)->None:"""Retrieve and store the initial state of messages and runs."""# Retrieve all initial message IDsinitial_message_ids:Set[str]=set()after:str|NotGiven=NOT_GIVENwhileTrue:msgs:AsyncCursorPage[Message]=awaitself._client.beta.threads.messages.list(self._thread_id,after=after,order="asc",limit=100)formsginmsgs.data:initial_message_ids.add(msg.id)ifnotmsgs.has_next_page():breakafter=msgs.data[-1].idself._initial_message_ids=initial_message_ids@propertydefproduced_message_types(self)->Sequence[type[ChatMessage]]:"""The types of messages that the assistant agent produces."""return(TextMessage,)@propertydefthreads(self)->AsyncThreads:returnself._client.beta.threads@propertydefruns(self)->AsyncRuns:returnself._client.beta.threads.runs@propertydefmessages(self)->AsyncMessages:returnself._client.beta.threads.messages@propertydef_get_assistant_id(self)->str:ifself._assistantisNone:raiseValueError("Assistant not initialized")returnself._assistant.id@propertydef_thread_id(self)->str:ifself._threadisNone:raiseValueError("Thread not initialized")returnself._thread.idasyncdef_execute_tool_call(self,tool_call:FunctionCall,cancellation_token:CancellationToken)->str:"""Execute a tool call and return the result."""try:ifnotself._original_tools:raiseValueError("No tools are available.")tool=next((tfortinself._original_toolsift.name==tool_call.name),None)iftoolisNone:raiseValueError(f"The tool '{tool_call.name}' is not available.")arguments=json.loads(tool_call.arguments)result=awaittool.run_json(arguments,cancellation_token)returntool.return_value_as_string(result)exceptExceptionase:returnf"Error: {e}"
[docs]asyncdefon_messages(self,messages:Sequence[ChatMessage],cancellation_token:CancellationToken)->Response:"""Handle incoming messages and return a response."""asyncformessageinself.on_messages_stream(messages,cancellation_token):ifisinstance(message,Response):returnmessageraiseAssertionError("The stream should have returned the final result.")
[docs]asyncdefon_messages_stream(self,messages:Sequence[ChatMessage],cancellation_token:CancellationToken)->AsyncGenerator[AgentEvent|ChatMessage|Response,None]:"""Handle incoming messages and return a response."""awaitself._ensure_initialized()# Process all messages in sequenceformessageinmessages:ifisinstance(message,(TextMessage,MultiModalMessage)):awaitself.handle_text_message(str(message.content),cancellation_token)elifisinstance(message,(StopMessage,HandoffMessage)):awaitself.handle_text_message(message.content,cancellation_token)# Inner messages for tool callsinner_messages:List[AgentEvent|ChatMessage]=[]# Create and start a runrun:Run=awaitcancellation_token.link_future(asyncio.ensure_future(self._client.beta.threads.runs.create(thread_id=self._thread_id,assistant_id=self._get_assistant_id,)))# Wait for run completion by pollingwhileTrue:run=awaitcancellation_token.link_future(asyncio.ensure_future(self._client.beta.threads.runs.retrieve(thread_id=self._thread_id,run_id=run.id,)))ifrun.status=="failed":raiseValueError(f"Run failed: {run.last_error}")# If the run requires action (function calls), execute tools and continueifrun.status=="requires_action"andrun.required_actionisnotNone:tool_calls:List[FunctionCall]=[]forrequired_tool_callinrun.required_action.submit_tool_outputs.tool_calls:ifrequired_tool_call.type=="function":tool_calls.append(FunctionCall(id=required_tool_call.id,name=required_tool_call.function.name,arguments=required_tool_call.function.arguments,))# Add tool call message to inner messagestool_call_msg=ToolCallRequestEvent(source=self.name,content=tool_calls)inner_messages.append(tool_call_msg)event_logger.debug(tool_call_msg)yieldtool_call_msg# Execute tool calls and get resultstool_outputs:List[FunctionExecutionResult]=[]fortool_callintool_calls:result=awaitself._execute_tool_call(tool_call,cancellation_token)tool_outputs.append(FunctionExecutionResult(content=result,call_id=tool_call.id))# Add tool result message to inner messagestool_result_msg=ToolCallExecutionEvent(source=self.name,content=tool_outputs)inner_messages.append(tool_result_msg)event_logger.debug(tool_result_msg)yieldtool_result_msg# Submit tool outputs back to the runrun=awaitcancellation_token.link_future(asyncio.ensure_future(self._client.beta.threads.runs.submit_tool_outputs(thread_id=self._thread_id,run_id=run.id,tool_outputs=[{"tool_call_id":t.call_id,"output":t.content}fortintool_outputs],)))continueifrun.status=="completed":breakawaitasyncio.sleep(0.5)# Get messages after run completionassistant_messages:AsyncCursorPage[Message]=awaitcancellation_token.link_future(asyncio.ensure_future(self._client.beta.threads.messages.list(thread_id=self._thread_id,order="desc",limit=1)))ifnotassistant_messages.data:raiseValueError("No messages received from assistant")# Get the last message's contentlast_message=assistant_messages.data[0]ifnotlast_message.content:raiseValueError(f"No content in the last message: {last_message}")# Extract text contenttext_content=[contentforcontentinlast_message.contentifcontent.type=="text"]ifnottext_content:raiseValueError(f"Expected text content in the last message: {last_message.content}")# Return the assistant's response as a Response with inner messageschat_message=TextMessage(source=self.name,content=text_content[0].text.value)yieldResponse(chat_message=chat_message,inner_messages=inner_messages)
[docs]asyncdefhandle_text_message(self,content:str,cancellation_token:CancellationToken)->None:"""Handle regular text messages by adding them to the thread."""awaitcancellation_token.link_future(asyncio.ensure_future(self._client.beta.threads.messages.create(thread_id=self._thread_id,content=content,role="user",)))
[docs]asyncdefon_reset(self,cancellation_token:CancellationToken)->None:"""Handle reset command by deleting new messages and runs since initialization."""awaitself._ensure_initialized()# Retrieve all message IDs in the threadnew_message_ids:List[str]=[]after:str|NotGiven=NOT_GIVENwhileTrue:msgs:AsyncCursorPage[Message]=awaitcancellation_token.link_future(asyncio.ensure_future(self._client.beta.threads.messages.list(self._thread_id,after=after,order="asc",limit=100)))formsginmsgs.data:ifmsg.idnotinself._initial_message_ids:new_message_ids.append(msg.id)ifnotmsgs.has_next_page():breakafter=msgs.data[-1].id# Delete new messagesformsg_idinnew_message_ids:status:MessageDeleted=awaitcancellation_token.link_future(asyncio.ensure_future(self._client.beta.threads.messages.delete(message_id=msg_id,thread_id=self._thread_id)))assertstatus.deletedisTrue
asyncdef_upload_files(self,file_paths:str|Iterable[str],cancellation_token:CancellationToken)->List[str]:"""Upload files and return their IDs."""awaitself._ensure_initialized()ifisinstance(file_paths,str):file_paths=[file_paths]file_ids:List[str]=[]forfile_pathinfile_paths:asyncwithaiofiles.open(file_path,mode="rb")asf:file_content=awaitcancellation_token.link_future(asyncio.ensure_future(f.read()))file_name=os.path.basename(file_path)file:FileObject=awaitcancellation_token.link_future(asyncio.ensure_future(self._client.files.create(file=(file_name,file_content),purpose="assistants")))file_ids.append(file.id)self._uploaded_file_ids.append(file.id)returnfile_ids
[docs]asyncdefon_upload_for_code_interpreter(self,file_paths:str|Iterable[str],cancellation_token:CancellationToken)->None:"""Handle file uploads for the code interpreter."""awaitself._ensure_initialized()file_ids=awaitself._upload_files(file_paths,cancellation_token)# Update thread with the new filesthread=awaitcancellation_token.link_future(asyncio.ensure_future(self._client.beta.threads.retrieve(thread_id=self._thread_id)))tool_resources:ToolResources=thread.tool_resourcesorToolResources()code_interpreter:ToolResourcesCodeInterpreter=(tool_resources.code_interpreterorToolResourcesCodeInterpreter())existing_file_ids:List[str]=code_interpreter.file_idsor[]existing_file_ids.extend(file_ids)tool_resources.code_interpreter=ToolResourcesCodeInterpreter(file_ids=existing_file_ids)awaitcancellation_token.link_future(asyncio.ensure_future(self._client.beta.threads.update(thread_id=self._thread_id,tool_resources=cast(thread_update_params.ToolResources,tool_resources.model_dump()),)))
[docs]asyncdefon_upload_for_file_search(self,file_paths:str|Iterable[str],cancellation_token:CancellationToken)->None:"""Handle file uploads for file search."""awaitself._ensure_initialized()# Check if file_search is enabled in toolsifnotany(tool.get("type")=="file_search"fortoolinself._api_tools):raiseValueError("File search is not enabled for this assistant. Add a file_search tool when creating the assistant.")# Create vector store if not already createdifself._vector_store_idisNone:vector_store:VectorStore=awaitcancellation_token.link_future(asyncio.ensure_future(self._client.beta.vector_stores.create()))self._vector_store_id=vector_store.id# Update assistant with vector store IDawaitcancellation_token.link_future(asyncio.ensure_future(self._client.beta.assistants.update(assistant_id=self._get_assistant_id,tool_resources={"file_search":{"vector_store_ids":[self._vector_store_id]}},)))file_ids=awaitself._upload_files(file_paths,cancellation_token)# Create file batch with the file IDsawaitcancellation_token.link_future(asyncio.ensure_future(self._client.beta.vector_stores.file_batches.create_and_poll(vector_store_id=self._vector_store_id,file_ids=file_ids)))
[docs]asyncdefdelete_uploaded_files(self,cancellation_token:CancellationToken)->None:"""Delete all files that were uploaded by this agent instance."""awaitself._ensure_initialized()forfile_idinself._uploaded_file_ids:try:awaitcancellation_token.link_future(asyncio.ensure_future(self._client.files.delete(file_id=file_id)))exceptExceptionase:event_logger.error(f"Failed to delete file {file_id}: {str(e)}")self._uploaded_file_ids=[]
[docs]asyncdefdelete_assistant(self,cancellation_token:CancellationToken)->None:"""Delete the assistant if it was created by this instance."""awaitself._ensure_initialized()ifself._assistantisnotNoneandnotself._assistant_id:try:awaitcancellation_token.link_future(asyncio.ensure_future(self._client.beta.assistants.delete(assistant_id=self._get_assistant_id)))self._assistant=NoneexceptExceptionase:event_logger.error(f"Failed to delete assistant: {str(e)}")
[docs]asyncdefdelete_vector_store(self,cancellation_token:CancellationToken)->None:"""Delete the vector store if it was created by this instance."""awaitself._ensure_initialized()ifself._vector_store_idisnotNone:try:awaitcancellation_token.link_future(asyncio.ensure_future(self._client.beta.vector_stores.delete(vector_store_id=self._vector_store_id)))self._vector_store_id=NoneexceptExceptionase:event_logger.error(f"Failed to delete vector store: {str(e)}")