Skip to content

Algorithm

Algorithm-side References

Note

This reference covers APIs that are designed to be used at "Algorithm Side". For built-in algorithms, see Algorithm Zoo.

Base Class and Decorators

agentlightning.Algorithm

Algorithm is the strategy, or tuner to train the agent.

get_adapter()

Retrieve the adapter for this algorithm to communicate with the runners.

get_client()

Get the client to communicate with the algorithm.

If the algorithm does not require a server-client communication, it can also create a mock client that never communicates with itself.

Deprecated and will be removed in a future version.

Returns:

get_initial_resources()

Get the initial resources for this algorithm.

get_llm_proxy()

Retrieve the configured LLM proxy instance, if one has been set.

Returns:

  • Optional[LLMProxy]

    The active LLMProxy instance or None when not configured.

get_store()

Retrieve the store for this algorithm to communicate with the runners.

get_trainer()

Get the trainer for this algorithm.

Returns:

  • Trainer

    The Trainer instance associated with this agent.

is_async()

Return True if the algorithm is asynchronous.

run(train_dataset=None, val_dataset=None)

Subclasses should implement this method to implement the algorithm.

Parameters:

  • train_dataset (Optional[Dataset[Any]], default: None ) –

    The dataset to train on. Not all algorithms require a training dataset.

  • val_dataset (Optional[Dataset[Any]], default: None ) –

    The dataset to validate on. Not all algorithms require a validation dataset.

Returns:

  • Union[None, Awaitable[None]]

    Algorithm should refrain from returning anything. It should just run the algorithm.

set_adapter(adapter)

Set the adapter for this algorithm to collect and convert traces.

set_initial_resources(resources)

Set the initial resources for this algorithm.

set_llm_proxy(llm_proxy)

Set the LLM proxy for this algorithm to reuse when available.

Parameters:

  • llm_proxy (LLMProxy | None) –

    The LLMProxy instance configured by the trainer, if any.

set_store(store)

Set the store for this algorithm to communicate with the runners.

Store is set directly instead of using weakref because its copy is meant to be maintained throughout the algorithm's lifecycle.

set_trainer(trainer)

Set the trainer for this algorithm.

Parameters:

  • trainer (Trainer) –

    The Trainer instance that will handle training and validation.

agentlightning.algo(func)

algo(
    func: AlgorithmFuncAsync,
) -> FunctionalAlgorithm[Literal[True]]
algo(
    func: AlgorithmFuncAsyncFallback,
) -> FunctionalAlgorithm[Any]
algo(
    func: AlgorithmFuncSync,
) -> FunctionalAlgorithm[Literal[False]]
algo(
    func: AlgorithmFuncSyncFallback,
) -> FunctionalAlgorithm[Any]

Convert a callable into a FunctionalAlgorithm.

The decorator inspects the callable signature to decide which dependencies to inject at runtime, enabling concise algorithm definitions that still leverage the full training runtime.

Parameters:

  • func (Union[AlgorithmFuncSync, AlgorithmFuncAsync, AlgorithmFuncSyncFallback, AlgorithmFuncAsyncFallback]) –

    Function implementing the algorithm logic. May be synchronous or asynchronous. The function can expect all of, or a subset of the following parameters:

    If the function does not expect a parameter, the wrapper will not inject it into the call. Using *args and **kwargs will not work and no parameters will be injected.

Returns:

Examples:

from agentlightning.algorithm.decorator import algo

@algo
def batching_algorithm(*, store, train_dataset, val_dataset):
    for sample in train_dataset:
        store.enqueue_rollout(input=sample, mode="train")

@algo
async def async_algorithm(*, store, train_dataset=None, val_dataset=None):
    await store.enqueue_rollout(input={"prompt": "hello"}, mode="train")

Fast Algorithms (for Debugging)

agentlightning.FastAlgorithm

Bases: Algorithm

Base class for lightweight algorithms optimised for developer workflows.

Fast algorithms prioritise short feedback loops so an agent developer can run small-scale experiments without waiting for long-running training jobs to finish.

agentlightning.Baseline

Bases: FastAlgorithm

Reference implementation that streams the full dataset through the rollout queue.

The baseline algorithm batches task submissions, waits for each rollout to finish, and logs every collected span and reward. It is primarily useful as a smoke test for the platform plumbing rather than a performant trainer.

Parameters:

  • n_epochs (int, default: 1 ) –

    Number of dataset passes to execute for both the train and val splits during developer experiments.

  • train_split (float, default: 0.5 ) –

    Fraction of the concatenated dataset to treat as training data. Must be strictly between 0 and 1.

  • polling_interval (float, default: 5.0 ) –

    Interval, in seconds, to poll the store for queue depth and rollout completion.

  • max_queue_length (int, default: 4 ) –

    Number of rollouts allowed to wait in the queue before throttling additional submissions.

  • span_verbosity (Literal['keys', 'key_values', 'none'], default: 'keys' ) –

    Level of detail to include when logging span metadata.

Raises:

  • ValueError

    If train_split falls outside the (0, 1) interval.

Examples:

from agentlightning.algorithm.fast import Baseline

algorithm = Baseline(n_epochs=2, train_split=0.8, span_verbosity="key_values")
trainer.fit(algorithm, train_dataset=my_train, val_dataset=my_val)

run(train_dataset=None, val_dataset=None) async

Execute the baseline loop across the provided datasets.

Adapter

agentlightning.Adapter

Bases: Generic[T_from, T_to]

Base class for synchronous adapters that convert data from one format to another.

The class defines a minimal protocol so that adapters can be treated like callables while still allowing subclasses to supply the concrete transformation logic.

Note

Subclasses must override adapt() to provide the actual conversion.

Type Variables:

T_from: Source data type supplied to the adapter.

T_to: Target data type produced by the adapter.

Examples:

>>> class IntToStrAdapter(Adapter[int, str]):
...     def adapt(self, source: int) -> str:
...         return str(source)
...
>>> adapter = IntToStrAdapter()
>>> adapter(42)
'42'

__call__(source)

Convert the data to the target format.

This method delegates to adapt() so that an instance of Adapter can be used like a standard function.

Parameters:

  • source (T_from) –

    Input data in the source format.

Returns:

  • T_to

    Data converted to the target format.

adapt(source)

Convert the data to the target format.

Subclasses must override this method with the concrete transformation logic. The base implementation raises NotImplementedError to make the requirement explicit.

Parameters:

  • source (T_from) –

    Input data in the source format.

Returns:

  • T_to

    Data converted to the target format.

agentlightning.TraceAdapter

Bases: Adapter[List[Span], T_to], Generic[T_to]

Base class for adapters that convert trace spans into other formats.

This class specializes Adapter for working with Span instances emitted by Agent Lightning instrumentation. Subclasses receive entire trace slices and return a format suited for the downstream consumer, for example reinforcement learning training data or observability metrics.

agentlightning.OtelTraceAdapter

Bases: Adapter[List[ReadableSpan], T_to], Generic[T_to]

Base class for adapters that convert OpenTelemetry trace spans into other formats.

This specialization of Adapter expects a list of opentelemetry.sdk.trace.ReadableSpan instances and produces any target format, such as reinforcement learning trajectories, structured logs, or analytics-ready payloads.

Examples:

>>> class TraceToDictAdapter(OtelTraceAdapter[dict]):
...     def adapt(self, spans: List[ReadableSpan]) -> dict:
...         return {"count": len(spans)}
...
>>> adapter = TraceToDictAdapter()
>>> adapter([span1, span2])
{'count': 2}

agentlightning.TraceToTripletBase

Bases: TraceAdapter[List[Triplet]]

Base class for adapters that emit Triplet trajectories.

agentlightning.TracerTraceToTriplet

Bases: TraceToTripletBase

Convert tracer-emitted spans into triplet trajectories.

Attributes:

  • repair_hierarchy

    When True, repair the span tree using TraceTree.repair_hierarchy() before matching calls and rewards.

  • llm_call_match

    Regular expression pattern that selects LLM call span names.

  • agent_match

    Optional regular expression pattern for agent span names. When omitted, spans from any agent are considered.

  • exclude_llm_call_in_reward

    When True, ignore matches under reward spans while searching for rewards.

  • reward_match

    Strategy used to associate rewards with LLM calls.

adapt(source)

Convert tracer spans into Triplet trajectories.

Parameters:

  • source (Union[List[Span], List[ReadableSpan]]) –

    Agent Lightning spans or raw OpenTelemetry spans that form a trace.

Returns:

  • List[Triplet]

    Ordered list of trajectory transitions with prompt, response, and reward information.

visualize(source, /, filename='trace_tree', interested_span_match=None)

Visualize the trace tree built from the supplied spans.

Parameters:

  • source (Union[List[Span], List[ReadableSpan]]) –

    Collection of Agent Lightning Span objects or raw opentelemetry.sdk.trace.ReadableSpan instances.

  • filename (str, default: 'trace_tree' ) –

    Base filename for the generated image; .png is appended automatically.

  • interested_span_match (str | None, default: None ) –

    Optional regular expression used to highlight a subset of spans.

Returns:

agentlightning.LlmProxyTraceToTriplet

Bases: TraceToTripletBase

Convert telemetry emitted by the LLM Proxy into triplet trajectories.

Warning

This adapter is experimental and might be merged with TracerTraceToTriplet in the future.

Danger

Do not rely on timestamps when using this adapter. Proxy spans can originate on different machines with unsynchronised clocks, so sequence_id is treated as the sole source of ordering.

Strategy:

  1. Sort spans by (sequence_id, start_time) for deterministic processing.
  2. Extract token identifiers from litellm_request or raw_gen_ai_request spans.
  3. Extract rewards from spans exposing AgentOps-style payloads or explicit reward spans.
  4. Match each reward to the most recent unmatched LLM call whose sequence is smaller.

adapt(source)

Convert LLM Proxy spans into Triplet trajectories.

Parameters:

  • source (List[Span]) –

    Spans emitted by the LLM Proxy containing prompt, response, and reward data.

Returns:

  • List[Triplet]

    Ordered trajectory transitions matched purely by sequence_id.

agentlightning.TraceToMessages

Bases: TraceAdapter[List[OpenAIMessages]]

Convert trace spans into OpenAI-compatible conversation messages.

The adapter reconstructs prompts, completions, tool calls, and function definitions from gen_ai.* span attributes. The resulting objects match the JSONL structure expected by the OpenAI fine-tuning pipeline.

Warning

The adapter assumes all spans share a common trace and that tool call spans are direct children of the associated completion span.

adapt(source)

Transform trace spans into OpenAI chat payloads.

Parameters:

  • source (List[Span]) –

    Spans containing gen_ai.* attributes emitted by the tracing pipeline.

Returns:

get_tool_calls(completion, all_spans)

Yield tool call payloads for a completion span.

Parameters:

  • completion (Span) –

    The completion span whose descendants should be inspected.

  • all_spans (List[Span]) –

    The complete span list belonging to the trace.

Yields:

  • Iterable[Dict[str, Any]]

    Dictionaries describing tool calls with identifiers, names, and arguments.

Raises:

  • ValueError

    If a candidate tool span cannot be converted into a dictionary.

LLM Proxy

agentlightning.LLMProxy

Host a LiteLLM OpenAI-compatible proxy bound to a LightningStore.

The proxy:

  • Serves an OpenAI-compatible API via uvicorn.
  • Adds rollout/attempt routing and headers via middleware.
  • Registers OTEL export and token-id callbacks.
  • Writes a LiteLLM worker config file with model_list and settings.

Lifecycle:

  • start() writes config, starts uvicorn server in a thread, and waits until ready.
  • stop() tears down the server and removes the temp config file.
  • restart() convenience wrapper to stop then start.

Usage Note: As the LLM Proxy sets up an OpenTelemetry tracer, it's recommended to run it in a different process from the main runner (i.e., tracer from agents).

Warning

The LLM Proxy does support streaming, but the tracing is still problematic when streaming is enabled.

Parameters:

  • port (int) –

    TCP port to bind.

  • model_list (List[ModelConfig] | None, default: None ) –

    LiteLLM model_list entries.

  • store (Optional[LightningStore], default: None ) –

    LightningStore used for span sequence and persistence.

  • host (str | None, default: None ) –

    Publicly reachable host used in resource endpoints. Defaults to best-guess IPv4.

  • litellm_config (Dict[str, Any] | None, default: None ) –

    Extra LiteLLM proxy config merged with model_list.

  • num_retries (int, default: 0 ) –

    Default LiteLLM retry count injected into litellm_settings.

as_resource(rollout_id=None, attempt_id=None, model=None, sampling_parameters=None)

Create an LLM resource pointing at this proxy with rollout context.

The returned endpoint is

http://{host}:{port}/rollout/{rollout_id}/attempt/{attempt_id}

Parameters:

  • rollout_id (str | None, default: None ) –

    Rollout identifier used for span attribution. If None, will instantiate a ProxyLLM resource.

  • attempt_id (str | None, default: None ) –

    Attempt identifier used for span attribution. If None, will instantiate a ProxyLLM resource.

  • model (str | None, default: None ) –

    Logical model name to use. If omitted and exactly one model is configured, that model is used.

  • sampling_parameters (Dict[str, Any] | None, default: None ) –

    Optional default sampling parameters.

Returns:

  • LLM ( LLM ) –

    Configured resource ready for OpenAI-compatible calls.

Raises:

  • ValueError

    If model is omitted and zero or multiple models are configured.

is_running()

Return whether the uvicorn server is active.

Returns:

  • bool ( bool ) –

    True if server was started and did not signal exit.

restart(*, _port=None)

Restart the proxy if running, else start it.

Convenience wrapper calling stop() followed by start().

set_store(store)

Set the store for the proxy.

Parameters:

start()

Start the proxy server thread and initialize global wiring.

Side effects:

  • Sets the module-level global store for middleware/exporter access.
  • Calls initialize() once to register middleware and callbacks.
  • Writes a temporary YAML config consumed by LiteLLM worker.
  • Launches uvicorn in a daemon thread and waits for readiness.

stop()

Stop the proxy server and clean up temporary artifacts.

This is a best-effort graceful shutdown with a bounded join timeout.

update_model_list(model_list)

Replace the in-memory model list and hot-restart if running.

Parameters:

  • model_list (List[ModelConfig]) –

    New list of model entries.

update_port(port)

Update the port for the proxy.

Parameters:

  • port (int) –

    The new port to use for the proxy.