Orchestrator Contract¶
Orchestrators implement the agent execution loop strategy.
Purpose¶
Orchestrators control how agents execute: - Basic loops - Simple prompt → response → tool → response cycles - Streaming - Real-time response delivery - Event-driven - Complex multi-step workflows - Custom strategies - Domain-specific execution patterns
Key principle: The orchestrator is policy, not mechanism. Swap orchestrators to change agent behavior without modifying the kernel.
Protocol Definition¶
Source: amplifier_core/interfaces.py → class Orchestrator(Protocol)
class Orchestrator(Protocol):
async def execute(
self,
prompt: str,
context: ContextManager,
providers: dict[str, Provider],
tools: dict[str, Tool],
hooks: HookRegistry,
**kwargs: Any,
) -> str:
"""
Execute the agent loop with given prompt.
Args:
prompt: User input prompt
context: Context manager for conversation state
providers: Available LLM providers (keyed by name)
tools: Available tools (keyed by name)
hooks: Hook registry for lifecycle events
**kwargs: Additional kernel-injected arguments (see note below)
Returns:
Final response string
"""
...
coordinatorinjection: The kernel (session.py) passescoordinator=<ModuleCoordinator>via kwargs at runtime so orchestrators can process hook results and coordinate module interactions. Implementations may acceptcoordinatoras an explicit keyword argument or simply absorb it through**kwargs.
Execution Flow¶
A typical orchestrator implements this flow:
User Prompt
↓
Add to Context
↓
┌─────────────────────────────────────┐
│ LOOP until response has no tools │
│ │
│ 1. emit("provider:request") │
│ 2. provider.complete(messages) │
│ 3. emit("provider:response") │
│ 4. Add response to context │
│ │
│ If tool_calls: │
│ for each tool_call: │
│ 5. emit("tool:pre") │
│ 6. tool.execute(input) │
│ 7. emit("tool:post") │
│ 8. Add result to context │
│ │
│ Continue loop... │
└─────────────────────────────────────┘
↓
Return final text response
Entry Point Pattern¶
mount() Function¶
async def mount(coordinator: ModuleCoordinator, config: dict) -> Orchestrator | Callable | None:
"""
Initialize and return orchestrator instance.
Returns:
- Orchestrator instance
- Cleanup callable
- None for graceful degradation
"""
orchestrator = MyOrchestrator(config=config)
await coordinator.mount("session", orchestrator, name="orchestrator")
return orchestrator
pyproject.toml¶
Implementation Requirements¶
Event Emission¶
Orchestrators must emit lifecycle events for observability:
async def execute(self, prompt, context, providers, tools, hooks):
# Before LLM call
await hooks.emit("provider:request", {
"provider": provider_name,
"messages": messages,
"model": model_name
})
response = await provider.complete(request)
# After LLM call
await hooks.emit("provider:response", {
"provider": provider_name,
"response": response,
"usage": response.usage
})
# Before tool execution
await hooks.emit("tool:pre", {
"tool_name": tool_call.name,
"tool_input": tool_call.arguments
})
result = await tool.execute(tool_call.arguments)
# After tool execution
await hooks.emit("tool:post", {
"tool_name": tool_call.name,
"tool_input": tool_call.arguments,
"tool_result": result
})
# REQUIRED: At the end of execute(), emit orchestrator:complete
await hooks.emit("orchestrator:complete", {
"orchestrator": "my-orchestrator", # Your orchestrator name
"turn_count": iteration_count, # Number of LLM turns
"status": "success" # "success", "incomplete", or "cancelled"
})
Required: orchestrator:complete Event¶
All orchestrators MUST emit orchestrator:complete at the end of their execute() method. This event enables: - Session analytics and debugging - Hooks that trigger on turn completion (e.g., session naming) - Observability and monitoring
| Field | Type | Description |
|---|---|---|
orchestrator | string | Name of the orchestrator module |
turn_count | int | Number of LLM call iterations |
status | string | Exit status: "success", "incomplete", or "cancelled" |
Required: execution:start and execution:end Events¶
All orchestrators MUST emit execution:start and execution:end to mark the boundaries of every execute() invocation. These events are the primary observability signal used by the kernel for session lifecycle tracking, metrics, and tracing.
execution:startMUST be emitted at the very beginning ofexecute(), before any other workexecution:endMUST be emitted on ALL exit paths — normal completion, error, and cancellation
async def execute(self, prompt, context, providers, tools, hooks, **kwargs):
# REQUIRED: Emit at the very start of execute()
await hooks.emit("execution:start", {"prompt": prompt})
try:
# ... agent loop logic ...
# REQUIRED: Emit on successful completion
await hooks.emit("execution:end", {
"response": final_response,
"status": "completed"
})
return final_response
except CancelledError:
# REQUIRED: Emit on cancellation
await hooks.emit("execution:end", {
"response": "",
"status": "cancelled"
})
raise
except Exception:
# REQUIRED: Emit on error
await hooks.emit("execution:end", {
"response": "",
"status": "error"
})
raise
| Event | Field | Type | Description |
|---|---|---|---|
execution:start | prompt | string | The user prompt passed to execute() |
execution:end | response | string | Final response string (empty on error/cancellation) |
execution:end | status | string | "completed", "cancelled", or "error" |
Constants:
execution:startandexecution:endare defined inamplifier_core.events(Python) andamplifier_core::events(Rust). Use the constants rather than string literals.
Hook Processing¶
Handle HookResult actions:
# Before tool execution
pre_result = await hooks.emit("tool:pre", data)
if pre_result.action == "deny":
# Don't execute tool
return ToolResult(success=False, output=pre_result.reason)
if pre_result.action == "modify":
# Use modified data
data = pre_result.data
if pre_result.action == "inject_context":
# Add feedback to context
await context.add_message({
"role": pre_result.context_injection_role,
"content": pre_result.context_injection
})
if pre_result.action == "ask_user":
# Request approval (requires approval provider)
approved = await request_approval(pre_result)
if not approved:
return ToolResult(success=False, output="User denied")
Context Management¶
Manage conversation state:
# Add user message
await context.add_message({"role": "user", "content": prompt})
# Add assistant response
await context.add_message({"role": "assistant", "content": response.content})
# Add tool result
await context.add_message({
"role": "tool",
"tool_call_id": tool_call.id,
"content": result.output
})
# Get messages for LLM request (context handles compaction internally)
messages = await context.get_messages_for_request()
Provider Selection¶
Handle multiple providers:
# Get default or configured provider
provider_name = config.get("default_provider", list(providers.keys())[0])
provider = providers[provider_name]
# Or allow per-request provider selection
provider_name = request_options.get("provider", default_provider_name)
Configuration¶
Orchestrators receive configuration via Mount Plan:
session:
orchestrator: my-orchestrator
context: context-simple
# Orchestrator-specific config can be passed via providers/tools config
See MOUNT_PLAN_SPECIFICATION.md for full schema.
Observability¶
Orchestrators MUST register the custom events they emit via the observability.events contribution channel. This enables runtime introspection of which events are available and allows tooling, dashboards, and other modules to discover orchestrator-specific signals.
coordinator.register_contributor(
"observability.events",
"my-orchestrator",
lambda: [
"my-orchestrator:loop_started",
"my-orchestrator:loop_iteration",
"my-orchestrator:loop_completed"
]
)
Note: The standard
execution:start,execution:end, andorchestrator:completeevents are registered by the kernel and do not need to be re-registered. Only register events that are unique to your orchestrator module.
See CONTRIBUTION_CHANNELS.md for the pattern.
Canonical Example¶
Reference implementation: amplifier-module-loop-basic
Study this module for: - Complete execute() implementation - Event emission patterns - Hook result handling - Context management
Additional examples: - amplifier-module-loop-streaming - Real-time streaming - amplifier-module-loop-events - Event-driven patterns
Validation Checklist¶
Required¶
- Implements
execute(prompt, context, providers, tools, hooks, **kwargs) -> str -
mount()function with entry point in pyproject.toml - Emits
execution:startwith{prompt}at the very beginning ofexecute() - Emits
execution:endwith{response, status}on ALL exit paths (success, error, cancellation) - Emits standard events (provider:request/response, tool:pre/post)
- Emits
orchestrator:completeat the end of execute() - Handles HookResult actions appropriately
- Manages context (add messages, check compaction)
- Registers custom observability events via
coordinator.register_contributor("observability.events", ...)
Recommended¶
- Supports multiple providers
- Implements max iterations limit (prevent infinite loops)
- Handles provider errors gracefully
- Supports streaming via async generators
Testing¶
Use test utilities from amplifier_core/testing.py:
from amplifier_core.testing import (
MockCoordinator,
MockTool,
MockContextManager,
ScriptedOrchestrator,
EventRecorder
)
@pytest.mark.asyncio
async def test_orchestrator_basic():
orchestrator = MyOrchestrator(config={})
context = MockContextManager()
providers = {"test": MockProvider()}
tools = {"test_tool": MockTool()}
hooks = HookRegistry()
result = await orchestrator.execute(
prompt="Test prompt",
context=context,
providers=providers,
tools=tools,
hooks=hooks
)
assert isinstance(result, str)
assert len(context.messages) > 0
ScriptedOrchestrator for Testing¶
from amplifier_core.testing import ScriptedOrchestrator
# For testing components that use orchestrators
orchestrator = ScriptedOrchestrator(responses=["Response 1", "Response 2"])
result = await orchestrator.execute(...)
assert result == "Response 1"
Quick Validation Command¶
Related: README.md | CONTEXT_CONTRACT.md