from__future__importannotationsimportinspectimportwarningsfromabcimportABC,abstractmethodfromcollections.abcimportSequencefromtypingimportAny,Awaitable,Callable,ClassVar,List,Mapping,Tuple,Type,TypeVar,finalfromtyping_extensionsimportSelffrom._agentimportAgentfrom._agent_idimportAgentIdfrom._agent_instantiationimportAgentInstantiationContextfrom._agent_metadataimportAgentMetadatafrom._agent_runtimeimportAgentRuntimefrom._agent_typeimportAgentTypefrom._cancellation_tokenimportCancellationTokenfrom._message_contextimportMessageContextfrom._serializationimportMessageSerializer,try_get_known_serializers_for_typefrom._subscriptionimportSubscription,UnboundSubscriptionfrom._subscription_contextimportSubscriptionInstantiationContextfrom._topicimportTopicIdfrom._type_prefix_subscriptionimportTypePrefixSubscriptionT=TypeVar("T",bound=Agent)BaseAgentType=TypeVar("BaseAgentType",bound="BaseAgent")# Decorator for adding an unbound subscription to an agentdefsubscription_factory(subscription:UnboundSubscription)->Callable[[Type[BaseAgentType]],Type[BaseAgentType]]:""":meta private:"""defdecorator(cls:Type[BaseAgentType])->Type[BaseAgentType]:cls.internal_unbound_subscriptions_list.append(subscription)returnclsreturndecoratordefhandles(type:Type[Any],serializer:MessageSerializer[Any]|List[MessageSerializer[Any]]|None=None)->Callable[[Type[BaseAgentType]],Type[BaseAgentType]]:defdecorator(cls:Type[BaseAgentType])->Type[BaseAgentType]:ifserializerisNone:serializer_list=try_get_known_serializers_for_type(type)else:serializer_list=[serializer]ifnotisinstance(serializer,Sequence)elseserializeriflen(serializer_list)==0:raiseValueError(f"No serializers found for type {type}. Please provide an explicit serializer.")cls.internal_extra_handles_types.append((type,serializer_list))returnclsreturndecorator
[docs]classBaseAgent(ABC,Agent):internal_unbound_subscriptions_list:ClassVar[List[UnboundSubscription]]=[]""":meta private:"""internal_extra_handles_types:ClassVar[List[Tuple[Type[Any],List[MessageSerializer[Any]]]]]=[]""":meta private:"""def__init_subclass__(cls,**kwargs:Any)->None:super().__init_subclass__(**kwargs)# Automatically set class_variable in each subclass so that they are not shared between subclassescls.internal_extra_handles_types=[]cls.internal_unbound_subscriptions_list=[]@classmethoddef_handles_types(cls)->List[Tuple[Type[Any],List[MessageSerializer[Any]]]]:returncls.internal_extra_handles_types@classmethoddef_unbound_subscriptions(cls)->List[UnboundSubscription]:returncls.internal_unbound_subscriptions_list@propertydefmetadata(self)->AgentMetadata:assertself._idisnotNonereturnAgentMetadata(key=self._id.key,type=self._id.type,description=self._description)def__init__(self,description:str)->None:try:runtime=AgentInstantiationContext.current_runtime()id=AgentInstantiationContext.current_agent_id()exceptLookupErrorase:raiseRuntimeError("BaseAgent must be instantiated within the context of an AgentRuntime. It cannot be directly instantiated.")fromeself._runtime:AgentRuntime=runtimeself._id:AgentId=idifnotisinstance(description,str):raiseValueError("Agent description must be a string")self._description=description@propertydeftype(self)->str:returnself.id.type@propertydefid(self)->AgentId:returnself._id@propertydefruntime(self)->AgentRuntime:returnself._runtime
[docs]asyncdefsend_message(self,message:Any,recipient:AgentId,*,cancellation_token:CancellationToken|None=None,message_id:str|None=None,)->Any:"""See :py:meth:`autogen_core.AgentRuntime.send_message` for more information."""ifcancellation_tokenisNone:cancellation_token=CancellationToken()returnawaitself._runtime.send_message(message,sender=self.id,recipient=recipient,cancellation_token=cancellation_token,message_id=message_id,)
[docs]@classmethodasyncdefregister(cls,runtime:AgentRuntime,type:str,factory:Callable[[],Self|Awaitable[Self]],*,skip_class_subscriptions:bool=False,skip_direct_message_subscription:bool=False,)->AgentType:agent_type=AgentType(type)agent_type=awaitruntime.register_factory(type=agent_type,agent_factory=factory,expected_class=cls)ifnotskip_class_subscriptions:withSubscriptionInstantiationContext.populate_context(agent_type):subscriptions:List[Subscription]=[]forunbound_subscriptionincls._unbound_subscriptions():subscriptions_list_result=unbound_subscription()ifinspect.isawaitable(subscriptions_list_result):subscriptions_list=awaitsubscriptions_list_resultelse:subscriptions_list=subscriptions_list_resultsubscriptions.extend(subscriptions_list)forsubscriptioninsubscriptions:awaitruntime.add_subscription(subscription)ifnotskip_direct_message_subscription:# Additionally adds a special prefix subscription for this agent to receive direct messagesawaitruntime.add_subscription(TypePrefixSubscription(# The prefix MUST include ":" to avoid collisions with other agentstopic_type_prefix=agent_type.type+":",agent_type=agent_type.type,))# TODO: deduplicationfor_message_type,serializerincls._handles_types():runtime.add_message_serializer(serializer)returnagent_type