Source code for autogen_core._single_threaded_agent_runtime
from__future__importannotationsimportasyncioimportinspectimportloggingimportsysimportuuidimportwarningsfromasyncioimportCancelledError,Future,Queue,Taskfromcollections.abcimportSequencefromdataclassesimportdataclassfromtypingimportAny,Awaitable,Callable,Dict,List,Mapping,ParamSpec,Set,Type,TypeVar,castfromopentelemetry.traceimportTracerProviderfrom.loggingimport(AgentConstructionExceptionEvent,DeliveryStage,MessageDroppedEvent,MessageEvent,MessageHandlerExceptionEvent,MessageKind,)ifsys.version_info>=(3,13):fromasyncioimportQueue,QueueShutDownelse:from._queueimportQueue,QueueShutDown# type: ignorefrom._agentimportAgentfrom._agent_idimportAgentIdfrom._agent_instantiationimportAgentInstantiationContextfrom._agent_metadataimportAgentMetadatafrom._agent_runtimeimportAgentRuntimefrom._agent_typeimportAgentTypefrom._cancellation_tokenimportCancellationTokenfrom._interventionimportDropMessage,InterventionHandlerfrom._message_contextimportMessageContextfrom._message_handler_contextimportMessageHandlerContextfrom._runtime_impl_helpersimportSubscriptionManager,get_implfrom._serializationimportJSON_DATA_CONTENT_TYPE,MessageSerializer,SerializationRegistryfrom._subscriptionimportSubscriptionfrom._telemetryimportEnvelopeMetadata,MessageRuntimeTracingConfig,TraceHelper,get_telemetry_envelope_metadatafrom._topicimportTopicIdfrom.exceptionsimportMessageDroppedExceptionlogger=logging.getLogger("autogen_core")event_logger=logging.getLogger("autogen_core.events")# We use a type parameter in some functions which shadows the built-in `type` function.# This is a workaround to avoid shadowing the built-in `type` function.type_func_alias=type@dataclass(kw_only=True)classPublishMessageEnvelope:"""A message envelope for publishing messages to all agents that can handle the message of the type T."""message:Anycancellation_token:CancellationTokensender:AgentId|Nonetopic_id:TopicIdmetadata:EnvelopeMetadata|None=Nonemessage_id:str@dataclass(kw_only=True)classSendMessageEnvelope:"""A message envelope for sending a message to a specific agent that can handle the message of the type T."""message:Anysender:AgentId|Nonerecipient:AgentIdfuture:Future[Any]cancellation_token:CancellationTokenmetadata:EnvelopeMetadata|None=Nonemessage_id:str@dataclass(kw_only=True)classResponseMessageEnvelope:"""A message envelope for sending a response to a message."""message:Anyfuture:Future[Any]sender:AgentIdrecipient:AgentId|Nonemetadata:EnvelopeMetadata|None=NoneP=ParamSpec("P")T=TypeVar("T",bound=Agent)classRunContext:def__init__(self,runtime:SingleThreadedAgentRuntime)->None:self._runtime=runtimeself._run_task=asyncio.create_task(self._run())self._stopped=asyncio.Event()asyncdef_run(self)->None:whileTrue:ifself._stopped.is_set():returnawaitself._runtime._process_next()# type: ignoreasyncdefstop(self)->None:self._stopped.set()self._runtime._message_queue.shutdown(immediate=True)# type: ignoreawaitself._run_taskasyncdefstop_when_idle(self)->None:awaitself._runtime._message_queue.join()# type: ignoreself._stopped.set()self._runtime._message_queue.shutdown(immediate=True)# type: ignoreawaitself._run_taskasyncdefstop_when(self,condition:Callable[[],bool],check_period:float=1.0)->None:asyncdefcheck_condition()->None:whilenotcondition():awaitasyncio.sleep(check_period)awaitself.stop()awaitasyncio.create_task(check_condition())def_warn_if_none(value:Any,handler_name:str)->None:""" Utility function to check if the intervention handler returned None and issue a warning. Args: value: The return value to check handler_name: Name of the intervention handler method for the warning message """ifvalueisNone:warnings.warn(f"Intervention handler {handler_name} returned None. This might be unintentional. ""Consider returning the original message or DropMessage explicitly.",RuntimeWarning,stacklevel=2,)
[docs]classSingleThreadedAgentRuntime(AgentRuntime):"""A single-threaded agent runtime that processes all messages using a single asyncio queue. Messages are delivered in the order they are received, and the runtime processes each message in a separate asyncio task concurrently. .. note:: This runtime is suitable for development and standalone applications. It is not suitable for high-throughput or high-concurrency scenarios. Args: intervention_handlers (List[InterventionHandler], optional): A list of intervention handlers that can intercept messages before they are sent or published. Defaults to None. tracer_provider (TracerProvider, optional): The tracer provider to use for tracing. Defaults to None. ignore_unhandled_exceptions (bool, optional): Whether to ignore unhandled exceptions in that occur in agent event handlers. Any background exceptions will be raised on the next call to `process_next` or from an awaited `stop`, `stop_when_idle` or `stop_when`. Note, this does not apply to RPC handlers. Defaults to True. Examples: A simple example of creating a runtime, registering an agent, sending a message and stopping the runtime: .. code-block:: python import asyncio from dataclasses import dataclass from autogen_core import AgentId, MessageContext, RoutedAgent, SingleThreadedAgentRuntime, message_handler @dataclass class MyMessage: content: str class MyAgent(RoutedAgent): @message_handler async def handle_my_message(self, message: MyMessage, ctx: MessageContext) -> None: print(f"Received message: {message.content}") async def main() -> None: # Create a runtime and register the agent runtime = SingleThreadedAgentRuntime() await MyAgent.register(runtime, "my_agent", lambda: MyAgent("My agent")) # Start the runtime, send a message and stop the runtime runtime.start() await runtime.send_message(MyMessage("Hello, world!"), recipient=AgentId("my_agent", "default")) await runtime.stop() asyncio.run(main()) An example of creating a runtime, registering an agent, publishing a message and stopping the runtime: .. code-block:: python import asyncio from dataclasses import dataclass from autogen_core import ( DefaultTopicId, MessageContext, RoutedAgent, SingleThreadedAgentRuntime, default_subscription, message_handler, ) @dataclass class MyMessage: content: str # The agent is subscribed to the default topic. @default_subscription class MyAgent(RoutedAgent): @message_handler async def handle_my_message(self, message: MyMessage, ctx: MessageContext) -> None: print(f"Received message: {message.content}") async def main() -> None: # Create a runtime and register the agent runtime = SingleThreadedAgentRuntime() await MyAgent.register(runtime, "my_agent", lambda: MyAgent("My agent")) # Start the runtime. runtime.start() # Publish a message to the default topic that the agent is subscribed to. await runtime.publish_message(MyMessage("Hello, world!"), DefaultTopicId()) # Wait for the message to be processed and then stop the runtime. await runtime.stop_when_idle() asyncio.run(main()) """def__init__(self,*,intervention_handlers:List[InterventionHandler]|None=None,tracer_provider:TracerProvider|None=None,ignore_unhandled_exceptions:bool=True,)->None:self._tracer_helper=TraceHelper(tracer_provider,MessageRuntimeTracingConfig("SingleThreadedAgentRuntime"))self._message_queue:Queue[PublishMessageEnvelope|SendMessageEnvelope|ResponseMessageEnvelope]=Queue()# (namespace, type) -> List[AgentId]self._agent_factories:Dict[str,Callable[[],Agent|Awaitable[Agent]]|Callable[[AgentRuntime,AgentId],Agent|Awaitable[Agent]]]={}self._instantiated_agents:Dict[AgentId,Agent]={}self._intervention_handlers=intervention_handlersself._background_tasks:Set[Task[Any]]=set()self._subscription_manager=SubscriptionManager()self._run_context:RunContext|None=Noneself._serialization_registry=SerializationRegistry()self._ignore_unhandled_handler_exceptions=ignore_unhandled_exceptionsself._background_exception:BaseException|None=None@propertydefunprocessed_messages_count(self,)->int:returnself._message_queue.qsize()@propertydef_known_agent_names(self)->Set[str]:returnset(self._agent_factories.keys())# Returns the response of the message
[docs]asyncdefsend_message(self,message:Any,recipient:AgentId,*,sender:AgentId|None=None,cancellation_token:CancellationToken|None=None,message_id:str|None=None,)->Any:ifcancellation_tokenisNone:cancellation_token=CancellationToken()ifmessage_idisNone:message_id=str(uuid.uuid4())event_logger.info(MessageEvent(payload=self._try_serialize(message),sender=sender,receiver=recipient,kind=MessageKind.DIRECT,delivery_stage=DeliveryStage.SEND,))withself._tracer_helper.trace_block("create",recipient,parent=None,extraAttributes={"message_type":type(message).__name__},):future=asyncio.get_event_loop().create_future()ifrecipient.typenotinself._known_agent_names:future.set_exception(Exception("Recipient not found"))content=message.__dict__ifhasattr(message,"__dict__")elsemessagelogger.info(f"Sending message of type {type(message).__name__} to {recipient.type}: {content}")awaitself._message_queue.put(SendMessageEnvelope(message=message,recipient=recipient,future=future,cancellation_token=cancellation_token,sender=sender,metadata=get_telemetry_envelope_metadata(),message_id=message_id,))cancellation_token.link_future(future)returnawaitfuture
[docs]asyncdefpublish_message(self,message:Any,topic_id:TopicId,*,sender:AgentId|None=None,cancellation_token:CancellationToken|None=None,message_id:str|None=None,)->None:withself._tracer_helper.trace_block("create",topic_id,parent=None,extraAttributes={"message_type":type(message).__name__},):ifcancellation_tokenisNone:cancellation_token=CancellationToken()content=message.__dict__ifhasattr(message,"__dict__")elsemessagelogger.info(f"Publishing message of type {type(message).__name__} to all subscribers: {content}")ifmessage_idisNone:message_id=str(uuid.uuid4())event_logger.info(MessageEvent(payload=self._try_serialize(message),sender=sender,receiver=topic_id,kind=MessageKind.PUBLISH,delivery_stage=DeliveryStage.SEND,))awaitself._message_queue.put(PublishMessageEnvelope(message=message,cancellation_token=cancellation_token,sender=sender,topic_id=topic_id,metadata=get_telemetry_envelope_metadata(),message_id=message_id,))
asyncdef_process_send(self,message_envelope:SendMessageEnvelope)->None:withself._tracer_helper.trace_block("send",message_envelope.recipient,parent=message_envelope.metadata):recipient=message_envelope.recipientifrecipient.typenotinself._known_agent_names:raiseLookupError(f"Agent type '{recipient.type}' does not exist.")try:sender_id=str(message_envelope.sender)ifmessage_envelope.senderisnotNoneelse"Unknown"logger.info(f"Calling message handler for {recipient} with message type {type(message_envelope.message).__name__} sent by {sender_id}")event_logger.info(MessageEvent(payload=self._try_serialize(message_envelope.message),sender=message_envelope.sender,receiver=recipient,kind=MessageKind.DIRECT,delivery_stage=DeliveryStage.DELIVER,))recipient_agent=awaitself._get_agent(recipient)message_context=MessageContext(sender=message_envelope.sender,topic_id=None,is_rpc=True,cancellation_token=message_envelope.cancellation_token,message_id=message_envelope.message_id,)withself._tracer_helper.trace_block("process",recipient_agent.id,parent=message_envelope.metadata):withMessageHandlerContext.populate_context(recipient_agent.id):response=awaitrecipient_agent.on_message(message_envelope.message,ctx=message_context,)exceptCancelledErrorase:ifnotmessage_envelope.future.cancelled():message_envelope.future.set_exception(e)self._message_queue.task_done()event_logger.info(MessageHandlerExceptionEvent(payload=self._try_serialize(message_envelope.message),handling_agent=recipient,exception=e,))returnexceptBaseExceptionase:message_envelope.future.set_exception(e)self._message_queue.task_done()event_logger.info(MessageHandlerExceptionEvent(payload=self._try_serialize(message_envelope.message),handling_agent=recipient,exception=e,))returnevent_logger.info(MessageEvent(payload=self._try_serialize(response),sender=message_envelope.recipient,receiver=message_envelope.sender,kind=MessageKind.RESPOND,delivery_stage=DeliveryStage.SEND,))awaitself._message_queue.put(ResponseMessageEnvelope(message=response,future=message_envelope.future,sender=message_envelope.recipient,recipient=message_envelope.sender,metadata=get_telemetry_envelope_metadata(),))self._message_queue.task_done()asyncdef_process_publish(self,message_envelope:PublishMessageEnvelope)->None:withself._tracer_helper.trace_block("publish",message_envelope.topic_id,parent=message_envelope.metadata):try:responses:List[Awaitable[Any]]=[]recipients=awaitself._subscription_manager.get_subscribed_recipients(message_envelope.topic_id)foragent_idinrecipients:# Avoid sending the message back to the senderifmessage_envelope.senderisnotNoneandagent_id==message_envelope.sender:continuesender_agent=(awaitself._get_agent(message_envelope.sender)ifmessage_envelope.senderisnotNoneelseNone)sender_name=str(sender_agent.id)ifsender_agentisnotNoneelse"Unknown"logger.info(f"Calling message handler for {agent_id.type} with message type {type(message_envelope.message).__name__} published by {sender_name}")event_logger.info(MessageEvent(payload=self._try_serialize(message_envelope.message),sender=message_envelope.sender,receiver=None,kind=MessageKind.PUBLISH,delivery_stage=DeliveryStage.DELIVER,))message_context=MessageContext(sender=message_envelope.sender,topic_id=message_envelope.topic_id,is_rpc=False,cancellation_token=message_envelope.cancellation_token,message_id=message_envelope.message_id,)agent=awaitself._get_agent(agent_id)asyncdef_on_message(agent:Agent,message_context:MessageContext)->Any:withself._tracer_helper.trace_block("process",agent.id,parent=message_envelope.metadata):withMessageHandlerContext.populate_context(agent.id):try:returnawaitagent.on_message(message_envelope.message,ctx=message_context,)exceptBaseExceptionase:logger.error(f"Error processing publish message for {agent.id}",exc_info=True)event_logger.info(MessageHandlerExceptionEvent(payload=self._try_serialize(message_envelope.message),handling_agent=agent.id,exception=e,))raiseefuture=_on_message(agent,message_context)responses.append(future)awaitasyncio.gather(*responses)exceptBaseExceptionase:ifnotself._ignore_unhandled_handler_exceptions:self._background_exception=efinally:self._message_queue.task_done()# TODO if responses are given for a publishasyncdef_process_response(self,message_envelope:ResponseMessageEnvelope)->None:withself._tracer_helper.trace_block("ack",message_envelope.recipient,parent=message_envelope.metadata):content=(message_envelope.message.__dict__ifhasattr(message_envelope.message,"__dict__")elsemessage_envelope.message)logger.info(f"Resolving response with message type {type(message_envelope.message).__name__} for recipient {message_envelope.recipient} from {message_envelope.sender.type}: {content}")event_logger.info(MessageEvent(payload=self._try_serialize(message_envelope.message),sender=message_envelope.sender,receiver=message_envelope.recipient,kind=MessageKind.RESPOND,delivery_stage=DeliveryStage.DELIVER,))ifnotmessage_envelope.future.cancelled():message_envelope.future.set_result(message_envelope.message)self._message_queue.task_done()
[docs]asyncdefprocess_next(self)->None:"""Process the next message in the queue. If there is an unhandled exception in the background task, it will be raised here. `process_next` cannot be called again after an unhandled exception is raised. """awaitself._process_next()
asyncdef_process_next(self)->None:"""Process the next message in the queue."""ifself._background_exceptionisnotNone:e=self._background_exceptionself._background_exception=Noneself._message_queue.shutdown(immediate=True)# type: ignoreraiseetry:message_envelope=awaitself._message_queue.get()exceptQueueShutDown:ifself._background_exceptionisnotNone:e=self._background_exceptionself._background_exception=NoneraiseefromNonereturnmatchmessage_envelope:caseSendMessageEnvelope(message=message,sender=sender,recipient=recipient,future=future):ifself._intervention_handlersisnotNone:forhandlerinself._intervention_handlers:withself._tracer_helper.trace_block("intercept",handler.__class__.__name__,parent=message_envelope.metadata):try:message_context=MessageContext(sender=sender,topic_id=None,is_rpc=True,cancellation_token=message_envelope.cancellation_token,message_id=message_envelope.message_id,)temp_message=awaithandler.on_send(message,message_context=message_context,recipient=recipient)_warn_if_none(temp_message,"on_send")exceptBaseExceptionase:future.set_exception(e)returniftemp_messageisDropMessageorisinstance(temp_message,DropMessage):event_logger.info(MessageDroppedEvent(payload=self._try_serialize(message),sender=sender,receiver=recipient,kind=MessageKind.DIRECT,))future.set_exception(MessageDroppedException())returnmessage_envelope.message=temp_messagetask=asyncio.create_task(self._process_send(message_envelope))self._background_tasks.add(task)task.add_done_callback(self._background_tasks.discard)casePublishMessageEnvelope(message=message,sender=sender,topic_id=topic_id,):ifself._intervention_handlersisnotNone:forhandlerinself._intervention_handlers:withself._tracer_helper.trace_block("intercept",handler.__class__.__name__,parent=message_envelope.metadata):try:message_context=MessageContext(sender=sender,topic_id=topic_id,is_rpc=False,cancellation_token=message_envelope.cancellation_token,message_id=message_envelope.message_id,)temp_message=awaithandler.on_publish(message,message_context=message_context)_warn_if_none(temp_message,"on_publish")exceptBaseExceptionase:# TODO: we should raise the intervention exception to the publisher.logger.error(f"Exception raised in in intervention handler: {e}",exc_info=True)returniftemp_messageisDropMessageorisinstance(temp_message,DropMessage):event_logger.info(MessageDroppedEvent(payload=self._try_serialize(message),sender=sender,receiver=topic_id,kind=MessageKind.PUBLISH,))returnmessage_envelope.message=temp_messagetask=asyncio.create_task(self._process_publish(message_envelope))self._background_tasks.add(task)task.add_done_callback(self._background_tasks.discard)caseResponseMessageEnvelope(message=message,sender=sender,recipient=recipient,future=future):ifself._intervention_handlersisnotNone:forhandlerinself._intervention_handlers:try:temp_message=awaithandler.on_response(message,sender=sender,recipient=recipient)_warn_if_none(temp_message,"on_response")exceptBaseExceptionase:# TODO: should we raise the exception to sender of the response instead?future.set_exception(e)returniftemp_messageisDropMessageorisinstance(temp_message,DropMessage):event_logger.info(MessageDroppedEvent(payload=self._try_serialize(message),sender=sender,receiver=recipient,kind=MessageKind.RESPOND,))future.set_exception(MessageDroppedException())returnmessage_envelope.message=temp_messagetask=asyncio.create_task(self._process_response(message_envelope))self._background_tasks.add(task)task.add_done_callback(self._background_tasks.discard)# Yield control to the message loop to allow other tasks to runawaitasyncio.sleep(0)
[docs]defstart(self)->None:"""Start the runtime message processing loop. This runs in a background task. Example: .. code-block:: python import asyncio from autogen_core import SingleThreadedAgentRuntime async def main() -> None: runtime = SingleThreadedAgentRuntime() runtime.start() # ... do other things ... await runtime.stop() asyncio.run(main()) """ifself._run_contextisnotNone:raiseRuntimeError("Runtime is already started")self._run_context=RunContext(self)
[docs]asyncdefclose(self)->None:"""Calls :meth:`stop` if applicable and the :meth:`Agent.close` method on all instantiated agents"""# stop the runtime if it hasn't been stopped yetifself._run_contextisnotNone:awaitself.stop()# close all the agents that have been instantiatedforagent_idinself._instantiated_agents:agent=awaitself._get_agent(agent_id)awaitagent.close()
[docs]asyncdefstop(self)->None:"""Immediately stop the runtime message processing loop. The currently processing message will be completed, but all others following it will be discarded."""ifself._run_contextisNone:raiseRuntimeError("Runtime is not started")try:awaitself._run_context.stop()finally:self._run_context=Noneself._message_queue=Queue()
[docs]asyncdefstop_when_idle(self)->None:"""Stop the runtime message processing loop when there is no outstanding message being processed or queued. This is the most common way to stop the runtime."""ifself._run_contextisNone:raiseRuntimeError("Runtime is not started")try:awaitself._run_context.stop_when_idle()finally:self._run_context=Noneself._message_queue=Queue()
[docs]asyncdefstop_when(self,condition:Callable[[],bool])->None:"""Stop the runtime message processing loop when the condition is met. .. caution:: This method is not recommended to be used, and is here for legacy reasons. It will spawn a busy loop to continually check the condition. It is much more efficient to call `stop_when_idle` or `stop` instead. If you need to stop the runtime based on a condition, consider using a background task and asyncio.Event to signal when the condition is met and the background task should call stop. """ifself._run_contextisNone:raiseRuntimeError("Runtime is not started")awaitself._run_context.stop_when(condition)self._run_context=Noneself._message_queue=Queue()
[docs]asyncdefregister_factory(self,type:str|AgentType,agent_factory:Callable[[],T|Awaitable[T]],*,expected_class:type[T]|None=None,)->AgentType:ifisinstance(type,str):type=AgentType(type)iftype.typeinself._agent_factories:raiseValueError(f"Agent with type {type} already exists.")asyncdeffactory_wrapper()->T:maybe_agent_instance=agent_factory()ifinspect.isawaitable(maybe_agent_instance):agent_instance=awaitmaybe_agent_instanceelse:agent_instance=maybe_agent_instanceifexpected_classisnotNoneandtype_func_alias(agent_instance)!=expected_class:raiseValueError("Factory registered using the wrong type.")returnagent_instanceself._agent_factories[type.type]=factory_wrapperreturntype
asyncdef_invoke_agent_factory(self,agent_factory:Callable[[],T|Awaitable[T]]|Callable[[AgentRuntime,AgentId],T|Awaitable[T]],agent_id:AgentId,)->T:withAgentInstantiationContext.populate_context((self,agent_id)):try:iflen(inspect.signature(agent_factory).parameters)==0:factory_one=cast(Callable[[],T],agent_factory)agent=factory_one()eliflen(inspect.signature(agent_factory).parameters)==2:warnings.warn("Agent factories that take two arguments are deprecated. Use AgentInstantiationContext instead. Two arg factories will be removed in a future version.",stacklevel=2,)factory_two=cast(Callable[[AgentRuntime,AgentId],T],agent_factory)agent=factory_two(self,agent_id)else:raiseValueError("Agent factory must take 0 or 2 arguments.")ifinspect.isawaitable(agent):returncast(T,awaitagent)returnagentexceptBaseExceptionase:event_logger.info(AgentConstructionExceptionEvent(agent_id=agent_id,exception=e,))logger.error(f"Error constructing agent {agent_id}",exc_info=True)raiseasyncdef_get_agent(self,agent_id:AgentId)->Agent:ifagent_idinself._instantiated_agents:returnself._instantiated_agents[agent_id]ifagent_id.typenotinself._agent_factories:raiseLookupError(f"Agent with name {agent_id.type} not found.")agent_factory=self._agent_factories[agent_id.type]agent=awaitself._invoke_agent_factory(agent_factory,agent_id)self._instantiated_agents[agent_id]=agentreturnagent# TODO: uncomment out the following type ignore when this is fixed in mypy: https://github.com/python/mypy/issues/3737
[docs]asyncdeftry_get_underlying_agent_instance(self,id:AgentId,type:Type[T]=Agent)->T:# type: ignore[assignment]ifid.typenotinself._agent_factories:raiseLookupError(f"Agent with name {id.type} not found.")# TODO: check if remoteagent_instance=awaitself._get_agent(id)ifnotisinstance(agent_instance,type):raiseTypeError(f"Agent with name {id.type} is not of type {type.__name__}. It is of type {type_func_alias(agent_instance).__name__}")returnagent_instance
def_try_serialize(self,message:Any)->str:try:type_name=self._serialization_registry.type_name(message)returnself._serialization_registry.serialize(message,type_name=type_name,data_content_type=JSON_DATA_CONTENT_TYPE).decode("utf-8")exceptValueError:return"Message could not be serialized"