Agent-lightning Trainer¶
agentlightning.Trainer
¶
Bases: TrainerLegacy
High-level orchestration layer that wires Algorithm <-> Runner <-> Store.
A Trainer
packages the moving parts of Agent-Lightning's
training loop into a single entry point:
- Algorithm lifecycle: Instantiates or accepts an
Algorithm
, attaches the currentLightningStore
, adapter, and initial resources, then executes the algorithm role inside the configured execution strategy. - Runner fleet: Spawns one or more
Runner
instances (defaulting toLitAgentRunner
) that hydrate aLitAgent
, claim rollouts, stream spans, and respect graceful termination signals from the execution strategy. - Execution strategy: Delegates process management to an
ExecutionStrategy
(shared memory, client/server, etc.), so advanced users can swap orchestration backends without changing trainer code. - Telemetry plumbing: Ensures tracers, adapters, and optional
LLMProxy
are wired into both algorithm and runners so telemetry flows back into the store.
The trainer exposes two convenience entry points:
fit()
for full training and
dev()
for fast, reproducible dry-runs. See the
Train the First Agent and
Write the First Algorithm tutorials for the broader context.
adapter = self._make_adapter(adapter_spec)
instance-attribute
¶
An instance of TraceAdapter
to export data consumble by algorithms from traces.
algorithm = self._make_algorithm(algorithm)
instance-attribute
¶
An instance of Algorithm
to use for training.
daemon = daemon
instance-attribute
¶
Whether worker processes should be daemons. Daemon processes
are terminated automatically when the main process exits. Deprecated.
Only have effect with fit_v0
.
hooks = self._normalize_hooks(hooks)
instance-attribute
¶
A sequence of Hook
instances to be called at various lifecycle stages (e.g., on_trace_start
,
on_trace_end
, on_rollout_start
, on_rollout_end
).
initial_resources = initial_resources
instance-attribute
¶
An instance of NamedResources
to use for bootstrapping the fit/dev process.
The resources will be handed over to the algorithm. Note that not all algorithms support seeding resources.
llm_proxy = self._make_llm_proxy(llm_proxy, store=(self.store))
instance-attribute
¶
An instance of LLMProxy
to use for intercepting the LLM calls.
If not provided, algorithm may create one on its own.
max_rollouts = max_rollouts
instance-attribute
¶
Maximum number of rollouts to process per runner. If None, workers run until no more rollouts are available.
max_tasks = max_tasks if max_tasks is not None else max_rollouts
instance-attribute
¶
Maximum number of tasks to process per runner. Deprecated in favor of max_rollouts
.
n_runners = n_runners
instance-attribute
¶
Number of agent runners to run in parallel.
n_workers = n_runners
instance-attribute
¶
Number of agent workers to run in parallel. Deprecated in favor of n_runners
.
runner = self._make_runner(runner)
instance-attribute
¶
An instance of Runner
to use for running the agent.
store = self._make_store(store)
instance-attribute
¶
An instance of LightningStore
to use for storing tasks and traces.
strategy = self._make_strategy(strategy, n_runners=(self.n_runners))
instance-attribute
¶
An instance of ExecutionStrategy
to use for spawning the algorithm and runners.
tracer = self._make_tracer(tracer)
instance-attribute
¶
A tracer instance, or a string pointing to the class full name or a dictionary with a 'type' key
that specifies the class full name and other initialization parameters.
If None, a default AgentOpsTracer
will be created with the current settings.
triplet_exporter = self.adapter
instance-attribute
¶
An instance of TracerTraceToTriplet
to export triplets from traces,
or a dictionary with the initialization parameters for the exporter.
Deprecated. Use adapter
instead.
__init__(*, dev=False, n_runners=None, max_rollouts=None, initial_resources=None, tracer=None, adapter=None, store=None, runner=None, strategy=None, algorithm=None, llm_proxy=None, n_workers=None, max_tasks=None, daemon=True, triplet_exporter=None, hooks=None)
¶
Configure the trainer and resolve user-provided component specifications.
Each keyword accepts either a concrete instance, a class, a callable factory, a
registry string, or a lightweight configuration dictionary (see
build_component()
).
dev(agent, train_dataset=None, *, val_dataset=None)
¶
Exercise the infrastructure using a fast, synchronous algorithm.
Trainer.dev
mirrors fit()
but
insists on an Algorithm
subtype that also derives from
FastAlgorithm
. This keeps the loop responsive for
debugging while still touching the same store, runners, hooks, and tracer plumbing.
If no algorithm is provided, a default Baseline
algorithm will be used.
Parameters:
-
agent
(LitAgent[T_co]
) –LitAgent
implementation to execute. -
train_dataset
(Optional[Dataset[T_co]]
, default:None
) –Optional iterable passed to the algorithm.
-
val_dataset
(Optional[Dataset[T_co]]
, default:None
) –Optional iterable passed to the algorithm.
Raises:
-
TypeError
–If the configured algorithm does not inherit from
FastAlgorithm
.
fit(agent, train_dataset=None, *, val_dataset=None)
¶
Execute the full algorithm/runner training loop.
Trainer.fit
packages the algorithm and runner bundles,
then hands them to the active ExecutionStrategy
.
The strategy rarely returns until:
- The algorithm exhausts the dataset(s) and stops enqueuing rollouts.
max_rollouts
causes individual runners to exit.- An exception or interrupt cancels the shared
ExecutionEvent
.
Parameters:
agentlightning.build_component(spec, *, expected_type, spec_name, default_factory=None, allow_none=False, optional_defaults=None, dict_requires_type=True, dict_default_cls=None, type_error_fmt=None, invalid_spec_error_fmt=None, registry=None)
¶
build_component(
spec: Union[
T,
str,
Dict[str, Any],
type[T],
Callable[[], T],
None,
],
*,
expected_type: type[T],
spec_name: str,
default_factory: Callable[[], T],
allow_none: bool = ...,
optional_defaults: Optional[OptionalDefaults] = ...,
dict_requires_type: bool = ...,
dict_default_cls: type[T] | None = ...,
type_error_fmt: str | None = ...,
invalid_spec_error_fmt: str | None = ...,
registry: Optional[Dict[str, str]] = ...
) -> T
build_component(
spec: Union[
T,
str,
Dict[str, Any],
type[T],
Callable[[], T],
None,
],
*,
expected_type: type[T],
spec_name: str,
default_factory: None = ...,
allow_none: bool,
optional_defaults: Optional[OptionalDefaults] = ...,
dict_requires_type: bool = ...,
dict_default_cls: type[T] | None = ...,
type_error_fmt: str | None = ...,
invalid_spec_error_fmt: str | None = ...,
registry: Optional[Dict[str, str]] = ...
) -> T | None
build_component(
spec: Union[
T,
str,
Dict[str, Any],
type[T],
Callable[[], T],
None,
],
*,
expected_type: type[T],
spec_name: str,
default_factory: None = ...,
allow_none: bool = ...,
optional_defaults: Optional[OptionalDefaults] = ...,
dict_requires_type: bool = ...,
dict_default_cls: type[T] | None = ...,
type_error_fmt: str | None = ...,
invalid_spec_error_fmt: str | None = ...,
registry: Optional[Dict[str, str]] = ...
) -> T | None
Build and return a component instance from a flexible specification.
This function provides a flexible way to create component instances from various input formats including direct instances, class types, factory functions, import paths, or configuration dictionaries.
Parameters:
-
spec
(Union[T, str, Dict[str, Any], type[T], Callable[[], T], None]
) –The component specification. Can be: - An instance of expected_type (returned as-is) - A string import path (e.g., 'module.Class') or registry key - A dict with 'type' key (import path or registry key) and constructor kwargs - A class type (will be instantiated) - A factory function (will be called) - None (uses default_factory or returns None if allow_none=True)
-
expected_type
(type[T]
) –The type that the resulting instance must be or inherit from.
-
spec_name
(str
) –Descriptive name for the spec, used in error messages.
-
default_factory
(Callable[[], T] | None
, default:None
) –Optional factory function called when spec is None.
-
allow_none
(bool
, default:False
) –If True, allows None to be returned when spec is None and no default_factory is provided.
-
optional_defaults
(Optional[OptionalDefaults]
, default:None
) –Dict mapping parameter names to default values or factory functions that will be injected if the constructor accepts them.
-
dict_requires_type
(bool
, default:True
) –If True, dict specs must include a 'type' key.
-
dict_default_cls
(type[T] | None
, default:None
) –Default class to use for dict specs without a 'type' key (only used when dict_requires_type=False).
-
type_error_fmt
(str | None
, default:None
) –Custom format string for type validation errors. Should include {type_name} and {expected_type} placeholders.
-
invalid_spec_error_fmt
(str | None
, default:None
) –Custom format string for invalid spec type errors. Should include {actual_type} and {expected_type} placeholders.
-
registry
(Optional[Dict[str, str]]
, default:None
) –Optional mapping of short names to fully qualified import paths. When provided, string specs or dict 'type'/'name' entries are first resolved through this registry before attempting to import.
Returns:
-
T | None
–An instance of expected_type, or None if allow_none=True and spec is None
-
T | None
–without a default_factory.
Raises:
-
TypeError
–If the instantiated object is not an instance of expected_type.
-
ValueError
–If spec is None and neither default_factory nor allow_none is set, or if spec type is invalid, or if dict spec is invalid.
Examples:
>>> # Direct instance
>>> optimizer = build_component(AdamW(), expected_type=Optimizer, spec_name='optimizer')
>>>
>>> # String import path
>>> optimizer = build_component('torch.optim.AdamW', expected_type=Optimizer, spec_name='optimizer')
>>>
>>> # Dict with type and kwargs
>>> spec = {'type': 'torch.optim.AdamW', 'lr': 0.001}
>>> optimizer = build_component(spec, expected_type=Optimizer, spec_name='optimizer')
>>>
>>> # Class type
>>> optimizer = build_component(AdamW, expected_type=Optimizer, spec_name='optimizer')
>>>
>>> # Factory function
>>> optimizer = build_component(lambda: AdamW(lr=0.001), expected_type=Optimizer,
... spec_name='optimizer')
Execution Strategy¶
agentlightning.ExecutionStrategy
¶
Coordinate algorithm and runner bundles within a single process abstraction.
Strategies decide how many worker bundles to launch, whether to communicate through shared memory or an HTTP boundary, and how to react to shutdown signals. They intentionally avoid inspecting the bundle internals; instead, each bundle remains responsible for its own scheduling semantics.
Note
Implementations must honor the execute()
contract by propagating KeyboardInterrupt
and ensuring resources are
released when an error occurs on either side of the algorithm/runner
pair.
execute(algorithm, runner, store)
¶
Run the provided bundles using the configured orchestration model.
Parameters:
-
algorithm
(AlgorithmBundle
) –Callable bundle responsible for algorithm execution.
-
runner
(RunnerBundle
) –Callable bundle for runner workers.
-
store
(LightningStore
) –Concrete
LightningStore
shared across bundles.
Raises:
-
NotImplementedError
–Subclasses must provide the orchestration implementation.
agentlightning.ClientServerExecutionStrategy
¶
Bases: ExecutionStrategy
Run algorithm and runner bundles as separate processes over HTTP.
Execution Roles:
"algorithm"
: StartLightningStoreServer
in-process and execute the algorithm bundle against it."runner"
: Connect to an existing server withLightningStoreClient
and run the runner bundle locally (spawning multiple processes when requested)."both"
: Spawn runner processes first, then execute the algorithm and server on the same machine. This mode orchestrates the full loop locally.
When role == "both"
you may choose which side runs on the main process
via main_process
. The runner-on-main option is limited to
n_runners == 1
because each additional runner requires its own event
loop and process.
Warning
When main_process == "runner"
the algorithm and HTTP server execute
in a child process. Store mutations remain isolated inside that process,
so the original store instance passed to
execute() is not updated.
Abort Model (four-step escalation):
- Cooperative stop. Every bundle receives a shared
MultiprocessingEvent
(stop_evt
). Any failure flips the event so peers can exit cleanly. Ctrl+C on the main process also sets the flag. - KeyboardInterrupt synthesis. Remaining subprocesses receive
SIGINT
to triggerKeyboardInterrupt
handlers. - Termination. Stubborn processes are asked to
terminate()
(SIGTERM
on POSIX). - Kill. As a last resort
kill()
is invoked (SIGKILL
on POSIX).
This mirrors the semantics implemented in
SharedMemoryExecutionStrategy
but adapts them to multiple processes and the HTTP client/server boundary.
__init__(role=None, server_host=None, server_port=None, n_runners=1, graceful_timeout=5.0, terminate_timeout=5.0, main_process='algorithm', managed_store=None)
¶
Configure the strategy.
Parameters:
-
role
(Literal['algorithm', 'runner', 'both'] | None
, default:None
) –Which side(s) to run in this process. When omitted, the
AGL_CURRENT_ROLE
environment variable is used. -
server_host
(str | None
, default:None
) –Interface the HTTP server binds to when running the algorithm bundle locally. Defaults to
AGL_SERVER_HOST
or"localhost"
if unset. -
server_port
(int | None
, default:None
) –Port for the HTTP server in "algorithm"/"both" modes. Defaults to
AGL_SERVER_PORT
or4747
if unset. -
n_runners
(int
, default:1
) –Number of runner processes to spawn in "runner"/"both".
-
graceful_timeout
(float
, default:5.0
) –How long to wait (seconds) after setting the stop event before escalating to signals.
-
terminate_timeout
(float
, default:5.0
) –How long to wait between escalation steps beyond the cooperative phase (re-used for SIGINT, terminate, and kill).
-
main_process
(Literal['algorithm', 'runner']
, default:'algorithm'
) –Which bundle runs on the main process when
role == "both"
."runner"
requiresn_runners == 1
and is primarily intended for debugging. -
managed_store
(bool | None
, default:None
) –When
True
(default) the strategy constructs LightningStore client/server wrappers automatically. WhenFalse
the providedstore
is passed directly to the bundles, allowing callers to manage store wrappers manually.
agentlightning.SharedMemoryExecutionStrategy
¶
Bases: ExecutionStrategy
Execute bundles in a single process with cooperative worker threads.
Stop Model:
- All bundles share one
ThreadingEvent
namedstop_evt
. - Only the main thread receives
KeyboardInterrupt
. When Ctrl+C occurs we setstop_evt
. - Any exception raised inside a bundle sets
stop_evt
so other threads can unwind cooperatively. - Once the bundle running on the main thread exits successfully the
treatment depends on
main_thread
:"algorithm"
: the runners are asked to stop by settingstop_evt
."runner"
: the algorithm keeps running until it exits naturally.
- Background threads are marked as daemons. We join them briefly and log any stragglers before shutting down.
Note
Signals other than SIGINT
(such as SIGTERM
) are not intercepted;
Python's default behavior for those signals is preserved.
Events¶
agentlightning.ExecutionEvent
¶
Bases: Protocol
Protocol capturing the cooperative stop contract shared by strategies.
Implementations mirror the API of threading.Event
and
multiprocessing.Event
so the rest of the execution layer can remain
agnostic to the underlying concurrency primitive.
Methods:
set: Signal cancellation. The call must be idempotent.
clear: Reset the event to the unsignaled state.
is_set: Return ``True`` when cancellation has been requested.
wait: Block until the event is signaled or an optional timeout elapses.
agentlightning.ThreadingEvent
¶
Thread-safe implementation of ExecutionEvent
.
agentlightning.MultiprocessingEvent
¶
Process-safe implementation of ExecutionEvent
.
CLI Builder¶
agentlightning.lightning_cli(*classes)
¶
Parses command-line arguments to configure and instantiate provided CliConfigurable classes.
Parameters:
-
*classes
(Type[CliConfigurable]
, default:()
) –One or more classes that inherit from CliConfigurable. Each class's init parameters will be exposed as command-line arguments.
Returns:
-
CliConfigurable | Tuple[CliConfigurable, ...]
–A tuple of instantiated objects, corresponding to the input classes in order.