autogen_core.application#
The autogen_core.application
module provides implementations of core components that are used to compose an application
- class autogen_core.application.SingleThreadedAgentRuntime(*, intervention_handlers: List[InterventionHandler] | None = None, tracer_provider: TracerProvider | None = None)[source]#
Bases:
AgentRuntime
- add_message_serializer(serializer: MessageSerializer[Any] | Sequence[MessageSerializer[Any]]) None [source]#
Add a new message serialization serializer to the runtime
Note: This will deduplicate serializers based on the type_name and data_content_type properties
- Parameters:
serializer (MessageSerializer[Any] | Sequence[MessageSerializer[Any]]) – The serializer/s to add
- async add_subscription(subscription: Subscription) None [source]#
Add a new subscription that the runtime should fulfill when processing published messages
- Parameters:
subscription (Subscription) – The subscription to add
- async agent_load_state(agent: AgentId, state: Mapping[str, Any]) None [source]#
Load the state of a single agent.
- async agent_metadata(agent: AgentId) AgentMetadata [source]#
Get the metadata for an agent.
- Parameters:
agent (AgentId) – The agent id.
- Returns:
AgentMetadata – The agent metadata.
- async agent_save_state(agent: AgentId) Mapping[str, Any] [source]#
Save the state of a single agent.
The structure of the state is implementation defined and can be any JSON serializable object.
- Parameters:
agent (AgentId) – The agent id.
- Returns:
Mapping[str, Any] – The saved state.
- async get(id_or_type: AgentId | AgentType | str, /, key: str = 'default', *, lazy: bool = True) AgentId [source]#
- async load_state(state: Mapping[str, Any]) None [source]#
Load the state of the entire runtime, including all hosted agents. The state should be the same as the one returned by
save_state()
.- Parameters:
state (Mapping[str, Any]) – The saved state.
- async publish_message(message: Any, topic_id: TopicId, *, sender: AgentId | None = None, cancellation_token: CancellationToken | None = None) None [source]#
Publish a message to all agents in the given namespace, or if no namespace is provided, the namespace of the sender.
No responses are expected from publishing.
- Parameters:
message (Any) – The message to publish.
topic (TopicId) – The topic to publish the message to.
sender (AgentId | None, optional) – The agent which sent the message. Defaults to None.
cancellation_token (CancellationToken | None, optional) – Token used to cancel an in progress . Defaults to None.
- Raises:
UndeliverableException – If the message cannot be delivered.
- register(type: str, agent_factory: Callable[[], T | Awaitable[T]] | Callable[[AgentRuntime, AgentId], T | Awaitable[T]], subscriptions: Callable[[], list[Subscription] | Awaitable[list[Subscription]]] | list[Subscription] | None = None) AgentType [source]#
Register an agent factory with the runtime associated with a specific type. The type must be unique.
- Parameters:
type (str) – The type of agent this factory creates. It is not the same as agent class name. The type parameter is used to differentiate between different factory functions rather than agent classes.
agent_factory (Callable[[], T]) – The factory that creates the agent, where T is a concrete Agent type. Inside the factory, use autogen_core.base.AgentInstantiationContext to access variables like the current runtime and agent ID.
subscriptions (Callable[[], list[Subscription]] | list[Subscription] | None, optional) – The subscriptions that the agent should be subscribed to. Defaults to None.
Example
runtime.register( "chat_agent", lambda: ChatCompletionAgent( description="A generic chat agent.", system_messages=[SystemMessage("You are a helpful assistant")], model_client=OpenAIChatCompletionClient(model="gpt-4o"), memory=BufferedChatMemory(buffer_size=10), ), )
- async register_factory(*, type: AgentType, agent_factory: Callable[[], T | Awaitable[T]], expected_class: type[T]) AgentType [source]#
Register an agent factory with the runtime associated with a specific type. The type must be unique.
- Parameters:
type (str) – The type of agent this factory creates. It is not the same as agent class name. The type parameter is used to differentiate between different factory functions rather than agent classes.
agent_factory (Callable[[], T]) – The factory that creates the agent, where T is a concrete Agent type. Inside the factory, use autogen_core.base.AgentInstantiationContext to access variables like the current runtime and agent ID.
Example
runtime.register( "chat_agent", lambda: ChatCompletionAgent( description="A generic chat agent.", system_messages=[SystemMessage("You are a helpful assistant")], model_client=OpenAIChatCompletionClient(model="gpt-4o"), memory=BufferedChatMemory(buffer_size=10), ), )
- async remove_subscription(id: str) None [source]#
Remove a subscription from the runtime
- Parameters:
id (str) – id of the subscription to remove
- Raises:
LookupError – If the subscription does not exist
- async save_state() Mapping[str, Any] [source]#
Save the state of the entire runtime, including all hosted agents. The only way to restore the state is to pass it to
load_state()
.The structure of the state is implementation defined and can be any JSON serializable object.
- Returns:
Mapping[str, Any] – The saved state.
- async send_message(message: Any, recipient: AgentId, *, sender: AgentId | None = None, cancellation_token: CancellationToken | None = None) Any [source]#
Send a message to an agent and get a response.
- Parameters:
message (Any) – The message to send.
recipient (AgentId) – The agent to send the message to.
sender (AgentId | None, optional) – Agent which sent the message. Should only be None if this was sent from no agent, such as directly to the runtime externally. Defaults to None.
cancellation_token (CancellationToken | None, optional) – Token used to cancel an in progress . Defaults to None.
- Raises:
CantHandleException – If the recipient cannot handle the message.
UndeliverableException – If the message cannot be delivered.
Other – Any other exception raised by the recipient.
- Returns:
Any – The response from the agent.
- async stop_when(condition: Callable[[], bool]) None [source]#
Stop the runtime message processing loop when the condition is met.
- async stop_when_idle() None [source]#
Stop the runtime message processing loop when there is no outstanding message being processed or queued.
- async try_get_underlying_agent_instance(id: ~autogen_core.base._agent_id.AgentId, type: ~typing.Type[~autogen_core.application._single_threaded_agent_runtime.T] = <class 'autogen_core.base._agent.Agent'>) T [source]#
Try to get the underlying agent instance by name and namespace. This is generally discouraged (hence the long name), but can be useful in some cases.
If the underlying agent is not accessible, this will raise an exception.
- Parameters:
id (AgentId) – The agent id.
type (Type[T], optional) – The expected type of the agent. Defaults to Agent.
- Returns:
T – The concrete agent instance.
- Raises:
LookupError – If the agent is not found.
NotAccessibleError – If the agent is not accessible, for example if it is located remotely.
TypeError – If the agent is not of the expected type.
- class autogen_core.application.WorkerAgentRuntime(host_address: str, tracer_provider: TracerProvider | None = None, extra_grpc_config: Sequence[Tuple[str, Any]] | None = None)[source]#
Bases:
AgentRuntime
- add_message_serializer(serializer: MessageSerializer[Any] | Sequence[MessageSerializer[Any]]) None [source]#
Add a new message serialization serializer to the runtime
Note: This will deduplicate serializers based on the type_name and data_content_type properties
- Parameters:
serializer (MessageSerializer[Any] | Sequence[MessageSerializer[Any]]) – The serializer/s to add
- async add_subscription(subscription: Subscription) None [source]#
Add a new subscription that the runtime should fulfill when processing published messages
- Parameters:
subscription (Subscription) – The subscription to add
- async agent_load_state(agent: AgentId, state: Mapping[str, Any]) None [source]#
Load the state of a single agent.
- async agent_metadata(agent: AgentId) AgentMetadata [source]#
Get the metadata for an agent.
- Parameters:
agent (AgentId) – The agent id.
- Returns:
AgentMetadata – The agent metadata.
- async agent_save_state(agent: AgentId) Mapping[str, Any] [source]#
Save the state of a single agent.
The structure of the state is implementation defined and can be any JSON serializable object.
- Parameters:
agent (AgentId) – The agent id.
- Returns:
Mapping[str, Any] – The saved state.
- async get(id_or_type: AgentId | AgentType | str, /, key: str = 'default', *, lazy: bool = True) AgentId [source]#
- async load_state(state: Mapping[str, Any]) None [source]#
Load the state of the entire runtime, including all hosted agents. The state should be the same as the one returned by
save_state()
.- Parameters:
state (Mapping[str, Any]) – The saved state.
- async publish_message(message: Any, topic_id: TopicId, *, sender: AgentId | None = None, cancellation_token: CancellationToken | None = None) None [source]#
Publish a message to all agents in the given namespace, or if no namespace is provided, the namespace of the sender.
No responses are expected from publishing.
- Parameters:
message (Any) – The message to publish.
topic (TopicId) – The topic to publish the message to.
sender (AgentId | None, optional) – The agent which sent the message. Defaults to None.
cancellation_token (CancellationToken | None, optional) – Token used to cancel an in progress . Defaults to None.
- Raises:
UndeliverableException – If the message cannot be delivered.
- register(type: str, agent_factory: Callable[[], T | Awaitable[T]], subscriptions: Callable[[], list[Subscription] | Awaitable[list[Subscription]]] | list[Subscription] | None = None) AgentType [source]#
Register an agent factory with the runtime associated with a specific type. The type must be unique.
- Parameters:
type (str) – The type of agent this factory creates. It is not the same as agent class name. The type parameter is used to differentiate between different factory functions rather than agent classes.
agent_factory (Callable[[], T]) – The factory that creates the agent, where T is a concrete Agent type. Inside the factory, use autogen_core.base.AgentInstantiationContext to access variables like the current runtime and agent ID.
subscriptions (Callable[[], list[Subscription]] | list[Subscription] | None, optional) – The subscriptions that the agent should be subscribed to. Defaults to None.
Example
runtime.register( "chat_agent", lambda: ChatCompletionAgent( description="A generic chat agent.", system_messages=[SystemMessage("You are a helpful assistant")], model_client=OpenAIChatCompletionClient(model="gpt-4o"), memory=BufferedChatMemory(buffer_size=10), ), )
- async register_factory(*, type: AgentType, agent_factory: Callable[[], T | Awaitable[T]], expected_class: type[T]) AgentType [source]#
Register an agent factory with the runtime associated with a specific type. The type must be unique.
- Parameters:
type (str) – The type of agent this factory creates. It is not the same as agent class name. The type parameter is used to differentiate between different factory functions rather than agent classes.
agent_factory (Callable[[], T]) – The factory that creates the agent, where T is a concrete Agent type. Inside the factory, use autogen_core.base.AgentInstantiationContext to access variables like the current runtime and agent ID.
Example
runtime.register( "chat_agent", lambda: ChatCompletionAgent( description="A generic chat agent.", system_messages=[SystemMessage("You are a helpful assistant")], model_client=OpenAIChatCompletionClient(model="gpt-4o"), memory=BufferedChatMemory(buffer_size=10), ), )
- async remove_subscription(id: str) None [source]#
Remove a subscription from the runtime
- Parameters:
id (str) – id of the subscription to remove
- Raises:
LookupError – If the subscription does not exist
- async save_state() Mapping[str, Any] [source]#
Save the state of the entire runtime, including all hosted agents. The only way to restore the state is to pass it to
load_state()
.The structure of the state is implementation defined and can be any JSON serializable object.
- Returns:
Mapping[str, Any] – The saved state.
- async send_message(message: Any, recipient: AgentId, *, sender: AgentId | None = None, cancellation_token: CancellationToken | None = None) Any [source]#
Send a message to an agent and get a response.
- Parameters:
message (Any) – The message to send.
recipient (AgentId) – The agent to send the message to.
sender (AgentId | None, optional) – Agent which sent the message. Should only be None if this was sent from no agent, such as directly to the runtime externally. Defaults to None.
cancellation_token (CancellationToken | None, optional) – Token used to cancel an in progress . Defaults to None.
- Raises:
CantHandleException – If the recipient cannot handle the message.
UndeliverableException – If the message cannot be delivered.
Other – Any other exception raised by the recipient.
- Returns:
Any – The response from the agent.
- async stop_when_signal(signals: Sequence[Signals] = (Signals.SIGTERM, Signals.SIGINT)) None [source]#
Stop the runtime when a signal is received.
- async try_get_underlying_agent_instance(id: ~autogen_core.base._agent_id.AgentId, type: ~typing.Type[~autogen_core.application._worker_runtime.T] = <class 'autogen_core.base._agent.Agent'>) T [source]#
Try to get the underlying agent instance by name and namespace. This is generally discouraged (hence the long name), but can be useful in some cases.
If the underlying agent is not accessible, this will raise an exception.
- Parameters:
id (AgentId) – The agent id.
type (Type[T], optional) – The expected type of the agent. Defaults to Agent.
- Returns:
T – The concrete agent instance.
- Raises:
LookupError – If the agent is not found.
NotAccessibleError – If the agent is not accessible, for example if it is located remotely.
TypeError – If the agent is not of the expected type.