autogen_core.base#

The autogen_core.base module provides the foundational generic interfaces upon which all else is built. This module must not depend on any other module.

class autogen_core.base.Agent(*args, **kwargs)[source]#

Bases: Protocol

property id: AgentId#

ID of the agent.

async load_state(state: Mapping[str, Any]) None[source]#

Load in the state of the agent obtained from save_state.

Parameters:

state (Mapping[str, Any]) – State of the agent. Must be JSON serializable.

property metadata: AgentMetadata#

Metadata of the agent.

async on_message(message: Any, ctx: MessageContext) Any[source]#

Message handler for the agent. This should only be called by the runtime, not by other agents.

Parameters:
  • message (Any) – Received message. Type is one of the types in subscriptions.

  • ctx (MessageContext) – Context of the message.

Returns:

Any – Response to the message. Can be None.

Raises:
async save_state() Mapping[str, Any][source]#

Save the state of the agent. The result must be JSON serializable.

class autogen_core.base.AgentChildren(*args, **kwargs)[source]#

Bases: Protocol

property children: Sequence[AgentId]#

Ids of the children of the agent.

class autogen_core.base.AgentId(type: str | AgentType, key: str)[source]#

Bases: object

classmethod from_str(agent_id: str) Self[source]#
property key: str#
property type: str#
class autogen_core.base.AgentInstantiationContext[source]#

Bases: object

AGENT_INSTANTIATION_CONTEXT_VAR: ClassVar[ContextVar[tuple[AgentRuntime, AgentId]]] = <ContextVar name='AGENT_INSTANTIATION_CONTEXT_VAR'>#
classmethod current_agent_id() AgentId[source]#
classmethod current_runtime() AgentRuntime[source]#
classmethod populate_context(ctx: tuple[AgentRuntime, AgentId]) Generator[None, Any, None][source]#
class autogen_core.base.AgentMetadata[source]#

Bases: TypedDict

description: str#
key: str#
type: str#
class autogen_core.base.AgentProxy(agent: AgentId, runtime: AgentRuntime)[source]#

Bases: object

property id: AgentId#

Target agent for this proxy

async load_state(state: Mapping[str, Any]) None[source]#

Load in the state of the agent obtained from save_state.

Parameters:

state (Mapping[str, Any]) – State of the agent. Must be JSON serializable.

property metadata: Awaitable[AgentMetadata]#

Metadata of the agent.

async save_state() Mapping[str, Any][source]#

Save the state of the agent. The result must be JSON serializable.

async send_message(message: Any, *, sender: AgentId, cancellation_token: CancellationToken | None = None) Any[source]#
class autogen_core.base.AgentRuntime(*args, **kwargs)[source]#

Bases: Protocol

add_message_serializer(serializer: MessageSerializer[Any] | Sequence[MessageSerializer[Any]]) None[source]#

Add a new message serialization serializer to the runtime

Note: This will deduplicate serializers based on the type_name and data_content_type properties

Parameters:

serializer (MessageSerializer[Any] | Sequence[MessageSerializer[Any]]) – The serializer/s to add

async add_subscription(subscription: Subscription) None[source]#

Add a new subscription that the runtime should fulfill when processing published messages

Parameters:

subscription (Subscription) – The subscription to add

async agent_load_state(agent: AgentId, state: Mapping[str, Any]) None[source]#

Load the state of a single agent.

Parameters:
  • agent (AgentId) – The agent id.

  • state (Mapping[str, Any]) – The saved state.

async agent_metadata(agent: AgentId) AgentMetadata[source]#

Get the metadata for an agent.

Parameters:

agent (AgentId) – The agent id.

Returns:

AgentMetadata – The agent metadata.

async agent_save_state(agent: AgentId) Mapping[str, Any][source]#

Save the state of a single agent.

The structure of the state is implementation defined and can be any JSON serializable object.

Parameters:

agent (AgentId) – The agent id.

Returns:

Mapping[str, Any] – The saved state.

async get(id: AgentId, /, *, lazy: bool = True) AgentId[source]#
async get(type: AgentType | str, /, key: str = 'default', *, lazy: bool = True) AgentId
async load_state(state: Mapping[str, Any]) None[source]#

Load the state of the entire runtime, including all hosted agents. The state should be the same as the one returned by save_state().

Parameters:

state (Mapping[str, Any]) – The saved state.

async publish_message(message: Any, topic_id: TopicId, *, sender: AgentId | None = None, cancellation_token: CancellationToken | None = None) None[source]#

Publish a message to all agents in the given namespace, or if no namespace is provided, the namespace of the sender.

No responses are expected from publishing.

Parameters:
  • message (Any) – The message to publish.

  • topic (TopicId) – The topic to publish the message to.

  • sender (AgentId | None, optional) – The agent which sent the message. Defaults to None.

  • cancellation_token (CancellationToken | None, optional) – Token used to cancel an in progress . Defaults to None.

Raises:

UndeliverableException – If the message cannot be delivered.

register(type: str, agent_factory: Callable[[], T | Awaitable[T]], subscriptions: Callable[[], list[Subscription] | Awaitable[list[Subscription]]] | list[Subscription] | None = None) AgentType[source]#

Register an agent factory with the runtime associated with a specific type. The type must be unique.

Parameters:
  • type (str) – The type of agent this factory creates. It is not the same as agent class name. The type parameter is used to differentiate between different factory functions rather than agent classes.

  • agent_factory (Callable[[], T]) – The factory that creates the agent, where T is a concrete Agent type. Inside the factory, use autogen_core.base.AgentInstantiationContext to access variables like the current runtime and agent ID.

  • subscriptions (Callable[[], list[Subscription]] | list[Subscription] | None, optional) – The subscriptions that the agent should be subscribed to. Defaults to None.

Example

runtime.register(
    "chat_agent",
    lambda: ChatCompletionAgent(
        description="A generic chat agent.",
        system_messages=[SystemMessage("You are a helpful assistant")],
        model_client=OpenAIChatCompletionClient(model="gpt-4o"),
        memory=BufferedChatMemory(buffer_size=10),
    ),
)
async register_factory(*, type: AgentType, agent_factory: Callable[[], T | Awaitable[T]], expected_class: type[T]) AgentType[source]#

Register an agent factory with the runtime associated with a specific type. The type must be unique.

Parameters:
  • type (str) – The type of agent this factory creates. It is not the same as agent class name. The type parameter is used to differentiate between different factory functions rather than agent classes.

  • agent_factory (Callable[[], T]) – The factory that creates the agent, where T is a concrete Agent type. Inside the factory, use autogen_core.base.AgentInstantiationContext to access variables like the current runtime and agent ID.

Example

runtime.register(
    "chat_agent",
    lambda: ChatCompletionAgent(
        description="A generic chat agent.",
        system_messages=[SystemMessage("You are a helpful assistant")],
        model_client=OpenAIChatCompletionClient(model="gpt-4o"),
        memory=BufferedChatMemory(buffer_size=10),
    ),
)
async remove_subscription(id: str) None[source]#

Remove a subscription from the runtime

Parameters:

id (str) – id of the subscription to remove

Raises:

LookupError – If the subscription does not exist

async save_state() Mapping[str, Any][source]#

Save the state of the entire runtime, including all hosted agents. The only way to restore the state is to pass it to load_state().

The structure of the state is implementation defined and can be any JSON serializable object.

Returns:

Mapping[str, Any] – The saved state.

async send_message(message: Any, recipient: AgentId, *, sender: AgentId | None = None, cancellation_token: CancellationToken | None = None) Any[source]#

Send a message to an agent and get a response.

Parameters:
  • message (Any) – The message to send.

  • recipient (AgentId) – The agent to send the message to.

  • sender (AgentId | None, optional) – Agent which sent the message. Should only be None if this was sent from no agent, such as directly to the runtime externally. Defaults to None.

  • cancellation_token (CancellationToken | None, optional) – Token used to cancel an in progress . Defaults to None.

Raises:
Returns:

Any – The response from the agent.

async try_get_underlying_agent_instance(id: ~autogen_core.base._agent_id.AgentId, type: ~typing.Type[~autogen_core.base._agent_runtime.T] = <class 'autogen_core.base._agent.Agent'>) T[source]#

Try to get the underlying agent instance by name and namespace. This is generally discouraged (hence the long name), but can be useful in some cases.

If the underlying agent is not accessible, this will raise an exception.

Parameters:
  • id (AgentId) – The agent id.

  • type (Type[T], optional) – The expected type of the agent. Defaults to Agent.

Returns:

T – The concrete agent instance.

Raises:
  • LookupError – If the agent is not found.

  • NotAccessibleError – If the agent is not accessible, for example if it is located remotely.

  • TypeError – If the agent is not of the expected type.

class autogen_core.base.AgentType(type: str)[source]#

Bases: object

type: str#

String representation of this agent type.

class autogen_core.base.BaseAgent(description: str)[source]#

Bases: ABC, Agent

property id: AgentId#

ID of the agent.

internal_extra_handles_types: ClassVar[List[Tuple[Type[Any], List[MessageSerializer[Any]]]]] = []#
internal_unbound_subscriptions_list: ClassVar[List[Callable[[], list[Subscription] | Awaitable[list[Subscription]]]]] = []#
async load_state(state: Mapping[str, Any]) None[source]#

Load in the state of the agent obtained from save_state.

Parameters:

state (Mapping[str, Any]) – State of the agent. Must be JSON serializable.

property metadata: AgentMetadata#

Metadata of the agent.

abstract async on_message(message: Any, ctx: MessageContext) Any[source]#

Message handler for the agent. This should only be called by the runtime, not by other agents.

Parameters:
  • message (Any) – Received message. Type is one of the types in subscriptions.

  • ctx (MessageContext) – Context of the message.

Returns:

Any – Response to the message. Can be None.

Raises:
async publish_message(message: Any, topic_id: TopicId, *, cancellation_token: CancellationToken | None = None) None[source]#
async classmethod register(runtime: AgentRuntime, type: str, factory: Callable[[], Self | Awaitable[Self]], *, skip_class_subscriptions: bool = False) AgentType[source]#

Register a virtual subclass of an ABC.

Returns the subclass, to allow usage as a class decorator.

property runtime: AgentRuntime#
async save_state() Mapping[str, Any][source]#

Save the state of the agent. The result must be JSON serializable.

async send_message(message: Any, recipient: AgentId, *, cancellation_token: CancellationToken | None = None) Any[source]#

See autogen_core.base.AgentRuntime.send_message() for more information.

property type: str#
class autogen_core.base.CancellationToken[source]#

Bases: object

add_callback(callback: Callable[[], None]) None[source]#
cancel() None[source]#
is_cancelled() bool[source]#
class autogen_core.base.MessageContext(sender: autogen_core.base._agent_id.AgentId | None, topic_id: autogen_core.base._topic.TopicId | None, is_rpc: bool, cancellation_token: autogen_core.base._cancellation_token.CancellationToken)[source]#

Bases: object

cancellation_token: CancellationToken#
is_rpc: bool#
sender: AgentId | None#
topic_id: TopicId | None#
class autogen_core.base.MessageHandlerContext[source]#

Bases: object

MESSAGE_HANDLER_CONTEXT: ClassVar[ContextVar[AgentId]] = <ContextVar name='MESSAGE_HANDLER_CONTEXT'>#
classmethod agent_id() AgentId[source]#
classmethod populate_context(ctx: AgentId) Generator[None, Any, None][source]#
class autogen_core.base.MessageSerializer(*args, **kwargs)[source]#

Bases: Protocol[T]

property data_content_type: str#
deserialize(payload: bytes) T[source]#
serialize(message: T) bytes[source]#
property type_name: str#
class autogen_core.base.SerializationRegistry[source]#

Bases: object

add_serializer(serializer: MessageSerializer[Any] | Sequence[MessageSerializer[Any]]) None[source]#
deserialize(payload: bytes, *, type_name: str, data_content_type: str) Any[source]#
is_registered(type_name: str, data_content_type: str) bool[source]#
serialize(message: Any, *, type_name: str, data_content_type: str) bytes[source]#
type_name(message: Any) str[source]#
class autogen_core.base.Subscription(*args, **kwargs)[source]#

Bases: Protocol

Subscriptions define the topics that an agent is interested in.

property id: str#

Get the ID of the subscription.

Implementations should return a unique ID for the subscription. Usually this is a UUID.

Returns:

str – ID of the subscription.

is_match(topic_id: TopicId) bool[source]#

Check if a given topic_id matches the subscription.

Parameters:

topic_id (TopicId) – TopicId to check.

Returns:

bool – True if the topic_id matches the subscription, False otherwise.

map_to_agent(topic_id: TopicId) AgentId[source]#

Map a topic_id to an agent. Should only be called if is_match returns True for the given topic_id.

Parameters:

topic_id (TopicId) – TopicId to map.

Returns:

AgentId – ID of the agent that should handle the topic_id.

Raises:

CantHandleException – If the subscription cannot handle the topic_id.

class autogen_core.base.SubscriptionInstantiationContext[source]#

Bases: object

SUBSCRIPTION_CONTEXT_VAR: ClassVar[ContextVar[AgentType]] = <ContextVar name='SUBSCRIPTION_CONTEXT_VAR'>#
classmethod agent_type() AgentType[source]#
classmethod populate_context(ctx: AgentType) Generator[None, Any, None][source]#
class autogen_core.base.TopicId(type: str, source: str)[source]#

Bases: object

classmethod from_str(topic_id: str) Self[source]#
source: str#

Identifies the context in which an event happened. Adhere’s to the cloud event spec.

Learn more here: cloudevents/spec

type: str#

Type of the event that this topic_id contains. Adhere’s to the cloud event spec.

Learn more here: cloudevents/spec

class autogen_core.base.UnknownPayload(type_name: str, data_content_type: str, payload: bytes)[source]#

Bases: object

data_content_type: str#
payload: bytes#
type_name: str#
autogen_core.base.subscription_factory(subscription: Callable[[], list[Subscription] | Awaitable[list[Subscription]]]) Callable[[Type[BaseAgentType]], Type[BaseAgentType]][source]#
autogen_core.base.try_get_known_serializers_for_type(cls: type[Any]) list[MessageSerializer[Any]][source]#