Skip to content

Orchestrator Contract

Orchestrators control how agents execute: the loop structure, tool handling, and response generation.

Purpose

Orchestrators define the execution strategy:

  • Basic: Simple request/response loop
  • Streaming: Real-time token streaming
  • Events: Event-driven architecture

Protocol

from typing import Protocol, runtime_checkable

@runtime_checkable
class Orchestrator(Protocol):
    async def execute(
        self,
        prompt: str,
        context: ContextManager,
        providers: dict[str, Provider],
        tools: dict[str, Tool],
        hooks: HookRegistry,
    ) -> str:
        """Execute the orchestration loop."""
        ...

Mount Function

async def mount(coordinator, config=None):
    config = config or {}
    orchestrator = MyOrchestrator(config)
    await coordinator.mount("orchestrator", orchestrator)
    return cleanup_function  # or None

Execution Flow

Typical orchestrator flow:

1. Add user prompt to context
2. Loop:
   a. Get messages from context
   b. Emit provider:request
   c. Call provider.complete()
   d. Emit provider:response
   e. Add response to context
   f. Parse tool calls
   g. If no tools: return response
   h. For each tool:
      - Emit tool:pre
      - Execute tool
      - Emit tool:post
      - Add result to context
   i. Continue loop
3. Return final response

Events

Orchestrators should emit:

Event When Data
prompt:submit Received prompt prompt
provider:request Before LLM call messages, model
provider:response After LLM response response, usage
tool:pre Before tool execution tool_name, input
tool:post After tool execution tool_name, result
prompt:complete Final response response
orchestrator:complete Loop finished iterations, tokens

Configuration

Common configuration:

Option Type Description
max_iterations int Maximum loop iterations
parallel_tools bool Execute tools in parallel

Example Implementation

class BasicOrchestrator:
    def __init__(self, config):
        self.max_iterations = config.get("max_iterations", 10)

    async def execute(self, prompt, context, providers, tools, hooks):
        await context.add_message({"role": "user", "content": prompt})
        provider = list(providers.values())[0]

        for iteration in range(self.max_iterations):
            messages = await context.get_messages()

            await hooks.emit("provider:request", {
                "messages": messages,
                "iteration": iteration
            })

            response = await provider.complete({"messages": messages})

            await hooks.emit("provider:response", {
                "response": response
            })

            await context.add_message({
                "role": "assistant",
                "content": response["content"]
            })

            tool_calls = provider.parse_tool_calls(response)

            if not tool_calls:
                return self._extract_text(response)

            for call in tool_calls:
                tool = tools.get(call["name"])
                if tool:
                    await hooks.emit("tool:pre", {
                        "tool_name": call["name"],
                        "input": call["input"]
                    })

                    result = await tool.execute(call["input"])

                    await hooks.emit("tool:post", {
                        "tool_name": call["name"],
                        "result": result
                    })

                    await context.add_message({
                        "role": "tool",
                        "tool_call_id": call["id"],
                        "content": result.get("output", "")
                    })

        return "Max iterations reached"

    def _extract_text(self, response):
        for block in response.get("content", []):
            if block.get("type") == "text":
                return block.get("text", "")
        return ""

Authoritative Reference

Orchestrator Contract - Complete specification