importloggingfromfunctoolsimportwrapsfromtypingimport(Any,Callable,Coroutine,DefaultDict,List,Literal,Protocol,Sequence,Tuple,Type,TypeVar,cast,get_type_hints,overload,runtime_checkable,)from._base_agentimportBaseAgentfrom._message_contextimportMessageContextfrom._serializationimportMessageSerializer,try_get_known_serializers_for_typefrom._type_helpersimportAnyType,get_typesfrom.exceptionsimportCantHandleExceptionlogger=logging.getLogger("autogen_core")AgentT=TypeVar("AgentT")ReceivesT=TypeVar("ReceivesT")ProducesT=TypeVar("ProducesT",covariant=True)# TODO: Generic typevar bound binding U to agent type# Can't do because python doesnt support it# Pyright and mypy disagree on the variance of ReceivesT. Mypy thinks it should be contravariant here.# Revisit this later to see if we can remove the ignore.@runtime_checkableclassMessageHandler(Protocol[AgentT,ReceivesT,ProducesT]):# type: ignoretarget_types:Sequence[type]produces_types:Sequence[type]is_message_handler:Literal[True]router:Callable[[ReceivesT,MessageContext],bool]# agent_instance binds to self in the method@staticmethodasyncdef__call__(agent_instance:AgentT,message:ReceivesT,ctx:MessageContext)->ProducesT:...# NOTE: this works on concrete types and not inheritance# TODO: Use a protocol for the outer function to check checked arg names@overloaddefmessage_handler(func:Callable[[AgentT,ReceivesT,MessageContext],Coroutine[Any,Any,ProducesT]],)->MessageHandler[AgentT,ReceivesT,ProducesT]:...@overloaddefmessage_handler(func:None=None,*,match:None=...,strict:bool=...,)->Callable[[Callable[[AgentT,ReceivesT,MessageContext],Coroutine[Any,Any,ProducesT]]],MessageHandler[AgentT,ReceivesT,ProducesT],]:...@overloaddefmessage_handler(func:None=None,*,match:Callable[[ReceivesT,MessageContext],bool],strict:bool=...,)->Callable[[Callable[[AgentT,ReceivesT,MessageContext],Coroutine[Any,Any,ProducesT]]],MessageHandler[AgentT,ReceivesT,ProducesT],]:...
[docs]defmessage_handler(func:None|Callable[[AgentT,ReceivesT,MessageContext],Coroutine[Any,Any,ProducesT]]=None,*,strict:bool=True,match:None|Callable[[ReceivesT,MessageContext],bool]=None,)->(Callable[[Callable[[AgentT,ReceivesT,MessageContext],Coroutine[Any,Any,ProducesT]]],MessageHandler[AgentT,ReceivesT,ProducesT],]|MessageHandler[AgentT,ReceivesT,ProducesT]):"""Decorator for generic message handlers. Add this decorator to methods in a :class:`RoutedAgent` class that are intended to handle both event and RPC messages. These methods must have a specific signature that needs to be followed for it to be valid: - The method must be an `async` method. - The method must be decorated with the `@message_handler` decorator. - The method must have exactly 3 arguments: 1. `self` 2. `message`: The message to be handled, this must be type-hinted with the message type that it is intended to handle. 3. `ctx`: A :class:`autogen_core.MessageContext` object. - The method must be type hinted with what message types it can return as a response, or it can return `None` if it does not return anything. Handlers can handle more than one message type by accepting a Union of the message types. It can also return more than one message type by returning a Union of the message types. Args: func: The function to be decorated. strict: If `True`, the handler will raise an exception if the message type or return type is not in the target types. If `False`, it will log a warning instead. match: A function that takes the message and the context as arguments and returns a boolean. This is used for secondary routing after the message type. For handlers addressing the same message type, the match function is applied in alphabetical order of the handlers and the first matching handler will be called while the rest are skipped. If `None`, the first handler in alphabetical order matching the same message type will be called. """defdecorator(func:Callable[[AgentT,ReceivesT,MessageContext],Coroutine[Any,Any,ProducesT]],)->MessageHandler[AgentT,ReceivesT,ProducesT]:type_hints=get_type_hints(func)if"message"notintype_hints:raiseAssertionError("message parameter not found in function signature")if"return"notintype_hints:raiseAssertionError("return not found in function signature")# Get the type of the message parametertarget_types=get_types(type_hints["message"])iftarget_typesisNone:raiseAssertionError("Message type not found")# print(type_hints)return_types=get_types(type_hints["return"])ifreturn_typesisNone:raiseAssertionError("Return type not found")# Convert target_types to list and stash@wraps(func)asyncdefwrapper(self:AgentT,message:ReceivesT,ctx:MessageContext)->ProducesT:iftype(message)notintarget_types:ifstrict:raiseCantHandleException(f"Message type {type(message)} not in target types {target_types}")else:logger.warning(f"Message type {type(message)} not in target types {target_types}")return_value=awaitfunc(self,message,ctx)ifAnyTypenotinreturn_typesandtype(return_value)notinreturn_types:ifstrict:raiseValueError(f"Return type {type(return_value)} not in return types {return_types}")else:logger.warning(f"Return type {type(return_value)} not in return types {return_types}")returnreturn_valuewrapper_handler=cast(MessageHandler[AgentT,ReceivesT,ProducesT],wrapper)wrapper_handler.target_types=list(target_types)wrapper_handler.produces_types=list(return_types)wrapper_handler.is_message_handler=Truewrapper_handler.router=matchor(lambda_message,_ctx:True)returnwrapper_handleriffuncisNoneandnotcallable(func):returndecoratorelifcallable(func):returndecorator(func)else:raiseValueError("Invalid arguments")
[docs]defevent(func:None|Callable[[AgentT,ReceivesT,MessageContext],Coroutine[Any,Any,None]]=None,*,strict:bool=True,match:None|Callable[[ReceivesT,MessageContext],bool]=None,)->(Callable[[Callable[[AgentT,ReceivesT,MessageContext],Coroutine[Any,Any,None]]],MessageHandler[AgentT,ReceivesT,None],]|MessageHandler[AgentT,ReceivesT,None]):"""Decorator for event message handlers. Add this decorator to methods in a :class:`RoutedAgent` class that are intended to handle event messages. These methods must have a specific signature that needs to be followed for it to be valid: - The method must be an `async` method. - The method must be decorated with the `@message_handler` decorator. - The method must have exactly 3 arguments: 1. `self` 2. `message`: The event message to be handled, this must be type-hinted with the message type that it is intended to handle. 3. `ctx`: A :class:`autogen_core.MessageContext` object. - The method must return `None`. Handlers can handle more than one message type by accepting a Union of the message types. Args: func: The function to be decorated. strict: If `True`, the handler will raise an exception if the message type is not in the target types. If `False`, it will log a warning instead. match: A function that takes the message and the context as arguments and returns a boolean. This is used for secondary routing after the message type. For handlers addressing the same message type, the match function is applied in alphabetical order of the handlers and the first matching handler will be called while the rest are skipped. If `None`, the first handler in alphabetical order matching the same message type will be called. """defdecorator(func:Callable[[AgentT,ReceivesT,MessageContext],Coroutine[Any,Any,None]],)->MessageHandler[AgentT,ReceivesT,None]:type_hints=get_type_hints(func)if"message"notintype_hints:raiseAssertionError("message parameter not found in function signature")if"return"notintype_hints:raiseAssertionError("return not found in function signature")# Get the type of the message parametertarget_types=get_types(type_hints["message"])iftarget_typesisNone:raiseAssertionError("Message type not found. Please provide a type hint for the message parameter.")return_types=get_types(type_hints["return"])ifreturn_typesisNone:raiseAssertionError("Return type not found. Please use `None` as the type hint of the return type.")# Convert target_types to list and stash@wraps(func)asyncdefwrapper(self:AgentT,message:ReceivesT,ctx:MessageContext)->None:iftype(message)notintarget_types:ifstrict:raiseCantHandleException(f"Message type {type(message)} not in target types {target_types}")else:logger.warning(f"Message type {type(message)} not in target types {target_types}")return_value=awaitfunc(self,message,ctx)# type: ignoreifreturn_valueisnotNone:ifstrict:raiseValueError(f"Return type {type(return_value)} is not None.")else:logger.warning(f"Return type {type(return_value)} is not None. It will be ignored.")returnNonewrapper_handler=cast(MessageHandler[AgentT,ReceivesT,None],wrapper)wrapper_handler.target_types=list(target_types)wrapper_handler.produces_types=list(return_types)wrapper_handler.is_message_handler=True# Wrap the match function with a check on the is_rpc flag.wrapper_handler.router=lambda_message,_ctx:(not_ctx.is_rpc)and(match(_message,_ctx)ifmatchelseTrue)returnwrapper_handleriffuncisNoneandnotcallable(func):returndecoratorelifcallable(func):returndecorator(func)else:raiseValueError("Invalid arguments")
[docs]defrpc(func:None|Callable[[AgentT,ReceivesT,MessageContext],Coroutine[Any,Any,ProducesT]]=None,*,strict:bool=True,match:None|Callable[[ReceivesT,MessageContext],bool]=None,)->(Callable[[Callable[[AgentT,ReceivesT,MessageContext],Coroutine[Any,Any,ProducesT]]],MessageHandler[AgentT,ReceivesT,ProducesT],]|MessageHandler[AgentT,ReceivesT,ProducesT]):"""Decorator for RPC message handlers. Add this decorator to methods in a :class:`RoutedAgent` class that are intended to handle RPC messages. These methods must have a specific signature that needs to be followed for it to be valid: - The method must be an `async` method. - The method must be decorated with the `@message_handler` decorator. - The method must have exactly 3 arguments: 1. `self` 2. `message`: The message to be handled, this must be type-hinted with the message type that it is intended to handle. 3. `ctx`: A :class:`autogen_core.MessageContext` object. - The method must be type hinted with what message types it can return as a response, or it can return `None` if it does not return anything. Handlers can handle more than one message type by accepting a Union of the message types. It can also return more than one message type by returning a Union of the message types. Args: func: The function to be decorated. strict: If `True`, the handler will raise an exception if the message type or return type is not in the target types. If `False`, it will log a warning instead. match: A function that takes the message and the context as arguments and returns a boolean. This is used for secondary routing after the message type. For handlers addressing the same message type, the match function is applied in alphabetical order of the handlers and the first matching handler will be called while the rest are skipped. If `None`, the first handler in alphabetical order matching the same message type will be called. """defdecorator(func:Callable[[AgentT,ReceivesT,MessageContext],Coroutine[Any,Any,ProducesT]],)->MessageHandler[AgentT,ReceivesT,ProducesT]:type_hints=get_type_hints(func)if"message"notintype_hints:raiseAssertionError("message parameter not found in function signature")if"return"notintype_hints:raiseAssertionError("return not found in function signature")# Get the type of the message parametertarget_types=get_types(type_hints["message"])iftarget_typesisNone:raiseAssertionError("Message type not found")# print(type_hints)return_types=get_types(type_hints["return"])ifreturn_typesisNone:raiseAssertionError("Return type not found")# Convert target_types to list and stash@wraps(func)asyncdefwrapper(self:AgentT,message:ReceivesT,ctx:MessageContext)->ProducesT:iftype(message)notintarget_types:ifstrict:raiseCantHandleException(f"Message type {type(message)} not in target types {target_types}")else:logger.warning(f"Message type {type(message)} not in target types {target_types}")return_value=awaitfunc(self,message,ctx)ifAnyTypenotinreturn_typesandtype(return_value)notinreturn_types:ifstrict:raiseValueError(f"Return type {type(return_value)} not in return types {return_types}")else:logger.warning(f"Return type {type(return_value)} not in return types {return_types}")returnreturn_valuewrapper_handler=cast(MessageHandler[AgentT,ReceivesT,ProducesT],wrapper)wrapper_handler.target_types=list(target_types)wrapper_handler.produces_types=list(return_types)wrapper_handler.is_message_handler=Truewrapper_handler.router=lambda_message,_ctx:(_ctx.is_rpc)and(match(_message,_ctx)ifmatchelseTrue)returnwrapper_handleriffuncisNoneandnotcallable(func):returndecoratorelifcallable(func):returndecorator(func)else:raiseValueError("Invalid arguments")
[docs]classRoutedAgent(BaseAgent):"""A base class for agents that route messages to handlers based on the type of the message and optional matching functions. To create a routed agent, subclass this class and add message handlers as methods decorated with either :func:`event` or :func:`rpc` decorator. Example: .. code-block:: python from dataclasses import dataclass from autogen_core import MessageContext from autogen_core import RoutedAgent, event, rpc @dataclass class Message: pass @dataclass class MessageWithContent: content: str @dataclass class Response: pass class MyAgent(RoutedAgent): def __init__(self): super().__init__("MyAgent") @event async def handle_event_message(self, message: Message, ctx: MessageContext) -> None: assert ctx.topic_id is not None await self.publish_message(MessageWithContent("event handled"), ctx.topic_id) @rpc(match=lambda message, ctx: message.content == "special") # type: ignore async def handle_special_rpc_message(self, message: MessageWithContent, ctx: MessageContext) -> Response: return Response() """def__init__(self,description:str)->None:# Self is already bound to the handlersself._handlers:DefaultDict[Type[Any],List[MessageHandler[RoutedAgent,Any,Any]],]=DefaultDict(list)handlers=self._discover_handlers()formessage_handlerinhandlers:fortarget_typeinmessage_handler.target_types:self._handlers[target_type].append(message_handler)super().__init__(description)
[docs]asyncdefon_message_impl(self,message:Any,ctx:MessageContext)->Any|None:"""Handle a message by routing it to the appropriate message handler. Do not override this method in subclasses. Instead, add message handlers as methods decorated with either the :func:`event` or :func:`rpc` decorator."""key_type:Type[Any]=type(message)# type: ignorehandlers=self._handlers.get(key_type)# type: ignoreifhandlersisnotNone:# Iterate over all handlers for this matching message type.# Call the first handler whose router returns True and then return the result.forhinhandlers:ifh.router(message,ctx):returnawaith(self,message,ctx)returnawaitself.on_unhandled_message(message,ctx)# type: ignore
[docs]asyncdefon_unhandled_message(self,message:Any,ctx:MessageContext)->None:"""Called when a message is received that does not have a matching message handler. The default implementation logs an info message."""logger.info(f"Unhandled message: {message}")
@classmethoddef_discover_handlers(cls)->Sequence[MessageHandler[Any,Any,Any]]:handlers:List[MessageHandler[Any,Any,Any]]=[]forattrindir(cls):ifcallable(getattr(cls,attr,None)):# Since we are getting it from the class, self is not boundhandler=getattr(cls,attr)ifhasattr(handler,"is_message_handler"):handlers.append(cast(MessageHandler[Any,Any,Any],handler))returnhandlers@classmethoddef_handles_types(cls)->List[Tuple[Type[Any],List[MessageSerializer[Any]]]]:# TODO handle deduplicationhandlers=cls._discover_handlers()types:List[Tuple[Type[Any],List[MessageSerializer[Any]]]]=[]types.extend(cls.internal_extra_handles_types)forhandlerinhandlers:fortinhandler.target_types:# TODO: support different serializersserializers=try_get_known_serializers_for_type(t)iflen(serializers)==0:raiseValueError(f"No serializers found for type {t}.")types.append((t,try_get_known_serializers_for_type(t)))returntypes