[docs]classBaseGroupChat(Team,ABC,ComponentBase[BaseModel]):"""The base class for group chat teams. To implement a group chat team, first create a subclass of :class:`BaseGroupChatManager` and then create a subclass of :class:`BaseGroupChat` that uses the group chat manager. """component_type="team"def__init__(self,participants:List[ChatAgent],group_chat_manager_class:type[SequentialRoutedAgent],termination_condition:TerminationCondition|None=None,max_turns:int|None=None,):iflen(participants)==0:raiseValueError("At least one participant is required.")iflen(participants)!=len(set(participant.nameforparticipantinparticipants)):raiseValueError("The participant names must be unique.")self._participants=participantsself._base_group_chat_manager_class=group_chat_manager_classself._termination_condition=termination_conditionself._max_turns=max_turns# Constants for the group chat.self._team_id=str(uuid.uuid4())self._group_topic_type="group_topic"self._output_topic_type="output_topic"self._group_chat_manager_topic_type="group_chat_manager"self._participant_topic_types:List[str]=[participant.nameforparticipantinparticipants]self._participant_descriptions:List[str]=[participant.descriptionforparticipantinparticipants]self._collector_agent_type="collect_output_messages"# Constants for the closure agent to collect the output messages.self._stop_reason:str|None=Noneself._output_message_queue:asyncio.Queue[AgentEvent|ChatMessage|None]=asyncio.Queue()# Create a runtime for the team.# TODO: The runtime should be created by a managed context.self._runtime=SingleThreadedAgentRuntime()# Flag to track if the group chat has been initialized.self._initialized=False# Flag to track if the group chat is running.self._is_running=False@abstractmethoddef_create_group_chat_manager_factory(self,group_topic_type:str,output_topic_type:str,participant_topic_types:List[str],participant_descriptions:List[str],termination_condition:TerminationCondition|None,max_turns:int|None,)->Callable[[],SequentialRoutedAgent]:...def_create_participant_factory(self,parent_topic_type:str,output_topic_type:str,agent:ChatAgent,)->Callable[[],ChatAgentContainer]:def_factory()->ChatAgentContainer:id=AgentInstantiationContext.current_agent_id()assertid==AgentId(type=agent.name,key=self._team_id)container=ChatAgentContainer(parent_topic_type,output_topic_type,agent)assertcontainer.id==idreturncontainerreturn_factoryasyncdef_init(self,runtime:AgentRuntime)->None:# Constants for the group chat manager.group_chat_manager_agent_type=AgentType(self._group_chat_manager_topic_type)# Register participants.forparticipant,participant_topic_typeinzip(self._participants,self._participant_topic_types,strict=False):# Use the participant topic type as the agent type.agent_type=participant_topic_type# Register the participant factory.awaitChatAgentContainer.register(runtime,type=agent_type,factory=self._create_participant_factory(self._group_topic_type,self._output_topic_type,participant),)# Add subscriptions for the participant.awaitruntime.add_subscription(TypeSubscription(topic_type=participant_topic_type,agent_type=agent_type))awaitruntime.add_subscription(TypeSubscription(topic_type=self._group_topic_type,agent_type=agent_type))# Register the group chat manager.awaitself._base_group_chat_manager_class.register(runtime,type=group_chat_manager_agent_type.type,factory=self._create_group_chat_manager_factory(group_topic_type=self._group_topic_type,output_topic_type=self._output_topic_type,participant_topic_types=self._participant_topic_types,participant_descriptions=self._participant_descriptions,termination_condition=self._termination_condition,max_turns=self._max_turns,),)# Add subscriptions for the group chat manager.awaitruntime.add_subscription(TypeSubscription(topic_type=self._group_chat_manager_topic_type,agent_type=group_chat_manager_agent_type.type))awaitruntime.add_subscription(TypeSubscription(topic_type=self._group_topic_type,agent_type=group_chat_manager_agent_type.type))asyncdefcollect_output_messages(_runtime:ClosureContext,message:GroupChatStart|GroupChatMessage|GroupChatTermination,ctx:MessageContext,)->None:"""Collect output messages from the group chat."""ifisinstance(message,GroupChatStart):ifmessage.messagesisnotNone:formsginmessage.messages:event_logger.info(msg)awaitself._output_message_queue.put(msg)elifisinstance(message,GroupChatMessage):event_logger.info(message.message)awaitself._output_message_queue.put(message.message)elifisinstance(message,GroupChatTermination):event_logger.info(message.message)self._stop_reason=message.message.contentawaitClosureAgent.register_closure(runtime,type=self._collector_agent_type,closure=collect_output_messages,subscriptions=lambda:[TypeSubscription(topic_type=self._output_topic_type,agent_type=self._collector_agent_type),],)self._initialized=True
[docs]asyncdefrun(self,*,task:str|ChatMessage|Sequence[ChatMessage]|None=None,cancellation_token:CancellationToken|None=None,)->TaskResult:"""Run the team and return the result. The base implementation uses :meth:`run_stream` to run the team and then returns the final result. Once the team is stopped, the termination condition is reset. Args: task (str | ChatMessage | Sequence[ChatMessage] | None): The task to run the team with. Can be a string, a single :class:`ChatMessage` , or a list of :class:`ChatMessage`. cancellation_token (CancellationToken | None): The cancellation token to kill the task immediately. Setting the cancellation token potentially put the team in an inconsistent state, and it may not reset the termination condition. To gracefully stop the team, use :class:`~autogen_agentchat.conditions.ExternalTermination` instead. Example using the :class:`~autogen_agentchat.teams.RoundRobinGroupChat` team: .. code-block:: python import asyncio from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.conditions import MaxMessageTermination from autogen_agentchat.teams import RoundRobinGroupChat from autogen_ext.models.openai import OpenAIChatCompletionClient async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") agent1 = AssistantAgent("Assistant1", model_client=model_client) agent2 = AssistantAgent("Assistant2", model_client=model_client) termination = MaxMessageTermination(3) team = RoundRobinGroupChat([agent1, agent2], termination_condition=termination) result = await team.run(task="Count from 1 to 10, respond one at a time.") print(result) # Run the team again without a task to continue the previous task. result = await team.run() print(result) asyncio.run(main()) Example using the :class:`~autogen_core.CancellationToken` to cancel the task: .. code-block:: python import asyncio from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.conditions import MaxMessageTermination from autogen_agentchat.teams import RoundRobinGroupChat from autogen_core import CancellationToken from autogen_ext.models.openai import OpenAIChatCompletionClient async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") agent1 = AssistantAgent("Assistant1", model_client=model_client) agent2 = AssistantAgent("Assistant2", model_client=model_client) termination = MaxMessageTermination(3) team = RoundRobinGroupChat([agent1, agent2], termination_condition=termination) cancellation_token = CancellationToken() # Create a task to run the team in the background. run_task = asyncio.create_task( team.run( task="Count from 1 to 10, respond one at a time.", cancellation_token=cancellation_token, ) ) # Wait for 1 second and then cancel the task. await asyncio.sleep(1) cancellation_token.cancel() # This will raise a cancellation error. await run_task asyncio.run(main()) """result:TaskResult|None=Noneasyncformessageinself.run_stream(task=task,cancellation_token=cancellation_token,):ifisinstance(message,TaskResult):result=messageifresultisnotNone:returnresultraiseAssertionError("The stream should have returned the final result.")
[docs]asyncdefrun_stream(self,*,task:str|ChatMessage|Sequence[ChatMessage]|None=None,cancellation_token:CancellationToken|None=None,)->AsyncGenerator[AgentEvent|ChatMessage|TaskResult,None]:"""Run the team and produces a stream of messages and the final result of the type :class:`TaskResult` as the last item in the stream. Once the team is stopped, the termination condition is reset. Args: task (str | ChatMessage | Sequence[ChatMessage] | None): The task to run the team with. Can be a string, a single :class:`ChatMessage` , or a list of :class:`ChatMessage`. cancellation_token (CancellationToken | None): The cancellation token to kill the task immediately. Setting the cancellation token potentially put the team in an inconsistent state, and it may not reset the termination condition. To gracefully stop the team, use :class:`~autogen_agentchat.conditions.ExternalTermination` instead. Example using the :class:`~autogen_agentchat.teams.RoundRobinGroupChat` team: .. code-block:: python import asyncio from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.conditions import MaxMessageTermination from autogen_agentchat.teams import RoundRobinGroupChat from autogen_ext.models.openai import OpenAIChatCompletionClient async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") agent1 = AssistantAgent("Assistant1", model_client=model_client) agent2 = AssistantAgent("Assistant2", model_client=model_client) termination = MaxMessageTermination(3) team = RoundRobinGroupChat([agent1, agent2], termination_condition=termination) stream = team.run_stream(task="Count from 1 to 10, respond one at a time.") async for message in stream: print(message) # Run the team again without a task to continue the previous task. stream = team.run_stream() async for message in stream: print(message) asyncio.run(main()) Example using the :class:`~autogen_core.CancellationToken` to cancel the task: .. code-block:: python import asyncio from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.conditions import MaxMessageTermination from autogen_agentchat.ui import Console from autogen_agentchat.teams import RoundRobinGroupChat from autogen_core import CancellationToken from autogen_ext.models.openai import OpenAIChatCompletionClient async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") agent1 = AssistantAgent("Assistant1", model_client=model_client) agent2 = AssistantAgent("Assistant2", model_client=model_client) termination = MaxMessageTermination(3) team = RoundRobinGroupChat([agent1, agent2], termination_condition=termination) cancellation_token = CancellationToken() # Create a task to run the team in the background. run_task = asyncio.create_task( Console( team.run_stream( task="Count from 1 to 10, respond one at a time.", cancellation_token=cancellation_token, ) ) ) # Wait for 1 second and then cancel the task. await asyncio.sleep(1) cancellation_token.cancel() # This will raise a cancellation error. await run_task asyncio.run(main()) """# Create the messages list if the task is a string or a chat message.messages:List[ChatMessage]|None=NoneiftaskisNone:passelifisinstance(task,str):messages=[TextMessage(content=task,source="user")]elifisinstance(task,BaseChatMessage):messages=[task]else:ifnottask:raiseValueError("Task list cannot be empty.")messages=[]formsgintask:ifnotisinstance(msg,BaseChatMessage):raiseValueError("All messages in task list must be valid ChatMessage types")messages.append(msg)ifself._is_running:raiseValueError("The team is already running, it cannot run again until it is stopped.")self._is_running=True# Start the runtime.# TODO: The runtime should be started by a managed context.self._runtime.start()ifnotself._initialized:awaitself._init(self._runtime)# Start a coroutine to stop the runtime and signal the output message queue is complete.asyncdefstop_runtime()->None:awaitself._runtime.stop_when_idle()awaitself._output_message_queue.put(None)shutdown_task=asyncio.create_task(stop_runtime())try:# Run the team by sending the start message to the group chat manager.# The group chat manager will start the group chat by relaying the message to the participants# and the closure agent.awaitself._runtime.send_message(GroupChatStart(messages=messages),recipient=AgentId(type=self._group_chat_manager_topic_type,key=self._team_id),cancellation_token=cancellation_token,)# Collect the output messages in order.output_messages:List[AgentEvent|ChatMessage]=[]# Yield the messsages until the queue is empty.whileTrue:message_future=asyncio.ensure_future(self._output_message_queue.get())ifcancellation_tokenisnotNone:cancellation_token.link_future(message_future)# Wait for the next message, this will raise an exception if the task is cancelled.message=awaitmessage_futureifmessageisNone:breakyieldmessageoutput_messages.append(message)# Yield the final result.yieldTaskResult(messages=output_messages,stop_reason=self._stop_reason)finally:# Wait for the shutdown task to finish.awaitshutdown_task# Clear the output message queue.whilenotself._output_message_queue.empty():self._output_message_queue.get_nowait()# Indicate that the team is no longer running.self._is_running=False
[docs]asyncdefreset(self)->None:"""Reset the team and its participants to their initial state. The team must be stopped before it can be reset. Raises: RuntimeError: If the team has not been initialized or is currently running. Example using the :class:`~autogen_agentchat.teams.RoundRobinGroupChat` team: .. code-block:: python import asyncio from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.conditions import MaxMessageTermination from autogen_agentchat.teams import RoundRobinGroupChat from autogen_ext.models.openai import OpenAIChatCompletionClient async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") agent1 = AssistantAgent("Assistant1", model_client=model_client) agent2 = AssistantAgent("Assistant2", model_client=model_client) termination = MaxMessageTermination(3) team = RoundRobinGroupChat([agent1, agent2], termination_condition=termination) stream = team.run_stream(task="Count from 1 to 10, respond one at a time.") async for message in stream: print(message) # Reset the team. await team.reset() stream = team.run_stream(task="Count from 1 to 10, respond one at a time.") async for message in stream: print(message) asyncio.run(main()) """ifnotself._initialized:raiseRuntimeError("The group chat has not been initialized. It must be run before it can be reset.")ifself._is_running:raiseRuntimeError("The group chat is currently running. It must be stopped before it can be reset.")self._is_running=True# Start the runtime.self._runtime.start()try:# Send a reset messages to all participants.forparticipant_topic_typeinself._participant_topic_types:awaitself._runtime.send_message(GroupChatReset(),recipient=AgentId(type=participant_topic_type,key=self._team_id),)# Send a reset message to the group chat manager.awaitself._runtime.send_message(GroupChatReset(),recipient=AgentId(type=self._group_chat_manager_topic_type,key=self._team_id),)finally:# Stop the runtime.awaitself._runtime.stop_when_idle()# Reset the output message queue.self._stop_reason=Nonewhilenotself._output_message_queue.empty():self._output_message_queue.get_nowait()# Indicate that the team is no longer running.self._is_running=False
[docs]asyncdefsave_state(self)->Mapping[str,Any]:"""Save the state of the group chat team."""ifnotself._initialized:raiseRuntimeError("The group chat has not been initialized. It must be run before it can be saved.")ifself._is_running:raiseRuntimeError("The team cannot be saved while it is running.")self._is_running=Truetry:# Save the state of the runtime. This will save the state of the participants and the group chat manager.agent_states=awaitself._runtime.save_state()returnTeamState(agent_states=agent_states,team_id=self._team_id).model_dump()finally:# Indicate that the team is no longer running.self._is_running=False
[docs]asyncdefload_state(self,state:Mapping[str,Any])->None:"""Load the state of the group chat team."""ifnotself._initialized:awaitself._init(self._runtime)ifself._is_running:raiseRuntimeError("The team cannot be loaded while it is running.")self._is_running=Truetry:# Load the state of the runtime. This will load the state of the participants and the group chat manager.team_state=TeamState.model_validate(state)self._team_id=team_state.team_idawaitself._runtime.load_state(team_state.agent_states)finally:# Indicate that the team is no longer running.self._is_running=False