autogen_core#
- class Agent(*args, **kwargs)[source]#
Bases:
Protocol
- property metadata: AgentMetadata#
Metadata of the agent.
- async on_message(message: Any, ctx: MessageContext) Any [source]#
Message handler for the agent. This should only be called by the runtime, not by other agents.
- Parameters:
message (Any) – Received message. Type is one of the types in subscriptions.
ctx (MessageContext) – Context of the message.
- Returns:
Any – Response to the message. Can be None.
- Raises:
CancelledError – If the message was cancelled.
CantHandleException – If the agent cannot handle the message.
- class AgentId(type: str | AgentType, key: str)[source]#
Bases:
object
Agent ID uniquely identifies an agent instance within an agent runtime - including distributed runtime. It is the ‘address’ of the agent instance for receiving messages.
See here for more information: Agent Identity and Lifecycle
- classmethod from_str(agent_id: str) Self [source]#
Convert a string of the format
type/key
into an AgentId
- class AgentProxy(agent: AgentId, runtime: AgentRuntime)[source]#
Bases:
object
A helper class that allows you to use an
AgentId
in place of its associatedAgent
- property metadata: Awaitable[AgentMetadata]#
Metadata of the agent.
- async send_message(message: Any, *, sender: AgentId, cancellation_token: CancellationToken | None = None) Any [source]#
- class AgentRuntime(*args, **kwargs)[source]#
Bases:
Protocol
- async send_message(message: Any, recipient: AgentId, *, sender: AgentId | None = None, cancellation_token: CancellationToken | None = None) Any [source]#
Send a message to an agent and get a response.
- Parameters:
message (Any) – The message to send.
recipient (AgentId) – The agent to send the message to.
sender (AgentId | None, optional) – Agent which sent the message. Should only be None if this was sent from no agent, such as directly to the runtime externally. Defaults to None.
cancellation_token (CancellationToken | None, optional) – Token used to cancel an in progress . Defaults to None.
- Raises:
CantHandleException – If the recipient cannot handle the message.
UndeliverableException – If the message cannot be delivered.
Other – Any other exception raised by the recipient.
- Returns:
Any – The response from the agent.
- async publish_message(message: Any, topic_id: TopicId, *, sender: AgentId | None = None, cancellation_token: CancellationToken | None = None, message_id: str | None = None) None [source]#
Publish a message to all agents in the given namespace, or if no namespace is provided, the namespace of the sender.
No responses are expected from publishing.
- Parameters:
message (Any) – The message to publish.
topic (TopicId) – The topic to publish the message to.
sender (AgentId | None, optional) – The agent which sent the message. Defaults to None.
cancellation_token (CancellationToken | None, optional) – Token used to cancel an in progress. Defaults to None.
message_id (str | None, optional) – The message id. If None, a new message id will be generated. Defaults to None. This message id must be unique. and is recommended to be a UUID.
- Raises:
UndeliverableException – If the message cannot be delivered.
- async register_factory(type: str | AgentType, agent_factory: Callable[[], T | Awaitable[T]], *, expected_class: type[T] | None = None) AgentType [source]#
Register an agent factory with the runtime associated with a specific type. The type must be unique. This API does not add any subscriptions.
Note
This is a low level API and usually the agent class’s register method should be used instead, as this also handles subscriptions automatically.
Example:
from dataclasses import dataclass from autogen_core import AgentRuntime, MessageContext, RoutedAgent, event from autogen_core.models import UserMessage @dataclass class MyMessage: content: str class MyAgent(RoutedAgent): def __init__(self) -> None: super().__init__("My core agent") @event async def handler(self, message: UserMessage, context: MessageContext) -> None: print("Event received: ", message.content) async def my_agent_factory(): return MyAgent() async def main() -> None: runtime: AgentRuntime = ... # type: ignore await runtime.register_factory("my_agent", lambda: MyAgent()) import asyncio asyncio.run(main())
- Parameters:
type (str) – The type of agent this factory creates. It is not the same as agent class name. The type parameter is used to differentiate between different factory functions rather than agent classes.
agent_factory (Callable[[], T]) – The factory that creates the agent, where T is a concrete Agent type. Inside the factory, use autogen_core.AgentInstantiationContext to access variables like the current runtime and agent ID.
expected_class (type[T] | None, optional) – The expected class of the agent, used for runtime validation of the factory. Defaults to None.
- async try_get_underlying_agent_instance(id: ~autogen_core._agent_id.AgentId, type: ~typing.Type[~autogen_core._agent_runtime.T] = <class 'autogen_core._agent.Agent'>) T [source]#
Try to get the underlying agent instance by name and namespace. This is generally discouraged (hence the long name), but can be useful in some cases.
If the underlying agent is not accessible, this will raise an exception.
- Parameters:
id (AgentId) – The agent id.
type (Type[T], optional) – The expected type of the agent. Defaults to Agent.
- Returns:
T – The concrete agent instance.
- Raises:
LookupError – If the agent is not found.
NotAccessibleError – If the agent is not accessible, for example if it is located remotely.
TypeError – If the agent is not of the expected type.
- async get(id: AgentId, /, *, lazy: bool = True) AgentId [source]#
- async get(type: AgentType | str, /, key: str = 'default', *, lazy: bool = True) AgentId
- async save_state() Mapping[str, Any] [source]#
Save the state of the entire runtime, including all hosted agents. The only way to restore the state is to pass it to
load_state()
.The structure of the state is implementation defined and can be any JSON serializable object.
- Returns:
Mapping[str, Any] – The saved state.
- async load_state(state: Mapping[str, Any]) None [source]#
Load the state of the entire runtime, including all hosted agents. The state should be the same as the one returned by
save_state()
.- Parameters:
state (Mapping[str, Any]) – The saved state.
- async agent_metadata(agent: AgentId) AgentMetadata [source]#
Get the metadata for an agent.
- Parameters:
agent (AgentId) – The agent id.
- Returns:
AgentMetadata – The agent metadata.
- async agent_save_state(agent: AgentId) Mapping[str, Any] [source]#
Save the state of a single agent.
The structure of the state is implementation defined and can be any JSON serializable object.
- Parameters:
agent (AgentId) – The agent id.
- Returns:
Mapping[str, Any] – The saved state.
- async agent_load_state(agent: AgentId, state: Mapping[str, Any]) None [source]#
Load the state of a single agent.
- async add_subscription(subscription: Subscription) None [source]#
Add a new subscription that the runtime should fulfill when processing published messages
- Parameters:
subscription (Subscription) – The subscription to add
- async remove_subscription(id: str) None [source]#
Remove a subscription from the runtime
- Parameters:
id (str) – id of the subscription to remove
- Raises:
LookupError – If the subscription does not exist
- add_message_serializer(serializer: MessageSerializer[Any] | Sequence[MessageSerializer[Any]]) None [source]#
Add a new message serialization serializer to the runtime
Note: This will deduplicate serializers based on the type_name and data_content_type properties
- Parameters:
serializer (MessageSerializer[Any] | Sequence[MessageSerializer[Any]]) – The serializer/s to add
- class BaseAgent(description: str)[source]#
-
- property metadata: AgentMetadata#
Metadata of the agent.
- property runtime: AgentRuntime#
- final async on_message(message: Any, ctx: MessageContext) Any [source]#
Message handler for the agent. This should only be called by the runtime, not by other agents.
- Parameters:
message (Any) – Received message. Type is one of the types in subscriptions.
ctx (MessageContext) – Context of the message.
- Returns:
Any – Response to the message. Can be None.
- Raises:
CancelledError – If the message was cancelled.
CantHandleException – If the agent cannot handle the message.
- abstract async on_message_impl(message: Any, ctx: MessageContext) Any [source]#
- async send_message(message: Any, recipient: AgentId, *, cancellation_token: CancellationToken | None = None) Any [source]#
See
autogen_core.AgentRuntime.send_message()
for more information.
- async publish_message(message: Any, topic_id: TopicId, *, cancellation_token: CancellationToken | None = None) None [source]#
- async save_state() Mapping[str, Any] [source]#
Save the state of the agent. The result must be JSON serializable.
- async load_state(state: Mapping[str, Any]) None [source]#
Load in the state of the agent obtained from save_state.
- Parameters:
state (Mapping[str, Any]) – State of the agent. Must be JSON serializable.
- async classmethod register(runtime: AgentRuntime, type: str, factory: Callable[[], Self | Awaitable[Self]], *, skip_class_subscriptions: bool = False, skip_direct_message_subscription: bool = False) AgentType [source]#
Register a virtual subclass of an ABC.
Returns the subclass, to allow usage as a class decorator.
- class CancellationToken[source]#
Bases:
object
A token used to cancel pending async calls
- class AgentInstantiationContext[source]#
Bases:
object
- classmethod current_runtime() AgentRuntime [source]#
- class TopicId(type: str, source: str)[source]#
Bases:
object
TopicId defines the scope of a broadcast message. In essence, agent runtime implements a publish-subscribe model through its broadcast API: when publishing a message, the topic must be specified.
See here for more information: Topic
- type: str#
Type of the event that this topic_id contains. Adhere’s to the cloud event spec.
Must match the pattern: ^[w-.:=]+Z
Learn more here: cloudevents/spec
- source: str#
Identifies the context in which an event happened. Adhere’s to the cloud event spec.
Learn more here: cloudevents/spec
- class Subscription(*args, **kwargs)[source]#
Bases:
Protocol
Subscriptions define the topics that an agent is interested in.
- property id: str#
Get the ID of the subscription.
Implementations should return a unique ID for the subscription. Usually this is a UUID.
- Returns:
str – ID of the subscription.
- is_match(topic_id: TopicId) bool [source]#
Check if a given topic_id matches the subscription.
- Parameters:
topic_id (TopicId) – TopicId to check.
- Returns:
bool – True if the topic_id matches the subscription, False otherwise.
- map_to_agent(topic_id: TopicId) AgentId [source]#
Map a topic_id to an agent. Should only be called if is_match returns True for the given topic_id.
- Parameters:
topic_id (TopicId) – TopicId to map.
- Returns:
AgentId – ID of the agent that should handle the topic_id.
- Raises:
CantHandleException – If the subscription cannot handle the topic_id.
- class MessageContext(sender: AgentId | None, topic_id: TopicId | None, is_rpc: bool, cancellation_token: CancellationToken, message_id: str)[source]#
Bases:
object
- cancellation_token: CancellationToken#
- class RoutedAgent(description: str)[source]#
Bases:
BaseAgent
A base class for agents that route messages to handlers based on the type of the message and optional matching functions.
To create a routed agent, subclass this class and add message handlers as methods decorated with either
event()
orrpc()
decorator.Example:
from dataclasses import dataclass from autogen_core import MessageContext from autogen_core import RoutedAgent, event, rpc @dataclass class Message: pass @dataclass class MessageWithContent: content: str @dataclass class Response: pass class MyAgent(RoutedAgent): def __init__(self): super().__init__("MyAgent") @event async def handle_event_message(self, message: Message, ctx: MessageContext) -> None: assert ctx.topic_id is not None await self.publish_message(MessageWithContent("event handled"), ctx.topic_id) @rpc(match=lambda message, ctx: message.content == "special") # type: ignore async def handle_special_rpc_message(self, message: MessageWithContent, ctx: MessageContext) -> Response: return Response()
- async on_message_impl(message: Any, ctx: MessageContext) Any | None [source]#
Handle a message by routing it to the appropriate message handler. Do not override this method in subclasses. Instead, add message handlers as methods decorated with either the
event()
orrpc()
decorator.
- async on_unhandled_message(message: Any, ctx: MessageContext) None [source]#
Called when a message is received that does not have a matching message handler. The default implementation logs an info message.
- class ClosureAgent(description: str, closure: Callable[[ClosureContext, T, MessageContext], Awaitable[Any]], *, unknown_type_policy: Literal['error', 'warn', 'ignore'] = 'warn')[source]#
Bases:
BaseAgent
,ClosureContext
- property metadata: AgentMetadata#
Metadata of the agent.
- property runtime: AgentRuntime#
- async on_message_impl(message: Any, ctx: MessageContext) Any [source]#
- async save_state() Mapping[str, Any] [source]#
Closure agents do not have state. So this method always returns an empty dictionary.
- async load_state(state: Mapping[str, Any]) None [source]#
Closure agents do not have state. So this method does nothing.
- async classmethod register_closure(runtime: AgentRuntime, type: str, closure: Callable[[ClosureContext, T, MessageContext], Awaitable[Any]], *, unknown_type_policy: Literal['error', 'warn', 'ignore'] = 'warn', skip_direct_message_subscription: bool = False, description: str = '', subscriptions: Callable[[], list[Subscription] | Awaitable[list[Subscription]]] | None = None) AgentType [source]#
The closure agent allows you to define an agent using a closure, or function without needing to define a class. It allows values to be extracted out of the runtime.
The closure can define the type of message which is expected, or Any can be used to accept any type of message.
Example:
import asyncio from autogen_core import SingleThreadedAgentRuntime, MessageContext, ClosureAgent, ClosureContext from dataclasses import dataclass from autogen_core._default_subscription import DefaultSubscription from autogen_core._default_topic import DefaultTopicId @dataclass class MyMessage: content: str async def main(): queue = asyncio.Queue[MyMessage]() async def output_result(_ctx: ClosureContext, message: MyMessage, ctx: MessageContext) -> None: await queue.put(message) runtime = SingleThreadedAgentRuntime() await ClosureAgent.register_closure( runtime, "output_result", output_result, subscriptions=lambda: [DefaultSubscription()] ) runtime.start() await runtime.publish_message(MyMessage("Hello, world!"), DefaultTopicId()) await runtime.stop_when_idle() result = await queue.get() print(result) asyncio.run(main())
- Parameters:
runtime (AgentRuntime) – Runtime to register the agent to
type (str) – Agent type of registered agent
closure (Callable[[ClosureContext, T, MessageContext], Awaitable[Any]]) – Closure to handle messages
unknown_type_policy (Literal["error", "warn", "ignore"], optional) – What to do if a type is encountered that does not match the closure type. Defaults to “warn”.
skip_direct_message_subscription (bool, optional) – Do not add direct message subscription for this agent. Defaults to False.
description (str, optional) – Description of what agent does. Defaults to “”.
subscriptions (Callable[[], list[Subscription] | Awaitable[list[Subscription]]] | None, optional) – List of subscriptions for this closure agent. Defaults to None.
- Returns:
AgentType – Type of the agent that was registered
- class ClosureContext(*args, **kwargs)[source]#
Bases:
Protocol
- message_handler(func: None | Callable[[AgentT, ReceivesT, MessageContext], Coroutine[Any, Any, ProducesT]] = None, *, strict: bool = True, match: None | Callable[[ReceivesT, MessageContext], bool] = None) Callable[[Callable[[AgentT, ReceivesT, MessageContext], Coroutine[Any, Any, ProducesT]]], MessageHandler[AgentT, ReceivesT, ProducesT]] | MessageHandler[AgentT, ReceivesT, ProducesT] [source]#
Decorator for generic message handlers.
Add this decorator to methods in a
RoutedAgent
class that are intended to handle both event and RPC messages. These methods must have a specific signature that needs to be followed for it to be valid:The method must be an async method.
The method must be decorated with the @message_handler decorator.
- The method must have exactly 3 arguments:
self
message: The message to be handled, this must be type-hinted with the message type that it is intended to handle.
ctx: A
autogen_core.MessageContext
object.
The method must be type hinted with what message types it can return as a response, or it can return None if it does not return anything.
Handlers can handle more than one message type by accepting a Union of the message types. It can also return more than one message type by returning a Union of the message types.
- Parameters:
func – The function to be decorated.
strict – If True, the handler will raise an exception if the message type or return type is not in the target types. If False, it will log a warning instead.
match – A function that takes the message and the context as arguments and returns a boolean. This is used for secondary routing after the message type. For handlers addressing the same message type, the match function is applied in alphabetical order of the handlers and the first matching handler will be called while the rest are skipped. If None, the first handler in alphabetical order matching the same message type will be called.
- event(func: None | Callable[[AgentT, ReceivesT, MessageContext], Coroutine[Any, Any, None]] = None, *, strict: bool = True, match: None | Callable[[ReceivesT, MessageContext], bool] = None) Callable[[Callable[[AgentT, ReceivesT, MessageContext], Coroutine[Any, Any, None]]], MessageHandler[AgentT, ReceivesT, None]] | MessageHandler[AgentT, ReceivesT, None] [source]#
Decorator for event message handlers.
Add this decorator to methods in a
RoutedAgent
class that are intended to handle event messages. These methods must have a specific signature that needs to be followed for it to be valid:The method must be an async method.
The method must be decorated with the @message_handler decorator.
- The method must have exactly 3 arguments:
self
message: The event message to be handled, this must be type-hinted with the message type that it is intended to handle.
ctx: A
autogen_core.MessageContext
object.
The method must return None.
Handlers can handle more than one message type by accepting a Union of the message types.
- Parameters:
func – The function to be decorated.
strict – If True, the handler will raise an exception if the message type is not in the target types. If False, it will log a warning instead.
match – A function that takes the message and the context as arguments and returns a boolean. This is used for secondary routing after the message type. For handlers addressing the same message type, the match function is applied in alphabetical order of the handlers and the first matching handler will be called while the rest are skipped. If None, the first handler in alphabetical order matching the same message type will be called.
- rpc(func: None | Callable[[AgentT, ReceivesT, MessageContext], Coroutine[Any, Any, ProducesT]] = None, *, strict: bool = True, match: None | Callable[[ReceivesT, MessageContext], bool] = None) Callable[[Callable[[AgentT, ReceivesT, MessageContext], Coroutine[Any, Any, ProducesT]]], MessageHandler[AgentT, ReceivesT, ProducesT]] | MessageHandler[AgentT, ReceivesT, ProducesT] [source]#
Decorator for RPC message handlers.
Add this decorator to methods in a
RoutedAgent
class that are intended to handle RPC messages. These methods must have a specific signature that needs to be followed for it to be valid:The method must be an async method.
The method must be decorated with the @message_handler decorator.
- The method must have exactly 3 arguments:
self
message: The message to be handled, this must be type-hinted with the message type that it is intended to handle.
ctx: A
autogen_core.MessageContext
object.
The method must be type hinted with what message types it can return as a response, or it can return None if it does not return anything.
Handlers can handle more than one message type by accepting a Union of the message types. It can also return more than one message type by returning a Union of the message types.
- Parameters:
func – The function to be decorated.
strict – If True, the handler will raise an exception if the message type or return type is not in the target types. If False, it will log a warning instead.
match – A function that takes the message and the context as arguments and returns a boolean. This is used for secondary routing after the message type. For handlers addressing the same message type, the match function is applied in alphabetical order of the handlers and the first matching handler will be called while the rest are skipped. If None, the first handler in alphabetical order matching the same message type will be called.
- class TypeSubscription(topic_type: str, agent_type: str | AgentType)[source]#
Bases:
Subscription
This subscription matches on topics based on the type and maps to agents using the source of the topic as the agent key.
This subscription causes each source to have its own agent instance.
Example
from autogen_core import TypeSubscription subscription = TypeSubscription(topic_type="t1", agent_type="a1")
In this case:
A topic_id with type t1 and source s1 will be handled by an agent of type a1 with key s1
A topic_id with type t1 and source s2 will be handled by an agent of type a1 with key s2.
- Parameters:
- property id: str#
Get the ID of the subscription.
Implementations should return a unique ID for the subscription. Usually this is a UUID.
- Returns:
str – ID of the subscription.
- is_match(topic_id: TopicId) bool [source]#
Check if a given topic_id matches the subscription.
- Parameters:
topic_id (TopicId) – TopicId to check.
- Returns:
bool – True if the topic_id matches the subscription, False otherwise.
- map_to_agent(topic_id: TopicId) AgentId [source]#
Map a topic_id to an agent. Should only be called if is_match returns True for the given topic_id.
- Parameters:
topic_id (TopicId) – TopicId to map.
- Returns:
AgentId – ID of the agent that should handle the topic_id.
- Raises:
CantHandleException – If the subscription cannot handle the topic_id.
- class DefaultSubscription(topic_type: str = 'default', agent_type: str | AgentType | None = None)[source]#
Bases:
TypeSubscription
The default subscription is designed to be a sensible default for applications that only need global scope for agents.
This topic by default uses the “default” topic type and attempts to detect the agent type to use based on the instantiation context.
- class DefaultTopicId(type: str = 'default', source: str | None = None)[source]#
Bases:
TopicId
DefaultTopicId provides a sensible default for the topic_id and source fields of a TopicId.
If created in the context of a message handler, the source will be set to the agent_id of the message handler, otherwise it will be set to “default”.
- Parameters:
type (str, optional) – Topic type to publish message to. Defaults to “default”.
source (str | None, optional) – Topic source to publish message to. If None, the source will be set to the agent_id of the message handler if in the context of a message handler, otherwise it will be set to “default”. Defaults to None.
- default_subscription(cls: Type[BaseAgentType] | None = None) Callable[[Type[BaseAgentType]], Type[BaseAgentType]] | Type[BaseAgentType] [source]#
- class TypePrefixSubscription(topic_type_prefix: str, agent_type: str | AgentType)[source]#
Bases:
Subscription
This subscription matches on topics based on a prefix of the type and maps to agents using the source of the topic as the agent key.
This subscription causes each source to have its own agent instance.
Example
from autogen_core import TypePrefixSubscription subscription = TypePrefixSubscription(topic_type_prefix="t1", agent_type="a1")
In this case:
A topic_id with type t1 and source s1 will be handled by an agent of type a1 with key s1
A topic_id with type t1 and source s2 will be handled by an agent of type a1 with key s2.
A topic_id with type t1SUFFIX and source s2 will be handled by an agent of type a1 with key s2.
- Parameters:
- property id: str#
Get the ID of the subscription.
Implementations should return a unique ID for the subscription. Usually this is a UUID.
- Returns:
str – ID of the subscription.
- is_match(topic_id: TopicId) bool [source]#
Check if a given topic_id matches the subscription.
- Parameters:
topic_id (TopicId) – TopicId to check.
- Returns:
bool – True if the topic_id matches the subscription, False otherwise.
- map_to_agent(topic_id: TopicId) AgentId [source]#
Map a topic_id to an agent. Should only be called if is_match returns True for the given topic_id.
- Parameters:
topic_id (TopicId) – TopicId to map.
- Returns:
AgentId – ID of the agent that should handle the topic_id.
- Raises:
CantHandleException – If the subscription cannot handle the topic_id.
- JSON_DATA_CONTENT_TYPE = 'application/json'#
The content type for JSON data.
- PROTOBUF_DATA_CONTENT_TYPE = 'application/x-protobuf'#
The content type for Protobuf data.
- class SingleThreadedAgentRuntime(*, intervention_handlers: List[InterventionHandler] | None = None, tracer_provider: TracerProvider | None = None)[source]#
Bases:
AgentRuntime
- async send_message(message: Any, recipient: AgentId, *, sender: AgentId | None = None, cancellation_token: CancellationToken | None = None) Any [source]#
Send a message to an agent and get a response.
- Parameters:
message (Any) – The message to send.
recipient (AgentId) – The agent to send the message to.
sender (AgentId | None, optional) – Agent which sent the message. Should only be None if this was sent from no agent, such as directly to the runtime externally. Defaults to None.
cancellation_token (CancellationToken | None, optional) – Token used to cancel an in progress . Defaults to None.
- Raises:
CantHandleException – If the recipient cannot handle the message.
UndeliverableException – If the message cannot be delivered.
Other – Any other exception raised by the recipient.
- Returns:
Any – The response from the agent.
- async publish_message(message: Any, topic_id: TopicId, *, sender: AgentId | None = None, cancellation_token: CancellationToken | None = None, message_id: str | None = None) None [source]#
Publish a message to all agents in the given namespace, or if no namespace is provided, the namespace of the sender.
No responses are expected from publishing.
- Parameters:
message (Any) – The message to publish.
topic (TopicId) – The topic to publish the message to.
sender (AgentId | None, optional) – The agent which sent the message. Defaults to None.
cancellation_token (CancellationToken | None, optional) – Token used to cancel an in progress. Defaults to None.
message_id (str | None, optional) – The message id. If None, a new message id will be generated. Defaults to None. This message id must be unique. and is recommended to be a UUID.
- Raises:
UndeliverableException – If the message cannot be delivered.
- async save_state() Mapping[str, Any] [source]#
Save the state of the entire runtime, including all hosted agents. The only way to restore the state is to pass it to
load_state()
.The structure of the state is implementation defined and can be any JSON serializable object.
- Returns:
Mapping[str, Any] – The saved state.
- async load_state(state: Mapping[str, Any]) None [source]#
Load the state of the entire runtime, including all hosted agents. The state should be the same as the one returned by
save_state()
.- Parameters:
state (Mapping[str, Any]) – The saved state.
- start() None [source]#
Start the runtime message processing loop. This runs in a background task.
Example:
from autogen_core import SingleThreadedAgentRuntime runtime = SingleThreadedAgentRuntime() runtime.start()
- async stop() None [source]#
Immediately stop the runtime message processing loop. The currently processing message will be completed, but all others following it will be discarded.
- async stop_when_idle() None [source]#
Stop the runtime message processing loop when there is no outstanding message being processed or queued. This is the most common way to stop the runtime.
- async stop_when(condition: Callable[[], bool]) None [source]#
Stop the runtime message processing loop when the condition is met.
Caution
This method is not recommended to be used, and is here for legacy reasons. It will spawn a busy loop to continually check the condition. It is much more efficient to call stop_when_idle or stop instead. If you need to stop the runtime based on a condition, consider using a background task and asyncio.Event to signal when the condition is met and the background task should call stop.
- async agent_metadata(agent: AgentId) AgentMetadata [source]#
Get the metadata for an agent.
- Parameters:
agent (AgentId) – The agent id.
- Returns:
AgentMetadata – The agent metadata.
- async agent_save_state(agent: AgentId) Mapping[str, Any] [source]#
Save the state of a single agent.
The structure of the state is implementation defined and can be any JSON serializable object.
- Parameters:
agent (AgentId) – The agent id.
- Returns:
Mapping[str, Any] – The saved state.
- async agent_load_state(agent: AgentId, state: Mapping[str, Any]) None [source]#
Load the state of a single agent.
- async register_factory(type: str | AgentType, agent_factory: Callable[[], T | Awaitable[T]], *, expected_class: type[T] | None = None) AgentType [source]#
Register an agent factory with the runtime associated with a specific type. The type must be unique. This API does not add any subscriptions.
Note
This is a low level API and usually the agent class’s register method should be used instead, as this also handles subscriptions automatically.
Example:
from dataclasses import dataclass from autogen_core import AgentRuntime, MessageContext, RoutedAgent, event from autogen_core.models import UserMessage @dataclass class MyMessage: content: str class MyAgent(RoutedAgent): def __init__(self) -> None: super().__init__("My core agent") @event async def handler(self, message: UserMessage, context: MessageContext) -> None: print("Event received: ", message.content) async def my_agent_factory(): return MyAgent() async def main() -> None: runtime: AgentRuntime = ... # type: ignore await runtime.register_factory("my_agent", lambda: MyAgent()) import asyncio asyncio.run(main())
- Parameters:
type (str) – The type of agent this factory creates. It is not the same as agent class name. The type parameter is used to differentiate between different factory functions rather than agent classes.
agent_factory (Callable[[], T]) – The factory that creates the agent, where T is a concrete Agent type. Inside the factory, use autogen_core.AgentInstantiationContext to access variables like the current runtime and agent ID.
expected_class (type[T] | None, optional) – The expected class of the agent, used for runtime validation of the factory. Defaults to None.
- async try_get_underlying_agent_instance(id: ~autogen_core._agent_id.AgentId, type: ~typing.Type[~autogen_core._single_threaded_agent_runtime.T] = <class 'autogen_core._agent.Agent'>) T [source]#
Try to get the underlying agent instance by name and namespace. This is generally discouraged (hence the long name), but can be useful in some cases.
If the underlying agent is not accessible, this will raise an exception.
- Parameters:
id (AgentId) – The agent id.
type (Type[T], optional) – The expected type of the agent. Defaults to Agent.
- Returns:
T – The concrete agent instance.
- Raises:
LookupError – If the agent is not found.
NotAccessibleError – If the agent is not accessible, for example if it is located remotely.
TypeError – If the agent is not of the expected type.
- async add_subscription(subscription: Subscription) None [source]#
Add a new subscription that the runtime should fulfill when processing published messages
- Parameters:
subscription (Subscription) – The subscription to add
- async remove_subscription(id: str) None [source]#
Remove a subscription from the runtime
- Parameters:
id (str) – id of the subscription to remove
- Raises:
LookupError – If the subscription does not exist
- async get(id_or_type: AgentId | AgentType | str, /, key: str = 'default', *, lazy: bool = True) AgentId [source]#
- add_message_serializer(serializer: MessageSerializer[Any] | Sequence[MessageSerializer[Any]]) None [source]#
Add a new message serialization serializer to the runtime
Note: This will deduplicate serializers based on the type_name and data_content_type properties
- Parameters:
serializer (MessageSerializer[Any] | Sequence[MessageSerializer[Any]]) – The serializer/s to add
- ROOT_LOGGER_NAME = 'autogen_core'#
The name of the root logger.
- EVENT_LOGGER_NAME = 'autogen_core.events'#
The name of the logger used for structured events.
- TRACE_LOGGER_NAME = 'autogen_core.trace'#
Logger name used for developer intended trace logging. The content and format of this log should not be depended upon.
- class Component(*args, **kwargs)[source]#
Bases:
ComponentConfigImpl
[ConfigT
],ComponentLoader
,Generic
[ConfigT
]To create a component class, inherit from this class. Then implement two class variables:
component_config_schema
- A Pydantic model class which represents the configuration of the component. This is also the type parameter of Component.component_type
- What is the logical type of the component.
Example:
from __future__ import annotations from pydantic import BaseModel from autogen_core import Component class Config(BaseModel): value: str class MyComponent(Component[Config]): component_type = "custom" component_config_schema = Config def __init__(self, value: str): self.value = value def _to_config(self) -> Config: return Config(value=self.value) @classmethod def _from_config(cls, config: Config) -> MyComponent: return cls(value=config.value)
- required_class_vars = ['component_config_schema', 'component_type']#
- dump_component() ComponentModel [source]#
Dump the component to a model that can be loaded back in.
- Raises:
TypeError – If the component is a local class.
- Returns:
ComponentModel – The model representing the component.
- classmethod _from_config_past_version(config: Dict[str, Any], version: int) Self [source]#
Create a new instance of the component from a previous version of the configuration object.
This is only called when the version of the configuration object is less than the current version, since in this case the schema is not known.
- class ComponentLoader[source]#
Bases:
object
- classmethod load_component(model: ComponentModel | Dict[str, Any], expected: None = None) Self [source]#
- classmethod load_component(model: ComponentModel | Dict[str, Any], expected: Type[ExpectedType]) ExpectedType
Load a component from a model. Intended to be used with the return type of
autogen_core.ComponentConfig.dump_component()
.Example
from autogen_core import ComponentModel from autogen_core.models import ChatCompletionClient component: ComponentModel = ... # type: ignore model_client = ChatCompletionClient.load_component(component)
- Parameters:
model (ComponentModel) – The model to load the component from.
model – _description_
expected (Type[ExpectedType] | None, optional) – Explicit type only if used directly on ComponentLoader. Defaults to None.
- Returns:
Self – The loaded component.
- Raises:
ValueError – If the provider string is invalid.
TypeError – Provider is not a subclass of ComponentConfigImpl, or the expected type does not match.
- Returns:
Self | ExpectedType – The loaded component.
- class ComponentConfigImpl(*args, **kwargs)[source]#
Bases:
Protocol
[ConfigT
]- component_config_schema: Type[ConfigT]#
The Pydantic model class which represents the configuration of the component.
- component_type: ClassVar[Literal['model', 'agent', 'tool', 'termination', 'token_provider'] | str]#
The logical type of the component.
- component_version: ClassVar[int] = 1#
The version of the component, if schema incompatibilities are introduced this should be updated.
- component_provider_override: ClassVar[str | None] = None#
Override the provider string for the component. This should be used to prevent internal module names being a part of the module name.
- _to_config() ConfigT [source]#
Dump the configuration that would be requite to create a new instance of a component matching the configuration of this instance.
- Returns:
T – The configuration of the component.
- classmethod _from_config(config: ConfigT) Self [source]#
Create a new instance of the component from a configuration object.
- Parameters:
config (T) – The configuration object.
- Returns:
Self – The new instance of the component.
- classmethod _from_config_past_version(config: Dict[str, Any], version: int) Self [source]#
Create a new instance of the component from a previous version of the configuration object.
This is only called when the version of the configuration object is less than the current version, since in this case the schema is not known.
- pydantic model ComponentModel[source]#
Bases:
BaseModel
Model class for a component. Contains all information required to instantiate a component.
Show JSON schema
{ "title": "ComponentModel", "description": "Model class for a component. Contains all information required to instantiate a component.", "type": "object", "properties": { "provider": { "title": "Provider", "type": "string" }, "component_type": { "anyOf": [ { "enum": [ "model", "agent", "tool", "termination", "token_provider" ], "type": "string" }, { "type": "string" }, { "type": "null" } ], "default": null, "title": "Component Type" }, "version": { "anyOf": [ { "type": "integer" }, { "type": "null" } ], "default": null, "title": "Version" }, "component_version": { "anyOf": [ { "type": "integer" }, { "type": "null" } ], "default": null, "title": "Component Version" }, "description": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "title": "Description" }, "config": { "title": "Config", "type": "object" } }, "required": [ "provider", "config" ] }
- Fields:
component_type (Literal['model', 'agent', 'tool', 'termination', 'token_provider'] | str | None)
component_version (int | None)
config (dict[str, Any])
description (str | None)
provider (str)
version (int | None)
- field component_type: ComponentType | None = None#
Logical type of the component. If missing, the component assumes the default type of the provider.
- field version: int | None = None#
Version of the component specification. If missing, the component assumes whatever is the current version of the library used to load it. This is obviously dangerous and should be used for user authored ephmeral config. For all other configs version should be specified.
- field component_version: int | None = None#
Version of the component. If missing, the component assumes the default version of the provider.
- field config: dict[str, Any] [Required]#
The schema validated config field is passed to a given class’s implmentation of
autogen_core.ComponentConfigImpl._from_config()
to create a new instance of the component class.
- class InterventionHandler(*args, **kwargs)[source]#
Bases:
Protocol
An intervention handler is a class that can be used to modify, log or drop messages that are being processed by the
autogen_core.base.AgentRuntime
.Note: Returning None from any of the intervention handler methods will result in a warning being issued and treated as “no change”. If you intend to drop a message, you should return
DropMessage
explicitly.
- class DefaultInterventionHandler(*args, **kwargs)[source]#
Bases:
InterventionHandler
Simple class that provides a default implementation for all intervention handler methods, that simply returns the message unchanged. Allows for easy subclassing to override only the desired methods.