Skip to content

Agent-lightning Trainer

agentlightning.Trainer

Bases: TrainerLegacy

High-level orchestration layer that wires Algorithm <-> Runner <-> Store.

A Trainer packages the moving parts of Agent-Lightning's training loop into a single entry point:

  • Algorithm lifecycle: Instantiates or accepts an Algorithm, attaches the current LightningStore, adapter, and initial resources, then executes the algorithm role inside the configured execution strategy.
  • Runner fleet: Spawns one or more Runner instances (defaulting to LitAgentRunner) that hydrate a LitAgent, claim rollouts, stream spans, and respect graceful termination signals from the execution strategy.
  • Execution strategy: Delegates process management to an ExecutionStrategy (shared memory, client/server, etc.), so advanced users can swap orchestration backends without changing trainer code.
  • Telemetry plumbing: Ensures tracers, adapters, and optional LLMProxy are wired into both algorithm and runners so telemetry flows back into the store.

The trainer exposes two convenience entry points: fit() for full training and dev() for fast, reproducible dry-runs. See the Train the First Agent and Write the First Algorithm tutorials for the broader context.

adapter = self._make_adapter(adapter_spec) instance-attribute

An instance of TraceAdapter to export data consumble by algorithms from traces.

algorithm = self._make_algorithm(algorithm) instance-attribute

An instance of Algorithm to use for training.

daemon = daemon instance-attribute

Whether worker processes should be daemons. Daemon processes are terminated automatically when the main process exits. Deprecated. Only have effect with fit_v0.

hooks = self._normalize_hooks(hooks) instance-attribute

A sequence of Hook instances to be called at various lifecycle stages (e.g., on_trace_start, on_trace_end, on_rollout_start, on_rollout_end).

initial_resources = initial_resources instance-attribute

An instance of NamedResources to use for bootstrapping the fit/dev process.

The resources will be handed over to the algorithm. Note that not all algorithms support seeding resources.

llm_proxy = self._make_llm_proxy(llm_proxy, store=(self.store)) instance-attribute

An instance of LLMProxy to use for intercepting the LLM calls. If not provided, algorithm may create one on its own.

max_rollouts = max_rollouts instance-attribute

Maximum number of rollouts to process per runner. If None, workers run until no more rollouts are available.

max_tasks = max_tasks if max_tasks is not None else max_rollouts instance-attribute

Maximum number of tasks to process per runner. Deprecated in favor of max_rollouts.

n_runners = n_runners instance-attribute

Number of agent runners to run in parallel.

n_workers = n_runners instance-attribute

Number of agent workers to run in parallel. Deprecated in favor of n_runners.

runner = self._make_runner(runner) instance-attribute

An instance of Runner to use for running the agent.

store = self._make_store(store) instance-attribute

An instance of LightningStore to use for storing tasks and traces.

strategy = self._make_strategy(strategy, n_runners=(self.n_runners)) instance-attribute

An instance of ExecutionStrategy to use for spawning the algorithm and runners.

tracer = self._make_tracer(tracer) instance-attribute

A tracer instance, or a string pointing to the class full name or a dictionary with a 'type' key that specifies the class full name and other initialization parameters. If None, a default AgentOpsTracer will be created with the current settings.

triplet_exporter = self.adapter instance-attribute

An instance of TracerTraceToTriplet to export triplets from traces, or a dictionary with the initialization parameters for the exporter. Deprecated. Use adapter instead.

__init__(*, dev=False, n_runners=None, max_rollouts=None, initial_resources=None, tracer=None, adapter=None, store=None, runner=None, strategy=None, algorithm=None, llm_proxy=None, n_workers=None, max_tasks=None, daemon=True, triplet_exporter=None, hooks=None)

Configure the trainer and resolve user-provided component specifications.

Each keyword accepts either a concrete instance, a class, a callable factory, a registry string, or a lightweight configuration dictionary (see build_component()).

dev(agent, train_dataset=None, *, val_dataset=None)

Exercise the infrastructure using a fast, synchronous algorithm.

Trainer.dev mirrors fit() but insists on an Algorithm subtype that also derives from FastAlgorithm. This keeps the loop responsive for debugging while still touching the same store, runners, hooks, and tracer plumbing.

If no algorithm is provided, a default Baseline algorithm will be used.

Parameters:

  • agent (LitAgent[T_co]) –

    LitAgent implementation to execute.

  • train_dataset (Optional[Dataset[T_co]], default: None ) –

    Optional iterable passed to the algorithm.

  • val_dataset (Optional[Dataset[T_co]], default: None ) –

    Optional iterable passed to the algorithm.

Raises:

  • TypeError

    If the configured algorithm does not inherit from FastAlgorithm.

fit(agent, train_dataset=None, *, val_dataset=None)

Execute the full algorithm/runner training loop.

Trainer.fit packages the algorithm and runner bundles, then hands them to the active ExecutionStrategy. The strategy rarely returns until:

  • The algorithm exhausts the dataset(s) and stops enqueuing rollouts.
  • max_rollouts causes individual runners to exit.
  • An exception or interrupt cancels the shared ExecutionEvent.

Parameters:

  • agent (LitAgent[T_co]) –

    LitAgent implementation executed by runners.

  • train_dataset (Optional[Dataset[T_co]], default: None ) –

    Optional iterable of rollout inputs consumed by the algorithm.

  • val_dataset (Optional[Dataset[T_co]], default: None ) –

    Optional iterable consumed by validation passes.

agentlightning.build_component(spec, *, expected_type, spec_name, default_factory=None, allow_none=False, optional_defaults=None, dict_requires_type=True, dict_default_cls=None, type_error_fmt=None, invalid_spec_error_fmt=None, registry=None)

build_component(
    spec: Union[
        T,
        str,
        Dict[str, Any],
        type[T],
        Callable[[], T],
        None,
    ],
    *,
    expected_type: type[T],
    spec_name: str,
    default_factory: Callable[[], T],
    allow_none: bool = ...,
    optional_defaults: Optional[OptionalDefaults] = ...,
    dict_requires_type: bool = ...,
    dict_default_cls: type[T] | None = ...,
    type_error_fmt: str | None = ...,
    invalid_spec_error_fmt: str | None = ...,
    registry: Optional[Dict[str, str]] = ...
) -> T
build_component(
    spec: Union[
        T,
        str,
        Dict[str, Any],
        type[T],
        Callable[[], T],
        None,
    ],
    *,
    expected_type: type[T],
    spec_name: str,
    default_factory: None = ...,
    allow_none: bool,
    optional_defaults: Optional[OptionalDefaults] = ...,
    dict_requires_type: bool = ...,
    dict_default_cls: type[T] | None = ...,
    type_error_fmt: str | None = ...,
    invalid_spec_error_fmt: str | None = ...,
    registry: Optional[Dict[str, str]] = ...
) -> T | None
build_component(
    spec: Union[
        T,
        str,
        Dict[str, Any],
        type[T],
        Callable[[], T],
        None,
    ],
    *,
    expected_type: type[T],
    spec_name: str,
    default_factory: None = ...,
    allow_none: bool = ...,
    optional_defaults: Optional[OptionalDefaults] = ...,
    dict_requires_type: bool = ...,
    dict_default_cls: type[T] | None = ...,
    type_error_fmt: str | None = ...,
    invalid_spec_error_fmt: str | None = ...,
    registry: Optional[Dict[str, str]] = ...
) -> T | None

Build and return a component instance from a flexible specification.

This function provides a flexible way to create component instances from various input formats including direct instances, class types, factory functions, import paths, or configuration dictionaries.

Parameters:

  • spec (Union[T, str, Dict[str, Any], type[T], Callable[[], T], None]) –

    The component specification. Can be: - An instance of expected_type (returned as-is) - A string import path (e.g., 'module.Class') or registry key - A dict with 'type' key (import path or registry key) and constructor kwargs - A class type (will be instantiated) - A factory function (will be called) - None (uses default_factory or returns None if allow_none=True)

  • expected_type (type[T]) –

    The type that the resulting instance must be or inherit from.

  • spec_name (str) –

    Descriptive name for the spec, used in error messages.

  • default_factory (Callable[[], T] | None, default: None ) –

    Optional factory function called when spec is None.

  • allow_none (bool, default: False ) –

    If True, allows None to be returned when spec is None and no default_factory is provided.

  • optional_defaults (Optional[OptionalDefaults], default: None ) –

    Dict mapping parameter names to default values or factory functions that will be injected if the constructor accepts them.

  • dict_requires_type (bool, default: True ) –

    If True, dict specs must include a 'type' key.

  • dict_default_cls (type[T] | None, default: None ) –

    Default class to use for dict specs without a 'type' key (only used when dict_requires_type=False).

  • type_error_fmt (str | None, default: None ) –

    Custom format string for type validation errors. Should include {type_name} and {expected_type} placeholders.

  • invalid_spec_error_fmt (str | None, default: None ) –

    Custom format string for invalid spec type errors. Should include {actual_type} and {expected_type} placeholders.

  • registry (Optional[Dict[str, str]], default: None ) –

    Optional mapping of short names to fully qualified import paths. When provided, string specs or dict 'type'/'name' entries are first resolved through this registry before attempting to import.

Returns:

  • T | None

    An instance of expected_type, or None if allow_none=True and spec is None

  • T | None

    without a default_factory.

Raises:

  • TypeError

    If the instantiated object is not an instance of expected_type.

  • ValueError

    If spec is None and neither default_factory nor allow_none is set, or if spec type is invalid, or if dict spec is invalid.

Examples:

>>> # Direct instance
>>> optimizer = build_component(AdamW(), expected_type=Optimizer, spec_name='optimizer')
>>>
>>> # String import path
>>> optimizer = build_component('torch.optim.AdamW', expected_type=Optimizer, spec_name='optimizer')
>>>
>>> # Dict with type and kwargs
>>> spec = {'type': 'torch.optim.AdamW', 'lr': 0.001}
>>> optimizer = build_component(spec, expected_type=Optimizer, spec_name='optimizer')
>>>
>>> # Class type
>>> optimizer = build_component(AdamW, expected_type=Optimizer, spec_name='optimizer')
>>>
>>> # Factory function
>>> optimizer = build_component(lambda: AdamW(lr=0.001), expected_type=Optimizer,
...                            spec_name='optimizer')

Execution Strategy

agentlightning.ExecutionStrategy

Coordinate algorithm and runner bundles within a single process abstraction.

Strategies decide how many worker bundles to launch, whether to communicate through shared memory or an HTTP boundary, and how to react to shutdown signals. They intentionally avoid inspecting the bundle internals; instead, each bundle remains responsible for its own scheduling semantics.

Note

Implementations must honor the execute() contract by propagating KeyboardInterrupt and ensuring resources are released when an error occurs on either side of the algorithm/runner pair.

execute(algorithm, runner, store)

Run the provided bundles using the configured orchestration model.

Parameters:

  • algorithm (AlgorithmBundle) –

    Callable bundle responsible for algorithm execution.

  • runner (RunnerBundle) –

    Callable bundle for runner workers.

  • store (LightningStore) –

    Concrete LightningStore shared across bundles.

Raises:

  • NotImplementedError

    Subclasses must provide the orchestration implementation.

agentlightning.ClientServerExecutionStrategy

Bases: ExecutionStrategy

Run algorithm and runner bundles as separate processes over HTTP.

Execution Roles:

  • "algorithm": Start LightningStoreServer in-process and execute the algorithm bundle against it.
  • "runner": Connect to an existing server with LightningStoreClient and run the runner bundle locally (spawning multiple processes when requested).
  • "both": Spawn runner processes first, then execute the algorithm and server on the same machine. This mode orchestrates the full loop locally.

When role == "both" you may choose which side runs on the main process via main_process. The runner-on-main option is limited to n_runners == 1 because each additional runner requires its own event loop and process.

Warning

When main_process == "runner" the algorithm and HTTP server execute in a child process. Store mutations remain isolated inside that process, so the original store instance passed to execute() is not updated.

Abort Model (four-step escalation):

  1. Cooperative stop. Every bundle receives a shared MultiprocessingEvent (stop_evt). Any failure flips the event so peers can exit cleanly. Ctrl+C on the main process also sets the flag.
  2. KeyboardInterrupt synthesis. Remaining subprocesses receive SIGINT to trigger KeyboardInterrupt handlers.
  3. Termination. Stubborn processes are asked to terminate() (SIGTERM on POSIX).
  4. Kill. As a last resort kill() is invoked (SIGKILL on POSIX).

This mirrors the semantics implemented in SharedMemoryExecutionStrategy but adapts them to multiple processes and the HTTP client/server boundary.

__init__(role=None, server_host=None, server_port=None, n_runners=1, graceful_timeout=5.0, terminate_timeout=5.0, main_process='algorithm', managed_store=None)

Configure the strategy.

Parameters:

  • role (Literal['algorithm', 'runner', 'both'] | None, default: None ) –

    Which side(s) to run in this process. When omitted, the AGL_CURRENT_ROLE environment variable is used.

  • server_host (str | None, default: None ) –

    Interface the HTTP server binds to when running the algorithm bundle locally. Defaults to AGL_SERVER_HOST or "localhost" if unset.

  • server_port (int | None, default: None ) –

    Port for the HTTP server in "algorithm"/"both" modes. Defaults to AGL_SERVER_PORT or 4747 if unset.

  • n_runners (int, default: 1 ) –

    Number of runner processes to spawn in "runner"/"both".

  • graceful_timeout (float, default: 5.0 ) –

    How long to wait (seconds) after setting the stop event before escalating to signals.

  • terminate_timeout (float, default: 5.0 ) –

    How long to wait between escalation steps beyond the cooperative phase (re-used for SIGINT, terminate, and kill).

  • main_process (Literal['algorithm', 'runner'], default: 'algorithm' ) –

    Which bundle runs on the main process when role == "both". "runner" requires n_runners == 1 and is primarily intended for debugging.

  • managed_store (bool | None, default: None ) –

    When True (default) the strategy constructs LightningStore client/server wrappers automatically. When False the provided store is passed directly to the bundles, allowing callers to manage store wrappers manually.

agentlightning.SharedMemoryExecutionStrategy

Bases: ExecutionStrategy

Execute bundles in a single process with cooperative worker threads.

Stop Model:

  • All bundles share one ThreadingEvent named stop_evt.
  • Only the main thread receives KeyboardInterrupt. When Ctrl+C occurs we set stop_evt.
  • Any exception raised inside a bundle sets stop_evt so other threads can unwind cooperatively.
  • Once the bundle running on the main thread exits successfully the treatment depends on main_thread:
    • "algorithm": the runners are asked to stop by setting stop_evt.
    • "runner": the algorithm keeps running until it exits naturally.
  • Background threads are marked as daemons. We join them briefly and log any stragglers before shutting down.

Note

Signals other than SIGINT (such as SIGTERM) are not intercepted; Python's default behavior for those signals is preserved.

Events

agentlightning.ExecutionEvent

Bases: Protocol

Protocol capturing the cooperative stop contract shared by strategies.

Implementations mirror the API of threading.Event and multiprocessing.Event so the rest of the execution layer can remain agnostic to the underlying concurrency primitive.

Methods:

set: Signal cancellation. The call must be idempotent.
clear: Reset the event to the unsignaled state.
is_set: Return ``True`` when cancellation has been requested.
wait: Block until the event is signaled or an optional timeout elapses.

agentlightning.ThreadingEvent

Thread-safe implementation of ExecutionEvent.

agentlightning.MultiprocessingEvent

Process-safe implementation of ExecutionEvent.

CLI Builder

agentlightning.lightning_cli(*classes)

lightning_cli(cls1: Type[_C1]) -> _C1
lightning_cli(
    cls1: Type[_C1], cls2: Type[_C2]
) -> Tuple[_C1, _C2]
lightning_cli(
    cls1: Type[_C1], cls2: Type[_C2], cls3: Type[_C3]
) -> Tuple[_C1, _C2, _C3]
lightning_cli(
    cls1: Type[_C1],
    cls2: Type[_C2],
    cls3: Type[_C3],
    cls4: Type[_C4],
) -> Tuple[_C1, _C2, _C3, _C4]
lightning_cli(
    *classes: Type[CliConfigurable],
) -> Tuple[CliConfigurable, ...]

Parses command-line arguments to configure and instantiate provided CliConfigurable classes.

Parameters:

  • *classes (Type[CliConfigurable], default: () ) –

    One or more classes that inherit from CliConfigurable. Each class's init parameters will be exposed as command-line arguments.

Returns:

  • CliConfigurable | Tuple[CliConfigurable, ...]

    A tuple of instantiated objects, corresponding to the input classes in order.