Skip to content

Module Development

This guide covers creating custom modules for Amplifier: providers, tools, hooks, orchestrators, and context managers.

Module Structure

Every module follows the same structure:

amplifier-module-{type}-{name}/
├── amplifier_module_{type}_{name}/
│   └── __init__.py         # Module code with mount() function
├── tests/
│   └── test_module.py      # Tests
├── pyproject.toml          # Package configuration with entry point
├── README.md               # Documentation
└── LICENSE

The Mount Function

Every module exposes a mount function:

from typing import Any, Callable

async def mount(
    coordinator: "ModuleCoordinator",
    config: dict
) -> Callable | None:
    """
    Mount the module.

    Args:
        coordinator: Infrastructure context from kernel
        config: Configuration from Mount Plan

    Returns:
        Optional cleanup function (async callable) or None for graceful degradation
    """
    config = config or {}

    # Create your module instance
    module = MyModule(config)

    # Mount to appropriate mount point
    await coordinator.mount("tools", module, name="my-tool")

    # Optional: return cleanup function
    async def cleanup():
        await module.close()

    return cleanup

Source of Truth

Protocols are in code, not docs:

  • Protocol definitions: amplifier_core/interfaces.py
  • Data models: amplifier_core/models.py
  • Message models: amplifier_core/message_models.py (Pydantic models for request/response envelopes)
  • Content models: amplifier_core/content_models.py (dataclass types for events and streaming)
  • Rust traits: crates/amplifier-core/src/traits.rs (Rust-side trait definitions)
  • Rust/Python type mapping: CONTRACTS.md (authoritative cross-boundary reference)

Always read the code docstrings first - they are authoritative.

Creating a Tool

Tools provide capabilities to agents.

Tool Contract

Protocol definition: amplifier_core/interfaces.pyclass Tool(Protocol)

from amplifier_core.interfaces import Tool
from amplifier_core.models import ToolResult
from typing import runtime_checkable, Protocol, Any

@runtime_checkable
class Tool(Protocol):
    @property
    def name(self) -> str:
        """Tool name for invocation."""
        ...

    @property
    def description(self) -> str:
        """Human-readable tool description."""
        ...

    @property
    def input_schema(self) -> dict[str, Any]:
        """JSON Schema describing the tool's input parameters.

        Returns an empty dict by default for backward compatibility
        with tools that predate this convention.
        """
        return {}

    async def execute(self, input: dict[str, Any]) -> ToolResult:
        """
        Execute tool with given input.

        Args:
            input: Tool-specific input parameters

        Returns:
            Tool execution result
        """
        ...

Note: input_schema has a concrete default (return {}) and is excluded from isinstance() structural checks so that tools written before this field was introduced continue to satisfy the protocol without modification. Callers that need the schema should always use getattr(tool, "input_schema", {}) for maximum compatibility.

Data models: - ToolCall - Input model from amplifier_core/message_models.py - ToolResult - Output model from amplifier_core/models.py

Reference implementation: amplifier-module-tool-filesystem

Example Tool

from amplifier_core import ToolResult
import logging

logger = logging.getLogger(__name__)

class GreetTool:
    """Simple greeting tool."""

    @property
    def name(self) -> str:
        return "greet"

    @property
    def description(self) -> str:
        return "Greet a person by name"

    @property
    def input_schema(self) -> dict[str, Any]:
        return {
            "type": "object",
            "properties": {
                "name": {
                    "type": "string",
                    "description": "Name to greet"
                }
            },
            "required": ["name"]
        }

    async def execute(self, input_data: dict[str, Any]) -> ToolResult:
        """Execute greeting."""
        try:
            name = input_data.get("name", "World")
            message = f"Hello, {name}!"

            return ToolResult(
                success=True,
                output={"message": message}
            )
        except Exception as e:
            logger.error(f"Greeting failed: {e}")
            return ToolResult(
                success=False,
                error={"message": str(e)}
            )

Mount Function for Tool

async def mount(coordinator, config: dict | None = None):
    """Mount the greet tool."""
    config = config or {}
    tool = GreetTool()
    await coordinator.mount("tools", tool, name="greet")
    logger.info("Mounted GreetTool")
    return None  # No cleanup needed

Testing Tools

Use test utilities from amplifier_core/testing.py:

from amplifier_core.testing import MockTool

# Create mock tool for testing orchestrators
mock_tool = MockTool(
    name="test_tool",
    description="Test tool",
    return_value="mock result"
)

# After use
assert mock_tool.call_count == 1
assert mock_tool.last_input == {...}

Creating a Provider

Providers integrate LLM APIs.

Provider Contract

Protocol definition: amplifier_core/interfaces.pyclass Provider(Protocol)

Detailed specification: See PROVIDER_SPECIFICATION.md for complete implementation guidance including: - Content block preservation requirements - Role conversion patterns - Auto-continuation handling - Debug levels and observability

from amplifier_core.message_models import ChatRequest, ChatResponse
from amplifier_core.models import ProviderInfo, ModelInfo
from typing import Protocol

@runtime_checkable
class Provider(Protocol):
    @property
    def name(self) -> str:
        """Provider identifier."""
        ...

    def get_info(self) -> ProviderInfo:
        """Provider metadata."""
        ...

    async def list_models(self) -> list[ModelInfo]:
        """List available models."""
        ...

    async def complete(
        self,
        request: ChatRequest,
        **kwargs
    ) -> ChatResponse:
        """Generate completion from ChatRequest."""
        ...

    def parse_tool_calls(self, response: ChatResponse) -> list[ToolCall]:
        """Parse tool calls from response."""
        ...

Data models: - ChatRequest, ChatResponse - From amplifier_core/message_models.py - ProviderInfo, ModelInfo - From amplifier_core/models.py - ToolCall - From amplifier_core/message_models.py

Reference implementation: amplifier-module-provider-anthropic

ModelInfo Extensions

The list_models() method returns list[ModelInfo]. Beyond the required fields (id, display_name, context_window, max_output_tokens), ModelInfo supports optional extension fields for model class routing and cost-aware selection:

Field Type Default Description
cost_per_input_token float \| None None Cost per input token in USD (e.g., 3e-6 for $3/MTok)
cost_per_output_token float \| None None Cost per output token in USD
metadata dict[str, Any] {} Extensible metadata bag for cost tier, model class, provider-specific tags

Cost Fields: Providers SHOULD populate cost_per_input_token and cost_per_output_token when pricing information is available. These enable cost-aware model selection and budget tracking.

Metadata: cost_tier: Providers SHOULD set metadata["cost_tier"] to one of the well-known cost tier strings:

Tier Description
free No-cost models (local, free-tier)
low Budget-friendly models (e.g., Haiku-class)
medium Standard pricing (e.g., Sonnet-class)
high Premium pricing (e.g., Opus-class)
extreme Highest-cost models (e.g., deep research)

Capabilities: Providers SHOULD populate the capabilities list using well-known constants from amplifier_core.capabilities. See the Capabilities Taxonomy in the Provider Specification for the full list.

Backward Compatibility: All extension fields are optional with sensible defaults. Existing providers that do not populate these fields continue to work unchanged — they simply won't participate in cost-aware or capability-based routing.

Example Provider

from amplifier_core.message_models import ChatRequest, ChatResponse, Message, Usage
from amplifier_core.models import ProviderInfo, ModelInfo

class MockProvider:
    """Simple mock provider for testing."""

    name = "mock"

    def __init__(self, config: dict):
        self.config = config
        self.default_model = config.get("default_model", "mock-model")

    def get_info(self) -> ProviderInfo:
        """Return provider metadata."""
        return ProviderInfo(
            name=self.name,
            version="1.0.0",
            supported_features=["chat"]
        )

    async def complete(
        self,
        request: ChatRequest,
        **kwargs
    ) -> ChatResponse:
        """Return mock response."""
        return ChatResponse(
            content="Mock response",
            usage=Usage(
                input_tokens=10,
                output_tokens=5,
                total_tokens=15
            ),
            finish_reason="stop"
        )

    async def list_models(self):
        """Return mock model list."""
        return [
            ModelInfo(
                id="mock-model",
                display_name="Mock Model",
                context_window=8192,
                max_output_tokens=4096
            )
        ]

    def parse_tool_calls(self, response: ChatResponse) -> list[ToolCall]:
        """Parse tool calls from response."""
        return []  # Mock provider doesn't support tools

Creating a Hook

Hooks intercept events for observability and modification.

Hook Contract

Protocol definition: amplifier_core/interfaces.pyclass HookHandler(Protocol)

Detailed API reference: See HOOKS_API.md for complete documentation including: - HookResult actions and fields - Registration patterns - Common patterns with examples - Best practices

from amplifier_core.interfaces import HookHandler
from amplifier_core.models import HookResult
from typing import Protocol, Any

@runtime_checkable
class HookHandler(Protocol):
    async def __call__(self, event: str, data: dict[str, Any]) -> HookResult:
        """
        Handle a lifecycle event.

        Args:
            event: Event name (e.g., "tool:pre", "execution:start")
            data: Event-specific data

        Returns:
            HookResult indicating action to take
        """
        ...

HookResult actions: - continue - Proceed normally - deny - Block operation - modify - Transform data - inject_context - Add to agent's context - ask_user - Request approval

Common events:

Event Trigger Data Includes
execution:start Orchestrator execution begins prompt
execution:end Orchestrator execution completes response
prompt:submit User input prompt text
tool:pre Before tool execution tool_name, tool_input
tool:post After tool execution tool_name, tool_result
tool:error Tool failed tool_name, error
provider:request LLM call starting provider, messages
provider:response LLM call complete provider, response, usage

Reference implementation: amplifier-module-hooks-logging

Example Hook

from amplifier_core.models import HookResult
import logging

logger = logging.getLogger(__name__)

class LoggingHook:
    """Log all events."""

    def __init__(self, config: dict):
        self.config = config
        self.verbose = config.get("verbose", False)

    async def __call__(self, event: str, data: dict) -> HookResult:
        """Log event."""
        if self.verbose:
            logger.info(f"Event: {event}, Data: {data}")
        else:
            logger.info(f"Event: {event}")

        return HookResult(action="continue")

Testing Hooks

Use test utilities from amplifier_core/testing.py:

from amplifier_core.testing import EventRecorder

# Record events for testing
recorder = EventRecorder()
await recorder.record("tool:pre", {"tool_name": "Write"})

# Assert
events = recorder.get_events()
assert len(events) == 1
assert events[0][0] == "tool:pre"  # events are (event_name, data) tuples

Creating an Orchestrator

Orchestrators control the agent loop.

Orchestrator Contract

from typing import Protocol, Any

@runtime_checkable
class Orchestrator(Protocol):
    async def execute(
        self,
        prompt: str,
        context: ContextManager,
        providers: dict[str, Provider],
        tools: dict[str, Tool],
        hooks: HookRegistry,
        **kwargs: Any,
    ) -> str:
        """
        Execute the agent loop with given prompt.

        Args:
            prompt: User input prompt
            context: Context manager for conversation state
            providers: Available LLM providers (keyed by name)
            tools: Available tools (keyed by name)
            hooks: Hook registry for lifecycle events
            **kwargs: Additional kernel-injected arguments (see note below)

        Returns:
            Final response string
        """
        ...

coordinator injection: The kernel (session.py) passes coordinator=<ModuleCoordinator> via kwargs at runtime so orchestrators can process hook results and coordinate module interactions. Implementations may accept coordinator as an explicit keyword argument or simply absorb it through **kwargs.

Required events: All orchestrators MUST emit: - execution:start with {"prompt": prompt} at the very beginning of execute() - execution:end with {"response": ..., "status": ...} on all exit paths (success, error, cancellation) - orchestrator:complete with {"orchestrator": name, "turn_count": n, "status": "success"|"incomplete"|"cancelled"} at the end of execute()

Reference implementations: - amplifier-module-loop-basic - amplifier-module-loop-streaming - amplifier-module-loop-events

Creating a Context Manager

Context managers handle conversation history.

Context Contract

from typing import Protocol, Any

@runtime_checkable
class ContextManager(Protocol):
    async def add_message(self, message: dict[str, Any]) -> None:
        """Add a message to the context."""
        ...

    async def get_messages_for_request(
        self,
        token_budget: int | None = None,
        provider: Any | None = None,
    ) -> list[dict[str, Any]]:
        """
        Get messages ready for an LLM request.

        The context manager handles any compaction needed internally.
        Returns messages that fit within the token budget.

        Args:
            token_budget: Optional explicit token limit (deprecated, prefer provider).
            provider: Optional provider instance for dynamic budget calculation.
                If provided, budget = context_window - max_output_tokens - safety_margin.

        Returns:
            Messages ready for LLM request, compacted if necessary.
        """
        ...

    async def get_messages(self) -> list[dict[str, Any]]:
        """Get all messages (raw, uncompacted) for transcripts/debugging."""
        ...

    async def set_messages(self, messages: list[dict[str, Any]]) -> None:
        """Set messages directly (for session resume)."""
        ...

    async def clear(self) -> None:
        """Clear all messages."""
        ...

Reference implementation: amplifier-module-context-simple

Package Configuration

pyproject.toml

[project]
name = "amplifier-module-tool-greet"
version = "0.1.0"
description = "Greeting tool for Amplifier"
requires-python = ">=3.11"
dependencies = []  # amplifier-core is a peer dependency

[project.entry-points."amplifier.modules"]
tool-greet = "amplifier_module_tool_greet:mount"

Important: Don't declare amplifier-core as a runtime dependency. It's a peer dependency provided by the runtime environment.

Testing

import pytest
from amplifier_module_tool_greet import GreetTool
from amplifier_core.testing import TestCoordinator

@pytest.mark.asyncio
async def test_greet():
    """Test greeting tool."""
    tool = GreetTool()
    result = await tool.execute({"name": "Alice"})

    assert result.success
    assert result.output["message"] == "Hello, Alice!"

@pytest.mark.asyncio
async def test_mount():
    """Test tool mounting."""
    coordinator = TestCoordinator()
    cleanup = await mount(coordinator, {})

    assert "greet" in coordinator.get_mounted("tools")

    if cleanup:
        await cleanup()

Best Practices

  1. Single Responsibility: Each module does one thing well
  2. Clear Contracts: Use type hints and protocols
  3. Fail Gracefully: Return errors, don't crash
  4. Async By Default: Use async/await for I/O
  5. Minimal Dependencies: Depend only on what you need (amplifier-core is peer dependency)
  6. Test Coverage: Unit tests for core functionality
  7. Documentation: Clear README with examples

Publishing

# Build package
uv build

# Publish to PyPI
uv publish

# Or install from git
uv pip install git+https://github.com/user/amplifier-module-tool-greet@main

Module Types Reference

Module Type Contract Purpose
Provider PROVIDER_CONTRACT.md LLM backend integration
Tool TOOL_CONTRACT.md Agent capabilities
Hook HOOK_CONTRACT.md Lifecycle observation and control
Orchestrator ORCHESTRATOR_CONTRACT.md Agent loop execution strategy
Context CONTEXT_CONTRACT.md Conversation memory management

Examples

See official modules for reference implementations:

Resources