Topic and Subscription Example Scenarios#
Introduction#
In this cookbook, we explore how broadcasting works for agent communication in AutoGen using four different broadcasting scenarios. These scenarios illustrate various ways to handle and distribute messages among agents. We’ll use a consistent example of a tax management company processing client requests to demonstrate each scenario.
Scenario Overview#
Imagine a tax management company that offers various services to clients, such as tax planning, dispute resolution, compliance, and preparation. The company employs a team of tax specialists, each with expertise in one of these areas, and a tax system manager who oversees the operations.
Clients submit requests that need to be processed by the appropriate specialists. The communication between the clients, the tax system manager, and the tax specialists is handled through broadcasting in this system.
We’ll explore how different broadcasting scenarios affect the way messages are distributed among agents and how they can be used to tailor the communication flow to specific needs.
Broadcasting Scenarios Overview#
We will cover the following broadcasting scenarios:
Single-Tenant, Single Scope of Publishing
Multi-Tenant, Single Scope of Publishing
Single-Tenant, Multiple Scopes of Publishing
Multi-Tenant, Multiple Scopes of Publishing
Each scenario represents a different approach to message distribution and agent interaction within the system. By understanding these scenarios, you can design agent communication strategies that best fit your application’s requirements.
import asyncio
from dataclasses import dataclass
from enum import Enum
from typing import List
from autogen_core.application import SingleThreadedAgentRuntime
from autogen_core.base import MessageContext, TopicId
from autogen_core.components import RoutedAgent, TypeSubscription, message_handler
from autogen_core.components._default_subscription import DefaultSubscription
from autogen_core.components._default_topic import DefaultTopicId
from autogen_core.components.models import (
SystemMessage,
)
class TaxSpecialty(str, Enum):
PLANNING = "planning"
DISPUTE_RESOLUTION = "dispute_resolution"
COMPLIANCE = "compliance"
PREPARATION = "preparation"
@dataclass
class ClientRequest:
content: str
@dataclass
class RequestAssessment:
content: str
class TaxSpecialist(RoutedAgent):
def __init__(
self,
description: str,
specialty: TaxSpecialty,
system_messages: List[SystemMessage],
) -> None:
super().__init__(description)
self.specialty = specialty
self._system_messages = system_messages
self._memory: List[ClientRequest] = []
@message_handler
async def handle_message(self, message: ClientRequest, ctx: MessageContext) -> None:
# Process the client request.
print(f"\n{'='*50}\nTax specialist {self.id} with specialty {self.specialty}:\n{message.content}")
# Send a response back to the manager
if ctx.topic_id is None:
raise ValueError("Topic ID is required for broadcasting")
await self.publish_message(
message=RequestAssessment(content=f"I can handle this request in {self.specialty}."),
topic_id=ctx.topic_id,
)
1. Single-Tenant, Single Scope of Publishing#
Scenarios Explanation#
In the single-tenant, single scope of publishing scenario:
All agents operate within a single tenant (e.g., one client or user session).
Messages are published to a single topic, and all agents subscribe to this topic.
Every agent receives every message that gets published to the topic.
This scenario is suitable for situations where all agents need to be aware of all messages, and there’s no need to isolate communication between different groups of agents or sessions.
Application in the Tax Specialist Company#
In our tax specialist company, this scenario implies:
All tax specialists receive every client request and internal message.
All agents collaborate closely, with full visibility of all communications.
Useful for tasks or teams where all agents need to be aware of all messages.
How the Scenario Works#
Subscriptions: All agents use the default subscription(e.g., “default”).
Publishing: Messages are published to the default topic.
Message Handling: Each agent decides whether to act on a message based on its content and available handlers.
Benefits#
Simplicity: Easy to set up and understand.
Collaboration: Promotes transparency and collaboration among agents.
Flexibility: Agents can dynamically decide which messages to process.
Considerations#
Scalability: May not scale well with a large number of agents or messages.
Efficiency: Agents may receive many irrelevant messages, leading to unnecessary processing.
async def run_single_tenant_single_scope() -> None:
# Create the runtime.
runtime = SingleThreadedAgentRuntime()
# Register TaxSpecialist agents for each specialty
specialist_agent_type_1 = "TaxSpecialist_1"
specialist_agent_type_2 = "TaxSpecialist_2"
await TaxSpecialist.register(
runtime=runtime,
type=specialist_agent_type_1,
factory=lambda: TaxSpecialist(
description="A tax specialist 1",
specialty=TaxSpecialty.PLANNING,
system_messages=[SystemMessage("You are a tax specialist.")],
),
)
await TaxSpecialist.register(
runtime=runtime,
type=specialist_agent_type_2,
factory=lambda: TaxSpecialist(
description="A tax specialist 2",
specialty=TaxSpecialty.DISPUTE_RESOLUTION,
system_messages=[SystemMessage("You are a tax specialist.")],
),
)
# Add default subscriptions for each agent type
await runtime.add_subscription(DefaultSubscription(agent_type=specialist_agent_type_1))
await runtime.add_subscription(DefaultSubscription(agent_type=specialist_agent_type_2))
# Start the runtime and send a message to agents on default topic
runtime.start()
await runtime.publish_message(ClientRequest("I need to have my tax for 2024 prepared."), topic_id=DefaultTopicId())
await runtime.stop_when_idle()
await run_single_tenant_single_scope()
==================================================
Tax specialist TaxSpecialist_1:default with specialty TaxSpecialty.PLANNING:
I need to have my tax for 2024 prepared.
==================================================
Tax specialist TaxSpecialist_2:default with specialty TaxSpecialty.DISPUTE_RESOLUTION:
I need to have my tax for 2024 prepared.
2. Multi-Tenant, Single Scope of Publishing#
Scenario Explanation#
In the multi-tenant, single scope of publishing scenario:
There are multiple tenants (e.g., multiple clients or user sessions).
Each tenant has its own isolated topic through the topic source.
All agents within a tenant subscribe to the tenant’s topic. If needed, new agent instances are created for each tenant.
Messages are only visible to agents within the same tenant.
This scenario is useful when you need to isolate communication between different tenants but want all agents within a tenant to be aware of all messages.
Application in the Tax Specialist Company#
In this scenario:
The company serves multiple clients (tenants) simultaneously.
For each client, a dedicated set of agent instances is created.
Each client’s communication is isolated from others.
All agents for a client receive messages published to that client’s topic.
How the Scenario Works#
Subscriptions: Agents subscribe to topics based on the tenant’s identity.
Publishing: Messages are published to the tenant-specific topic.
Message Handling: Agents only receive messages relevant to their tenant.
Benefits#
Tenant Isolation: Ensures data privacy and separation between clients.
Collaboration Within Tenant: Agents can collaborate freely within their tenant.
Considerations#
Complexity: Requires managing multiple sets of agents and topics.
Resource Usage: More agent instances may consume additional resources.
async def run_multi_tenant_single_scope() -> None:
# Create the runtime
runtime = SingleThreadedAgentRuntime()
# List of clients (tenants)
tenants = ["ClientABC", "ClientXYZ"]
# Initialize sessions and map the topic type to each TaxSpecialist agent type
for specialty in TaxSpecialty:
specialist_agent_type = f"TaxSpecialist_{specialty.value}"
await TaxSpecialist.register(
runtime=runtime,
type=specialist_agent_type,
factory=lambda specialty=specialty: TaxSpecialist( # type: ignore
description=f"A tax specialist in {specialty.value}.",
specialty=specialty,
system_messages=[SystemMessage(f"You are a tax specialist in {specialty.value}.")],
),
)
specialist_subscription = DefaultSubscription(agent_type=specialist_agent_type)
await runtime.add_subscription(specialist_subscription)
# Start the runtime
runtime.start()
# Publish client requests to their respective topics
for tenant in tenants:
topic_source = tenant # The topic source is the client name
topic_id = DefaultTopicId(source=topic_source)
await runtime.publish_message(
ClientRequest(f"{tenant} requires tax services."),
topic_id=topic_id,
)
# Allow time for message processing
await asyncio.sleep(1)
# Stop the runtime when idle
await runtime.stop_when_idle()
await run_multi_tenant_single_scope()
==================================================
Tax specialist TaxSpecialist_planning:ClientABC with specialty TaxSpecialty.PLANNING:
ClientABC requires tax services.
==================================================
Tax specialist TaxSpecialist_dispute_resolution:ClientABC with specialty TaxSpecialty.DISPUTE_RESOLUTION:
ClientABC requires tax services.
==================================================
Tax specialist TaxSpecialist_compliance:ClientABC with specialty TaxSpecialty.COMPLIANCE:
ClientABC requires tax services.
==================================================
Tax specialist TaxSpecialist_preparation:ClientABC with specialty TaxSpecialty.PREPARATION:
ClientABC requires tax services.
==================================================
Tax specialist TaxSpecialist_planning:ClientXYZ with specialty TaxSpecialty.PLANNING:
ClientXYZ requires tax services.
==================================================
Tax specialist TaxSpecialist_dispute_resolution:ClientXYZ with specialty TaxSpecialty.DISPUTE_RESOLUTION:
ClientXYZ requires tax services.
==================================================
Tax specialist TaxSpecialist_compliance:ClientXYZ with specialty TaxSpecialty.COMPLIANCE:
ClientXYZ requires tax services.
==================================================
Tax specialist TaxSpecialist_preparation:ClientXYZ with specialty TaxSpecialty.PREPARATION:
ClientXYZ requires tax services.
3. Single-Tenant, Multiple Scopes of Publishing#
Scenario Explanation#
In the single-tenant, multiple scopes of publishing scenario:
All agents operate within a single tenant.
Messages are published to different topics.
Agents subscribe to specific topics relevant to their role or specialty.
Messages are directed to subsets of agents based on the topic.
This scenario allows for targeted communication within a tenant, enabling more granular control over message distribution.
Application in the Tax Management Company#
In this scenario:
The tax system manager communicates with specific specialists based on their specialties.
Different topics represent different specialties (e.g., “planning”, “compliance”).
Specialists subscribe only to the topic that matches their specialty.
The manager publishes messages to specific topics to reach the intended specialists.
How the Scenario Works#
Subscriptions: Agents subscribe to topics corresponding to their specialties.
Publishing: Messages are published to topics based on the intended recipients.
Message Handling: Only agents subscribed to a topic receive its messages.
Benefits#
Targeted Communication: Messages reach only the relevant agents.
Efficiency: Reduces unnecessary message processing by agents.
Considerations#
Setup Complexity: Requires careful management of topics and subscriptions.
Flexibility: Changes in communication scenarios may require updating subscriptions.
async def run_single_tenant_multiple_scope() -> None:
# Create the runtime
runtime = SingleThreadedAgentRuntime()
# Register TaxSpecialist agents for each specialty and add subscriptions
for specialty in TaxSpecialty:
specialist_agent_type = f"TaxSpecialist_{specialty.value}"
await TaxSpecialist.register(
runtime=runtime,
type=specialist_agent_type,
factory=lambda specialty=specialty: TaxSpecialist( # type: ignore
description=f"A tax specialist in {specialty.value}.",
specialty=specialty,
system_messages=[SystemMessage(f"You are a tax specialist in {specialty.value}.")],
),
)
specialist_subscription = TypeSubscription(topic_type=specialty.value, agent_type=specialist_agent_type)
await runtime.add_subscription(specialist_subscription)
# Start the runtime
runtime.start()
# Publish a ClientRequest to each specialist's topic
for specialty in TaxSpecialty:
topic_id = TopicId(type=specialty.value, source="default")
await runtime.publish_message(
ClientRequest(f"I need assistance with {specialty.value} taxes."),
topic_id=topic_id,
)
# Allow time for message processing
await asyncio.sleep(1)
# Stop the runtime when idle
await runtime.stop_when_idle()
await run_single_tenant_multiple_scope()
==================================================
Tax specialist TaxSpecialist_planning:default with specialty TaxSpecialty.PLANNING:
I need assistance with planning taxes.
==================================================
Tax specialist TaxSpecialist_dispute_resolution:default with specialty TaxSpecialty.DISPUTE_RESOLUTION:
I need assistance with dispute_resolution taxes.
==================================================
Tax specialist TaxSpecialist_compliance:default with specialty TaxSpecialty.COMPLIANCE:
I need assistance with compliance taxes.
==================================================
Tax specialist TaxSpecialist_preparation:default with specialty TaxSpecialty.PREPARATION:
I need assistance with preparation taxes.
4. Multi-Tenant, Multiple Scopes of Publishing#
Scenario Explanation#
In the multi-tenant, multiple scopes of publishing scenario:
There are multiple tenants, each with their own set of agents.
Messages are published to multiple topics within each tenant.
Agents subscribe to tenant-specific topics relevant to their role.
Combines tenant isolation with targeted communication.
This scenario provides the highest level of control over message distribution, suitable for complex systems with multiple clients and specialized communication needs.
Application in the Tax Management Company#
In this scenario:
The company serves multiple clients, each with dedicated agent instances.
Within each client, agents communicate using multiple topics based on specialties.
For example, Client A’s planning specialist subscribes to the “planning” topic with source “ClientA”.
The tax system manager for each client communicates with their specialists using tenant-specific topics.
How the Scenario Works#
Subscriptions: Agents subscribe to topics based on both tenant identity and specialty.
Publishing: Messages are published to tenant-specific and specialty-specific topics.
Message Handling: Only agents matching the tenant and topic receive messages.
Benefits#
Complete Isolation: Ensures both tenant and communication isolation.
Granular Control: Enables precise routing of messages to intended agents.
Considerations#
Complexity: Requires careful management of topics, tenants, and subscriptions.
Resource Usage: Increased number of agent instances and topics may impact resources.
async def run_multi_tenant_multiple_scope() -> None:
# Create the runtime
runtime = SingleThreadedAgentRuntime()
# Define TypeSubscriptions for each specialty and tenant
tenants = ["ClientABC", "ClientXYZ"]
# Initialize agents for all specialties and add type subscriptions
for specialty in TaxSpecialty:
specialist_agent_type = f"TaxSpecialist_{specialty.value}"
await TaxSpecialist.register(
runtime=runtime,
type=specialist_agent_type,
factory=lambda specialty=specialty: TaxSpecialist( # type: ignore
description=f"A tax specialist in {specialty.value}.",
specialty=specialty,
system_messages=[SystemMessage(f"You are a tax specialist in {specialty.value}.")],
),
)
for tenant in tenants:
specialist_subscription = TypeSubscription(
topic_type=f"{tenant}_{specialty.value}", agent_type=specialist_agent_type
)
await runtime.add_subscription(specialist_subscription)
# Start the runtime
runtime.start()
# Send messages for each tenant to each specialty
for tenant in tenants:
for specialty in TaxSpecialty:
topic_id = TopicId(type=f"{tenant}_{specialty.value}", source=tenant)
await runtime.publish_message(
ClientRequest(f"{tenant} needs assistance with {specialty.value} taxes."),
topic_id=topic_id,
)
# Allow time for message processing
await asyncio.sleep(1)
# Stop the runtime when idle
await runtime.stop_when_idle()
await run_multi_tenant_multiple_scope()
==================================================
Tax specialist TaxSpecialist_planning:ClientABC with specialty TaxSpecialty.PLANNING:
ClientABC needs assistance with planning taxes.
==================================================
Tax specialist TaxSpecialist_dispute_resolution:ClientABC with specialty TaxSpecialty.DISPUTE_RESOLUTION:
ClientABC needs assistance with dispute_resolution taxes.
==================================================
Tax specialist TaxSpecialist_compliance:ClientABC with specialty TaxSpecialty.COMPLIANCE:
ClientABC needs assistance with compliance taxes.
==================================================
Tax specialist TaxSpecialist_preparation:ClientABC with specialty TaxSpecialty.PREPARATION:
ClientABC needs assistance with preparation taxes.
==================================================
Tax specialist TaxSpecialist_planning:ClientXYZ with specialty TaxSpecialty.PLANNING:
ClientXYZ needs assistance with planning taxes.
==================================================
Tax specialist TaxSpecialist_dispute_resolution:ClientXYZ with specialty TaxSpecialty.DISPUTE_RESOLUTION:
ClientXYZ needs assistance with dispute_resolution taxes.
==================================================
Tax specialist TaxSpecialist_compliance:ClientXYZ with specialty TaxSpecialty.COMPLIANCE:
ClientXYZ needs assistance with compliance taxes.
==================================================
Tax specialist TaxSpecialist_preparation:ClientXYZ with specialty TaxSpecialty.PREPARATION:
ClientXYZ needs assistance with preparation taxes.