autogen_core#

class Agent(*args, **kwargs)[source]#

Bases: Protocol

property id: AgentId#

ID of the agent.

async load_state(state: Mapping[str, Any]) None[source]#

Load in the state of the agent obtained from save_state.

Parameters:

state (Mapping[str, Any]) – State of the agent. Must be JSON serializable.

property metadata: AgentMetadata#

Metadata of the agent.

async on_message(message: Any, ctx: MessageContext) Any[source]#

Message handler for the agent. This should only be called by the runtime, not by other agents.

Parameters:
  • message (Any) – Received message. Type is one of the types in subscriptions.

  • ctx (MessageContext) – Context of the message.

Returns:

Any – Response to the message. Can be None.

Raises:
async save_state() Mapping[str, Any][source]#

Save the state of the agent. The result must be JSON serializable.

class AgentId(type: str | AgentType, key: str)[source]#

Bases: object

classmethod from_str(agent_id: str) Self[source]#
property key: str#
property type: str#
class AgentInstantiationContext[source]#

Bases: object

classmethod current_agent_id() AgentId[source]#
classmethod current_runtime() AgentRuntime[source]#
class AgentMetadata[source]#

Bases: TypedDict

description: str#
key: str#
type: str#
class AgentProxy(agent: AgentId, runtime: AgentRuntime)[source]#

Bases: object

property id: AgentId#

Target agent for this proxy

async load_state(state: Mapping[str, Any]) None[source]#

Load in the state of the agent obtained from save_state.

Parameters:

state (Mapping[str, Any]) – State of the agent. Must be JSON serializable.

property metadata: Awaitable[AgentMetadata]#

Metadata of the agent.

async save_state() Mapping[str, Any][source]#

Save the state of the agent. The result must be JSON serializable.

async send_message(message: Any, *, sender: AgentId, cancellation_token: CancellationToken | None = None) Any[source]#
class AgentRuntime(*args, **kwargs)[source]#

Bases: Protocol

add_message_serializer(serializer: MessageSerializer[Any] | Sequence[MessageSerializer[Any]]) None[source]#

Add a new message serialization serializer to the runtime

Note: This will deduplicate serializers based on the type_name and data_content_type properties

Parameters:

serializer (MessageSerializer[Any] | Sequence[MessageSerializer[Any]]) – The serializer/s to add

async add_subscription(subscription: Subscription) None[source]#

Add a new subscription that the runtime should fulfill when processing published messages

Parameters:

subscription (Subscription) – The subscription to add

async agent_load_state(agent: AgentId, state: Mapping[str, Any]) None[source]#

Load the state of a single agent.

Parameters:
  • agent (AgentId) – The agent id.

  • state (Mapping[str, Any]) – The saved state.

async agent_metadata(agent: AgentId) AgentMetadata[source]#

Get the metadata for an agent.

Parameters:

agent (AgentId) – The agent id.

Returns:

AgentMetadata – The agent metadata.

async agent_save_state(agent: AgentId) Mapping[str, Any][source]#

Save the state of a single agent.

The structure of the state is implementation defined and can be any JSON serializable object.

Parameters:

agent (AgentId) – The agent id.

Returns:

Mapping[str, Any] – The saved state.

async get(id: AgentId, /, *, lazy: bool = True) AgentId[source]#
async get(type: AgentType | str, /, key: str = 'default', *, lazy: bool = True) AgentId
async load_state(state: Mapping[str, Any]) None[source]#

Load the state of the entire runtime, including all hosted agents. The state should be the same as the one returned by save_state().

Parameters:

state (Mapping[str, Any]) – The saved state.

async publish_message(message: Any, topic_id: TopicId, *, sender: AgentId | None = None, cancellation_token: CancellationToken | None = None, message_id: str | None = None) None[source]#

Publish a message to all agents in the given namespace, or if no namespace is provided, the namespace of the sender.

No responses are expected from publishing.

Parameters:
  • message (Any) – The message to publish.

  • topic (TopicId) – The topic to publish the message to.

  • sender (AgentId | None, optional) – The agent which sent the message. Defaults to None.

  • cancellation_token (CancellationToken | None, optional) – Token used to cancel an in progress. Defaults to None.

  • message_id (str | None, optional) – The message id. If None, a new message id will be generated. Defaults to None. This message id must be unique. and is recommended to be a UUID.

Raises:

UndeliverableException – If the message cannot be delivered.

register(type: str, agent_factory: Callable[[], T | Awaitable[T]], subscriptions: Callable[[], list[Subscription] | Awaitable[list[Subscription]]] | list[Subscription] | None = None) AgentType[source]#

Register an agent factory with the runtime associated with a specific type. The type must be unique.

Deprecated since version 0.4.0.dev1: Use a specific agent’s register method directly instead of this method. For example: BaseAgent.register()

Parameters:
  • type (str) – The type of agent this factory creates. It is not the same as agent class name. The type parameter is used to differentiate between different factory functions rather than agent classes.

  • agent_factory (Callable[[], T]) – The factory that creates the agent, where T is a concrete Agent type. Inside the factory, use autogen_core.AgentInstantiationContext to access variables like the current runtime and agent ID.

  • subscriptions (Callable[[], list[Subscription]] | list[Subscription] | None, optional) – The subscriptions that the agent should be subscribed to. Defaults to None.

async register_factory(*, type: AgentType, agent_factory: Callable[[], T | Awaitable[T]], expected_class: type[T]) AgentType[source]#

Register an agent factory with the runtime associated with a specific type. The type must be unique.

Parameters:
  • type (str) – The type of agent this factory creates. It is not the same as agent class name. The type parameter is used to differentiate between different factory functions rather than agent classes.

  • agent_factory (Callable[[], T]) – The factory that creates the agent, where T is a concrete Agent type. Inside the factory, use autogen_core.AgentInstantiationContext to access variables like the current runtime and agent ID.

async remove_subscription(id: str) None[source]#

Remove a subscription from the runtime

Parameters:

id (str) – id of the subscription to remove

Raises:

LookupError – If the subscription does not exist

async save_state() Mapping[str, Any][source]#

Save the state of the entire runtime, including all hosted agents. The only way to restore the state is to pass it to load_state().

The structure of the state is implementation defined and can be any JSON serializable object.

Returns:

Mapping[str, Any] – The saved state.

async send_message(message: Any, recipient: AgentId, *, sender: AgentId | None = None, cancellation_token: CancellationToken | None = None) Any[source]#

Send a message to an agent and get a response.

Parameters:
  • message (Any) – The message to send.

  • recipient (AgentId) – The agent to send the message to.

  • sender (AgentId | None, optional) – Agent which sent the message. Should only be None if this was sent from no agent, such as directly to the runtime externally. Defaults to None.

  • cancellation_token (CancellationToken | None, optional) – Token used to cancel an in progress . Defaults to None.

Raises:
Returns:

Any – The response from the agent.

async try_get_underlying_agent_instance(id: ~autogen_core._agent_id.AgentId, type: ~typing.Type[~autogen_core._agent_runtime.T] = <class 'autogen_core._agent.Agent'>) T[source]#

Try to get the underlying agent instance by name and namespace. This is generally discouraged (hence the long name), but can be useful in some cases.

If the underlying agent is not accessible, this will raise an exception.

Parameters:
  • id (AgentId) – The agent id.

  • type (Type[T], optional) – The expected type of the agent. Defaults to Agent.

Returns:

T – The concrete agent instance.

Raises:
  • LookupError – If the agent is not found.

  • NotAccessibleError – If the agent is not accessible, for example if it is located remotely.

  • TypeError – If the agent is not of the expected type.

class AgentType(type: str)[source]#

Bases: object

type: str#

String representation of this agent type.

class BaseAgent(description: str)[source]#

Bases: ABC, Agent

property id: AgentId#

ID of the agent.

async load_state(state: Mapping[str, Any]) None[source]#

Load in the state of the agent obtained from save_state.

Parameters:

state (Mapping[str, Any]) – State of the agent. Must be JSON serializable.

property metadata: AgentMetadata#

Metadata of the agent.

final async on_message(message: Any, ctx: MessageContext) Any[source]#

Message handler for the agent. This should only be called by the runtime, not by other agents.

Parameters:
  • message (Any) – Received message. Type is one of the types in subscriptions.

  • ctx (MessageContext) – Context of the message.

Returns:

Any – Response to the message. Can be None.

Raises:
abstract async on_message_impl(message: Any, ctx: MessageContext) Any[source]#
async publish_message(message: Any, topic_id: TopicId, *, cancellation_token: CancellationToken | None = None) None[source]#
async classmethod register(runtime: AgentRuntime, type: str, factory: Callable[[], Self | Awaitable[Self]], *, skip_class_subscriptions: bool = False, skip_direct_message_subscription: bool = False) AgentType[source]#

Register a virtual subclass of an ABC.

Returns the subclass, to allow usage as a class decorator.

property runtime: AgentRuntime#
async save_state() Mapping[str, Any][source]#

Save the state of the agent. The result must be JSON serializable.

async send_message(message: Any, recipient: AgentId, *, cancellation_token: CancellationToken | None = None) Any[source]#

See autogen_core.AgentRuntime.send_message() for more information.

property type: str#
class CancellationToken[source]#

Bases: object

add_callback(callback: Callable[[], None]) None[source]#
cancel() None[source]#
is_cancelled() bool[source]#
class ClosureAgent(description: str, closure: Callable[[ClosureContext, T, MessageContext], Awaitable[Any]])[source]#

Bases: BaseAgent, ClosureContext

property id: AgentId#

ID of the agent.

async load_state(state: Mapping[str, Any]) None[source]#

Closure agents do not have state. So this method does nothing.

property metadata: AgentMetadata#

Metadata of the agent.

async on_message_impl(message: Any, ctx: MessageContext) Any[source]#
async classmethod register_closure(runtime: AgentRuntime, type: str, closure: Callable[[ClosureContext, T, MessageContext], Awaitable[Any]], *, skip_class_subscriptions: bool = False, skip_direct_message_subscription: bool = False, description: str = '', subscriptions: Callable[[], list[Subscription] | Awaitable[list[Subscription]]] | None = None) AgentType[source]#
property runtime: AgentRuntime#
async save_state() Mapping[str, Any][source]#

Closure agents do not have state. So this method always returns an empty dictionary.

class ClosureContext(*args, **kwargs)[source]#

Bases: Protocol

property id: AgentId#
async publish_message(message: Any, topic_id: TopicId, *, cancellation_token: CancellationToken | None = None) None[source]#
async send_message(message: Any, recipient: AgentId, *, cancellation_token: CancellationToken | None = None) Any[source]#
class DefaultSubscription(topic_type: str = 'default', agent_type: str | None = None)[source]#

Bases: TypeSubscription

The default subscription is designed to be a sensible default for applications that only need global scope for agents.

This topic by default uses the “default” topic type and attempts to detect the agent type to use based on the instantiation context.

Parameters:
  • topic_type (str, optional) – The topic type to subscribe to. Defaults to “default”.

  • agent_type (str, optional) – The agent type to use for the subscription. Defaults to None, in which case it will attempt to detect the agent type based on the instantiation context.

class DefaultTopicId(type: str = 'default', source: str | None = None)[source]#

Bases: TopicId

DefaultTopicId provides a sensible default for the topic_id and source fields of a TopicId.

If created in the context of a message handler, the source will be set to the agent_id of the message handler, otherwise it will be set to “default”.

Parameters:
  • type (str, optional) – Topic type to publish message to. Defaults to “default”.

  • source (str | None, optional) – Topic source to publish message to. If None, the source will be set to the agent_id of the message handler if in the context of a message handler, otherwise it will be set to “default”. Defaults to None.

EVENT_LOGGER_NAME = 'autogen_core.events'#

The name of the logger used for structured events.

class FunctionCall(id: 'str', arguments: 'str', name: 'str')[source]#

Bases: object

arguments: str#
id: str#
name: str#
class Image(image: Image)[source]#

Bases: object

property data_uri: str#
classmethod from_base64(base64_str: str) Image[source]#
classmethod from_file(file_path: Path) Image[source]#
classmethod from_pil(pil_image: Image) Image[source]#
classmethod from_uri(uri: str) Image[source]#
async classmethod from_url(url: str) Image[source]#
to_base64() str[source]#
to_openai_format(detail: Literal['auto', 'low', 'high'] = 'auto') ChatCompletionContentPartImageParam[source]#
JSON_DATA_CONTENT_TYPE = 'application/json'#

The content type for JSON data.

class MessageContext(sender: AgentId | None, topic_id: TopicId | None, is_rpc: bool, cancellation_token: CancellationToken, message_id: str)[source]#

Bases: object

cancellation_token: CancellationToken#
is_rpc: bool#
message_id: str#
sender: AgentId | None#
topic_id: TopicId | None#
class MessageHandlerContext[source]#

Bases: object

classmethod agent_id() AgentId[source]#
class MessageSerializer(*args, **kwargs)[source]#

Bases: Protocol[T]

property data_content_type: str#
deserialize(payload: bytes) T[source]#
serialize(message: T) bytes[source]#
property type_name: str#
PROTOBUF_DATA_CONTENT_TYPE = 'application/x-protobuf'#

The content type for Protobuf data.

ROOT_LOGGER_NAME = 'autogen_core'#

The name of the root logger.

class RoutedAgent(description: str)[source]#

Bases: BaseAgent

A base class for agents that route messages to handlers based on the type of the message and optional matching functions.

To create a routed agent, subclass this class and add message handlers as methods decorated with either event() or rpc() decorator.

Example:

from dataclasses import dataclass
from autogen_core import MessageContext
from autogen_core import RoutedAgent, event, rpc


@dataclass
class Message:
    pass


@dataclass
class MessageWithContent:
    content: str


@dataclass
class Response:
    pass


class MyAgent(RoutedAgent):
    def __init__(self):
        super().__init__("MyAgent")

    @event
    async def handle_event_message(self, message: Message, ctx: MessageContext) -> None:
        assert ctx.topic_id is not None
        await self.publish_message(MessageWithContent("event handled"), ctx.topic_id)

    @rpc(match=lambda message, ctx: message.content == "special")  # type: ignore
    async def handle_special_rpc_message(self, message: MessageWithContent, ctx: MessageContext) -> Response:
        return Response()
async on_message_impl(message: Any, ctx: MessageContext) Any | None[source]#

Handle a message by routing it to the appropriate message handler. Do not override this method in subclasses. Instead, add message handlers as methods decorated with either the event() or rpc() decorator.

async on_unhandled_message(message: Any, ctx: MessageContext) None[source]#

Called when a message is received that does not have a matching message handler. The default implementation logs an info message.

class SingleThreadedAgentRuntime(*, intervention_handlers: List[InterventionHandler] | None = None, tracer_provider: TracerProvider | None = None)[source]#

Bases: AgentRuntime

add_message_serializer(serializer: MessageSerializer[Any] | Sequence[MessageSerializer[Any]]) None[source]#

Add a new message serialization serializer to the runtime

Note: This will deduplicate serializers based on the type_name and data_content_type properties

Parameters:

serializer (MessageSerializer[Any] | Sequence[MessageSerializer[Any]]) – The serializer/s to add

async add_subscription(subscription: Subscription) None[source]#

Add a new subscription that the runtime should fulfill when processing published messages

Parameters:

subscription (Subscription) – The subscription to add

async agent_load_state(agent: AgentId, state: Mapping[str, Any]) None[source]#

Load the state of a single agent.

Parameters:
  • agent (AgentId) – The agent id.

  • state (Mapping[str, Any]) – The saved state.

async agent_metadata(agent: AgentId) AgentMetadata[source]#

Get the metadata for an agent.

Parameters:

agent (AgentId) – The agent id.

Returns:

AgentMetadata – The agent metadata.

async agent_save_state(agent: AgentId) Mapping[str, Any][source]#

Save the state of a single agent.

The structure of the state is implementation defined and can be any JSON serializable object.

Parameters:

agent (AgentId) – The agent id.

Returns:

Mapping[str, Any] – The saved state.

async get(id_or_type: AgentId | AgentType | str, /, key: str = 'default', *, lazy: bool = True) AgentId[source]#
property idle: bool#
async load_state(state: Mapping[str, Any]) None[source]#

Load the state of the entire runtime, including all hosted agents. The state should be the same as the one returned by save_state().

Parameters:

state (Mapping[str, Any]) – The saved state.

property outstanding_tasks: int#
async process_next() None[source]#

Process the next message in the queue.

async publish_message(message: Any, topic_id: TopicId, *, sender: AgentId | None = None, cancellation_token: CancellationToken | None = None, message_id: str | None = None) None[source]#

Publish a message to all agents in the given namespace, or if no namespace is provided, the namespace of the sender.

No responses are expected from publishing.

Parameters:
  • message (Any) – The message to publish.

  • topic (TopicId) – The topic to publish the message to.

  • sender (AgentId | None, optional) – The agent which sent the message. Defaults to None.

  • cancellation_token (CancellationToken | None, optional) – Token used to cancel an in progress. Defaults to None.

  • message_id (str | None, optional) – The message id. If None, a new message id will be generated. Defaults to None. This message id must be unique. and is recommended to be a UUID.

Raises:

UndeliverableException – If the message cannot be delivered.

register(type: str, agent_factory: Callable[[], T | Awaitable[T]] | Callable[[AgentRuntime, AgentId], T | Awaitable[T]], subscriptions: Callable[[], list[Subscription] | Awaitable[list[Subscription]]] | list[Subscription] | None = None) AgentType[source]#

Register an agent factory with the runtime associated with a specific type. The type must be unique.

Deprecated since version 0.4.0.dev1: Use a specific agent’s register method directly instead of this method. For example: BaseAgent.register()

Parameters:
  • type (str) – The type of agent this factory creates. It is not the same as agent class name. The type parameter is used to differentiate between different factory functions rather than agent classes.

  • agent_factory (Callable[[], T]) – The factory that creates the agent, where T is a concrete Agent type. Inside the factory, use autogen_core.AgentInstantiationContext to access variables like the current runtime and agent ID.

  • subscriptions (Callable[[], list[Subscription]] | list[Subscription] | None, optional) – The subscriptions that the agent should be subscribed to. Defaults to None.

async register_factory(*, type: AgentType, agent_factory: Callable[[], T | Awaitable[T]], expected_class: type[T]) AgentType[source]#

Register an agent factory with the runtime associated with a specific type. The type must be unique.

Parameters:
  • type (str) – The type of agent this factory creates. It is not the same as agent class name. The type parameter is used to differentiate between different factory functions rather than agent classes.

  • agent_factory (Callable[[], T]) – The factory that creates the agent, where T is a concrete Agent type. Inside the factory, use autogen_core.AgentInstantiationContext to access variables like the current runtime and agent ID.

async remove_subscription(id: str) None[source]#

Remove a subscription from the runtime

Parameters:

id (str) – id of the subscription to remove

Raises:

LookupError – If the subscription does not exist

async save_state() Mapping[str, Any][source]#

Save the state of the entire runtime, including all hosted agents. The only way to restore the state is to pass it to load_state().

The structure of the state is implementation defined and can be any JSON serializable object.

Returns:

Mapping[str, Any] – The saved state.

async send_message(message: Any, recipient: AgentId, *, sender: AgentId | None = None, cancellation_token: CancellationToken | None = None) Any[source]#

Send a message to an agent and get a response.

Parameters:
  • message (Any) – The message to send.

  • recipient (AgentId) – The agent to send the message to.

  • sender (AgentId | None, optional) – Agent which sent the message. Should only be None if this was sent from no agent, such as directly to the runtime externally. Defaults to None.

  • cancellation_token (CancellationToken | None, optional) – Token used to cancel an in progress . Defaults to None.

Raises:
Returns:

Any – The response from the agent.

start() None[source]#

Start the runtime message processing loop.

async stop() None[source]#

Stop the runtime message processing loop.

async stop_when(condition: Callable[[], bool]) None[source]#

Stop the runtime message processing loop when the condition is met.

async stop_when_idle() None[source]#

Stop the runtime message processing loop when there is no outstanding message being processed or queued.

async try_get_underlying_agent_instance(id: ~autogen_core._agent_id.AgentId, type: ~typing.Type[~autogen_core._single_threaded_agent_runtime.T] = <class 'autogen_core._agent.Agent'>) T[source]#

Try to get the underlying agent instance by name and namespace. This is generally discouraged (hence the long name), but can be useful in some cases.

If the underlying agent is not accessible, this will raise an exception.

Parameters:
  • id (AgentId) – The agent id.

  • type (Type[T], optional) – The expected type of the agent. Defaults to Agent.

Returns:

T – The concrete agent instance.

Raises:
  • LookupError – If the agent is not found.

  • NotAccessibleError – If the agent is not accessible, for example if it is located remotely.

  • TypeError – If the agent is not of the expected type.

property unprocessed_messages: Sequence[PublishMessageEnvelope | SendMessageEnvelope | ResponseMessageEnvelope]#
class Subscription(*args, **kwargs)[source]#

Bases: Protocol

Subscriptions define the topics that an agent is interested in.

property id: str#

Get the ID of the subscription.

Implementations should return a unique ID for the subscription. Usually this is a UUID.

Returns:

str – ID of the subscription.

is_match(topic_id: TopicId) bool[source]#

Check if a given topic_id matches the subscription.

Parameters:

topic_id (TopicId) – TopicId to check.

Returns:

bool – True if the topic_id matches the subscription, False otherwise.

map_to_agent(topic_id: TopicId) AgentId[source]#

Map a topic_id to an agent. Should only be called if is_match returns True for the given topic_id.

Parameters:

topic_id (TopicId) – TopicId to map.

Returns:

AgentId – ID of the agent that should handle the topic_id.

Raises:

CantHandleException – If the subscription cannot handle the topic_id.

class SubscriptionInstantiationContext[source]#

Bases: object

classmethod agent_type() AgentType[source]#
TRACE_LOGGER_NAME = 'autogen_core.trace'#

Logger name used for developer intended trace logging. The content and format of this log should not be depended upon.

class TopicId(type: str, source: str)[source]#

Bases: object

classmethod from_str(topic_id: str) Self[source]#
source: str#

Identifies the context in which an event happened. Adhere’s to the cloud event spec.

Learn more here: cloudevents/spec

type: str#

Type of the event that this topic_id contains. Adhere’s to the cloud event spec.

Must match the pattern: ^[w-.:=]+Z

Learn more here: cloudevents/spec

class TypePrefixSubscription(topic_type_prefix: str, agent_type: str)[source]#

Bases: Subscription

This subscription matches on topics based on a prefix of the type and maps to agents using the source of the topic as the agent key.

This subscription causes each source to have its own agent instance.

Example

from autogen_core import TypePrefixSubscription

subscription = TypePrefixSubscription(topic_type_prefix="t1", agent_type="a1")

In this case:

  • A topic_id with type t1 and source s1 will be handled by an agent of type a1 with key s1

  • A topic_id with type t1 and source s2 will be handled by an agent of type a1 with key s2.

  • A topic_id with type t1SUFFIX and source s2 will be handled by an agent of type a1 with key s2.

Parameters:
  • topic_type_prefix (str) – Topic type prefix to match against

  • agent_type (str) – Agent type to handle this subscription

property agent_type: str#
property id: str#

Get the ID of the subscription.

Implementations should return a unique ID for the subscription. Usually this is a UUID.

Returns:

str – ID of the subscription.

is_match(topic_id: TopicId) bool[source]#

Check if a given topic_id matches the subscription.

Parameters:

topic_id (TopicId) – TopicId to check.

Returns:

bool – True if the topic_id matches the subscription, False otherwise.

map_to_agent(topic_id: TopicId) AgentId[source]#

Map a topic_id to an agent. Should only be called if is_match returns True for the given topic_id.

Parameters:

topic_id (TopicId) – TopicId to map.

Returns:

AgentId – ID of the agent that should handle the topic_id.

Raises:

CantHandleException – If the subscription cannot handle the topic_id.

property topic_type_prefix: str#
class TypeSubscription(topic_type: str, agent_type: str)[source]#

Bases: Subscription

This subscription matches on topics based on the type and maps to agents using the source of the topic as the agent key.

This subscription causes each source to have its own agent instance.

Example

from autogen_core import TypeSubscription

subscription = TypeSubscription(topic_type="t1", agent_type="a1")

In this case:

  • A topic_id with type t1 and source s1 will be handled by an agent of type a1 with key s1

  • A topic_id with type t1 and source s2 will be handled by an agent of type a1 with key s2.

Parameters:
  • topic_type (str) – Topic type to match against

  • agent_type (str) – Agent type to handle this subscription

property agent_type: str#
property id: str#

Get the ID of the subscription.

Implementations should return a unique ID for the subscription. Usually this is a UUID.

Returns:

str – ID of the subscription.

is_match(topic_id: TopicId) bool[source]#

Check if a given topic_id matches the subscription.

Parameters:

topic_id (TopicId) – TopicId to check.

Returns:

bool – True if the topic_id matches the subscription, False otherwise.

map_to_agent(topic_id: TopicId) AgentId[source]#

Map a topic_id to an agent. Should only be called if is_match returns True for the given topic_id.

Parameters:

topic_id (TopicId) – TopicId to map.

Returns:

AgentId – ID of the agent that should handle the topic_id.

Raises:

CantHandleException – If the subscription cannot handle the topic_id.

property topic_type: str#
class UnknownPayload(type_name: str, data_content_type: str, payload: bytes)[source]#

Bases: object

data_content_type: str#
payload: bytes#
type_name: str#
default_subscription(cls: Type[BaseAgentType] | None = None) Callable[[Type[BaseAgentType]], Type[BaseAgentType]] | Type[BaseAgentType][source]#
event(func: None | Callable[[AgentT, ReceivesT, MessageContext], Coroutine[Any, Any, None]] = None, *, strict: bool = True, match: None | Callable[[ReceivesT, MessageContext], bool] = None) Callable[[Callable[[AgentT, ReceivesT, MessageContext], Coroutine[Any, Any, None]]], MessageHandler[AgentT, ReceivesT, None]] | MessageHandler[AgentT, ReceivesT, None][source]#

Decorator for event message handlers.

Add this decorator to methods in a RoutedAgent class that are intended to handle event messages. These methods must have a specific signature that needs to be followed for it to be valid:

  • The method must be an async method.

  • The method must be decorated with the @message_handler decorator.

  • The method must have exactly 3 arguments:
    1. self

    2. message: The event message to be handled, this must be type-hinted with the message type that it is intended to handle.

    3. ctx: A autogen_core.MessageContext object.

  • The method must return None.

Handlers can handle more than one message type by accepting a Union of the message types.

Parameters:
  • func – The function to be decorated.

  • strict – If True, the handler will raise an exception if the message type is not in the target types. If False, it will log a warning instead.

  • match – A function that takes the message and the context as arguments and returns a boolean. This is used for secondary routing after the message type. For handlers addressing the same message type, the match function is applied in alphabetical order of the handlers and the first matching handler will be called while the rest are skipped. If None, the first handler in alphabetical order matching the same message type will be called.

message_handler(func: None | Callable[[AgentT, ReceivesT, MessageContext], Coroutine[Any, Any, ProducesT]] = None, *, strict: bool = True, match: None | Callable[[ReceivesT, MessageContext], bool] = None) Callable[[Callable[[AgentT, ReceivesT, MessageContext], Coroutine[Any, Any, ProducesT]]], MessageHandler[AgentT, ReceivesT, ProducesT]] | MessageHandler[AgentT, ReceivesT, ProducesT][source]#

Decorator for generic message handlers.

Add this decorator to methods in a RoutedAgent class that are intended to handle both event and RPC messages. These methods must have a specific signature that needs to be followed for it to be valid:

  • The method must be an async method.

  • The method must be decorated with the @message_handler decorator.

  • The method must have exactly 3 arguments:
    1. self

    2. message: The message to be handled, this must be type-hinted with the message type that it is intended to handle.

    3. ctx: A autogen_core.MessageContext object.

  • The method must be type hinted with what message types it can return as a response, or it can return None if it does not return anything.

Handlers can handle more than one message type by accepting a Union of the message types. It can also return more than one message type by returning a Union of the message types.

Parameters:
  • func – The function to be decorated.

  • strict – If True, the handler will raise an exception if the message type or return type is not in the target types. If False, it will log a warning instead.

  • match – A function that takes the message and the context as arguments and returns a boolean. This is used for secondary routing after the message type. For handlers addressing the same message type, the match function is applied in alphabetical order of the handlers and the first matching handler will be called while the rest are skipped. If None, the first handler in alphabetical order matching the same message type will be called.

rpc(func: None | Callable[[AgentT, ReceivesT, MessageContext], Coroutine[Any, Any, ProducesT]] = None, *, strict: bool = True, match: None | Callable[[ReceivesT, MessageContext], bool] = None) Callable[[Callable[[AgentT, ReceivesT, MessageContext], Coroutine[Any, Any, ProducesT]]], MessageHandler[AgentT, ReceivesT, ProducesT]] | MessageHandler[AgentT, ReceivesT, ProducesT][source]#

Decorator for RPC message handlers.

Add this decorator to methods in a RoutedAgent class that are intended to handle RPC messages. These methods must have a specific signature that needs to be followed for it to be valid:

  • The method must be an async method.

  • The method must be decorated with the @message_handler decorator.

  • The method must have exactly 3 arguments:
    1. self

    2. message: The message to be handled, this must be type-hinted with the message type that it is intended to handle.

    3. ctx: A autogen_core.MessageContext object.

  • The method must be type hinted with what message types it can return as a response, or it can return None if it does not return anything.

Handlers can handle more than one message type by accepting a Union of the message types. It can also return more than one message type by returning a Union of the message types.

Parameters:
  • func – The function to be decorated.

  • strict – If True, the handler will raise an exception if the message type or return type is not in the target types. If False, it will log a warning instead.

  • match – A function that takes the message and the context as arguments and returns a boolean. This is used for secondary routing after the message type. For handlers addressing the same message type, the match function is applied in alphabetical order of the handlers and the first matching handler will be called while the rest are skipped. If None, the first handler in alphabetical order matching the same message type will be called.

type_subscription(topic_type: str) Callable[[Type[BaseAgentType]], Type[BaseAgentType]][source]#