Coordinator API¶
The ModuleCoordinator manages module lifecycle and provides access to loaded modules.
Source: amplifier_core/coordinator.py
ModuleCoordinator¶
The coordinator is created by AmplifierSession and provides infrastructure context to all modules including:
- Mount points for module attachment
- Infrastructure context (session ID, parent ID, config)
- Capability registry for inter-module communication
- Contribution channels for pull-based aggregation
- Event system with hook registry
Properties¶
| Property | Type | Description |
|---|---|---|
hooks | HookRegistry | Hook registry for event emission |
cancellation | CancellationToken | Cancellation token for graceful shutdown |
Methods¶
mount(mount_point, module, name=None)¶
Mount a module at a specific mount point.
await coordinator.mount("tools", my_tool, name="my-tool")
await coordinator.mount("providers", my_provider, name="anthropic")
await coordinator.mount("orchestrator", my_orchestrator) # Single module, no name needed
get_capability(name)¶
Query a registered capability.
events = coordinator.get_capability("observability.events") or []
working_dir = coordinator.get_capability("session.working_dir")
register_capability(name, value)¶
Register a capability that other modules can query.
register_contributor(channel, source, fn)¶
Register a contributor function for pull-based aggregation.
coordinator.register_contributor(
"observability.events",
"my-module",
lambda: ["my:event:1", "my:event:2"]
)
process_hook_result(result, event, source)¶
Process hook results and apply actions.
result = await coordinator.process_hook_result(
hook_result, "tool:pre", tool_name
)
if result.action == "deny":
return f"Operation denied: {result.reason}"
Capability Registry¶
The capability registry enables loose coupling between modules:
# Provider registers model info
coordinator.register_capability("provider.model_info", {
"context_window": 200_000,
"max_output_tokens": 8192
})
# Context manager queries it
model_info = coordinator.get_capability("provider.model_info")
if model_info:
budget = model_info["context_window"] - model_info["max_output_tokens"]
Contribution Channels¶
Contribution channels enable pull-based aggregation:
# Multiple modules register contributors
coordinator.register_contributor("observability.events", "mod-a",
lambda: ["event:a1", "event:a2"])
coordinator.register_contributor("observability.events", "mod-b",
lambda: ["event:b1"])
# Consumer pulls all contributions
all_events = coordinator.get_capability("observability.events")
# Result: ["event:a1", "event:a2", "event:b1"]