Extracting Results with an Agent#
When running a multi-agent system to solve some task, you may want to extract the result of the system once it has reached termination. This guide showcases one way to achieve this. Given that agent instances are not directly accessible from the outside, we will use an agent to publish the final result to an accessible location.
If you model your system to publish some FinalResult
type then you can create an agent whose sole job is to subscribe to this and make it available externally. For simple agents like this the ClosureAgent
is an option to reduce the amount of boilerplate code. This allows you to define a function that will be associated as the agent’s message handler. In this example, we’re going to use a queue shared between the agent and the external code to pass the result.
Note
When considering how to extract results from a multi-agent system, you must always consider the subscriptions of the agent and the topics they publish to. This is because the agent will only receive messages from topics it is subscribed to.
import asyncio
from dataclasses import dataclass
from autogen_core import (
ClosureAgent,
ClosureContext,
DefaultSubscription,
DefaultTopicId,
MessageContext,
SingleThreadedAgentRuntime,
)
Define a dataclass for the final result.
@dataclass
class FinalResult:
value: str
Create a queue to pass the result from the agent to the external code.
queue = asyncio.Queue[FinalResult]()
Create a function closure for outputting the final result to the queue.
The function must follow the signature
Callable[[AgentRuntime, AgentId, T, MessageContext], Awaitable[Any]]
where T
is the type of the message the agent will receive.
You can use union types to handle multiple message types.
async def output_result(_agent: ClosureContext, message: FinalResult, ctx: MessageContext) -> None:
await queue.put(message)
Let’s create a runtime and register a ClosureAgent
that will publish the final result to the queue.
runtime = SingleThreadedAgentRuntime()
await ClosureAgent.register_closure(
runtime, "output_result", output_result, subscriptions=lambda: [DefaultSubscription()]
)
AgentType(type='output_result')
We can simulate the collection of final results by publishing them directly to the runtime.
runtime.start()
await runtime.publish_message(FinalResult("Result 1"), DefaultTopicId())
await runtime.publish_message(FinalResult("Result 2"), DefaultTopicId())
await runtime.stop_when_idle()
We can take a look at the queue to see the final result.
while not queue.empty():
print((result := await queue.get()).value)
Result 1
Result 2