Algorithm
Algorithm-side References¶
Note
This reference covers APIs that are designed to be used at "Algorithm Side". For built-in algorithms, see Algorithm Zoo.
Base Class and Decorators¶
agentlightning.Algorithm
¶
Algorithm is the strategy, or tuner to train the agent.
get_adapter()
¶
Retrieve the adapter for this algorithm to communicate with the runners.
get_client()
¶
Get the client to communicate with the algorithm.
If the algorithm does not require a server-client communication, it can also create a mock client that never communicates with itself.
Deprecated and will be removed in a future version.
Returns:
-
AgentLightningClient–The AgentLightningClient instance associated with this algorithm.
get_initial_resources()
¶
Get the initial resources for this algorithm.
get_llm_proxy()
¶
Retrieve the configured LLM proxy instance, if one has been set.
Returns:
-
Optional[LLMProxy]–The active LLMProxy instance or None when not configured.
get_store()
¶
Retrieve the store for this algorithm to communicate with the runners.
get_trainer()
¶
Get the trainer for this algorithm.
Returns:
-
Trainer–The Trainer instance associated with this agent.
is_async()
¶
Return True if the algorithm is asynchronous.
run(train_dataset=None, val_dataset=None)
¶
Subclasses should implement this method to implement the algorithm.
Parameters:
-
train_dataset(Optional[Dataset[Any]], default:None) –The dataset to train on. Not all algorithms require a training dataset.
-
val_dataset(Optional[Dataset[Any]], default:None) –The dataset to validate on. Not all algorithms require a validation dataset.
Returns:
-
Union[None, Awaitable[None]]–Algorithm should refrain from returning anything. It should just run the algorithm.
set_adapter(adapter)
¶
Set the adapter for this algorithm to collect and convert traces.
set_initial_resources(resources)
¶
Set the initial resources for this algorithm.
set_llm_proxy(llm_proxy)
¶
Set the LLM proxy for this algorithm to reuse when available.
Parameters:
-
llm_proxy(LLMProxy | None) –The LLMProxy instance configured by the trainer, if any.
set_store(store)
¶
Set the store for this algorithm to communicate with the runners.
Store is set directly instead of using weakref because its copy is meant to be maintained throughout the algorithm's lifecycle.
agentlightning.algo(func)
¶
Convert a callable into a FunctionalAlgorithm.
The decorator inspects the callable signature to decide which dependencies to inject at runtime, enabling concise algorithm definitions that still leverage the full training runtime.
Parameters:
-
func(Union[AlgorithmFuncSync, AlgorithmFuncAsync, AlgorithmFuncSyncFallback, AlgorithmFuncAsyncFallback]) –Function implementing the algorithm logic. May be synchronous or asynchronous. The function can expect all of, or a subset of the following parameters:
store:LightningStore,train_dataset:Dataset,val_dataset:Dataset,llm_proxy:LLMProxy,adapter:TraceAdapter,initial_resources:NamedResources,
If the function does not expect a parameter, the wrapper will not inject it into the call. Using
*argsand**kwargswill not work and no parameters will be injected.
Returns:
-
Union[FunctionalAlgorithm[Literal[False]], FunctionalAlgorithm[Literal[True]]]–FunctionalAlgorithm that proxies the callable while exposing the
-
Union[FunctionalAlgorithm[Literal[False]], FunctionalAlgorithm[Literal[True]]]–Algorithminterface.
Examples:
from agentlightning.algorithm.decorator import algo
@algo
def batching_algorithm(*, store, train_dataset, val_dataset):
for sample in train_dataset:
store.enqueue_rollout(input=sample, mode="train")
@algo
async def async_algorithm(*, store, train_dataset=None, val_dataset=None):
await store.enqueue_rollout(input={"prompt": "hello"}, mode="train")
Fast Algorithms (for Debugging)¶
agentlightning.FastAlgorithm
¶
Bases: Algorithm
Base class for lightweight algorithms optimised for developer workflows.
Fast algorithms prioritise short feedback loops so an agent developer can run small-scale experiments without waiting for long-running training jobs to finish.
agentlightning.Baseline
¶
Bases: FastAlgorithm
Reference implementation that streams the full dataset through the rollout queue.
The baseline algorithm batches task submissions, waits for each rollout to finish, and logs every collected span and reward. It is primarily useful as a smoke test for the platform plumbing rather than a performant trainer.
Parameters:
-
n_epochs(int, default:1) –Number of dataset passes to execute for both the train and val splits during developer experiments.
-
train_split(float, default:0.5) –Fraction of the concatenated dataset to treat as training data. Must be strictly between 0 and 1.
-
polling_interval(float, default:5.0) –Interval, in seconds, to poll the store for queue depth and rollout completion.
-
max_queue_length(int, default:4) –Number of rollouts allowed to wait in the queue before throttling additional submissions.
-
span_verbosity(Literal['keys', 'key_values', 'none'], default:'keys') –Level of detail to include when logging span metadata.
Raises:
-
ValueError–If
train_splitfalls outside the(0, 1)interval.
Examples:
from agentlightning.algorithm.fast import Baseline
algorithm = Baseline(n_epochs=2, train_split=0.8, span_verbosity="key_values")
trainer.fit(algorithm, train_dataset=my_train, val_dataset=my_val)
run(train_dataset=None, val_dataset=None)
async
¶
Execute the baseline loop across the provided datasets.
Adapter¶
agentlightning.Adapter
¶
Bases: Generic[T_from, T_to]
Base class for synchronous adapters that convert data from one format to another.
The class defines a minimal protocol so that adapters can be treated like callables while still allowing subclasses to supply the concrete transformation logic.
Note
Subclasses must override adapt() to provide
the actual conversion.
Type Variables:
T_from: Source data type supplied to the adapter.
T_to: Target data type produced by the adapter.
Examples:
>>> class IntToStrAdapter(Adapter[int, str]):
... def adapt(self, source: int) -> str:
... return str(source)
...
>>> adapter = IntToStrAdapter()
>>> adapter(42)
'42'
__call__(source)
¶
adapt(source)
¶
Convert the data to the target format.
Subclasses must override this method with the concrete transformation logic. The base
implementation raises NotImplementedError to make the requirement explicit.
Parameters:
-
source(T_from) –Input data in the source format.
Returns:
-
T_to–Data converted to the target format.
agentlightning.TraceAdapter
¶
Bases: Adapter[List[Span], T_to], Generic[T_to]
Base class for adapters that convert trace spans into other formats.
This class specializes Adapter for working with
Span instances emitted by Agent Lightning instrumentation.
Subclasses receive entire trace slices and return a format suited for the downstream consumer,
for example reinforcement learning training data or observability metrics.
agentlightning.OtelTraceAdapter
¶
Bases: Adapter[List[ReadableSpan], T_to], Generic[T_to]
Base class for adapters that convert OpenTelemetry trace spans into other formats.
This specialization of Adapter expects a list of
opentelemetry.sdk.trace.ReadableSpan instances and produces any target format, such as
reinforcement learning trajectories, structured logs, or analytics-ready payloads.
Examples:
>>> class TraceToDictAdapter(OtelTraceAdapter[dict]):
... def adapt(self, spans: List[ReadableSpan]) -> dict:
... return {"count": len(spans)}
...
>>> adapter = TraceToDictAdapter()
>>> adapter([span1, span2])
{'count': 2}
agentlightning.TraceToTripletBase
¶
agentlightning.TracerTraceToTriplet
¶
Bases: TraceToTripletBase
Convert tracer-emitted spans into triplet trajectories.
Attributes:
-
repair_hierarchy–When
True, repair the span tree usingTraceTree.repair_hierarchy()before matching calls and rewards. -
llm_call_match–Regular expression pattern that selects LLM call span names.
-
agent_match–Optional regular expression pattern for agent span names. When omitted, spans from any agent are considered.
-
exclude_llm_call_in_reward–When
True, ignore matches under reward spans while searching for rewards. -
reward_match–Strategy used to associate rewards with LLM calls.
adapt(source)
¶
visualize(source, /, filename='trace_tree', interested_span_match=None)
¶
Visualize the trace tree built from the supplied spans.
Parameters:
-
source(Union[List[Span], List[ReadableSpan]]) –Collection of Agent Lightning
Spanobjects or rawopentelemetry.sdk.trace.ReadableSpaninstances. -
filename(str, default:'trace_tree') –Base filename for the generated image;
.pngis appended automatically. -
interested_span_match(str | None, default:None) –Optional regular expression used to highlight a subset of spans.
Returns:
agentlightning.LlmProxyTraceToTriplet
¶
Bases: TraceToTripletBase
Convert telemetry emitted by the LLM Proxy into triplet trajectories.
Warning
This adapter is experimental and might be merged with
TracerTraceToTriplet in the future.
Danger
Do not rely on timestamps when using this adapter. Proxy spans can originate on different
machines with unsynchronised clocks, so sequence_id is treated as the sole source of
ordering.
Strategy:
- Sort spans by
(sequence_id, start_time)for deterministic processing. - Extract token identifiers from
litellm_requestorraw_gen_ai_requestspans. - Extract rewards from spans exposing AgentOps-style payloads or explicit reward spans.
- Match each reward to the most recent unmatched LLM call whose sequence is smaller.
agentlightning.TraceToMessages
¶
Bases: TraceAdapter[List[OpenAIMessages]]
Convert trace spans into OpenAI-compatible conversation messages.
The adapter reconstructs prompts, completions, tool calls, and function definitions from
gen_ai.* span attributes. The resulting objects match the JSONL structure expected by the
OpenAI fine-tuning pipeline.
Warning
The adapter assumes all spans share a common trace and that tool call spans are direct children of the associated completion span.
adapt(source)
¶
Transform trace spans into OpenAI chat payloads.
Parameters:
-
source(List[Span]) –Spans containing
gen_ai.*attributes emitted by the tracing pipeline.
Returns:
-
List[OpenAIMessages]–A list of
OpenAIMessagesentries that -
List[OpenAIMessages]–capture prompts, completions, tools, and metadata.
get_tool_calls(completion, all_spans)
¶
Yield tool call payloads for a completion span.
Parameters:
-
completion(Span) –The completion span whose descendants should be inspected.
-
all_spans(List[Span]) –The complete span list belonging to the trace.
Yields:
-
Iterable[Dict[str, Any]]–Dictionaries describing tool calls with identifiers, names, and arguments.
Raises:
-
ValueError–If a candidate tool span cannot be converted into a dictionary.
LLM Proxy¶
agentlightning.LLMProxy
¶
Host a LiteLLM OpenAI-compatible proxy bound to a LightningStore.
The proxy:
- Serves an OpenAI-compatible API via uvicorn.
- Adds rollout/attempt routing and headers via middleware.
- Registers OTEL export and token-id callbacks.
- Writes a LiteLLM worker config file with
model_listand settings.
Lifecycle:
start()writes config, starts uvicorn server in a thread, and waits until ready.stop()tears down the server and removes the temp config file.restart()convenience wrapper to stop then start.
Usage Note: As the LLM Proxy sets up an OpenTelemetry tracer, it's recommended to run it in a different process from the main runner (i.e., tracer from agents).
Warning
The LLM Proxy does support streaming, but the tracing is still problematic when streaming is enabled.
Parameters:
-
port(int) –TCP port to bind.
-
model_list(List[ModelConfig] | None, default:None) –LiteLLM
model_listentries. -
store(Optional[LightningStore], default:None) –LightningStore used for span sequence and persistence.
-
host(str | None, default:None) –Publicly reachable host used in resource endpoints. Defaults to best-guess IPv4.
-
litellm_config(Dict[str, Any] | None, default:None) –Extra LiteLLM proxy config merged with
model_list. -
num_retries(int, default:0) –Default LiteLLM retry count injected into
litellm_settings.
as_resource(rollout_id=None, attempt_id=None, model=None, sampling_parameters=None)
¶
Create an LLM resource pointing at this proxy with rollout context.
The returned endpoint is
http://{host}:{port}/rollout/{rollout_id}/attempt/{attempt_id}
Parameters:
-
rollout_id(str | None, default:None) –Rollout identifier used for span attribution. If None, will instantiate a ProxyLLM resource.
-
attempt_id(str | None, default:None) –Attempt identifier used for span attribution. If None, will instantiate a ProxyLLM resource.
-
model(str | None, default:None) –Logical model name to use. If omitted and exactly one model is configured, that model is used.
-
sampling_parameters(Dict[str, Any] | None, default:None) –Optional default sampling parameters.
Returns:
-
LLM(LLM) –Configured resource ready for OpenAI-compatible calls.
Raises:
-
ValueError–If
modelis omitted and zero or multiple models are configured.
is_running()
¶
Return whether the uvicorn server is active.
Returns:
-
bool(bool) –True if server was started and did not signal exit.
restart(*, _port=None)
¶
Restart the proxy if running, else start it.
Convenience wrapper calling stop() followed by start().
set_store(store)
¶
start()
¶
Start the proxy server thread and initialize global wiring.
Side effects:
- Sets the module-level global store for middleware/exporter access.
- Calls
initialize()once to register middleware and callbacks. - Writes a temporary YAML config consumed by LiteLLM worker.
- Launches uvicorn in a daemon thread and waits for readiness.
stop()
¶
Stop the proxy server and clean up temporary artifacts.
This is a best-effort graceful shutdown with a bounded join timeout.
update_model_list(model_list)
¶
Replace the in-memory model list and hot-restart if running.
Parameters:
-
model_list(List[ModelConfig]) –New list of model entries.
update_port(port)
¶
Update the port for the proxy.
Parameters:
-
port(int) –The new port to use for the proxy.