autogen_agentchat.teams#
This module provides implementation of various pre-defined multi-agent teams. Each team inherits from the BaseGroupChat class.
- class BaseGroupChat(participants: List[ChatAgent], group_chat_manager_name: str, group_chat_manager_class: type[SequentialRoutedAgent], termination_condition: TerminationCondition | None = None, max_turns: int | None = None, runtime: AgentRuntime | None = None, custom_message_types: List[type[BaseAgentEvent | BaseChatMessage]] | None = None, emit_team_events: bool = False)[source]#
Bases:
Team
,ABC
,ComponentBase
[BaseModel
]The base class for group chat teams.
To implement a group chat team, first create a subclass of
BaseGroupChatManager
and then create a subclass ofBaseGroupChat
that uses the group chat manager.- component_type: ClassVar[ComponentType] = 'team'#
The logical type of the component.
- async load_state(state: Mapping[str, Any]) None [source]#
Load an external state and overwrite the current state of the group chat team.
The state is loaded by calling the
agent_load_state()
method on each participant and the group chat manager with their internal agent ID. Seesave_state()
for the expected format of the state.
- async pause() None [source]#
Pause its participants when the team is running by calling their
on_pause()
method via direct RPC calls.Attention
This is an experimental feature introduced in v0.4.9 and may subject to change or removal in the future.
The team must be initialized before it can be paused.
Different from termination, pausing the team does not cause the
run()
orrun_stream()
method to return. It calls theon_pause()
method on each participant, and if the participant does not implement the method, it will be a no-op.Note
It is the responsibility of the agent class to handle the pause and ensure that the agent can be resumed later. Make sure to implement the
on_pause()
method in your agent class for custom pause behavior. By default, the agent will not do anything when called.- Raises:
RuntimeError – If the team has not been initialized. Exceptions from the participants when calling their implementations of
on_pause
are propagated to this method and raised.
- async reset() None [source]#
Reset the team and its participants to their initial state.
The team must be stopped before it can be reset.
- Raises:
RuntimeError – If the team has not been initialized or is currently running.
Example using the
RoundRobinGroupChat
team:import asyncio from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.conditions import MaxMessageTermination from autogen_agentchat.teams import RoundRobinGroupChat from autogen_ext.models.openai import OpenAIChatCompletionClient async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") agent1 = AssistantAgent("Assistant1", model_client=model_client) agent2 = AssistantAgent("Assistant2", model_client=model_client) termination = MaxMessageTermination(3) team = RoundRobinGroupChat([agent1, agent2], termination_condition=termination) stream = team.run_stream(task="Count from 1 to 10, respond one at a time.") async for message in stream: print(message) # Reset the team. await team.reset() stream = team.run_stream(task="Count from 1 to 10, respond one at a time.") async for message in stream: print(message) asyncio.run(main())
- async resume() None [source]#
Resume its participants when the team is running and paused by calling their
on_resume()
method via direct RPC calls.Attention
This is an experimental feature introduced in v0.4.9 and may subject to change or removal in the future.
The team must be initialized before it can be resumed.
Different from termination and restart with a new task, resuming the team does not cause the
run()
orrun_stream()
method to return. It calls theon_resume()
method on each participant, and if the participant does not implement the method, it will be a no-op.Note
It is the responsibility of the agent class to handle the resume and ensure that the agent continues from where it was paused. Make sure to implement the
on_resume()
method in your agent class for custom resume behavior.- Raises:
RuntimeError – If the team has not been initialized. Exceptions from the participants when calling their implementations of
on_resume
method are propagated to this method and raised.
- async run(*, task: str | BaseChatMessage | Sequence[BaseChatMessage] | None = None, cancellation_token: CancellationToken | None = None) TaskResult [source]#
Run the team and return the result. The base implementation uses
run_stream()
to run the team and then returns the final result. Once the team is stopped, the termination condition is reset.- Parameters:
task (str | BaseChatMessage | Sequence[BaseChatMessage] | None) – The task to run the team with. Can be a string, a single
BaseChatMessage
, or a list ofBaseChatMessage
.cancellation_token (CancellationToken | None) – The cancellation token to kill the task immediately. Setting the cancellation token potentially put the team in an inconsistent state, and it may not reset the termination condition. To gracefully stop the team, use
ExternalTermination
instead.
- Returns:
result – The result of the task as
TaskResult
. The result contains the messages produced by the team and the stop reason.
Example using the
RoundRobinGroupChat
team:import asyncio from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.conditions import MaxMessageTermination from autogen_agentchat.teams import RoundRobinGroupChat from autogen_ext.models.openai import OpenAIChatCompletionClient async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") agent1 = AssistantAgent("Assistant1", model_client=model_client) agent2 = AssistantAgent("Assistant2", model_client=model_client) termination = MaxMessageTermination(3) team = RoundRobinGroupChat([agent1, agent2], termination_condition=termination) result = await team.run(task="Count from 1 to 10, respond one at a time.") print(result) # Run the team again without a task to continue the previous task. result = await team.run() print(result) asyncio.run(main())
Example using the
CancellationToken
to cancel the task:import asyncio from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.conditions import MaxMessageTermination from autogen_agentchat.teams import RoundRobinGroupChat from autogen_core import CancellationToken from autogen_ext.models.openai import OpenAIChatCompletionClient async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") agent1 = AssistantAgent("Assistant1", model_client=model_client) agent2 = AssistantAgent("Assistant2", model_client=model_client) termination = MaxMessageTermination(3) team = RoundRobinGroupChat([agent1, agent2], termination_condition=termination) cancellation_token = CancellationToken() # Create a task to run the team in the background. run_task = asyncio.create_task( team.run( task="Count from 1 to 10, respond one at a time.", cancellation_token=cancellation_token, ) ) # Wait for 1 second and then cancel the task. await asyncio.sleep(1) cancellation_token.cancel() # This will raise a cancellation error. await run_task asyncio.run(main())
- async run_stream(*, task: str | BaseChatMessage | Sequence[BaseChatMessage] | None = None, cancellation_token: CancellationToken | None = None) AsyncGenerator[BaseAgentEvent | BaseChatMessage | TaskResult, None] [source]#
Run the team and produces a stream of messages and the final result of the type
TaskResult
as the last item in the stream. Once the team is stopped, the termination condition is reset.Note
If an agent produces
ModelClientStreamingChunkEvent
, the message will be yielded in the stream but it will not be included in themessages
.- Parameters:
task (str | BaseChatMessage | Sequence[BaseChatMessage] | None) – The task to run the team with. Can be a string, a single
BaseChatMessage
, or a list ofBaseChatMessage
.cancellation_token (CancellationToken | None) – The cancellation token to kill the task immediately. Setting the cancellation token potentially put the team in an inconsistent state, and it may not reset the termination condition. To gracefully stop the team, use
ExternalTermination
instead.
- Returns:
stream – an
AsyncGenerator
that yieldsBaseAgentEvent
,BaseChatMessage
, and the final resultTaskResult
as the last item in the stream.
Example using the
RoundRobinGroupChat
team:import asyncio from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.conditions import MaxMessageTermination from autogen_agentchat.teams import RoundRobinGroupChat from autogen_ext.models.openai import OpenAIChatCompletionClient async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") agent1 = AssistantAgent("Assistant1", model_client=model_client) agent2 = AssistantAgent("Assistant2", model_client=model_client) termination = MaxMessageTermination(3) team = RoundRobinGroupChat([agent1, agent2], termination_condition=termination) stream = team.run_stream(task="Count from 1 to 10, respond one at a time.") async for message in stream: print(message) # Run the team again without a task to continue the previous task. stream = team.run_stream() async for message in stream: print(message) asyncio.run(main())
Example using the
CancellationToken
to cancel the task:import asyncio from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.conditions import MaxMessageTermination from autogen_agentchat.ui import Console from autogen_agentchat.teams import RoundRobinGroupChat from autogen_core import CancellationToken from autogen_ext.models.openai import OpenAIChatCompletionClient async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") agent1 = AssistantAgent("Assistant1", model_client=model_client) agent2 = AssistantAgent("Assistant2", model_client=model_client) termination = MaxMessageTermination(3) team = RoundRobinGroupChat([agent1, agent2], termination_condition=termination) cancellation_token = CancellationToken() # Create a task to run the team in the background. run_task = asyncio.create_task( Console( team.run_stream( task="Count from 1 to 10, respond one at a time.", cancellation_token=cancellation_token, ) ) ) # Wait for 1 second and then cancel the task. await asyncio.sleep(1) cancellation_token.cancel() # This will raise a cancellation error. await run_task asyncio.run(main())
- async save_state() Mapping[str, Any] [source]#
Save the state of the group chat team.
The state is saved by calling the
agent_save_state()
method on each participant and the group chat manager with their internal agent ID. The state is returned as a nested dictionary: a dictionary with key agent_states, which is a dictionary the agent names as keys and the state as values.{ "agent_states": { "agent1": ..., "agent2": ..., "RoundRobinGroupChatManager": ... } }
Note
Starting v0.4.9, the state is using the agent name as the key instead of the agent ID, and the team_id field is removed from the state. This is to allow the state to be portable across different teams and runtimes. States saved with the old format may not be compatible with the new format in the future.
Caution
When calling
save_state()
on a team while it is running, the state may not be consistent and may result in an unexpected state. It is recommended to call this method when the team is not running or after it is stopped.
- pydantic model DiGraph[source]#
Bases:
BaseModel
Defines a directed graph structure with nodes and edges.
GraphFlow
uses this to determine execution order and conditions.Warning
This is an experimental feature, and the API will change in the future releases.
Show JSON schema
{ "title": "DiGraph", "description": "Defines a directed graph structure with nodes and edges.\n:class:`GraphFlow` uses this to determine execution order and conditions.\n\n.. warning::\n\n This is an experimental feature, and the API will change in the future releases.", "type": "object", "properties": { "nodes": { "additionalProperties": { "$ref": "#/$defs/DiGraphNode" }, "title": "Nodes", "type": "object" }, "default_start_node": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "title": "Default Start Node" } }, "$defs": { "DiGraphEdge": { "description": "Represents a directed edge in a :class:`DiGraph`, with an optional execution condition.\n\n.. warning::\n\n This is an experimental feature, and the API will change in the future releases.", "properties": { "target": { "title": "Target", "type": "string" }, "condition": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "title": "Condition" } }, "required": [ "target" ], "title": "DiGraphEdge", "type": "object" }, "DiGraphNode": { "description": "Represents a node (agent) in a :class:`DiGraph`, with its outgoing edges and activation type.\n\n.. warning::\n\n This is an experimental feature, and the API will change in the future releases.", "properties": { "name": { "title": "Name", "type": "string" }, "edges": { "default": [], "items": { "$ref": "#/$defs/DiGraphEdge" }, "title": "Edges", "type": "array" }, "activation": { "default": "all", "enum": [ "all", "any" ], "title": "Activation", "type": "string" } }, "required": [ "name" ], "title": "DiGraphNode", "type": "object" } }, "required": [ "nodes" ] }
- Fields:
default_start_node (str | None)
nodes (Dict[str, autogen_agentchat.teams._group_chat._graph._digraph_group_chat.DiGraphNode])
- field nodes: Dict[str, DiGraphNode] [Required]#
- get_has_cycles() bool [source]#
Indicates if the graph has at least one cycle (with valid exit conditions).
- has_cycles_with_exit() bool [source]#
Check if the graph has any cycles and validate that each cycle has at least one conditional edge.
- Returns:
bool – True if there is at least one cycle and all cycles have an exit condition. False if there are no cycles.
- Raises:
ValueError – If there is a cycle without any conditional edge.
- class DiGraphBuilder[source]#
Bases:
object
A fluent builder for constructing
DiGraph
execution graphs used inGraphFlow
.Warning
This is an experimental feature, and the API will change in the future releases.
This utility provides a convenient way to programmatically build a graph of agent interactions, including complex execution flows such as:
Sequential chains
Parallel fan-outs
Conditional branching
Cyclic loops with safe exits
Each node in the graph represents an agent. Edges define execution paths between agents, and can optionally be conditioned on message content.
The builder is compatible with the Graph runner and supports both standard and filtered agents.
- - add_node(agent, activation)
Add an agent node to the graph.
- - add_edge(source, target, condition)
Connect two nodes optionally with a condition.
- - add_conditional_edges(source, condition_to_target)
Add multiple conditional edges from a source.
- - set_entry_point(agent)
Define the default start node (optional).
- - build()
Generate a validated DiGraph.
- - get_participants()
Return the list of added agents.
- Example — Sequential Flow A → B → C:
>>> builder = GraphBuilder() >>> builder.add_node(agent_a).add_node(agent_b).add_node(agent_c) >>> builder.add_edge(agent_a, agent_b).add_edge(agent_b, agent_c) >>> team = Graph( ... participants=builder.get_participants(), ... graph=builder.build(), ... termination_condition=MaxMessageTermination(5), ... )
- Example — Parallel Fan-out A → (B, C):
>>> builder = GraphBuilder() >>> builder.add_node(agent_a).add_node(agent_b).add_node(agent_c) >>> builder.add_edge(agent_a, agent_b).add_edge(agent_a, agent_c)
- Example — Conditional Branching A → B (“yes”), A → C (“no”):
>>> builder = GraphBuilder() >>> builder.add_node(agent_a).add_node(agent_b).add_node(agent_c) >>> builder.add_conditional_edges(agent_a, {"yes": agent_b, "no": agent_c})
- Example — Loop: A → B → A (“loop”), B → C (“exit”):
>>> builder = GraphBuilder() >>> builder.add_node(agent_a).add_node(agent_b).add_node(agent_c) >>> builder.add_edge(agent_a, agent_b) >>> builder.add_conditional_edges(agent_b, {"loop": agent_a, "exit": agent_c})
- add_conditional_edges(source: str | ChatAgent, condition_to_target: Dict[str, str | ChatAgent]) DiGraphBuilder [source]#
Add multiple conditional edges from a source node based on condition strings.
- add_edge(source: str | ChatAgent, target: str | ChatAgent, condition: str | None = None) DiGraphBuilder [source]#
Add a directed edge from source to target, optionally with a condition.
- add_node(agent: ChatAgent, activation: Literal['all', 'any'] = 'all') DiGraphBuilder [source]#
Add a node to the graph and register its agent.
- get_participants() list[ChatAgent] [source]#
Return the list of agents in the builder, in insertion order.
- set_entry_point(name: str | ChatAgent) DiGraphBuilder [source]#
Set the default start node of the graph.
- pydantic model DiGraphEdge[source]#
Bases:
BaseModel
Represents a directed edge in a
DiGraph
, with an optional execution condition.Warning
This is an experimental feature, and the API will change in the future releases.
Show JSON schema
{ "title": "DiGraphEdge", "description": "Represents a directed edge in a :class:`DiGraph`, with an optional execution condition.\n\n.. warning::\n\n This is an experimental feature, and the API will change in the future releases.", "type": "object", "properties": { "target": { "title": "Target", "type": "string" }, "condition": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "title": "Condition" } }, "required": [ "target" ] }
- Fields:
condition (str | None)
target (str)
- field condition: str | None = None#
(Experimental) Condition to execute this edge. If None, the edge is unconditional. If a string, the edge is conditional on the presence of that string in the last agent chat message. NOTE: This is an experimental feature WILL change in the future releases to allow for better spcification of branching conditions similar to the TerminationCondition class.
- pydantic model DiGraphNode[source]#
Bases:
BaseModel
Represents a node (agent) in a
DiGraph
, with its outgoing edges and activation type.Warning
This is an experimental feature, and the API will change in the future releases.
Show JSON schema
{ "title": "DiGraphNode", "description": "Represents a node (agent) in a :class:`DiGraph`, with its outgoing edges and activation type.\n\n.. warning::\n\n This is an experimental feature, and the API will change in the future releases.", "type": "object", "properties": { "name": { "title": "Name", "type": "string" }, "edges": { "default": [], "items": { "$ref": "#/$defs/DiGraphEdge" }, "title": "Edges", "type": "array" }, "activation": { "default": "all", "enum": [ "all", "any" ], "title": "Activation", "type": "string" } }, "$defs": { "DiGraphEdge": { "description": "Represents a directed edge in a :class:`DiGraph`, with an optional execution condition.\n\n.. warning::\n\n This is an experimental feature, and the API will change in the future releases.", "properties": { "target": { "title": "Target", "type": "string" }, "condition": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "title": "Condition" } }, "required": [ "target" ], "title": "DiGraphEdge", "type": "object" } }, "required": [ "name" ] }
- Fields:
activation (Literal['all', 'any'])
edges (List[autogen_agentchat.teams._group_chat._graph._digraph_group_chat.DiGraphEdge])
name (str)
- field edges: List[DiGraphEdge] = []#
- class GraphFlow(participants: List[ChatAgent], graph: DiGraph, termination_condition: TerminationCondition | None = None, max_turns: int | None = None, runtime: AgentRuntime | None = None, custom_message_types: List[type[BaseAgentEvent | BaseChatMessage]] | None = None)[source]#
Bases:
BaseGroupChat
,Component
[GraphFlowConfig
]A team that runs a group chat following a Directed Graph execution pattern.
Warning
This is an experimental feature, and the API will change in the future releases.
This group chat executes agents based on a directed graph (
DiGraph
) structure, allowing complex workflows such as sequential execution, parallel fan-out, conditional branching, join patterns, and loops with explicit exit conditions.The execution order is determined by the edges defined in the DiGraph. Each node in the graph corresponds to an agent, and edges define the flow of messages between agents. Nodes can be configured to activate when:
All parent nodes have completed (activation=”all”) → default
Any parent node completes (activation=”any”)
Conditional branching is supported using edge conditions, where the next agent(s) are selected based on content in the chat history. Loops are permitted as long as there is a condition that eventually exits the loop.
Note
Use the
DiGraphBuilder
class to create aDiGraph
easily. It provides a fluent API for adding nodes and edges, setting entry points, and validating the graph structure. See theDiGraphBuilder
documentation for more details. TheGraphFlow
class is designed to be used with theDiGraphBuilder
for creating complex workflows.- Parameters:
participants (List[ChatAgent]) – The participants in the group chat.
termination_condition (TerminationCondition, optional) – Termination condition for the chat.
max_turns (int, optional) – Maximum number of turns before forcing termination.
graph (DiGraph) – Directed execution graph defining node flow and conditions.
- Raises:
ValueError – If participant names are not unique, or if graph validation fails (e.g., cycles without exit).
Examples
Sequential Flow: A → B → C
import asyncio from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.conditions import MaxMessageTermination from autogen_agentchat.teams import DiGraphBuilder, GraphFlow from autogen_ext.models.openai import OpenAIChatCompletionClient async def main(): # Initialize agents with OpenAI model clients. model_client = OpenAIChatCompletionClient(model="gpt-4.1-nano") agent_a = AssistantAgent("A", model_client=model_client, system_message="You are a helpful assistant.") agent_b = AssistantAgent("B", model_client=model_client, system_message="Translate input to Chinese.") agent_c = AssistantAgent("C", model_client=model_client, system_message="Translate input to English.") # Create a directed graph with sequential flow A -> B -> C. builder = DiGraphBuilder() builder.add_node(agent_a).add_node(agent_b).add_node(agent_c) builder.add_edge(agent_a, agent_b).add_edge(agent_b, agent_c) graph = builder.build() # Create a GraphFlow team with the directed graph. team = GraphFlow( participants=[agent_a, agent_b, agent_c], graph=graph, termination_condition=MaxMessageTermination(5), ) # Run the team and print the events. async for event in team.run_stream(task="Write a short story about a cat."): print(event) asyncio.run(main())
Parallel Fan-out: A → (B, C)
import asyncio from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.conditions import MaxMessageTermination from autogen_agentchat.teams import DiGraphBuilder, GraphFlow from autogen_ext.models.openai import OpenAIChatCompletionClient async def main(): # Initialize agents with OpenAI model clients. model_client = OpenAIChatCompletionClient(model="gpt-4.1-nano") agent_a = AssistantAgent("A", model_client=model_client, system_message="You are a helpful assistant.") agent_b = AssistantAgent("B", model_client=model_client, system_message="Translate input to Chinese.") agent_c = AssistantAgent("C", model_client=model_client, system_message="Translate input to Japanese.") # Create a directed graph with fan-out flow A -> (B, C). builder = DiGraphBuilder() builder.add_node(agent_a).add_node(agent_b).add_node(agent_c) builder.add_edge(agent_a, agent_b).add_edge(agent_a, agent_c) graph = builder.build() # Create a GraphFlow team with the directed graph. team = GraphFlow( participants=[agent_a, agent_b, agent_c], graph=graph, termination_condition=MaxMessageTermination(5), ) # Run the team and print the events. async for event in team.run_stream(task="Write a short story about a cat."): print(event) asyncio.run(main())
Conditional Branching: A → B (if ‘yes’) or C (if ‘no’)
import asyncio from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.conditions import MaxMessageTermination from autogen_agentchat.teams import DiGraphBuilder, GraphFlow from autogen_ext.models.openai import OpenAIChatCompletionClient async def main(): # Initialize agents with OpenAI model clients. model_client = OpenAIChatCompletionClient(model="gpt-4.1-nano") agent_a = AssistantAgent( "A", model_client=model_client, system_message="Detect if the input is in Chinese. If it is, say 'yes', else say 'no', and nothing else.", ) agent_b = AssistantAgent("B", model_client=model_client, system_message="Translate input to English.") agent_c = AssistantAgent("C", model_client=model_client, system_message="Translate input to Chinese.") # Create a directed graph with conditional branching flow A -> B ("yes"), A -> C ("no"). builder = DiGraphBuilder() builder.add_node(agent_a).add_node(agent_b).add_node(agent_c) builder.add_edge(agent_a, agent_b, condition="yes") builder.add_edge(agent_a, agent_c, condition="no") graph = builder.build() # Create a GraphFlow team with the directed graph. team = GraphFlow( participants=[agent_a, agent_b, agent_c], graph=graph, termination_condition=MaxMessageTermination(5), ) # Run the team and print the events. async for event in team.run_stream(task="AutoGen is a framework for building AI agents."): print(event) asyncio.run(main())
Loop with exit condition: A → B → C (if ‘APPROVE’) or A (if ‘REJECT’)
import asyncio from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.conditions import MaxMessageTermination from autogen_agentchat.teams import DiGraphBuilder, GraphFlow from autogen_ext.models.openai import OpenAIChatCompletionClient async def main(): # Initialize agents with OpenAI model clients. model_client = OpenAIChatCompletionClient(model="gpt-4.1") agent_a = AssistantAgent( "A", model_client=model_client, system_message="You are a helpful assistant.", ) agent_b = AssistantAgent( "B", model_client=model_client, system_message="Provide feedback on the input, if your feedback has been addressed, " "say 'APPROVE', else say 'REJECT' and provide a reason.", ) agent_c = AssistantAgent( "C", model_client=model_client, system_message="Translate the final product to Korean." ) # Create a loop graph with conditional exit: A -> B -> C ("APPROVE"), B -> A ("REJECT"). builder = DiGraphBuilder() builder.add_node(agent_a).add_node(agent_b).add_node(agent_c) builder.add_edge(agent_a, agent_b) builder.add_conditional_edges(agent_b, {"APPROVE": agent_c, "REJECT": agent_a}) builder.set_entry_point(agent_a) graph = builder.build() # Create a GraphFlow team with the directed graph. team = GraphFlow( participants=[agent_a, agent_b, agent_c], graph=graph, termination_condition=MaxMessageTermination(20), # Max 20 messages to avoid infinite loop. ) # Run the team and print the events. async for event in team.run_stream(task="Write a short poem about AI Agents."): print(event) asyncio.run(main())
- component_config_schema#
alias of
GraphFlowConfig
- class MagenticOneGroupChat(participants: List[ChatAgent], model_client: ChatCompletionClient, *, termination_condition: TerminationCondition | None = None, max_turns: int | None = 20, runtime: AgentRuntime | None = None, max_stalls: int = 3, final_answer_prompt: str = ORCHESTRATOR_FINAL_ANSWER_PROMPT, custom_message_types: List[type[BaseAgentEvent | BaseChatMessage]] | None = None, emit_team_events: bool = False)[source]#
Bases:
BaseGroupChat
,Component
[MagenticOneGroupChatConfig
]A team that runs a group chat with participants managed by the MagenticOneOrchestrator.
The orchestrator handles the conversation flow, ensuring that the task is completed efficiently by managing the participants’ interactions.
The orchestrator is based on the Magentic-One architecture, which is a generalist multi-agent system for solving complex tasks (see references below).
- Parameters:
participants (List[ChatAgent]) – The participants in the group chat.
model_client (ChatCompletionClient) – The model client used for generating responses.
termination_condition (TerminationCondition, optional) – The termination condition for the group chat. Defaults to None. Without a termination condition, the group chat will run based on the orchestrator logic or until the maximum number of turns is reached.
max_turns (int, optional) – The maximum number of turns in the group chat before stopping. Defaults to 20.
max_stalls (int, optional) – The maximum number of stalls allowed before re-planning. Defaults to 3.
final_answer_prompt (str, optional) – The LLM prompt used to generate the final answer or response from the team’s transcript. A default (sensible for GPT-4o class models) is provided.
custom_message_types (List[type[BaseAgentEvent | BaseChatMessage]], optional) – A list of custom message types that will be used in the group chat. If you are using custom message types or your agents produces custom message types, you need to specify them here. Make sure your custom message types are subclasses of
BaseAgentEvent
orBaseChatMessage
.emit_team_events (bool, optional) – Whether to emit team events through
BaseGroupChat.run_stream()
. Defaults to False.
- Raises:
ValueError – In orchestration logic if progress ledger does not have required keys or if next speaker is not valid.
Examples:
MagenticOneGroupChat with one assistant agent:
import asyncio from autogen_ext.models.openai import OpenAIChatCompletionClient from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.teams import MagenticOneGroupChat from autogen_agentchat.ui import Console async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") assistant = AssistantAgent( "Assistant", model_client=model_client, ) team = MagenticOneGroupChat([assistant], model_client=model_client) await Console(team.run_stream(task="Provide a different proof to Fermat last theorem")) asyncio.run(main())
References
If you use the MagenticOneGroupChat in your work, please cite the following paper:
@article{fourney2024magentic, title={Magentic-one: A generalist multi-agent system for solving complex tasks}, author={Fourney, Adam and Bansal, Gagan and Mozannar, Hussein and Tan, Cheng and Salinas, Eduardo and Niedtner, Friederike and Proebsting, Grace and Bassman, Griffin and Gerrits, Jack and Alber, Jacob and others}, journal={arXiv preprint arXiv:2411.04468}, year={2024} }
- classmethod _from_config(config: MagenticOneGroupChatConfig) Self [source]#
Create a new instance of the component from a configuration object.
- Parameters:
config (T) – The configuration object.
- Returns:
Self – The new instance of the component.
- _to_config() MagenticOneGroupChatConfig [source]#
Dump the configuration that would be requite to create a new instance of a component matching the configuration of this instance.
- Returns:
T – The configuration of the component.
- component_config_schema#
alias of
MagenticOneGroupChatConfig
- class RoundRobinGroupChat(participants: List[ChatAgent], termination_condition: TerminationCondition | None = None, max_turns: int | None = None, runtime: AgentRuntime | None = None, custom_message_types: List[type[BaseAgentEvent | BaseChatMessage]] | None = None, emit_team_events: bool = False)[source]#
Bases:
BaseGroupChat
,Component
[RoundRobinGroupChatConfig
]A team that runs a group chat with participants taking turns in a round-robin fashion to publish a message to all.
If a single participant is in the team, the participant will be the only speaker.
- Parameters:
participants (List[BaseChatAgent]) – The participants in the group chat.
termination_condition (TerminationCondition, optional) – The termination condition for the group chat. Defaults to None. Without a termination condition, the group chat will run indefinitely.
max_turns (int, optional) – The maximum number of turns in the group chat before stopping. Defaults to None, meaning no limit.
custom_message_types (List[type[BaseAgentEvent | BaseChatMessage]], optional) – A list of custom message types that will be used in the group chat. If you are using custom message types or your agents produces custom message types, you need to specify them here. Make sure your custom message types are subclasses of
BaseAgentEvent
orBaseChatMessage
.emit_team_events (bool, optional) – Whether to emit team events through
BaseGroupChat.run_stream()
. Defaults to False.
- Raises:
ValueError – If no participants are provided or if participant names are not unique.
Examples:
A team with one participant with tools:
import asyncio from autogen_ext.models.openai import OpenAIChatCompletionClient from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.teams import RoundRobinGroupChat from autogen_agentchat.conditions import TextMentionTermination from autogen_agentchat.ui import Console async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") async def get_weather(location: str) -> str: return f"The weather in {location} is sunny." assistant = AssistantAgent( "Assistant", model_client=model_client, tools=[get_weather], ) termination = TextMentionTermination("TERMINATE") team = RoundRobinGroupChat([assistant], termination_condition=termination) await Console(team.run_stream(task="What's the weather in New York?")) asyncio.run(main())
A team with multiple participants:
import asyncio from autogen_ext.models.openai import OpenAIChatCompletionClient from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.teams import RoundRobinGroupChat from autogen_agentchat.conditions import TextMentionTermination from autogen_agentchat.ui import Console async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") agent1 = AssistantAgent("Assistant1", model_client=model_client) agent2 = AssistantAgent("Assistant2", model_client=model_client) termination = TextMentionTermination("TERMINATE") team = RoundRobinGroupChat([agent1, agent2], termination_condition=termination) await Console(team.run_stream(task="Tell me some jokes.")) asyncio.run(main())
- classmethod _from_config(config: RoundRobinGroupChatConfig) Self [source]#
Create a new instance of the component from a configuration object.
- Parameters:
config (T) – The configuration object.
- Returns:
Self – The new instance of the component.
- _to_config() RoundRobinGroupChatConfig [source]#
Dump the configuration that would be requite to create a new instance of a component matching the configuration of this instance.
- Returns:
T – The configuration of the component.
- component_config_schema#
alias of
RoundRobinGroupChatConfig
- class SelectorGroupChat(participants: List[ChatAgent], model_client: ChatCompletionClient, *, termination_condition: TerminationCondition | None = None, max_turns: int | None = None, runtime: AgentRuntime | None = None, selector_prompt: str = 'You are in a role play game. The following roles are available:\n{roles}.\nRead the following conversation. Then select the next role from {participants} to play. Only return the role.\n\n{history}\n\nRead the above conversation. Then select the next role from {participants} to play. Only return the role.\n', allow_repeated_speaker: bool = False, max_selector_attempts: int = 3, selector_func: Callable[[Sequence[BaseAgentEvent | BaseChatMessage]], str | None] | Callable[[Sequence[BaseAgentEvent | BaseChatMessage]], Awaitable[str | None]] | None = None, candidate_func: Callable[[Sequence[BaseAgentEvent | BaseChatMessage]], List[str]] | Callable[[Sequence[BaseAgentEvent | BaseChatMessage]], Awaitable[List[str]]] | None = None, custom_message_types: List[type[BaseAgentEvent | BaseChatMessage]] | None = None, emit_team_events: bool = False, model_client_streaming: bool = False)[source]#
Bases:
BaseGroupChat
,Component
[SelectorGroupChatConfig
]A group chat team that have participants takes turn to publish a message to all, using a ChatCompletion model to select the next speaker after each message.
- Parameters:
participants (List[ChatAgent]) – The participants in the group chat, must have unique names and at least two participants.
model_client (ChatCompletionClient) – The ChatCompletion model client used to select the next speaker.
termination_condition (TerminationCondition, optional) – The termination condition for the group chat. Defaults to None. Without a termination condition, the group chat will run indefinitely.
max_turns (int, optional) – The maximum number of turns in the group chat before stopping. Defaults to None, meaning no limit.
selector_prompt (str, optional) – The prompt template to use for selecting the next speaker. Available fields: ‘{roles}’, ‘{participants}’, and ‘{history}’. {participants} is the names of candidates for selection. The format is [“<name1>”, “<name2>”, …]. {roles} is a newline-separated list of names and descriptions of the candidate agents. The format for each line is: “<name> : <description>”. {history} is the conversation history formatted as a double newline separated of names and message content. The format for each message is: “<name> : <message content>”.
allow_repeated_speaker (bool, optional) – Whether to include the previous speaker in the list of candidates to be selected for the next turn. Defaults to False. The model may still select the previous speaker – a warning will be logged if this happens.
max_selector_attempts (int, optional) – The maximum number of attempts to select a speaker using the model. Defaults to 3. If the model fails to select a speaker after the maximum number of attempts, the previous speaker will be used if available, otherwise the first participant will be used.
selector_func (Callable[[Sequence[BaseAgentEvent | BaseChatMessage]], str | None], Callable[[Sequence[BaseAgentEvent | BaseChatMessage]], Awaitable[str | None]], optional) – A custom selector function that takes the conversation history and returns the name of the next speaker. If provided, this function will be used to override the model to select the next speaker. If the function returns None, the model will be used to select the next speaker.
candidate_func (Callable[[Sequence[BaseAgentEvent | BaseChatMessage]], List[str]], Callable[[Sequence[BaseAgentEvent | BaseChatMessage]], Awaitable[List[str]]], optional) – A custom function that takes the conversation history and returns a filtered list of candidates for the next speaker selection using model. If the function returns an empty list or None, SelectorGroupChat will raise a ValueError. This function is only used if selector_func is not set. The allow_repeated_speaker will be ignored if set.
custom_message_types (List[type[BaseAgentEvent | BaseChatMessage]], optional) – A list of custom message types that will be used in the group chat. If you are using custom message types or your agents produces custom message types, you need to specify them here. Make sure your custom message types are subclasses of
BaseAgentEvent
orBaseChatMessage
.emit_team_events (bool, optional) – Whether to emit team events through
BaseGroupChat.run_stream()
. Defaults to False.model_client_streaming (bool, optional) – Whether to use streaming for the model client. (This is useful for reasoning models like QwQ). Defaults to False.
- Raises:
ValueError – If the number of participants is less than two or if the selector prompt is invalid.
Examples:
A team with multiple participants:
import asyncio from autogen_ext.models.openai import OpenAIChatCompletionClient from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.teams import SelectorGroupChat from autogen_agentchat.conditions import TextMentionTermination from autogen_agentchat.ui import Console async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") async def lookup_hotel(location: str) -> str: return f"Here are some hotels in {location}: hotel1, hotel2, hotel3." async def lookup_flight(origin: str, destination: str) -> str: return f"Here are some flights from {origin} to {destination}: flight1, flight2, flight3." async def book_trip() -> str: return "Your trip is booked!" travel_advisor = AssistantAgent( "Travel_Advisor", model_client, tools=[book_trip], description="Helps with travel planning.", ) hotel_agent = AssistantAgent( "Hotel_Agent", model_client, tools=[lookup_hotel], description="Helps with hotel booking.", ) flight_agent = AssistantAgent( "Flight_Agent", model_client, tools=[lookup_flight], description="Helps with flight booking.", ) termination = TextMentionTermination("TERMINATE") team = SelectorGroupChat( [travel_advisor, hotel_agent, flight_agent], model_client=model_client, termination_condition=termination, ) await Console(team.run_stream(task="Book a 3-day trip to new york.")) asyncio.run(main())
A team with a custom selector function:
import asyncio from typing import Sequence from autogen_ext.models.openai import OpenAIChatCompletionClient from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.teams import SelectorGroupChat from autogen_agentchat.conditions import TextMentionTermination from autogen_agentchat.ui import Console from autogen_agentchat.messages import BaseAgentEvent, BaseChatMessage async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") def check_calculation(x: int, y: int, answer: int) -> str: if x + y == answer: return "Correct!" else: return "Incorrect!" agent1 = AssistantAgent( "Agent1", model_client, description="For calculation", system_message="Calculate the sum of two numbers", ) agent2 = AssistantAgent( "Agent2", model_client, tools=[check_calculation], description="For checking calculation", system_message="Check the answer and respond with 'Correct!' or 'Incorrect!'", ) def selector_func(messages: Sequence[BaseAgentEvent | BaseChatMessage]) -> str | None: if len(messages) == 1 or messages[-1].to_text() == "Incorrect!": return "Agent1" if messages[-1].source == "Agent1": return "Agent2" return None termination = TextMentionTermination("Correct!") team = SelectorGroupChat( [agent1, agent2], model_client=model_client, selector_func=selector_func, termination_condition=termination, ) await Console(team.run_stream(task="What is 1 + 1?")) asyncio.run(main())
- classmethod _from_config(config: SelectorGroupChatConfig) Self [source]#
Create a new instance of the component from a configuration object.
- Parameters:
config (T) – The configuration object.
- Returns:
Self – The new instance of the component.
- _to_config() SelectorGroupChatConfig [source]#
Dump the configuration that would be requite to create a new instance of a component matching the configuration of this instance.
- Returns:
T – The configuration of the component.
- component_config_schema#
alias of
SelectorGroupChatConfig
- class Swarm(participants: List[ChatAgent], termination_condition: TerminationCondition | None = None, max_turns: int | None = None, runtime: AgentRuntime | None = None, custom_message_types: List[type[BaseAgentEvent | BaseChatMessage]] | None = None, emit_team_events: bool = False)[source]#
Bases:
BaseGroupChat
,Component
[SwarmConfig
]A group chat team that selects the next speaker based on handoff message only.
The first participant in the list of participants is the initial speaker. The next speaker is selected based on the
HandoffMessage
message sent by the current speaker. If no handoff message is sent, the current speaker continues to be the speaker.- Parameters:
participants (List[ChatAgent]) – The agents participating in the group chat. The first agent in the list is the initial speaker.
termination_condition (TerminationCondition, optional) – The termination condition for the group chat. Defaults to None. Without a termination condition, the group chat will run indefinitely.
max_turns (int, optional) – The maximum number of turns in the group chat before stopping. Defaults to None, meaning no limit.
custom_message_types (List[type[BaseAgentEvent | BaseChatMessage]], optional) – A list of custom message types that will be used in the group chat. If you are using custom message types or your agents produces custom message types, you need to specify them here. Make sure your custom message types are subclasses of
BaseAgentEvent
orBaseChatMessage
.emit_team_events (bool, optional) – Whether to emit team events through
BaseGroupChat.run_stream()
. Defaults to False.
Basic example:
import asyncio from autogen_ext.models.openai import OpenAIChatCompletionClient from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.teams import Swarm from autogen_agentchat.conditions import MaxMessageTermination async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") agent1 = AssistantAgent( "Alice", model_client=model_client, handoffs=["Bob"], system_message="You are Alice and you only answer questions about yourself.", ) agent2 = AssistantAgent( "Bob", model_client=model_client, system_message="You are Bob and your birthday is on 1st January." ) termination = MaxMessageTermination(3) team = Swarm([agent1, agent2], termination_condition=termination) stream = team.run_stream(task="What is bob's birthday?") async for message in stream: print(message) asyncio.run(main())
Using the
HandoffTermination
for human-in-the-loop handoff:import asyncio from autogen_ext.models.openai import OpenAIChatCompletionClient from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.teams import Swarm from autogen_agentchat.conditions import HandoffTermination, MaxMessageTermination from autogen_agentchat.ui import Console from autogen_agentchat.messages import HandoffMessage async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") agent = AssistantAgent( "Alice", model_client=model_client, handoffs=["user"], system_message="You are Alice and you only answer questions about yourself, ask the user for help if needed.", ) termination = HandoffTermination(target="user") | MaxMessageTermination(3) team = Swarm([agent], termination_condition=termination) # Start the conversation. await Console(team.run_stream(task="What is bob's birthday?")) # Resume with user feedback. await Console( team.run_stream( task=HandoffMessage(source="user", target="Alice", content="Bob's birthday is on 1st January.") ) ) asyncio.run(main())
- classmethod _from_config(config: SwarmConfig) Swarm [source]#
Create a new instance of the component from a configuration object.
- Parameters:
config (T) – The configuration object.
- Returns:
Self – The new instance of the component.
- _to_config() SwarmConfig [source]#
Dump the configuration that would be requite to create a new instance of a component matching the configuration of this instance.
- Returns:
T – The configuration of the component.
- component_config_schema#
alias of
SwarmConfig