Skip to content

Orchestrator Contract

Orchestrators implement the agent execution loop strategy.


Purpose

Orchestrators control how agents execute: - Basic loops - Simple prompt → response → tool → response cycles - Streaming - Real-time response delivery - Event-driven - Complex multi-step workflows - Custom strategies - Domain-specific execution patterns

Key principle: The orchestrator is policy, not mechanism. Swap orchestrators to change agent behavior without modifying the kernel.


Protocol Definition

Source: amplifier_core/interfaces.pyclass Orchestrator(Protocol)

class Orchestrator(Protocol):
    async def execute(
        self,
        prompt: str,
        context: ContextManager,
        providers: dict[str, Provider],
        tools: dict[str, Tool],
        hooks: HookRegistry,
        **kwargs: Any,
    ) -> str:
        """
        Execute the agent loop with given prompt.

        Args:
            prompt: User input prompt
            context: Context manager for conversation state
            providers: Available LLM providers (keyed by name)
            tools: Available tools (keyed by name)
            hooks: Hook registry for lifecycle events
            **kwargs: Additional kernel-injected arguments (see note below)

        Returns:
            Final response string
        """
        ...

coordinator injection: The kernel (session.py) passes coordinator=<ModuleCoordinator> via kwargs at runtime so orchestrators can process hook results and coordinate module interactions. Implementations may accept coordinator as an explicit keyword argument or simply absorb it through **kwargs.


Execution Flow

A typical orchestrator implements this flow:

User Prompt
Add to Context
┌─────────────────────────────────────┐
│  LOOP until response has no tools   │
│                                     │
│  1. emit("provider:request")        │
│  2. provider.complete(messages)     │
│  3. emit("provider:response")       │
│  4. Add response to context         │
│                                     │
│  If tool_calls:                     │
│    for each tool_call:              │
│      5. emit("tool:pre")            │
│      6. tool.execute(input)         │
│      7. emit("tool:post")           │
│      8. Add result to context       │
│                                     │
│  Continue loop...                   │
└─────────────────────────────────────┘
Return final text response

Entry Point Pattern

mount() Function

async def mount(coordinator: ModuleCoordinator, config: dict) -> Orchestrator | Callable | None:
    """
    Initialize and return orchestrator instance.

    Returns:
        - Orchestrator instance
        - Cleanup callable
        - None for graceful degradation
    """
    orchestrator = MyOrchestrator(config=config)
    await coordinator.mount("session", orchestrator, name="orchestrator")
    return orchestrator

pyproject.toml

[project.entry-points."amplifier.modules"]
my-orchestrator = "my_orchestrator:mount"

Implementation Requirements

Event Emission

Orchestrators must emit lifecycle events for observability:

async def execute(self, prompt, context, providers, tools, hooks):
    # Before LLM call
    await hooks.emit("provider:request", {
        "provider": provider_name,
        "messages": messages,
        "model": model_name
    })

    response = await provider.complete(request)

    # After LLM call
    await hooks.emit("provider:response", {
        "provider": provider_name,
        "response": response,
        "usage": response.usage
    })

    # Before tool execution
    await hooks.emit("tool:pre", {
        "tool_name": tool_call.name,
        "tool_input": tool_call.arguments
    })

    result = await tool.execute(tool_call.arguments)

    # After tool execution
    await hooks.emit("tool:post", {
        "tool_name": tool_call.name,
        "tool_input": tool_call.arguments,
        "tool_result": result
    })

    # REQUIRED: At the end of execute(), emit orchestrator:complete
    await hooks.emit("orchestrator:complete", {
        "orchestrator": "my-orchestrator",  # Your orchestrator name
        "turn_count": iteration_count,       # Number of LLM turns
        "status": "success"                  # "success", "incomplete", or "cancelled"
    })

Required: orchestrator:complete Event

All orchestrators MUST emit orchestrator:complete at the end of their execute() method. This event enables: - Session analytics and debugging - Hooks that trigger on turn completion (e.g., session naming) - Observability and monitoring

Field Type Description
orchestrator string Name of the orchestrator module
turn_count int Number of LLM call iterations
status string Exit status: "success", "incomplete", or "cancelled"

Required: execution:start and execution:end Events

All orchestrators MUST emit execution:start and execution:end to mark the boundaries of every execute() invocation. These events are the primary observability signal used by the kernel for session lifecycle tracking, metrics, and tracing.

  • execution:start MUST be emitted at the very beginning of execute(), before any other work
  • execution:end MUST be emitted on ALL exit paths — normal completion, error, and cancellation
async def execute(self, prompt, context, providers, tools, hooks, **kwargs):
    # REQUIRED: Emit at the very start of execute()
    await hooks.emit("execution:start", {"prompt": prompt})

    try:
        # ... agent loop logic ...

        # REQUIRED: Emit on successful completion
        await hooks.emit("execution:end", {
            "response": final_response,
            "status": "completed"
        })
        return final_response

    except CancelledError:
        # REQUIRED: Emit on cancellation
        await hooks.emit("execution:end", {
            "response": "",
            "status": "cancelled"
        })
        raise

    except Exception:
        # REQUIRED: Emit on error
        await hooks.emit("execution:end", {
            "response": "",
            "status": "error"
        })
        raise
Event Field Type Description
execution:start prompt string The user prompt passed to execute()
execution:end response string Final response string (empty on error/cancellation)
execution:end status string "completed", "cancelled", or "error"

Constants: execution:start and execution:end are defined in amplifier_core.events (Python) and amplifier_core::events (Rust). Use the constants rather than string literals.

Hook Processing

Handle HookResult actions:

# Before tool execution
pre_result = await hooks.emit("tool:pre", data)

if pre_result.action == "deny":
    # Don't execute tool
    return ToolResult(success=False, output=pre_result.reason)

if pre_result.action == "modify":
    # Use modified data
    data = pre_result.data

if pre_result.action == "inject_context":
    # Add feedback to context
    await context.add_message({
        "role": pre_result.context_injection_role,
        "content": pre_result.context_injection
    })

if pre_result.action == "ask_user":
    # Request approval (requires approval provider)
    approved = await request_approval(pre_result)
    if not approved:
        return ToolResult(success=False, output="User denied")

Context Management

Manage conversation state:

# Add user message
await context.add_message({"role": "user", "content": prompt})

# Add assistant response
await context.add_message({"role": "assistant", "content": response.content})

# Add tool result
await context.add_message({
    "role": "tool",
    "tool_call_id": tool_call.id,
    "content": result.output
})

# Get messages for LLM request (context handles compaction internally)
messages = await context.get_messages_for_request()

Provider Selection

Handle multiple providers:

# Get default or configured provider
provider_name = config.get("default_provider", list(providers.keys())[0])
provider = providers[provider_name]

# Or allow per-request provider selection
provider_name = request_options.get("provider", default_provider_name)

Configuration

Orchestrators receive configuration via Mount Plan:

session:
  orchestrator: my-orchestrator
  context: context-simple

# Orchestrator-specific config can be passed via providers/tools config

See MOUNT_PLAN_SPECIFICATION.md for full schema.


Observability

Orchestrators MUST register the custom events they emit via the observability.events contribution channel. This enables runtime introspection of which events are available and allows tooling, dashboards, and other modules to discover orchestrator-specific signals.

coordinator.register_contributor(
    "observability.events",
    "my-orchestrator",
    lambda: [
        "my-orchestrator:loop_started",
        "my-orchestrator:loop_iteration",
        "my-orchestrator:loop_completed"
    ]
)

Note: The standard execution:start, execution:end, and orchestrator:complete events are registered by the kernel and do not need to be re-registered. Only register events that are unique to your orchestrator module.

See CONTRIBUTION_CHANNELS.md for the pattern.


Canonical Example

Reference implementation: amplifier-module-loop-basic

Study this module for: - Complete execute() implementation - Event emission patterns - Hook result handling - Context management

Additional examples: - amplifier-module-loop-streaming - Real-time streaming - amplifier-module-loop-events - Event-driven patterns


Validation Checklist

Required

  • Implements execute(prompt, context, providers, tools, hooks, **kwargs) -> str
  • mount() function with entry point in pyproject.toml
  • Emits execution:start with {prompt} at the very beginning of execute()
  • Emits execution:end with {response, status} on ALL exit paths (success, error, cancellation)
  • Emits standard events (provider:request/response, tool:pre/post)
  • Emits orchestrator:complete at the end of execute()
  • Handles HookResult actions appropriately
  • Manages context (add messages, check compaction)
  • Registers custom observability events via coordinator.register_contributor("observability.events", ...)
  • Supports multiple providers
  • Implements max iterations limit (prevent infinite loops)
  • Handles provider errors gracefully
  • Supports streaming via async generators

Testing

Use test utilities from amplifier_core/testing.py:

from amplifier_core.testing import (
    MockCoordinator,
    MockTool,
    MockContextManager,
    ScriptedOrchestrator,
    EventRecorder
)

@pytest.mark.asyncio
async def test_orchestrator_basic():
    orchestrator = MyOrchestrator(config={})
    context = MockContextManager()
    providers = {"test": MockProvider()}
    tools = {"test_tool": MockTool()}
    hooks = HookRegistry()

    result = await orchestrator.execute(
        prompt="Test prompt",
        context=context,
        providers=providers,
        tools=tools,
        hooks=hooks
    )

    assert isinstance(result, str)
    assert len(context.messages) > 0

ScriptedOrchestrator for Testing

from amplifier_core.testing import ScriptedOrchestrator

# For testing components that use orchestrators
orchestrator = ScriptedOrchestrator(responses=["Response 1", "Response 2"])

result = await orchestrator.execute(...)
assert result == "Response 1"

Quick Validation Command

# Structural validation
amplifier module validate ./my-orchestrator --type orchestrator

Related: README.md | CONTEXT_CONTRACT.md