Skip to content

Internal API References

Danger

The following APIs should be used with extra caution because they are very likely to change in the future.

agentlightning.adapter.messages.OpenAIMessages

Bases: TypedDict

OpenAI-style chat messages with optional tool definitions.

Attributes:

  • messages (List[ChatCompletionMessageParam]) –

    Ordered chat messages that describe the conversation.

  • tools (Optional[List[ChatCompletionFunctionToolParam]]) –

    Tool specifications available to the assistant, if any.

agentlightning.adapter.triplet.TraceTree

Tree representation of a trace span and its descendants.

Attributes:

  • id

    Unique identifier for the span node.

  • span

    Span backing this node.

  • children

    Child nodes connected to the current span.

agent_name()

Return the agent name associated with the span, if any.

Returns:

  • Optional[str]

    Agent name extracted from known attributes, otherwise None.

find_llm_calls(*, llm_call_match, agent_match, within_matching_subtree=None, within_reward=None, within_llm_call=None, existing_llm_call_response_ids=None)

Find LLM call spans matching the supplied filters.

Parameters:

  • llm_call_match (str) –

    Regular expression used to match span names that qualify as LLM calls.

  • agent_match (Optional[str]) –

    Optional regular expression that must match the enclosing agent span name.

  • within_matching_subtree (str | None, default: None ) –

    Marker propagated through recursive calls to record matching agents.

  • within_reward (Optional[bool], default: None ) –

    When True, suppresses LLM matches under reward spans.

  • within_llm_call (Optional[bool], default: None ) –

    When True, prevents duplicate matches for nested LLM calls.

  • existing_llm_call_response_ids (Optional[set[str]], default: None ) –

    Known response identifiers used to deduplicate spans.

Returns:

  • List[Tuple['TraceTree', str]]

    A list of tuples pairing the matching node with the agent subtree label that triggered the

  • List[Tuple['TraceTree', str]]

    match.

from_spans(spans) classmethod

Construct a tree from a flat list of spans.

Parameters:

  • spans (List[Span]) –

    Spans that collectively form a single trace segment.

Returns:

  • 'TraceTree'

    A TraceTree rooted at either the

  • 'TraceTree'

    discovered root span or a synthetic root when multiple roots are present.

Raises:

  • ValueError

    If the span list is empty or no root span can be inferred.

is_reward_span()

Return whether the span explicitly encodes a reward.

Returns:

  • bool

    True when the span payload describes a reward, otherwise False.

match_rewards(reward_match, llm_calls)

Assign rewards to previously matched LLM calls.

Parameters:

  • reward_match (str) –

    Strategy identifier from RewardMatchPolicy.

  • llm_calls (List['TraceTree']) –

    Trace nodes representing LLM call spans.

Returns:

  • dict[str, Optional[float]]

    Mapping from span identifier to reward value or None when no reward is available.

maybe_reward_dict()

Return a reward payload if the span encodes one.

Returns:

  • dict[str, Any]

    Dictionary containing reward metadata, or an empty dictionary when no reward is found.

names_tuple()

Return the span name alongside nested child names.

Returns:

  • str

    A tuple of the current span name and a list of tuples for each child containing the

  • List[Any]

    child name and its descendants.

repair_hierarchy()

Repair missing parent-child relationships introduced by mixed tracing systems.

Some agent frameworks emit spans via multiple subsystems, which can cause LLM completion spans to float directly under the root span instead of being nested under the correct agent. The method re-parents those spans to the closest ancestor that fully envelopes the child in time.

If we don't, when we want to select the LLM completion span with agent as filter. We will never get the correct span underneath.

to_json()

Convert the tree node into a JSON-serialisable structure.

to_trajectory(llm_call_match='openai\\.chat\\.completion', agent_match=None, exclude_llm_call_in_reward=True, dedup_llm_call=True, reward_match=RewardMatchPolicy.FIRST_OCCURRENCE, final_reward=None)

Convert the trace tree into a trajectory of Triplet items.

Parameters:

  • llm_call_match (str, default: 'openai\\.chat\\.completion' ) –

    Regular expression for LLM call span names.

  • agent_match (Optional[str], default: None ) –

    Optional regular expression for agent span names.

  • exclude_llm_call_in_reward (bool, default: True ) –

    When True, prevents searching for rewards under the LLM call subtree.

  • dedup_llm_call (bool, default: True ) –

    When True, deduplicates spans using the LLM response identifier.

  • reward_match (RewardMatchPolicy, default: FIRST_OCCURRENCE ) –

    Reward matching policy used to associate reward spans with LLM calls.

  • final_reward (Optional[float], default: None ) –

    Optional reward appended to the final transition when provided.

Returns:

  • List[Triplet]

    A list of Triplet objects ordered by call sequence.

traverse()

Traverse the tree depth first and return every node.

visualize(filename, interested_span_match=None)

Render the trace tree with Graphviz for debugging purposes.

Parameters:

  • filename (str) –

    Base filename for the generated .png diagram.

  • interested_span_match (str | None, default: None ) –

    Optional regular expression used to keep only matching spans (and their ancestors) in the output.

Note

The method requires the optional graphviz dependency to be available in the runtime environment.

agentlightning.adapter.triplet.Transition

Bases: BaseModel

A single transition within a reinforcement learning trajectory.

Attributes:

  • state (List[int]) –

    Token identifiers describing the model input state.

  • action (List[int]) –

    Token identifiers representing the model output.

  • response_id (Optional[str]) –

    Identifier of the LLM response used to deduplicate spans.

  • agent_name (str) –

    Human-readable agent name captured from the trace.

  • reward (Optional[float]) –

    Scalar reward associated with the transition, if available.

agentlightning.adapter.triplet.RewardMatchPolicy

Bases: str, Enum

Strategies for matching rewards to LLM call spans.

Note

Each reward span must expose a payload shaped like {"type": "reward", "value": <float>|None} as described in reward.py.

FIRST_OCCURRENCE = 'first_occurrence' class-attribute instance-attribute

Use the first reward encountered in chronological order after the current LLM call match.

FIRST_SIBLING = 'first_sibling' class-attribute instance-attribute

Use the first sibling in the current trace subtree as the reward unless another LLM call match is found.

agentlightning.algorithm.decorator.FunctionalAlgorithm

Bases: Algorithm, Generic[AF]

An algorithm wrapper built from a callable implementation.

Functional algorithms let you provide an ordinary function instead of subclassing Algorithm. The wrapper inspects the callable signature to supply optional dependencies such as the store, adapter, and LLM proxy.

__init__(algorithm_func)

__init__(algorithm_func: AlgorithmFuncSyncLike) -> None
__init__(algorithm_func: AlgorithmFuncAsyncLike) -> None

Wrap a function that implements algorithm behaviour.

Parameters:

  • algorithm_func (Union[AlgorithmFuncSyncLike, AlgorithmFuncAsyncLike]) –

    Sync or async callable implementing the algorithm contract. Arguments are detected automatically based on the function signature.

run(train_dataset=None, val_dataset=None)

run(
    train_dataset: Optional[Dataset[Any]] = None,
    val_dataset: Optional[Dataset[Any]] = None,
) -> None
run(
    train_dataset: Optional[Dataset[Any]] = None,
    val_dataset: Optional[Dataset[Any]] = None,
) -> Awaitable[None]

Execute the wrapped function with injected dependencies.

Parameters:

  • train_dataset (Optional[Dataset[Any]], default: None ) –

    Optional training dataset passed through when the callable declares a train_dataset parameter.

  • val_dataset (Optional[Dataset[Any]], default: None ) –

    Optional validation dataset passed through when the callable declares a val_dataset parameter.

Returns:

  • Union[None, Awaitable[None]]

    None for sync callables or an awaitable when the callable is async.

Raises:

  • TypeError

    If a dataset is provided but the function signature does not accept the corresponding argument.

agentlightning.litagent.decorator.FunctionalLitAgent

Bases: LitAgent[T]

Adapter that turns plain rollout functions into LitAgent instances.

The helper inspects the wrapped function to determine which resources to inject, allowing both synchronous and asynchronous callables to participate in the training loop without writing a dedicated subclass.

__call__(*args, **kwargs)

Make the agent instance callable, preserving the original function behavior.

__init__(rollout_func, *, strip_proxy=True)

Initialize the wrapper around a rollout function.

Parameters:

  • rollout_func (FunctionalLitAgentFunc[T]) –

    Callable that implements the rollout. It may be synchronous or asynchronous and can optionally receive a Rollout alongside resources such as llm or prompt_template.

  • strip_proxy (bool, default: True ) –

    When True, convert ProxyLLM inputs into LLM instances before calling the rollout function. Defaults to True.

rollout(task, resources, rollout)

Execute a synchronous rollout using the wrapped function.

Parameters:

  • task (T) –

    Task input data.

  • resources (NamedResources) –

    Mapping of named resources available to the agent.

  • rollout (Rollout) –

    Rollout metadata provided by the runtime.

Returns:

Raises:

  • RuntimeError

    If the wrapped function is asynchronous.

rollout_async(task, resources, rollout) async

Execute an asynchronous rollout using the wrapped function.

Parameters:

  • task (T) –

    Task input data.

  • resources (NamedResources) –

    Mapping of named resources available to the agent.

  • rollout (Rollout) –

    Rollout metadata provided by the runtime.

Returns:

Raises:

  • RuntimeError

    If the wrapped function is synchronous.

agentlightning.litagent.decorator.llm_rollout(func=None, *, strip_proxy=True)

llm_rollout(
    func: LlmRolloutFunc[T],
) -> FunctionalLitAgent[T]
llm_rollout(
    *, strip_proxy: bool = True
) -> Callable[[LlmRolloutFunc[T]], FunctionalLitAgent[T]]

Create a FunctionalLitAgent for LLM-based rollouts.

Parameters:

  • func (LlmRolloutFunc[T] | None, default: None ) –

    Callable defining the agent's behaviour. Supported signatures include:

    • (task, llm) -> result
    • (task, llm, rollout) -> result
    • async (task, llm) -> result
    • async (task, llm, rollout) -> result
  • strip_proxy (bool, default: True ) –

    When True, convert proxy resources into concrete LLM instances before calling the function. Defaults to True.

Returns:

Examples:

@llm_rollout
def my_agent(task, llm):
    return llm.endpoint

@llm_rollout(strip_proxy=False)
def my_agent_no_strip(task, llm):
    return llm.model

result = my_agent(task, llm)
result = my_agent.rollout(task, resources, rollout)

agentlightning.litagent.decorator.prompt_rollout(func=None)

prompt_rollout(
    func: PromptRolloutFunc[T],
) -> FunctionalLitAgent[T]
prompt_rollout() -> (
    Callable[[PromptRolloutFunc[T]], FunctionalLitAgent[T]]
)

Create a FunctionalLitAgent for prompt-based rollouts.

This decorator is designed for agents that work with tunable prompt templates. It enables a workflow where algorithms manage and optimize the prompt template, while agents consume the template to perform rollouts. This is particularly useful for prompt optimization scenarios.

Parameters:

  • func (PromptRolloutFunc[T] | None, default: None ) –

    Callable defining the agent's behavior. Supported signatures include:

    • (task, prompt_template) -> result
    • (task, prompt_template, rollout) -> result
    • async (task, prompt_template) -> result
    • async (task, prompt_template, rollout) -> result

Returns:

Examples:

@prompt_rollout
def my_agent(task, prompt_template):
    messages = prompt_template.format(task=task.input)
    return messages

result = my_agent(task, prompt_template)
result = my_agent.rollout(task, resources, rollout)

agentlightning.llm_proxy.ModelConfig

Bases: TypedDict

LiteLLM model registration entry.

This mirrors the items in LiteLLM's model_list section.

Attributes:

  • model_name (str) –

    Logical model name exposed by the proxy.

  • litellm_params (Dict[str, Any]) –

    Parameters passed to LiteLLM for this model (e.g., backend model id, api_base, additional options).

agentlightning.llm_proxy.LightningSpanExporter

Bases: SpanExporter

Buffered OTEL span exporter with subtree flushing and training-store sink.

Design:

  • Spans are buffered until a root span's entire subtree is available.
  • A private event loop on a daemon thread runs async flush logic.
  • Rollout/attempt/sequence metadata is reconstructed by merging headers from any span within a subtree.

Thread-safety:

  • Buffer access is protected by a re-entrant lock.
  • Export is synchronous to the caller yet schedules an async flush on the internal loop, then waits for completion.

Parameters:

  • store (Optional[LightningStore], default: None ) –

    Optional explicit LightningStore. If None, uses get_global_store().

export(spans)

Export spans via buffered subtree flush.

Appends spans to the internal buffer, then triggers an async flush on the private event loop. Blocks until that flush completes.

Parameters:

  • spans (Sequence[ReadableSpan]) –

    Sequence of spans to export.

Returns:

  • SpanExportResult ( SpanExportResult ) –

    SUCCESS on flush success, else FAILURE.

shutdown()

Shut down the exporter event loop.

Safe to call at process exit.

agentlightning.llm_proxy.LightningOpenTelemetry

Bases: OpenTelemetry

OpenTelemetry integration that exports spans to the Lightning store.

Responsibilities:

  • Ensures each request is annotated with a per-attempt sequence id so spans are ordered deterministically even with clock skew across nodes.
  • Uses LightningSpanExporter to persist spans for analytics and training.

Parameters:

  • store (LightningStore | None, default: None ) –

    Optional explicit LightningStore for the exporter.

agentlightning.llm_proxy.AddReturnTokenIds

Bases: CustomLogger

LiteLLM logger hook to request token ids from vLLM.

This mutates the outgoing request payload to include return_token_ids=True for backends that support token id return (e.g., vLLM).

See

https://github.com/vllm-project/vllm/pull/22587

async_pre_call_hook(*args, **kwargs) async

Async pre-call hook to adjust request payload.

Parameters:

  • args (Any, default: () ) –

    Positional args from LiteLLM.

  • kwargs (Any, default: {} ) –

    Keyword args from LiteLLM.

Returns:

  • Optional[Union[Exception, str, Dict[str, Any]]]

    Either an updated payload dict or an Exception to short-circuit.

agentlightning.store.base.UNSET = _UnsetType() module-attribute

agentlightning.store.utils.propagate_status(update_rollout_status, attempt, config) async

Propagate the status of an attempt to the rollout.

The rollout should be made sure in a state to be outdated. Requeue the rollout if it should be retried.

This operation is completely unlocked. The caller is responsible for locking the store.

agentlightning.tracer.agentops.LightningSpanProcessor

Bases: SpanProcessor

Span processor that subclasses OpenTelemetry's SpanProcessor and adds support to dump traces to a LightningStore.

on_end(span)

Process a span when it ends.

Parameters:

  • span (ReadableSpan) –

    The span that has ended.

spans()

Get the list of spans collected by this processor. This is useful for debugging and testing purposes.

Returns:

  • List[ReadableSpan]

    List of ReadableSpan objects collected during tracing.

Deprecated APIs

agentlightning.server.AgentLightningServer

High-level controller for the legacy Agent Lightning FastAPI server.

The controller orchestrates server start-up, task queueing, resource updates, and retrieval of client rollouts. It is primarily used by existing systems that still rely on the HTTP-based workflow.

Deprecated

AgentLightningServer is part of the legacy client/server stack. Prefer the store-based runtime for new integrations.

__init__(host='127.0.0.1', port=8000, task_timeout_seconds=300.0)

Initialize the controller.

Parameters:

  • host (str, default: '127.0.0.1' ) –

    Hostname or IP address to bind the HTTP server to.

  • port (int, default: 8000 ) –

    TCP port exposed by the server.

  • task_timeout_seconds (float, default: 300.0 ) –

    Seconds before a claimed task is considered stale and re-queued.

get_completed_rollout(rollout_id) async

Retrieve a specific completed rollout by identifier.

poll_completed_rollout(rollout_id, timeout=None) async

Poll for a completed rollout until it becomes available or a timeout expires.

Parameters:

  • rollout_id (str) –

    Identifier of the rollout to wait for.

  • timeout (Optional[float], default: None ) –

    Maximum number of seconds to wait. None waits indefinitely.

Returns:

  • Optional[RolloutLegacy]

    Retrieved rollout, or None when the timeout is reached without success.

queue_task(sample, mode=None, resources_id=None, metadata=None) async

Add a task to the queue for a client to process.

retrieve_completed_rollouts() async

Return every completed rollout and clear the internal buffer.

run_forever() async

Run the server indefinitely until stop() is invoked.

start() async

Start the FastAPI server in the background.

stop() async

Stop the FastAPI server and wait for a graceful shutdown.

update_resources(resources) async

Publish a new resource bundle and return its generated identifier.

agentlightning.server.ServerDataStore

Async-safe container for in-memory server state.

The store tracks queued tasks, claimed tasks, uploaded rollouts, and the currently published resources. All interactions are guarded by asyncio locks so that the FastAPI handlers can safely run in parallel.

Deprecated

ServerDataStore is part of the legacy client/server stack. Use LightningStore instead.

add_task(sample, mode=None, resources_id=None, metadata=None) async

Enqueue a new task and return the generated rollout identifier.

Parameters:

  • sample (Any) –

    Payload that describes the task input.

  • mode (Literal['train', 'val', 'test'] | None, default: None ) –

    Phase in which the sample should be executed ("train", "val", or "test").

  • resources_id (str | None, default: None ) –

    Identifier of a resource bundle that the executor should load before running the task.

  • metadata (Dict[str, Any] | None, default: None ) –

    Optional metadata forwarded to the executor.

Returns:

  • str

    Unique rollout identifier assigned to the task.

get_latest_resources() async

Return the most recent resource bundle, if one exists.

get_next_task() async

Retrieve the next task from the queue without blocking.

Returns:

  • Optional[Task]

    Next Task ready to execute, or None

  • Optional[Task]

    when the queue is empty.

get_processing_tasks()

Return a copy of currently processing tasks for timeout checking.

get_resources_by_id(resources_id) async

Retrieve a specific resource bundle by identifier.

Parameters:

  • resources_id (str) –

    Identifier that was previously published to the store.

Returns:

requeue_task(task) async

Requeue a task that timed out while being processed.

retrieve_completed_rollouts() async

Return all completed rollouts and clear the internal buffer.

retrieve_rollout(rollout_id) async

Retrieve and remove a stored rollout by identifier.

Parameters:

  • rollout_id (str) –

    Identifier of the rollout to fetch.

Returns:

store_rollout(rollout) async

Persist a completed rollout for later inspection.

Parameters:

update_resources(update) async

Persist a new resource bundle and mark it as the latest version.

Parameters:

agentlightning.client.AgentLightningClient

Client wrapper for the legacy version-aware Agent Lightning server.

The client exposes synchronous and asynchronous helpers for polling tasks, retrieving resource bundles, and submitting rollouts. It also maintains a simple in-memory cache keyed by the server-provided resource identifier to avoid redundant network requests.

Deprecated

AgentLightningClient is part of the legacy client/server stack. New code should rely on the store-based APIs implemented in agentlightning.store.

Attributes:

  • endpoint

    Base URL of the Agent Lightning server.

  • poll_interval

    Delay in seconds between polling attempts when no task is available.

  • timeout

    Timeout in seconds applied to HTTP requests.

  • task_count

    Number of tasks claimed during the lifetime of this client.

__init__(endpoint, poll_interval=5.0, timeout=10.0)

Initialize the client.

Parameters:

  • endpoint (str) –

    Root URL of the Agent Lightning server.

  • poll_interval (float, default: 5.0 ) –

    Seconds to wait between polling attempts.

  • timeout (float, default: 10.0 ) –

    Seconds before a request to the server is considered timed out.

get_latest_resources()

Fetch the most recent resource bundle advertised by the server.

Returns:

get_latest_resources_async() async

Fetch the most recent resource bundle advertised by the server.

Returns:

get_resources_by_id(resource_id)

Fetch a specific resource bundle by identifier.

Parameters:

  • resource_id (str) –

    Identifier sourced from the task metadata.

Returns:

get_resources_by_id_async(resource_id) async

Fetch a specific resource bundle by identifier.

Parameters:

  • resource_id (str) –

    Identifier sourced from the task metadata.

Returns:

poll_next_task()

Poll the server synchronously until a task becomes available.

Returns:

  • Optional[Task]

    The next Task available for execution, or

  • Optional[Task]

    None if polling fails.

poll_next_task_async() async

Poll the server asynchronously until a task becomes available.

Returns:

  • Optional[Task]

    The next Task exposed by the server,

  • Optional[Task]

    or None if polling fails.

post_rollout(rollout)

Submit a completed rollout back to the server.

Parameters:

  • rollout (RolloutLegacy) –

    Legacy rollout payload produced by the executor.

Returns:

  • Optional[Dict[str, Any]]

    Parsed JSON response returned by the server, or None when the request fails.

post_rollout_async(rollout) async

Submit a completed rollout back to the server.

Parameters:

  • rollout (RolloutLegacy) –

    Legacy rollout payload produced by the executor.

Returns:

  • Optional[Dict[str, Any]]

    Parsed JSON response returned by the server, or None when the request fails.

agentlightning.client.DevTaskLoader

Bases: AgentLightningClient

In-memory task loader used for development and integration tests.

The loader mimics the behavior of the legacy HTTP server by storing tasks and resources locally. Polling methods simply iterate over the provided collection, allowing rapid iteration without provisioning any external infrastructure.

Deprecated

DevTaskLoader is a compatibility shim. Prefer Trainer.dev for new code.

rollouts property

Return the rollouts posted back to the loader during development runs.

__init__(tasks, resources, **kwargs)

Initialize the loader with predefined tasks and resources.

Parameters:

  • tasks (Union[List[TaskInput], List[Task]]) –

    Sequence of task inputs or preconstructed tasks that will be served in order.

  • resources (Union[NamedResources, ResourcesUpdate]) –

    Static resources returned for any resources_id query.

  • **kwargs (Any, default: {} ) –

    Additional keyword arguments forwarded to the parent client.

Raises:

  • ValueError

    If no tasks are provided or both Task and TaskInput instances are mixed.

poll_next_task()

Return the next task from the local queue.

If TaskInput instances were provided, they are converted into Task objects on the fly. Otherwise, the preconstructed tasks are returned in sequence.

Returns:

  • Optional[Task]

    Next task to execute.

agentlightning.Task

Bases: BaseModel

Rollout request served to client agents.

Deprecated

The legacy HTTP client/server stack still uses this model. Prefer LightningStore APIs for new workflows.

agentlightning.TaskInput = Any module-attribute

Task input type. Accepts arbitrary payloads.

agentlightning.TaskIfAny

Bases: BaseModel

A task or indication that no task is available.

Deprecated

Use LightningStore APIs for new workflows.

is_available instance-attribute

Indication that a task is available.

agentlightning.RolloutRawResultLegacy = Union[None, float, List[Triplet], List[Dict[str, Any]], List[ReadableSpan], RolloutLegacy] module-attribute

Legacy rollout result type.

Deprecated

Use RolloutRawResult instead.

agentlightning.RolloutLegacy

Bases: BaseModel

Legacy reporting payload exchanged with the deprecated HTTP server.

Deprecated

Use Rollout instead.