Source code for autogen_ext.agents.openai._openai_agent

import asyncio
import json
import logging
import warnings
from typing import Any, AsyncGenerator, Dict, List, Mapping, Optional, Sequence, Type, TypedDict, Union, cast

from autogen_agentchat import EVENT_LOGGER_NAME
from autogen_agentchat.agents import BaseChatAgent
from autogen_agentchat.base import Response
from autogen_agentchat.messages import (
    AgentEvent,
    BaseChatMessage,
    ChatMessage,
    HandoffMessage,
    MultiModalMessage,
    StopMessage,
    TextMessage,
    ToolCallSummaryMessage,
)
from autogen_core import CancellationToken, Component, ComponentModel, FunctionCall
from autogen_core.models import UserMessage
from autogen_core.tools import Tool
from pydantic import BaseModel, Field

from openai import AsyncAzureOpenAI, AsyncOpenAI

# Number of characters to display when previewing image content in logs and UI
# Base64 encoded images can be very long, so we truncate for readability
IMAGE_CONTENT_PREVIEW_LENGTH = 50

# NOTE: We use the new Responses API, so ChatCompletion imports are not needed.

event_logger = logging.getLogger(EVENT_LOGGER_NAME)


def _convert_tool_to_function_schema(tool: Tool) -> Dict[str, Any]:
    schema = tool.schema
    parameters: Dict[str, object] = {}
    if "parameters" in schema:
        parameters = {
            "type": schema["parameters"]["type"],
            "properties": schema["parameters"]["properties"],
        }
        if "required" in schema["parameters"]:
            parameters["required"] = schema["parameters"]["required"]

    return {
        "name": schema["name"],
        "description": schema.get("description", ""),
        "parameters": parameters,
    }


class OpenAIMessageContent(TypedDict):
    type: str
    text: str


class OpenAIImageUrlContent(TypedDict):
    url: str


class OpenAIImageContent(TypedDict):
    type: str
    image_url: OpenAIImageUrlContent


class OpenAIMessage(TypedDict):
    role: str
    content: Union[str, List[Union[OpenAIMessageContent, OpenAIImageContent]]]


def _convert_message_to_openai_message(
    message: Union[TextMessage, MultiModalMessage, StopMessage, ToolCallSummaryMessage, HandoffMessage],
) -> OpenAIMessage:
    """Convert an AutoGen message to an OpenAI message format."""
    if isinstance(message, TextMessage):
        if message.source == "user":
            return {"role": "user", "content": str(message.content)}
        elif message.source == "system":
            return {"role": "system", "content": str(message.content)}
        elif message.source == "assistant":
            return {"role": "assistant", "content": str(message.content)}
        else:
            return {"role": "user", "content": str(message.content)}
    elif isinstance(message, MultiModalMessage):
        content_parts: List[Union[OpenAIMessageContent, OpenAIImageContent]] = []
        for part in message.content:
            if isinstance(part, TextMessage):
                content_parts.append({"type": "text", "text": str(part.content)})
            elif isinstance(part, ImageMessage):
                image_content = str(part.content)
                content_parts.append({"type": "image_url", "image_url": {"url": image_content}})
        return {"role": "user", "content": content_parts}
    else:
        return {"role": "user", "content": str(message.content)}


class OpenAIAgentState(BaseModel):
    type: str = Field(default="OpenAIAgentState")
    response_id: Optional[str] = None
    history: List[Dict[str, Any]] = Field(default_factory=list)


class OpenAIAgentConfig(BaseModel):
    name: str
    description: str
    model: str
    instructions: str
    tools: List[ComponentModel] | None = None
    temperature: Optional[float] = 1
    max_output_tokens: Optional[int] = None
    json_mode: bool = False
    store: bool = True
    truncation: str = "disabled"


class FunctionExecutionResult(BaseModel):
    """Result of a function execution."""

    content: str
    call_id: str
    name: str
    is_error: bool = False


[docs] class OpenAIAgent(BaseChatAgent, Component[OpenAIAgentConfig]): """ An agent implementation that uses the OpenAI Responses API to generate responses. Installation: .. code-block:: bash pip install "autogen-ext[openai]" # pip install "autogen-ext[openai,azure]" # For Azure OpenAI Assistant This agent leverages the Responses API to generate responses with capabilities like: * Custom function calling * Multi-turn conversations Example: .. code-block:: python from openai import AsyncOpenAI from autogen_core import CancellationToken from autogen_ext.agents.openai import OpenAIAgent from autogen_agentchat.messages import TextMessage async def example(): cancellation_token = CancellationToken() client = AsyncOpenAI() agent = OpenAIAgent( name="Simple Agent", description="A simple OpenAI agent using the Responses API", client=client, model="gpt-4.1", instructions="You are a helpful assistant.", ) response = await agent.on_messages([TextMessage(source="user", content="Hello!")], cancellation_token) print(response) asyncio.run(example()) TODO: Add support for advanced features (vector store, multimodal, etc.) in future PRs. """ component_config_schema = OpenAIAgentConfig component_provider_override = "autogen_ext.agents.openai.OpenAIAgent" def __init__( self: "OpenAIAgent", name: str, description: str, client: Union[AsyncOpenAI, AsyncAzureOpenAI], model: str, instructions: str, tools: Optional[List[Tool]] = None, temperature: Optional[float] = 1, max_output_tokens: Optional[int] = None, json_mode: bool = False, store: bool = True, truncation: str = "disabled", ) -> None: super().__init__(name, description) self._client: Union[AsyncOpenAI, AsyncAzureOpenAI] = client self._model: str = model self._instructions: str = instructions self._temperature: Optional[float] = temperature self._max_output_tokens: Optional[int] = max_output_tokens self._json_mode: bool = json_mode self._store: bool = store self._truncation: str = truncation self._last_response_id: Optional[str] = None self._message_history: List[Dict[str, Any]] = [] self._tools: List[Dict[str, Any]] = [] self._tool_map: Dict[str, Tool] = {} if tools is not None: for tool in tools: function_schema: Dict[str, Any] = { "type": "function", "function": _convert_tool_to_function_schema(tool), } self._tools.append(function_schema) self._tool_map[tool.name] = tool def _convert_message_to_dict(self, message: OpenAIMessage) -> Dict[str, Any]: """Convert an OpenAIMessage to a Dict[str, Any].""" return dict(message)
[docs] async def list_assistants( self: "OpenAIAgent", after: Optional[str] = None, before: Optional[str] = None, limit: Optional[int] = 20, order: Optional[str] = "desc", ) -> Dict[str, Any]: # noqa: D102 """ List all assistants using the OpenAI API. Args: after (Optional[str]): Cursor for pagination (fetch after this assistant ID). before (Optional[str]): Cursor for pagination (fetch before this assistant ID). limit (Optional[int]): Number of assistants to return (1-100, default 20). order (Optional[str]): 'asc' or 'desc' by created_at (default 'desc'). Returns: Dict[str, Any]: The OpenAI API response containing: - object: 'list' - data: List of assistant objects - first_id: str - last_id: str - has_more: bool Example: .. code-block:: python import asyncio from typing import Dict, Any from autogen_ext.agents.openai import OpenAIAgent from openai import AsyncOpenAI async def example() -> None: client = AsyncOpenAI() agent = OpenAIAgent( name="test_agent", description="Test agent", client=client, model="gpt-4", instructions="You are a helpful assistant.", ) assistants: Dict[str, Any] = await agent.list_assistants(limit=5) print(assistants) asyncio.run(example()) """ params = {"limit": limit, "order": order} if after: params["after"] = after if before: params["before"] = before if hasattr(self._client, "assistants"): client_any = cast(Any, self._client) response = await client_any.assistants.list(**params) if hasattr(response, "model_dump"): return cast(Dict[str, Any], response.model_dump()) return cast(Dict[str, Any], dict(response)) else: raise NotImplementedError("The OpenAI client does not support listing assistants.")
[docs] async def retrieve_assistant(self: "OpenAIAgent", assistant_id: str) -> Dict[str, Any]: # noqa: D102 """ Retrieve a single assistant by its ID using the OpenAI API. Args: assistant_id (str): The ID of the assistant to retrieve. Returns: Dict[str, Any]: The assistant object. Example: .. code-block:: python import asyncio from typing import Dict, Any from autogen_ext.agents.openai import OpenAIAgent from openai import AsyncOpenAI async def example() -> None: client = AsyncOpenAI() agent = OpenAIAgent( name="test_agent", description="Test agent", client=client, model="gpt-4", instructions="You are a helpful assistant.", ) assistant: Dict[str, Any] = await agent.retrieve_assistant("asst_abc123") print(assistant) asyncio.run(example()) """ if hasattr(self._client, "assistants"): client_any = cast(Any, self._client) response = await client_any.assistants.retrieve(assistant_id=assistant_id) if hasattr(response, "model_dump"): return cast(Dict[str, Any], response.model_dump()) return cast(Dict[str, Any], dict(response)) else: raise NotImplementedError("The OpenAI client does not support retrieving assistants.")
[docs] async def modify_assistant( self: "OpenAIAgent", assistant_id: str, name: Optional[str] = None, description: Optional[str] = None, instructions: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None, model: Optional[str] = None, reasoning_effort: Optional[str] = None, response_format: Optional[str] = None, temperature: Optional[float] = None, tool_resources: Optional[Dict[str, Any]] = None, tools: Optional[List[Any]] = None, top_p: Optional[float] = None, **kwargs: Any, ) -> Dict[str, Any]: # noqa: D102 """ Modify (update) an assistant by its ID using the OpenAI API. Args: assistant_id (str): The ID of the assistant to update. name (Optional[str]): New name for the assistant. description (Optional[str]): New description. instructions (Optional[str]): New instructions. metadata (Optional[dict]): New metadata. model (Optional[str]): New model. reasoning_effort (Optional[str]): New reasoning effort. response_format (Optional[str]): New response format. temperature (Optional[float]): New temperature. tool_resources (Optional[dict]): New tool resources. tools (Optional[list]): New tools. top_p (Optional[float]): New top_p value. **kwargs: Additional keyword arguments. Returns: Dict[str, Any]: The updated assistant object. Example: .. code-block:: python import asyncio from typing import Dict, Any from autogen_ext.agents.openai import OpenAIAgent from openai import AsyncOpenAI async def example() -> None: client = AsyncOpenAI() agent = OpenAIAgent( name="test_agent", description="Test agent", client=client, model="gpt-4", instructions="You are a helpful assistant.", ) updated: Dict[str, Any] = await agent.modify_assistant( assistant_id="asst_123", instructions="You are an HR bot, and you have access to files to answer employee questions about company policies. Always response with info from either of the files.", tools=[{"type": "file_search"}], tool_resources={"file_search": {"vector_store_ids": []}}, ) print(updated) asyncio.run(example()) """ params = {k: v for k, v in locals().items() if k not in {"self", "assistant_id", "kwargs"} and v is not None} params.update(kwargs) if hasattr(self._client, "assistants"): client_any = cast(Any, self._client) response = await client_any.assistants.update(assistant_id=assistant_id, **params) if hasattr(response, "model_dump"): return cast(Dict[str, Any], response.model_dump()) return cast(Dict[str, Any], dict(response)) else: raise NotImplementedError("The OpenAI client does not support modifying assistants.")
[docs] async def delete_assistant(self: "OpenAIAgent", assistant_id: str) -> Dict[str, Any]: # noqa: D102 """ Delete an assistant by its ID using the OpenAI API. Args: assistant_id (str): The ID of the assistant to delete. Returns: Dict[str, Any]: The deletion status object (e.g., {"id": ..., "object": "assistant.deleted", "deleted": true}). Example: .. code-block:: python import asyncio from typing import Dict, Any from autogen_ext.agents.openai import OpenAIAgent from openai import AsyncOpenAI async def example() -> None: client = AsyncOpenAI() agent = OpenAIAgent( name="test_agent", description="Test agent", client=client, model="gpt-4", instructions="You are a helpful assistant.", ) result: Dict[str, Any] = await agent.delete_assistant("asst_abc123") print(result) asyncio.run(example()) """ if hasattr(self._client, "assistants"): client_any = cast(Any, self._client) response = await client_any.assistants.delete(assistant_id=assistant_id) if hasattr(response, "model_dump"): return cast(Dict[str, Any], response.model_dump()) return cast(Dict[str, Any], dict(response)) else: raise NotImplementedError("The OpenAI client does not support deleting assistants.")
@property def produced_message_types( self: "OpenAIAgent", ) -> Sequence[ Union[ Type[TextMessage], Type[MultiModalMessage], Type[StopMessage], Type[ToolCallSummaryMessage], Type[HandoffMessage], ] ]: """Return the types of messages that this agent can produce.""" return [TextMessage, MultiModalMessage, StopMessage, ToolCallSummaryMessage, HandoffMessage] async def _execute_tool_call( self: "OpenAIAgent", tool_call: FunctionCall, cancellation_token: CancellationToken ) -> FunctionExecutionResult: tool_name = tool_call.name if tool_name not in self._tool_map: return FunctionExecutionResult( content=f"Error: Tool '{tool_name}' is not available", call_id=tool_call.id, name=tool_name, is_error=True, ) tool = self._tool_map[tool_name] try: try: arguments = json.loads(tool_call.arguments) except json.JSONDecodeError as json_err: return FunctionExecutionResult( content=f"Error: Invalid JSON in tool arguments - {str(json_err)}", call_id=tool_call.id, name=tool_name, is_error=True, ) result = await tool.run_json(arguments, cancellation_token, call_id=tool_call.id) return FunctionExecutionResult( content=tool.return_value_as_string(result), call_id=tool_call.id, name=tool_name, is_error=False ) except Exception as e: error_msg = f"Error: {str(e)}" event_logger.warning(f"Tool execution error in {tool_name}: {error_msg}") return FunctionExecutionResult(content=error_msg, call_id=tool_call.id, name=tool_name, is_error=True) def _build_api_parameters(self: "OpenAIAgent", messages: List[Dict[str, Any]]) -> Dict[str, Any]: has_system_message = any(msg.get("role") == "system" for msg in messages) if self._instructions and not has_system_message: messages = [{"role": "system", "content": self._instructions}] + messages api_params: Dict[str, Any] = { "model": self._model, "input": messages, # Responses API expects 'input' } if self._temperature is not None: api_params["temperature"] = self._temperature if self._max_output_tokens is not None: api_params["max_output_tokens"] = self._max_output_tokens if self._tools: api_params["tools"] = self._tools if self._json_mode: api_params["text"] = {"type": "json_object"} api_params["store"] = self._store api_params["truncation"] = self._truncation if self._last_response_id: api_params["previous_response_id"] = self._last_response_id return api_params
[docs] async def on_messages( self: "OpenAIAgent", messages: Sequence[BaseChatMessage], cancellation_token: CancellationToken ) -> Response: response = None inner_messages: List[ Union[AgentEvent, TextMessage, MultiModalMessage, StopMessage, ToolCallSummaryMessage, HandoffMessage] ] = [] async for msg in self.on_messages_stream(messages, cancellation_token): if isinstance(msg, Response): response = msg # ModelClientStreamingChunkEvent does not exist in this version, so skip this check else: inner_messages.append(msg) if response is None: raise ValueError("No response was generated") if response.inner_messages is None: response.inner_messages = [] for msg in inner_messages: if msg not in response.inner_messages: response.inner_messages = list(response.inner_messages) + [msg] return response
[docs] async def on_messages_stream( self: "OpenAIAgent", messages: Sequence[BaseChatMessage], cancellation_token: CancellationToken ) -> AsyncGenerator[ Union[ AgentEvent, TextMessage, MultiModalMessage, StopMessage, ToolCallSummaryMessage, HandoffMessage, Response ], None, ]: input_messages: List[Dict[str, Any]] = [] if self._message_history: input_messages.extend(self._message_history) for message in messages: if isinstance( message, (TextMessage, MultiModalMessage, StopMessage, ToolCallSummaryMessage, HandoffMessage) ): openai_message = _convert_message_to_openai_message(message) dict_message = self._convert_message_to_dict(openai_message) input_messages.append(dict_message) self._message_history.append(dict_message) else: msg_content = str(cast(Any, message).content) if hasattr(message, "content") else str(message) dict_message = {"role": "user", "content": msg_content} input_messages.append(dict_message) self._message_history.append(dict_message) inner_messages: List[AgentEvent | ChatMessage] = [] api_params = self._build_api_parameters(input_messages) try: client = cast(Any, self._client) response_obj = await cancellation_token.link_future( asyncio.ensure_future(client.responses.create(**api_params)) ) content = getattr(response_obj, "output_text", None) response_id = getattr(response_obj, "id", None) self._last_response_id = response_id self._message_history.append({"role": "assistant", "content": str(content) if content is not None else ""}) final_message = TextMessage(source=self.name, content=str(content) if content is not None else "") response = Response(chat_message=final_message, inner_messages=inner_messages) yield response except Exception as e: error_message = f"Error generating response: {str(e)}" event_logger.error(f"API error: {error_message}", exc_info=True) error_response = TextMessage(source=self.name, content=error_message) yield Response(chat_message=error_response, inner_messages=inner_messages)
[docs] async def on_reset(self: "OpenAIAgent", cancellation_token: CancellationToken) -> None: self._last_response_id = None self._message_history = []
[docs] async def save_state(self: "OpenAIAgent") -> Mapping[str, Any]: state = OpenAIAgentState( response_id=self._last_response_id, history=self._message_history, ) return state.model_dump()
[docs] async def load_state(self: "OpenAIAgent", state: Mapping[str, Any]) -> None: agent_state = OpenAIAgentState.model_validate(state) self._last_response_id = agent_state.response_id self._message_history = agent_state.history
def _to_config(self: "OpenAIAgent") -> OpenAIAgentConfig: """Convert the OpenAI agent to a declarative config.""" tool_configs: List[Dict[str, Any]] = [] for tool in self._tool_map.values(): try: if hasattr(tool, "dump_component"): tool_any = cast(Any, tool) component_dict = tool_any.dump_component() tool_configs.append(component_dict) else: tool_configs.append( { "provider": "autogen_core.tools.FunctionTool", "config": { "name": tool.name, "description": getattr(tool, "description", ""), }, } ) except Exception as e: warnings.warn(f"Error serializing tool: {e}", stacklevel=2) tool_configs.append( { "provider": "autogen_core.tools.FunctionTool", "config": { "name": getattr(tool, "name", "unknown_tool"), "description": getattr(tool, "description", ""), }, } ) return OpenAIAgentConfig( name=self.name, description=self.description, model=self._model, instructions=self._instructions, tools=cast(List[ComponentModel], tool_configs), temperature=self._temperature, max_output_tokens=self._max_output_tokens, json_mode=self._json_mode, store=self._store, truncation=self._truncation, ) @classmethod def _from_config(cls: Type["OpenAIAgent"], config: OpenAIAgentConfig) -> "OpenAIAgent": """Create an OpenAI agent from a declarative config.""" from openai import AsyncOpenAI client = AsyncOpenAI() tools: Optional[List[Tool]] = None if config.tools: tools_list: List[Tool] = [] for tool_config in config.tools: try: provider = tool_config.provider module_name, class_name = provider.rsplit(".", 1) module = __import__(module_name, fromlist=[class_name]) tool_cls = getattr(module, class_name) tool = tool_cls(**tool_config.config) tools_list.append(cast(Tool, tool)) except Exception as e: warnings.warn(f"Error loading tool: {e}", stacklevel=2) from autogen_core.tools import FunctionTool async def dummy_func(*args: Any, **kwargs: Any) -> str: return "Tool not fully restored" tool = FunctionTool( name=tool_config.config.get("name", "unknown_tool"), description=tool_config.config.get("description", ""), func=dummy_func, ) tools_list.append(tool) tools = tools_list return cls( name=config.name, description=config.description, client=client, model=config.model, instructions=config.instructions, tools=tools, temperature=config.temperature, max_output_tokens=config.max_output_tokens, json_mode=config.json_mode, store=config.store, truncation=config.truncation, )
# Define our own ImageMessage since it's not available class ImageMessage(BaseChatMessage): """A message containing an image.""" content: str # URL or base64 string def to_model_message(self) -> UserMessage: return UserMessage(content=self.content, source=self.source) def to_model_text(self) -> str: return "[image]" def to_text(self) -> str: # Truncate long image content (especially base64) for better readability # While still showing enough of the URL or content to be identifiable if len(self.content) > IMAGE_CONTENT_PREVIEW_LENGTH: return f"[Image: {self.content[:IMAGE_CONTENT_PREVIEW_LENGTH]}...]" return f"[Image: {self.content}]"