Source code for autogen_agentchat.agents._society_of_mind_agent

from typing import Any, AsyncGenerator, List, Mapping, Sequence, Tuple

from autogen_core import CancellationToken
from autogen_core.models import ChatCompletionClient, LLMMessage, SystemMessage, UserMessage

from autogen_agentchat.base import Response
from autogen_agentchat.state import SocietyOfMindAgentState

from ..base import TaskResult, Team
from ..messages import (
    AgentEvent,
    BaseChatMessage,
    ChatMessage,
    TextMessage,
)
from ._base_chat_agent import BaseChatAgent


[docs] class SocietyOfMindAgent(BaseChatAgent): """An agent that uses an inner team of agents to generate responses. Each time the agent's :meth:`on_messages` or :meth:`on_messages_stream` method is called, it runs the inner team of agents and then uses the model client to generate a response based on the inner team's messages. Once the response is generated, the agent resets the inner team by calling :meth:`Team.reset`. Args: name (str): The name of the agent. team (Team): The team of agents to use. model_client (ChatCompletionClient): The model client to use for preparing responses. description (str, optional): The description of the agent. instruction (str, optional): The instruction to use when generating a response using the inner team's messages. Defaults to :attr:`DEFAULT_INSTRUCTION`. It assumes the role of 'system'. response_prompt (str, optional): The response prompt to use when generating a response using the inner team's messages. Defaults to :attr:`DEFAULT_RESPONSE_PROMPT`. It assumes the role of 'system'. Example: .. code-block:: python import asyncio from autogen_agentchat.ui import Console from autogen_agentchat.agents import AssistantAgent, SocietyOfMindAgent from autogen_ext.models.openai import OpenAIChatCompletionClient from autogen_agentchat.teams import RoundRobinGroupChat from autogen_agentchat.conditions import TextMentionTermination async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") agent1 = AssistantAgent("assistant1", model_client=model_client, system_message="You are a writer, write well.") agent2 = AssistantAgent( "assistant2", model_client=model_client, system_message="You are an editor, provide critical feedback. Respond with 'APPROVE' if the text addresses all feedbacks.", ) inner_termination = TextMentionTermination("APPROVE") inner_team = RoundRobinGroupChat([agent1, agent2], termination_condition=inner_termination) society_of_mind_agent = SocietyOfMindAgent("society_of_mind", team=inner_team, model_client=model_client) agent3 = AssistantAgent( "assistant3", model_client=model_client, system_message="Translate the text to Spanish." ) team = RoundRobinGroupChat([society_of_mind_agent, agent3], max_turns=2) stream = team.run_stream(task="Write a short story with a surprising ending.") await Console(stream) asyncio.run(main()) """ DEFAULT_INSTRUCTION = "Earlier you were asked to fulfill a request. You and your team worked diligently to address that request. Here is a transcript of that conversation:" """str: The default instruction to use when generating a response using the inner team's messages. The instruction will be prepended to the inner team's messages when generating a response using the model. It assumes the role of 'system'.""" DEFAULT_RESPONSE_PROMPT = ( "Output a standalone response to the original request, without mentioning any of the intermediate discussion." ) """str: The default response prompt to use when generating a response using the inner team's messages. It assumes the role of 'system'.""" def __init__( self, name: str, team: Team, model_client: ChatCompletionClient, *, description: str = "An agent that uses an inner team of agents to generate responses.", instruction: str = DEFAULT_INSTRUCTION, response_prompt: str = DEFAULT_RESPONSE_PROMPT, ) -> None: super().__init__(name=name, description=description) self._team = team self._model_client = model_client self._instruction = instruction self._response_prompt = response_prompt @property def produced_message_types(self) -> Tuple[type[ChatMessage], ...]: return (TextMessage,)
[docs] async def on_messages(self, messages: Sequence[ChatMessage], cancellation_token: CancellationToken) -> Response: # Call the stream method and collect the messages. response: Response | None = None async for msg in self.on_messages_stream(messages, cancellation_token): if isinstance(msg, Response): response = msg assert response is not None return response
[docs] async def on_messages_stream( self, messages: Sequence[ChatMessage], cancellation_token: CancellationToken ) -> AsyncGenerator[AgentEvent | ChatMessage | Response, None]: # Prepare the task for the team of agents. task = list(messages) # Run the team of agents. result: TaskResult | None = None inner_messages: List[AgentEvent | ChatMessage] = [] count = 0 async for inner_msg in self._team.run_stream(task=task, cancellation_token=cancellation_token): if isinstance(inner_msg, TaskResult): result = inner_msg else: count += 1 if count <= len(task): # Skip the task messages. continue yield inner_msg inner_messages.append(inner_msg) assert result is not None if len(inner_messages) == 0: yield Response( chat_message=TextMessage(source=self.name, content="No response."), inner_messages=inner_messages ) else: # Generate a response using the model client. llm_messages: List[LLMMessage] = [SystemMessage(content=self._instruction)] llm_messages.extend( [ UserMessage(content=message.content, source=message.source) for message in inner_messages if isinstance(message, BaseChatMessage) ] ) llm_messages.append(SystemMessage(content=self._response_prompt)) completion = await self._model_client.create(messages=llm_messages, cancellation_token=cancellation_token) assert isinstance(completion.content, str) yield Response( chat_message=TextMessage(source=self.name, content=completion.content, models_usage=completion.usage), inner_messages=inner_messages, ) # Reset the team. await self._team.reset()
[docs] async def on_reset(self, cancellation_token: CancellationToken) -> None: await self._team.reset()
[docs] async def save_state(self) -> Mapping[str, Any]: team_state = await self._team.save_state() state = SocietyOfMindAgentState(inner_team_state=team_state) return state.model_dump()
[docs] async def load_state(self, state: Mapping[str, Any]) -> None: society_of_mind_state = SocietyOfMindAgentState.model_validate(state) await self._team.load_state(society_of_mind_state.inner_team_state)