Extracting Results with an Agent#

When running a multi-agent system to solve some task, you may want to extract the result of the system once it has reached termination. This guide showcases one way to achieve this. Given that agent instances are not directly accessible from the outside, we will use an agent to publish the final result to an accessible location.

If you model your system to publish some FinalResult type then you can create an agent whose sole job is to subscribe to this and make it available externally. For simple agents like this the ClosureAgent is an option to reduce the amount of boilerplate code. This allows you to define a function that will be associated as the agent’s message handler. In this example, we’re going to use a queue shared between the agent and the external code to pass the result.

Note

When considering how to extract results from a multi-agent system, you must always consider the subscriptions of the agent and the topics they publish to. This is because the agent will only receive messages from topics it is subscribed to.

import asyncio
from dataclasses import dataclass

from autogen_core.application import SingleThreadedAgentRuntime
from autogen_core.base import AgentId, AgentRuntime, MessageContext
from autogen_core.components import ClosureAgent, DefaultSubscription, DefaultTopicId

Define a dataclass for the final result.

@dataclass
class FinalResult:
    value: str

Create a queue to pass the result from the agent to the external code.

queue = asyncio.Queue[FinalResult]()

Create a function closure for outputting the final result to the queue. The function must follow the signature Callable[[AgentRuntime, AgentId, T, MessageContext], Awaitable[Any]] where T is the type of the message the agent will receive. You can use union types to handle multiple message types.

async def output_result(_runtime: AgentRuntime, id: AgentId, message: FinalResult, ctx: MessageContext) -> None:
    await queue.put(message)

Let’s create a runtime and register a ClosureAgent that will publish the final result to the queue.

runtime = SingleThreadedAgentRuntime()
await ClosureAgent.register(runtime, "output_result", output_result, subscriptions=lambda: [DefaultSubscription()])
AgentType(type='output_result')

We can simulate the collection of final results by publishing them directly to the runtime.

runtime.start()
await runtime.publish_message(FinalResult("Result 1"), DefaultTopicId())
await runtime.publish_message(FinalResult("Result 2"), DefaultTopicId())
await runtime.stop_when_idle()

We can take a look at the queue to see the final result.

while not queue.empty():
    print((result := await queue.get()).value)
Result 1
Result 2