Source code for autogen_agentchat.agents._user_proxy_agent
importasyncioimportuuidfromcontextlibimportcontextmanagerfromcontextvarsimportContextVarfrominspectimportiscoroutinefunctionfromtypingimportAny,AsyncGenerator,Awaitable,Callable,ClassVar,Generator,Optional,Sequence,Union,castfromautogen_coreimportCancellationToken,ComponentfrompydanticimportBaseModelfromtyping_extensionsimportSelffrom..baseimportResponsefrom..messagesimportAgentEvent,ChatMessage,HandoffMessage,TextMessage,UserInputRequestedEventfrom._base_chat_agentimportBaseChatAgentSyncInputFunc=Callable[[str],str]AsyncInputFunc=Callable[[str,Optional[CancellationToken]],Awaitable[str]]InputFuncType=Union[SyncInputFunc,AsyncInputFunc]# TODO: check if using to_thread fixes this in jupyterasyncdefcancellable_input(prompt:str,cancellation_token:Optional[CancellationToken])->str:task:asyncio.Task[str]=asyncio.create_task(asyncio.to_thread(input,prompt))ifcancellation_tokenisnotNone:cancellation_token.link_future(task)returnawaittaskclassUserProxyAgentConfig(BaseModel):"""Declarative configuration for the UserProxyAgent."""name:strdescription:str="A human user"input_func:str|None=None
[docs]classUserProxyAgent(BaseChatAgent,Component[UserProxyAgentConfig]):"""An agent that can represent a human user through an input function. This agent can be used to represent a human user in a chat system by providing a custom input function. Args: name (str): The name of the agent. description (str, optional): A description of the agent. input_func (Optional[Callable[[str], str]], Callable[[str, Optional[CancellationToken]], Awaitable[str]]): A function that takes a prompt and returns a user input string. .. note:: Using :class:`UserProxyAgent` puts a running team in a temporary blocked state until the user responds. So it is important to time out the user input function and cancel using the :class:`~autogen_core.CancellationToken` if the user does not respond. The input function should also handle exceptions and return a default response if needed. For typical use cases that involve slow human responses, it is recommended to use termination conditions such as :class:`~autogen_agentchat.conditions.HandoffTermination` or :class:`~autogen_agentchat.conditions.SourceMatchTermination` to stop the running team and return the control to the application. You can run the team again with the user input. This way, the state of the team can be saved and restored when the user responds. See `Human-in-the-loop <https://microsoft.github.io/autogen/dev/user-guide/agentchat-user-guide/tutorial/human-in-the-loop.html>`_ for more information. Example: Simple usage case:: import asyncio from autogen_core import CancellationToken from autogen_agentchat.agents import UserProxyAgent from autogen_agentchat.messages import TextMessage async def simple_user_agent(): agent = UserProxyAgent("user_proxy") response = await asyncio.create_task( agent.on_messages( [TextMessage(content="What is your name? ", source="user")], cancellation_token=CancellationToken(), ) ) print(f"Your name is {response.chat_message.content}") Example: Cancellable usage case:: import asyncio from typing import Any from autogen_core import CancellationToken from autogen_agentchat.agents import UserProxyAgent from autogen_agentchat.messages import TextMessage token = CancellationToken() agent = UserProxyAgent("user_proxy") async def timeout(delay: float): await asyncio.sleep(delay) def cancellation_callback(task: asyncio.Task[Any]): token.cancel() async def cancellable_user_agent(): try: timeout_task = asyncio.create_task(timeout(3)) timeout_task.add_done_callback(cancellation_callback) agent_task = asyncio.create_task( agent.on_messages( [TextMessage(content="What is your name? ", source="user")], cancellation_token=token, ) ) response = await agent_task print(f"Your name is {response.chat_message.content}") except Exception as e: print(f"Exception: {e}") except BaseException as e: print(f"BaseException: {e}") """component_type="agent"component_provider_override="autogen_agentchat.agents.UserProxyAgent"component_config_schema=UserProxyAgentConfig
[docs]classInputRequestContext:def__init__(self)->None:raiseRuntimeError("InputRequestContext cannot be instantiated. It is a static class that provides context management for user input requests.")_INPUT_REQUEST_CONTEXT_VAR:ClassVar[ContextVar[str]]=ContextVar("_INPUT_REQUEST_CONTEXT_VAR")@classmethod@contextmanagerdefpopulate_context(cls,ctx:str)->Generator[None,Any,None]:""":meta private:"""token=UserProxyAgent.InputRequestContext._INPUT_REQUEST_CONTEXT_VAR.set(ctx)try:yieldfinally:UserProxyAgent.InputRequestContext._INPUT_REQUEST_CONTEXT_VAR.reset(token)
[docs]@classmethoddefrequest_id(cls)->str:try:returncls._INPUT_REQUEST_CONTEXT_VAR.get()exceptLookupErrorase:raiseRuntimeError("InputRequestContext.runtime() must be called within the input callback of a UserProxyAgent.")frome
def__init__(self,name:str,*,description:str="A human user",input_func:Optional[InputFuncType]=None,)->None:"""Initialize the UserProxyAgent."""super().__init__(name=name,description=description)self.input_func=input_funcorcancellable_inputself._is_async=iscoroutinefunction(self.input_func)@propertydefproduced_message_types(self)->Sequence[type[ChatMessage]]:"""Message types this agent can produce."""return(TextMessage,HandoffMessage)def_get_latest_handoff(self,messages:Sequence[ChatMessage])->Optional[HandoffMessage]:"""Find the HandoffMessage in the message sequence that addresses this agent."""iflen(messages)>0andisinstance(messages[-1],HandoffMessage):ifmessages[-1].target==self.name:returnmessages[-1]else:raiseRuntimeError(f"Handoff message target does not match agent name: {messages[-1].source}")returnNoneasyncdef_get_input(self,prompt:str,cancellation_token:Optional[CancellationToken])->str:"""Handle input based on function signature."""try:ifself._is_async:# Cast to AsyncInputFunc for proper typingasync_func=cast(AsyncInputFunc,self.input_func)returnawaitasync_func(prompt,cancellation_token)else:# Cast to SyncInputFunc for proper typingsync_func=cast(SyncInputFunc,self.input_func)loop=asyncio.get_event_loop()returnawaitloop.run_in_executor(None,sync_func,prompt)exceptasyncio.CancelledError:raiseexceptExceptionase:raiseRuntimeError(f"Failed to get user input: {str(e)}")frome
[docs]asyncdefon_messages(self,messages:Sequence[ChatMessage],cancellation_token:CancellationToken)->Response:asyncformessageinself.on_messages_stream(messages,cancellation_token):ifisinstance(message,Response):returnmessageraiseAssertionError("The stream should have returned the final result.")
[docs]asyncdefon_messages_stream(self,messages:Sequence[ChatMessage],cancellation_token:CancellationToken)->AsyncGenerator[AgentEvent|ChatMessage|Response,None]:"""Handle incoming messages by requesting user input."""try:# Check for handoff firsthandoff=self._get_latest_handoff(messages)prompt=(f"Handoff received from {handoff.source}. Enter your response: "ifhandoffelse"Enter your response: ")request_id=str(uuid.uuid4())input_requested_event=UserInputRequestedEvent(request_id=request_id,source=self.name)yieldinput_requested_eventwithUserProxyAgent.InputRequestContext.populate_context(request_id):user_input=awaitself._get_input(prompt,cancellation_token)# Return appropriate message type based on handoff presenceifhandoff:yieldResponse(chat_message=HandoffMessage(content=user_input,target=handoff.source,source=self.name))else:yieldResponse(chat_message=TextMessage(content=user_input,source=self.name))exceptasyncio.CancelledError:raiseexceptExceptionase:raiseRuntimeError(f"Failed to get user input: {str(e)}")frome