{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Sequential Workflow\n", "\n", "Sequential Workflow is a multi-agent design pattern where agents respond in a deterministic sequence. Each agent in the workflow performs a specific task by processing a message, generating a response, and then passing it to the next agent. This pattern is useful for creating deterministic workflows where each agent contributes to a pre-specified sub-task." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In this example, we demonstrate a sequential workflow where multiple agents collaborate to transform a basic product description into a polished marketing copy.\n", "\n", "The pipeline consists of four specialized agents:\n", "- **Concept Extractor Agent**: Analyzes the initial product description to extract key features, target audience, and unique selling points (USPs). The output is a structured analysis in a single text block.\n", "- **Writer Agent**: Crafts compelling marketing copy based on the extracted concepts. This agent transforms the analytical insights into engaging promotional content, delivering a cohesive narrative in a single text block.\n", "- **Format & Proof Agent**: Polishes the draft copy by refining grammar, enhancing clarity, and maintaining consistent tone. This agent ensures professional quality and delivers a well-formatted final version.\n", "- **User Agent**: Presents the final, refined marketing copy to the user, completing the workflow." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The following diagram illustrates the sequential workflow in this example:\n", "\n", "![Sequential Workflow](sequential-workflow.svg)\n", "\n", "We will implement this workflow using publish-subscribe messaging.\n", "Please read about [Topic and Subscription](../core-concepts/topic-and-subscription.md) for the core concepts\n", "and [Broadcast Messaging](../framework/message-and-communication.ipynb#broadcast) for the the API usage.\n", "\n", "In this pipeline, agents communicate with each other by publishing their completed work as messages to the topic of the \n", "next agent in the sequence. For example, when the `ConceptExtractor` finishes analyzing the product description, it \n", "publishes its findings to the `\"WriterAgent\"` topic, which the `WriterAgent` is subscribed to. This pattern continues through \n", "each step of the pipeline, with each agent publishing to the topic that the next agent in line subscribed to." ] }, { "cell_type": "code", "execution_count": 20, "metadata": {}, "outputs": [], "source": [ "from dataclasses import dataclass\n", "\n", "from autogen_core import (\n", " MessageContext,\n", " RoutedAgent,\n", " SingleThreadedAgentRuntime,\n", " TopicId,\n", " TypeSubscription,\n", " message_handler,\n", " type_subscription,\n", ")\n", "from autogen_core.models import ChatCompletionClient, SystemMessage, UserMessage\n", "from autogen_ext.models.openai import OpenAIChatCompletionClient" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Message Protocol\n", "\n", "The message protocol for this example workflow is a simple text message that agents will use to relay their work." ] }, { "cell_type": "code", "execution_count": 21, "metadata": {}, "outputs": [], "source": [ "@dataclass\n", "class Message:\n", " content: str" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Topics\n", "\n", "Each agent in the workflow will be subscribed to a specific topic type. The topic types are named after the agents in the sequence,\n", "This allows each agent to publish its work to the next agent in the sequence." ] }, { "cell_type": "code", "execution_count": 22, "metadata": {}, "outputs": [], "source": [ "concept_extractor_topic_type = \"ConceptExtractorAgent\"\n", "writer_topic_type = \"WriterAgent\"\n", "format_proof_topic_type = \"FormatProofAgent\"\n", "user_topic_type = \"User\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Agents\n", "\n", "Each agent class is defined with a {py:class}`~autogen_core.type_subscription` decorator to specify the topic type it is subscribed to.\n", "Alternative to the decorator, you can also use the {py:meth}`~autogen_core.AgentRuntime.add_subscription` method to subscribe to a topic through runtime directly.\n", "\n", "The concept extractor agent comes up with the initial bullet points for the product description." ] }, { "cell_type": "code", "execution_count": 23, "metadata": {}, "outputs": [], "source": [ "@type_subscription(topic_type=concept_extractor_topic_type)\n", "class ConceptExtractorAgent(RoutedAgent):\n", " def __init__(self, model_client: ChatCompletionClient) -> None:\n", " super().__init__(\"A concept extractor agent.\")\n", " self._system_message = SystemMessage(\n", " content=(\n", " \"You are a marketing analyst. Given a product description, identify:\\n\"\n", " \"- Key features\\n\"\n", " \"- Target audience\\n\"\n", " \"- Unique selling points\\n\\n\"\n", " )\n", " )\n", " self._model_client = model_client\n", "\n", " @message_handler\n", " async def handle_user_description(self, message: Message, ctx: MessageContext) -> None:\n", " prompt = f\"Product description: {message.content}\"\n", " llm_result = await self._model_client.create(\n", " messages=[self._system_message, UserMessage(content=prompt, source=self.id.key)],\n", " cancellation_token=ctx.cancellation_token,\n", " )\n", " response = llm_result.content\n", " assert isinstance(response, str)\n", " print(f\"{'-'*80}\\n{self.id.type}:\\n{response}\")\n", "\n", " await self.publish_message(Message(response), topic_id=TopicId(writer_topic_type, source=self.id.key))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The writer agent performs writing." ] }, { "cell_type": "code", "execution_count": 24, "metadata": {}, "outputs": [], "source": [ "@type_subscription(topic_type=writer_topic_type)\n", "class WriterAgent(RoutedAgent):\n", " def __init__(self, model_client: ChatCompletionClient) -> None:\n", " super().__init__(\"A writer agent.\")\n", " self._system_message = SystemMessage(\n", " content=(\n", " \"You are a marketing copywriter. Given a block of text describing features, audience, and USPs, \"\n", " \"compose a compelling marketing copy (like a newsletter section) that highlights these points. \"\n", " \"Output should be short (around 150 words), output just the copy as a single text block.\"\n", " )\n", " )\n", " self._model_client = model_client\n", "\n", " @message_handler\n", " async def handle_intermediate_text(self, message: Message, ctx: MessageContext) -> None:\n", " prompt = f\"Below is the info about the product:\\n\\n{message.content}\"\n", "\n", " llm_result = await self._model_client.create(\n", " messages=[self._system_message, UserMessage(content=prompt, source=self.id.key)],\n", " cancellation_token=ctx.cancellation_token,\n", " )\n", " response = llm_result.content\n", " assert isinstance(response, str)\n", " print(f\"{'-'*80}\\n{self.id.type}:\\n{response}\")\n", "\n", " await self.publish_message(Message(response), topic_id=TopicId(format_proof_topic_type, source=self.id.key))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The format proof agent performs the formatting." ] }, { "cell_type": "code", "execution_count": 25, "metadata": {}, "outputs": [], "source": [ "@type_subscription(topic_type=format_proof_topic_type)\n", "class FormatProofAgent(RoutedAgent):\n", " def __init__(self, model_client: ChatCompletionClient) -> None:\n", " super().__init__(\"A format & proof agent.\")\n", " self._system_message = SystemMessage(\n", " content=(\n", " \"You are an editor. Given the draft copy, correct grammar, improve clarity, ensure consistent tone, \"\n", " \"give format and make it polished. Output the final improved copy as a single text block.\"\n", " )\n", " )\n", " self._model_client = model_client\n", "\n", " @message_handler\n", " async def handle_intermediate_text(self, message: Message, ctx: MessageContext) -> None:\n", " prompt = f\"Draft copy:\\n{message.content}.\"\n", " llm_result = await self._model_client.create(\n", " messages=[self._system_message, UserMessage(content=prompt, source=self.id.key)],\n", " cancellation_token=ctx.cancellation_token,\n", " )\n", " response = llm_result.content\n", " assert isinstance(response, str)\n", " print(f\"{'-'*80}\\n{self.id.type}:\\n{response}\")\n", "\n", " await self.publish_message(Message(response), topic_id=TopicId(user_topic_type, source=self.id.key))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In this example, the user agent simply prints the final marketing copy to the console.\n", "In a real-world application, this could be replaced by storing the result to a database, sending an email, or any other desired action." ] }, { "cell_type": "code", "execution_count": 26, "metadata": {}, "outputs": [], "source": [ "@type_subscription(topic_type=user_topic_type)\n", "class UserAgent(RoutedAgent):\n", " def __init__(self) -> None:\n", " super().__init__(\"A user agent that outputs the final copy to the user.\")\n", "\n", " @message_handler\n", " async def handle_final_copy(self, message: Message, ctx: MessageContext) -> None:\n", " print(f\"\\n{'-'*80}\\n{self.id.type} received final copy:\\n{message.content}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Workflow\n", "\n", "Now we can register the agents to the runtime.\n", "Because we used the {py:class}`~autogen_core.type_subscription` decorator, the runtime will automatically subscribe the agents to the correct topics." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "model_client = OpenAIChatCompletionClient(\n", " model=\"gpt-4o-mini\",\n", " # api_key=\"YOUR_API_KEY\"\n", ")\n", "\n", "runtime = SingleThreadedAgentRuntime()\n", "\n", "await ConceptExtractorAgent.register(\n", " runtime, type=concept_extractor_topic_type, factory=lambda: ConceptExtractorAgent(model_client=model_client)\n", ")\n", "\n", "await WriterAgent.register(runtime, type=writer_topic_type, factory=lambda: WriterAgent(model_client=model_client))\n", "\n", "await FormatProofAgent.register(\n", " runtime, type=format_proof_topic_type, factory=lambda: FormatProofAgent(model_client=model_client)\n", ")\n", "\n", "await UserAgent.register(runtime, type=user_topic_type, factory=lambda: UserAgent())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Run the Workflow\n", "\n", "Finally, we can run the workflow by publishing a message to the first agent in the sequence." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "--------------------------------------------------------------------------------\n", "ConceptExtractorAgent:\n", "**Key Features:**\n", "- Made from eco-friendly stainless steel\n", "- Can keep drinks cold for up to 24 hours\n", "- Durable and reusable design\n", "- Lightweight and portable\n", "- BPA-free and non-toxic materials\n", "- Sleek, modern aesthetic available in various colors\n", "\n", "**Target Audience:**\n", "- Environmentally conscious consumers\n", "- Health and fitness enthusiasts\n", "- Outdoor adventurers (hikers, campers, etc.)\n", "- Urban dwellers looking for sustainable alternatives\n", "- Individuals seeking stylish and functional drinkware\n", "\n", "**Unique Selling Points:**\n", "- Eco-friendly design minimizes plastic waste and supports sustainability\n", "- Superior insulation technology that maintains cold temperatures for a full day\n", "- Durable construction ensures long-lasting use, offering a great return on investment\n", "- Attractive design that caters to fashion-forward individuals \n", "- Versatile use for both everyday hydration and outdoor activities\n", "--------------------------------------------------------------------------------\n", "WriterAgent:\n", "🌍🌿 Stay Hydrated, Stay Sustainable! 🌿🌍 \n", "\n", "Introducing our eco-friendly stainless steel drinkware, the perfect companion for the environmentally conscious and style-savvy individuals. With superior insulation technology, our bottles keep your beverages cold for an impressive 24 hours—ideal for hiking, camping, or just tackling a busy day in the city. Made from lightweight, BPA-free materials, this durable and reusable design not only helps reduce plastic waste but also ensures you’re making a responsible choice for our planet.\n", "\n", "Available in a sleek, modern aesthetic with various colors to match your personality, this drinkware isn't just functional—it’s fashionable! Whether you’re hitting the trails or navigating urban life, equip yourself with a stylish hydration solution that supports your active and sustainable lifestyle. Join the movement today and make a positive impact without compromising on style! 🌟🥤\n", "--------------------------------------------------------------------------------\n", "FormatProofAgent:\n", "🌍🌿 Stay Hydrated, Stay Sustainable! 🌿🌍 \n", "\n", "Introducing our eco-friendly stainless steel drinkware—the perfect companion for environmentally conscious and style-savvy individuals. With superior insulation technology, our bottles keep your beverages cold for an impressive 24 hours, making them ideal for hiking, camping, or simply tackling a busy day in the city. Crafted from lightweight, BPA-free materials, this durable and reusable design not only helps reduce plastic waste but also ensures that you’re making a responsible choice for our planet.\n", "\n", "Our drinkware features a sleek, modern aesthetic available in a variety of colors to suit your personality. It’s not just functional; it’s also fashionable! Whether you’re exploring the trails or navigating urban life, equip yourself with a stylish hydration solution that supports your active and sustainable lifestyle. Join the movement today and make a positive impact without compromising on style! 🌟🥤\n", "\n", "--------------------------------------------------------------------------------\n", "User received final copy:\n", "🌍🌿 Stay Hydrated, Stay Sustainable! 🌿🌍 \n", "\n", "Introducing our eco-friendly stainless steel drinkware—the perfect companion for environmentally conscious and style-savvy individuals. With superior insulation technology, our bottles keep your beverages cold for an impressive 24 hours, making them ideal for hiking, camping, or simply tackling a busy day in the city. Crafted from lightweight, BPA-free materials, this durable and reusable design not only helps reduce plastic waste but also ensures that you’re making a responsible choice for our planet.\n", "\n", "Our drinkware features a sleek, modern aesthetic available in a variety of colors to suit your personality. It’s not just functional; it’s also fashionable! Whether you’re exploring the trails or navigating urban life, equip yourself with a stylish hydration solution that supports your active and sustainable lifestyle. Join the movement today and make a positive impact without compromising on style! 🌟🥤\n" ] } ], "source": [ "runtime.start()\n", "\n", "await runtime.publish_message(\n", " Message(content=\"An eco-friendly stainless steel water bottle that keeps drinks cold for 24 hours\"),\n", " topic_id=TopicId(concept_extractor_topic_type, source=\"default\"),\n", ")\n", "\n", "await runtime.stop_when_idle()" ] } ], "metadata": { "kernelspec": { "display_name": ".venv", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.11.5" } }, "nbformat": 4, "nbformat_minor": 2 }