autogen_agentchat.teams#
- class BaseGroupChat(participants: List[ChatAgent], group_chat_manager_class: type[SequentialRoutedAgent], termination_condition: TerminationCondition | None = None, max_turns: int | None = None)[source]#
-
The base class for group chat teams.
To implement a group chat team, first create a subclass of
BaseGroupChatManager
and then create a subclass ofBaseGroupChat
that uses the group chat manager.- async reset() None [source]#
Reset the team and its participants to their initial state.
The team must be stopped before it can be reset.
- Raises:
RuntimeError – If the team has not been initialized or is currently running.
Example using the
RoundRobinGroupChat
team:import asyncio from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.conditions import MaxMessageTermination from autogen_agentchat.teams import RoundRobinGroupChat from autogen_ext.models.openai import OpenAIChatCompletionClient async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") agent1 = AssistantAgent("Assistant1", model_client=model_client) agent2 = AssistantAgent("Assistant2", model_client=model_client) termination = MaxMessageTermination(3) team = RoundRobinGroupChat([agent1, agent2], termination_condition=termination) stream = team.run_stream(task="Count from 1 to 10, respond one at a time.") async for message in stream: print(message) # Reset the team. await team.reset() stream = team.run_stream(task="Count from 1 to 10, respond one at a time.") async for message in stream: print(message) asyncio.run(main())
- async run(*, task: str | Annotated[TextMessage | MultiModalMessage | StopMessage | HandoffMessage, FieldInfo(annotation=NoneType, required=True, discriminator='type')] | None = None, cancellation_token: CancellationToken | None = None) TaskResult [source]#
Run the team and return the result. The base implementation uses
run_stream()
to run the team and then returns the final result. Once the team is stopped, the termination condition is reset.- Parameters:
task (str | ChatMessage | None) – The task to run the team with.
cancellation_token (CancellationToken | None) – The cancellation token to kill the task immediately. Setting the cancellation token potentially put the team in an inconsistent state, and it may not reset the termination condition. To gracefully stop the team, use
ExternalTermination
instead.
Example using the
RoundRobinGroupChat
team:import asyncio from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.conditions import MaxMessageTermination from autogen_agentchat.teams import RoundRobinGroupChat from autogen_ext.models.openai import OpenAIChatCompletionClient async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") agent1 = AssistantAgent("Assistant1", model_client=model_client) agent2 = AssistantAgent("Assistant2", model_client=model_client) termination = MaxMessageTermination(3) team = RoundRobinGroupChat([agent1, agent2], termination_condition=termination) result = await team.run(task="Count from 1 to 10, respond one at a time.") print(result) # Run the team again without a task to continue the previous task. result = await team.run() print(result) asyncio.run(main())
Example using the
CancellationToken
to cancel the task:import asyncio from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.conditions import MaxMessageTermination from autogen_agentchat.teams import RoundRobinGroupChat from autogen_core import CancellationToken from autogen_ext.models.openai import OpenAIChatCompletionClient async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") agent1 = AssistantAgent("Assistant1", model_client=model_client) agent2 = AssistantAgent("Assistant2", model_client=model_client) termination = MaxMessageTermination(3) team = RoundRobinGroupChat([agent1, agent2], termination_condition=termination) cancellation_token = CancellationToken() # Create a task to run the team in the background. run_task = asyncio.create_task( team.run( task="Count from 1 to 10, respond one at a time.", cancellation_token=cancellation_token, ) ) # Wait for 1 second and then cancel the task. await asyncio.sleep(1) cancellation_token.cancel() # This will raise a cancellation error. await run_task asyncio.run(main())
- async run_stream(*, task: str | Annotated[TextMessage | MultiModalMessage | StopMessage | HandoffMessage, FieldInfo(annotation=NoneType, required=True, discriminator='type')] | None = None, cancellation_token: CancellationToken | None = None) AsyncGenerator[Annotated[TextMessage | MultiModalMessage | StopMessage | HandoffMessage | ToolCallMessage | ToolCallResultMessage, FieldInfo(annotation=NoneType, required=True, discriminator='type')] | TaskResult, None] [source]#
Run the team and produces a stream of messages and the final result of the type
TaskResult
as the last item in the stream. Once the team is stopped, the termination condition is reset.- Parameters:
task (str | ChatMessage | None) – The task to run the team with.
cancellation_token (CancellationToken | None) – The cancellation token to kill the task immediately. Setting the cancellation token potentially put the team in an inconsistent state, and it may not reset the termination condition. To gracefully stop the team, use
ExternalTermination
instead.
Example using the
RoundRobinGroupChat
team:import asyncio from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.conditions import MaxMessageTermination from autogen_agentchat.teams import RoundRobinGroupChat from autogen_ext.models.openai import OpenAIChatCompletionClient async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") agent1 = AssistantAgent("Assistant1", model_client=model_client) agent2 = AssistantAgent("Assistant2", model_client=model_client) termination = MaxMessageTermination(3) team = RoundRobinGroupChat([agent1, agent2], termination_condition=termination) stream = team.run_stream(task="Count from 1 to 10, respond one at a time.") async for message in stream: print(message) # Run the team again without a task to continue the previous task. stream = team.run_stream() async for message in stream: print(message) asyncio.run(main())
Example using the
CancellationToken
to cancel the task:import asyncio from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.conditions import MaxMessageTermination from autogen_agentchat.ui import Console from autogen_agentchat.teams import RoundRobinGroupChat from autogen_core import CancellationToken from autogen_ext.models.openai import OpenAIChatCompletionClient async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") agent1 = AssistantAgent("Assistant1", model_client=model_client) agent2 = AssistantAgent("Assistant2", model_client=model_client) termination = MaxMessageTermination(3) team = RoundRobinGroupChat([agent1, agent2], termination_condition=termination) cancellation_token = CancellationToken() # Create a task to run the team in the background. run_task = asyncio.create_task( Console( team.run_stream( task="Count from 1 to 10, respond one at a time.", cancellation_token=cancellation_token, ) ) ) # Wait for 1 second and then cancel the task. await asyncio.sleep(1) cancellation_token.cancel() # This will raise a cancellation error. await run_task asyncio.run(main())
- class MagenticOneGroupChat(participants: List[ChatAgent], model_client: ChatCompletionClient, *, termination_condition: TerminationCondition | None = None, max_turns: int | None = 20, max_stalls: int = 3, final_answer_prompt: str = '\nWe are working on the following task:\n{task}\n\nWe have completed the task.\n\nThe above messages contain the conversation that took place to complete the task.\n\nBased on the information gathered, provide the final answer to the original request.\nThe answer should be phrased as if you were speaking to the user.\n')[source]#
Bases:
BaseGroupChat
A team that runs a group chat with participants managed by the MagenticOneOrchestrator.
The orchestrator handles the conversation flow, ensuring that the task is completed efficiently by managing the participants’ interactions.
- Parameters:
participants (List[ChatAgent]) – The participants in the group chat.
model_client (ChatCompletionClient) – The model client used for generating responses.
termination_condition (TerminationCondition, optional) – The termination condition for the group chat. Defaults to None. Without a termination condition, the group chat will run based on the orchestrator logic or until the maximum number of turns is reached.
max_turns (int, optional) – The maximum number of turns in the group chat before stopping. Defaults to 20.
max_stalls (int, optional) – The maximum number of stalls allowed before re-planning. Defaults to 3.
final_answer_prompt (str, optional) – The LLM prompt used to generate the final answer or response from the team’s transcript. A default (sensible for GPT-4o class models) is provided.
- Raises:
ValueError – In orchestration logic if progress ledger does not have required keys or if next speaker is not valid.
Examples:
MagenticOneGroupChat with one assistant agent:
import asyncio from autogen_ext.models.openai import OpenAIChatCompletionClient from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.teams import MagenticOneGroupChat from autogen_agentchat.ui import Console async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") assistant = AssistantAgent( "Assistant", model_client=model_client, ) team = MagenticOneGroupChat([assistant], model_client=model_client) await Console(team.run_stream(task="Provide a different proof to Fermat last theorem")) asyncio.run(main())
- class RoundRobinGroupChat(participants: List[ChatAgent], termination_condition: TerminationCondition | None = None, max_turns: int | None = None)[source]#
Bases:
BaseGroupChat
A team that runs a group chat with participants taking turns in a round-robin fashion to publish a message to all.
If a single participant is in the team, the participant will be the only speaker.
- Parameters:
participants (List[BaseChatAgent]) – The participants in the group chat.
termination_condition (TerminationCondition, optional) – The termination condition for the group chat. Defaults to None. Without a termination condition, the group chat will run indefinitely.
max_turns (int, optional) – The maximum number of turns in the group chat before stopping. Defaults to None, meaning no limit.
- Raises:
ValueError – If no participants are provided or if participant names are not unique.
Examples:
A team with one participant with tools:
import asyncio from autogen_ext.models.openai import OpenAIChatCompletionClient from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.teams import RoundRobinGroupChat from autogen_agentchat.conditions import TextMentionTermination from autogen_agentchat.ui import Console async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") async def get_weather(location: str) -> str: return f"The weather in {location} is sunny." assistant = AssistantAgent( "Assistant", model_client=model_client, tools=[get_weather], ) termination = TextMentionTermination("TERMINATE") team = RoundRobinGroupChat([assistant], termination_condition=termination) await Console(team.run_stream(task="What's the weather in New York?")) asyncio.run(main())
A team with multiple participants:
import asyncio from autogen_ext.models.openai import OpenAIChatCompletionClient from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.teams import RoundRobinGroupChat from autogen_agentchat.conditions import TextMentionTermination from autogen_agentchat.ui import Console async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") agent1 = AssistantAgent("Assistant1", model_client=model_client) agent2 = AssistantAgent("Assistant2", model_client=model_client) termination = TextMentionTermination("TERMINATE") team = RoundRobinGroupChat([agent1, agent2], termination_condition=termination) await Console(team.run_stream(task="Tell me some jokes.")) asyncio.run(main())
- class SelectorGroupChat(participants: List[ChatAgent], model_client: ChatCompletionClient, *, termination_condition: TerminationCondition | None = None, max_turns: int | None = None, selector_prompt: str = 'You are in a role play game. The following roles are available:\n{roles}.\nRead the following conversation. Then select the next role from {participants} to play. Only return the role.\n\n{history}\n\nRead the above conversation. Then select the next role from {participants} to play. Only return the role.\n', allow_repeated_speaker: bool = False, selector_func: Callable[[Sequence[Annotated[TextMessage | MultiModalMessage | StopMessage | HandoffMessage | ToolCallMessage | ToolCallResultMessage, FieldInfo(annotation=NoneType, required=True, discriminator='type')]]], str | None] | None = None)[source]#
Bases:
BaseGroupChat
A group chat team that have participants takes turn to publish a message to all, using a ChatCompletion model to select the next speaker after each message.
- Parameters:
participants (List[ChatAgent]) – The participants in the group chat, must have unique names and at least two participants.
model_client (ChatCompletionClient) – The ChatCompletion model client used to select the next speaker.
termination_condition (TerminationCondition, optional) – The termination condition for the group chat. Defaults to None. Without a termination condition, the group chat will run indefinitely.
max_turns (int, optional) – The maximum number of turns in the group chat before stopping. Defaults to None, meaning no limit.
selector_prompt (str, optional) – The prompt template to use for selecting the next speaker. Must contain ‘{roles}’, ‘{participants}’, and ‘{history}’ to be filled in.
allow_repeated_speaker (bool, optional) – Whether to allow the same speaker to be selected consecutively. Defaults to False.
selector_func (Callable[[Sequence[AgentMessage]], str | None], optional) – A custom selector function that takes the conversation history and returns the name of the next speaker. If provided, this function will be used to override the model to select the next speaker. If the function returns None, the model will be used to select the next speaker.
- Raises:
ValueError – If the number of participants is less than two or if the selector prompt is invalid.
Examples:
A team with multiple participants:
import asyncio from autogen_ext.models.openai import OpenAIChatCompletionClient from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.teams import SelectorGroupChat from autogen_agentchat.conditions import TextMentionTermination from autogen_agentchat.ui import Console async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") async def lookup_hotel(location: str) -> str: return f"Here are some hotels in {location}: hotel1, hotel2, hotel3." async def lookup_flight(origin: str, destination: str) -> str: return f"Here are some flights from {origin} to {destination}: flight1, flight2, flight3." async def book_trip() -> str: return "Your trip is booked!" travel_advisor = AssistantAgent( "Travel_Advisor", model_client, tools=[book_trip], description="Helps with travel planning.", ) hotel_agent = AssistantAgent( "Hotel_Agent", model_client, tools=[lookup_hotel], description="Helps with hotel booking.", ) flight_agent = AssistantAgent( "Flight_Agent", model_client, tools=[lookup_flight], description="Helps with flight booking.", ) termination = TextMentionTermination("TERMINATE") team = SelectorGroupChat( [travel_advisor, hotel_agent, flight_agent], model_client=model_client, termination_condition=termination, ) await Console(team.run_stream(task="Book a 3-day trip to new york.")) asyncio.run(main())
A team with a custom selector function:
import asyncio from typing import Sequence from autogen_ext.models.openai import OpenAIChatCompletionClient from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.teams import SelectorGroupChat from autogen_agentchat.conditions import TextMentionTermination from autogen_agentchat.ui import Console from autogen_agentchat.messages import AgentMessage async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") def check_calculation(x: int, y: int, answer: int) -> str: if x + y == answer: return "Correct!" else: return "Incorrect!" agent1 = AssistantAgent( "Agent1", model_client, description="For calculation", system_message="Calculate the sum of two numbers", ) agent2 = AssistantAgent( "Agent2", model_client, tools=[check_calculation], description="For checking calculation", system_message="Check the answer and respond with 'Correct!' or 'Incorrect!'", ) def selector_func(messages: Sequence[AgentMessage]) -> str | None: if len(messages) == 1 or messages[-1].content == "Incorrect!": return "Agent1" if messages[-1].source == "Agent1": return "Agent2" return None termination = TextMentionTermination("Correct!") team = SelectorGroupChat( [agent1, agent2], model_client=model_client, selector_func=selector_func, termination_condition=termination, ) await Console(team.run_stream(task="What is 1 + 1?")) asyncio.run(main())
- class Swarm(participants: List[ChatAgent], termination_condition: TerminationCondition | None = None, max_turns: int | None = None)[source]#
Bases:
BaseGroupChat
A group chat team that selects the next speaker based on handoff message only.
The first participant in the list of participants is the initial speaker. The next speaker is selected based on the
HandoffMessage
message sent by the current speaker. If no handoff message is sent, the current speaker continues to be the speaker.- Parameters:
participants (List[ChatAgent]) – The agents participating in the group chat. The first agent in the list is the initial speaker.
termination_condition (TerminationCondition, optional) – The termination condition for the group chat. Defaults to None. Without a termination condition, the group chat will run indefinitely.
max_turns (int, optional) – The maximum number of turns in the group chat before stopping. Defaults to None, meaning no limit.
Basic example:
import asyncio from autogen_ext.models.openai import OpenAIChatCompletionClient from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.teams import Swarm from autogen_agentchat.conditions import MaxMessageTermination async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") agent1 = AssistantAgent( "Alice", model_client=model_client, handoffs=["Bob"], system_message="You are Alice and you only answer questions about yourself.", ) agent2 = AssistantAgent( "Bob", model_client=model_client, system_message="You are Bob and your birthday is on 1st January." ) termination = MaxMessageTermination(3) team = Swarm([agent1, agent2], termination_condition=termination) stream = team.run_stream(task="What is bob's birthday?") async for message in stream: print(message) asyncio.run(main())
Using the
HandoffTermination
for human-in-the-loop handoff:import asyncio from autogen_ext.models.openai import OpenAIChatCompletionClient from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.teams import Swarm from autogen_agentchat.conditions import HandoffTermination, MaxMessageTermination from autogen_agentchat.ui import Console from autogen_agentchat.messages import HandoffMessage async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") agent = AssistantAgent( "Alice", model_client=model_client, handoffs=["user"], system_message="You are Alice and you only answer questions about yourself, ask the user for help if needed.", ) termination = HandoffTermination(target="user") | MaxMessageTermination(3) team = Swarm([agent], termination_condition=termination) # Start the conversation. await Console(team.run_stream(task="What is bob's birthday?")) # Resume with user feedback. await Console( team.run_stream( task=HandoffMessage(source="user", target="Alice", content="Bob's birthday is on 1st January.") ) ) asyncio.run(main())