[docs]defget_wrapped_callback(self)->AsyncInputFunc:asyncdefuser_input_func_wrapper(prompt:str,cancellation_token:Optional[CancellationToken])->str:# Lookup the event for the prompt, if it exists wait for it.# If it doesn't exist, create it and store it.# Get request ID:request_id=UserProxyAgent.InputRequestContext.request_id()ifrequest_idinself.input_events:event=self.input_events[request_id]else:event=asyncio.Event()self.input_events[request_id]=eventawaitevent.wait()delself.input_events[request_id]ifiscoroutinefunction(self.callback):# Cast to AsyncInputFunc for proper typingasync_func=cast(AsyncInputFunc,self.callback)returnawaitasync_func(prompt,cancellation_token)else:# Cast to SyncInputFunc for proper typingsync_func=cast(SyncInputFunc,self.callback)loop=asyncio.get_event_loop()returnawaitloop.run_in_executor(None,sync_func,prompt)returnuser_input_func_wrapper
[docs]asyncdefConsole(stream:AsyncGenerator[AgentEvent|ChatMessage|T,None],*,no_inline_images:bool=False,output_stats:bool=False,user_input_manager:UserInputManager|None=None,)->T:""" Consumes the message stream from :meth:`~autogen_agentchat.base.TaskRunner.run_stream` or :meth:`~autogen_agentchat.base.ChatAgent.on_messages_stream` and renders the messages to the console. Returns the last processed TaskResult or Response. .. note:: `output_stats` is experimental and the stats may not be accurate. It will be improved in future releases. Args: stream (AsyncGenerator[AgentEvent | ChatMessage | TaskResult, None] | AsyncGenerator[AgentEvent | ChatMessage | Response, None]): Message stream to render. This can be from :meth:`~autogen_agentchat.base.TaskRunner.run_stream` or :meth:`~autogen_agentchat.base.ChatAgent.on_messages_stream`. no_inline_images (bool, optional): If terminal is iTerm2 will render images inline. Use this to disable this behavior. Defaults to False. output_stats (bool, optional): (Experimental) If True, will output a summary of the messages and inline token usage info. Defaults to False. Returns: last_processed: A :class:`~autogen_agentchat.base.TaskResult` if the stream is from :meth:`~autogen_agentchat.base.TaskRunner.run_stream` or a :class:`~autogen_agentchat.base.Response` if the stream is from :meth:`~autogen_agentchat.base.ChatAgent.on_messages_stream`. """render_image_iterm=_is_running_in_iterm()and_is_output_a_tty()andnotno_inline_imagesstart_time=time.time()total_usage=RequestUsage(prompt_tokens=0,completion_tokens=0)last_processed:Optional[T]=Nonestreaming_chunks:List[str]=[]asyncformessageinstream:ifisinstance(message,TaskResult):duration=time.time()-start_timeifoutput_stats:output=(f"{'-'*10} Summary {'-'*10}\n"f"Number of messages: {len(message.messages)}\n"f"Finish reason: {message.stop_reason}\n"f"Total prompt tokens: {total_usage.prompt_tokens}\n"f"Total completion tokens: {total_usage.completion_tokens}\n"f"Duration: {duration:.2f} seconds\n")awaitaprint(output,end="",flush=True)# mypy ignorelast_processed=message# type: ignoreelifisinstance(message,Response):duration=time.time()-start_time# Print final response.output=f"{'-'*10}{message.chat_message.source}{'-'*10}\n{_message_to_str(message.chat_message,render_image_iterm=render_image_iterm)}\n"ifmessage.chat_message.models_usage:ifoutput_stats:output+=f"[Prompt tokens: {message.chat_message.models_usage.prompt_tokens}, Completion tokens: {message.chat_message.models_usage.completion_tokens}]\n"total_usage.completion_tokens+=message.chat_message.models_usage.completion_tokenstotal_usage.prompt_tokens+=message.chat_message.models_usage.prompt_tokensawaitaprint(output,end="",flush=True)# Print summary.ifoutput_stats:ifmessage.inner_messagesisnotNone:num_inner_messages=len(message.inner_messages)else:num_inner_messages=0output=(f"{'-'*10} Summary {'-'*10}\n"f"Number of inner messages: {num_inner_messages}\n"f"Total prompt tokens: {total_usage.prompt_tokens}\n"f"Total completion tokens: {total_usage.completion_tokens}\n"f"Duration: {duration:.2f} seconds\n")awaitaprint(output,end="",flush=True)# mypy ignorelast_processed=message# type: ignore# We don't want to print UserInputRequestedEvent messages, we just use them to signal the user input event.elifisinstance(message,UserInputRequestedEvent):ifuser_input_managerisnotNone:user_input_manager.notify_event_received(message.request_id)else:# Cast required for mypy to be happymessage=cast(AgentEvent|ChatMessage,message)# type: ignoreifnotstreaming_chunks:# Print message sender.awaitaprint(f"{'-'*10}{message.source}{'-'*10}",end="\n",flush=True)ifisinstance(message,ModelClientStreamingChunkEvent):awaitaprint(message.content,end="")streaming_chunks.append(message.content)else:ifstreaming_chunks:streaming_chunks.clear()# Chunked messages are already printed, so we just print a newline.awaitaprint("",end="\n",flush=True)else:# Print message content.awaitaprint(_message_to_str(message,render_image_iterm=render_image_iterm),end="\n",flush=True)ifmessage.models_usage:ifoutput_stats:awaitaprint(f"[Prompt tokens: {message.models_usage.prompt_tokens}, Completion tokens: {message.models_usage.completion_tokens}]",end="\n",flush=True,)total_usage.completion_tokens+=message.models_usage.completion_tokenstotal_usage.prompt_tokens+=message.models_usage.prompt_tokensiflast_processedisNone:raiseValueError("No TaskResult or Response was processed.")returnlast_processed