Internal API References¶
Danger
The following APIs should be used with extra caution because they are very likely to change in the future.
agentlightning.adapter.messages.OpenAIMessages
¶
Bases: TypedDict
OpenAI-style chat messages with optional tool definitions.
Attributes:
-
messages
(List[ChatCompletionMessageParam]
) –Ordered chat messages that describe the conversation.
-
tools
(Optional[List[ChatCompletionFunctionToolParam]]
) –Tool specifications available to the assistant, if any.
agentlightning.adapter.triplet.TraceTree
¶
Tree representation of a trace span and its descendants.
Attributes:
-
id
–Unique identifier for the span node.
-
span
–Span
backing this node. -
children
–Child nodes connected to the current span.
agent_name()
¶
Return the agent name associated with the span, if any.
Returns:
-
Optional[str]
–Agent name extracted from known attributes, otherwise
None
.
find_llm_calls(*, llm_call_match, agent_match, within_matching_subtree=None, within_reward=None, within_llm_call=None, existing_llm_call_response_ids=None)
¶
Find LLM call spans matching the supplied filters.
Parameters:
-
llm_call_match
(str
) –Regular expression used to match span names that qualify as LLM calls.
-
agent_match
(Optional[str]
) –Optional regular expression that must match the enclosing agent span name.
-
within_matching_subtree
(str | None
, default:None
) –Marker propagated through recursive calls to record matching agents.
-
within_reward
(Optional[bool]
, default:None
) –When
True
, suppresses LLM matches under reward spans. -
within_llm_call
(Optional[bool]
, default:None
) –When
True
, prevents duplicate matches for nested LLM calls. -
existing_llm_call_response_ids
(Optional[set[str]]
, default:None
) –Known response identifiers used to deduplicate spans.
Returns:
-
List[Tuple['TraceTree', str]]
–A list of tuples pairing the matching node with the agent subtree label that triggered the
-
List[Tuple['TraceTree', str]]
–match.
from_spans(spans)
classmethod
¶
Construct a tree from a flat list of spans.
Parameters:
-
spans
(List[Span]
) –Spans that collectively form a single trace segment.
Returns:
-
'TraceTree'
–A
TraceTree
rooted at either the -
'TraceTree'
–discovered root span or a synthetic root when multiple roots are present.
Raises:
-
ValueError
–If the span list is empty or no root span can be inferred.
is_reward_span()
¶
Return whether the span explicitly encodes a reward.
Returns:
-
bool
–True
when the span payload describes a reward, otherwiseFalse
.
match_rewards(reward_match, llm_calls)
¶
Assign rewards to previously matched LLM calls.
Parameters:
-
reward_match
(str
) –Strategy identifier from
RewardMatchPolicy
. -
llm_calls
(List['TraceTree']
) –Trace nodes representing LLM call spans.
Returns:
-
dict[str, Optional[float]]
–Mapping from span identifier to reward value or
None
when no reward is available.
maybe_reward_dict()
¶
Return a reward payload if the span encodes one.
Returns:
-
dict[str, Any]
–Dictionary containing reward metadata, or an empty dictionary when no reward is found.
names_tuple()
¶
Return the span name alongside nested child names.
Returns:
-
str
–A tuple of the current span name and a list of tuples for each child containing the
-
List[Any]
–child name and its descendants.
repair_hierarchy()
¶
Repair missing parent-child relationships introduced by mixed tracing systems.
Some agent frameworks emit spans via multiple subsystems, which can cause LLM completion spans to float directly under the root span instead of being nested under the correct agent. The method re-parents those spans to the closest ancestor that fully envelopes the child in time.
If we don't, when we want to select the LLM completion span with agent as filter. We will never get the correct span underneath.
to_json()
¶
Convert the tree node into a JSON-serialisable structure.
to_trajectory(llm_call_match='openai\\.chat\\.completion', agent_match=None, exclude_llm_call_in_reward=True, dedup_llm_call=True, reward_match=RewardMatchPolicy.FIRST_OCCURRENCE, final_reward=None)
¶
Convert the trace tree into a trajectory of Triplet
items.
Parameters:
-
llm_call_match
(str
, default:'openai\\.chat\\.completion'
) –Regular expression for LLM call span names.
-
agent_match
(Optional[str]
, default:None
) –Optional regular expression for agent span names.
-
exclude_llm_call_in_reward
(bool
, default:True
) –When
True
, prevents searching for rewards under the LLM call subtree. -
dedup_llm_call
(bool
, default:True
) –When
True
, deduplicates spans using the LLM response identifier. -
reward_match
(RewardMatchPolicy
, default:FIRST_OCCURRENCE
) –Reward matching policy used to associate reward spans with LLM calls.
-
final_reward
(Optional[float]
, default:None
) –Optional reward appended to the final transition when provided.
Returns:
traverse()
¶
Traverse the tree depth first and return every node.
visualize(filename, interested_span_match=None)
¶
Render the trace tree with Graphviz for debugging purposes.
Parameters:
-
filename
(str
) –Base filename for the generated
.png
diagram. -
interested_span_match
(str | None
, default:None
) –Optional regular expression used to keep only matching spans (and their ancestors) in the output.
Note
The method requires the optional graphviz
dependency to be available in the runtime
environment.
agentlightning.adapter.triplet.Transition
¶
Bases: BaseModel
A single transition within a reinforcement learning trajectory.
Attributes:
-
state
(List[int]
) –Token identifiers describing the model input state.
-
action
(List[int]
) –Token identifiers representing the model output.
-
response_id
(Optional[str]
) –Identifier of the LLM response used to deduplicate spans.
-
agent_name
(str
) –Human-readable agent name captured from the trace.
-
reward
(Optional[float]
) –Scalar reward associated with the transition, if available.
agentlightning.adapter.triplet.RewardMatchPolicy
¶
Bases: str
, Enum
Strategies for matching rewards to LLM call spans.
Note
Each reward span must expose a payload shaped like {"type": "reward", "value": <float>|None}
as described in reward.py
.
FIRST_OCCURRENCE = 'first_occurrence'
class-attribute
instance-attribute
¶
Use the first reward encountered in chronological order after the current LLM call match.
FIRST_SIBLING = 'first_sibling'
class-attribute
instance-attribute
¶
Use the first sibling in the current trace subtree as the reward unless another LLM call match is found.
agentlightning.algorithm.decorator.FunctionalAlgorithm
¶
Bases: Algorithm
, Generic[AF]
An algorithm wrapper built from a callable implementation.
Functional algorithms let you provide an ordinary function instead of
subclassing Algorithm
. The wrapper inspects
the callable signature to supply optional dependencies
such as the store, adapter, and LLM proxy.
__init__(algorithm_func)
¶
Wrap a function that implements algorithm behaviour.
Parameters:
-
algorithm_func
(Union[AlgorithmFuncSyncLike, AlgorithmFuncAsyncLike]
) –Sync or async callable implementing the algorithm contract. Arguments are detected automatically based on the function signature.
run(train_dataset=None, val_dataset=None)
¶
Execute the wrapped function with injected dependencies.
Parameters:
-
train_dataset
(Optional[Dataset[Any]]
, default:None
) –Optional training dataset passed through when the callable declares a
train_dataset
parameter. -
val_dataset
(Optional[Dataset[Any]]
, default:None
) –Optional validation dataset passed through when the callable declares a
val_dataset
parameter.
Returns:
-
Union[None, Awaitable[None]]
–None for sync callables or an awaitable when the callable is async.
Raises:
-
TypeError
–If a dataset is provided but the function signature does not accept the corresponding argument.
agentlightning.litagent.decorator.FunctionalLitAgent
¶
Bases: LitAgent[T]
Adapter that turns plain rollout functions into LitAgent
instances.
The helper inspects the wrapped function to determine which resources to inject, allowing both synchronous and asynchronous callables to participate in the training loop without writing a dedicated subclass.
__call__(*args, **kwargs)
¶
Make the agent instance callable, preserving the original function behavior.
__init__(rollout_func, *, strip_proxy=True)
¶
Initialize the wrapper around a rollout function.
Parameters:
-
rollout_func
(FunctionalLitAgentFunc[T]
) –Callable that implements the rollout. It may be synchronous or asynchronous and can optionally receive a
Rollout
alongside resources such asllm
orprompt_template
. -
strip_proxy
(bool
, default:True
) –
rollout(task, resources, rollout)
¶
Execute a synchronous rollout using the wrapped function.
Parameters:
-
task
(T
) –Task input data.
-
resources
(NamedResources
) –Mapping of named resources available to the agent.
-
rollout
(Rollout
) –Rollout metadata provided by the runtime.
Returns:
-
RolloutRawResult
–Result produced by the wrapped rollout function.
Raises:
-
RuntimeError
–If the wrapped function is asynchronous.
rollout_async(task, resources, rollout)
async
¶
Execute an asynchronous rollout using the wrapped function.
Parameters:
-
task
(T
) –Task input data.
-
resources
(NamedResources
) –Mapping of named resources available to the agent.
-
rollout
(Rollout
) –Rollout metadata provided by the runtime.
Returns:
-
RolloutRawResult
–Result produced by the wrapped rollout coroutine.
Raises:
-
RuntimeError
–If the wrapped function is synchronous.
agentlightning.litagent.decorator.llm_rollout(func=None, *, strip_proxy=True)
¶
Create a FunctionalLitAgent
for LLM-based rollouts.
Parameters:
-
func
(LlmRolloutFunc[T] | None
, default:None
) –Callable defining the agent's behaviour. Supported signatures include:
(task, llm) -> result
(task, llm, rollout) -> result
async (task, llm) -> result
async (task, llm, rollout) -> result
-
strip_proxy
(bool
, default:True
) –When
True
, convert proxy resources into concreteLLM
instances before calling the function. Defaults toTrue
.
Returns:
-
FunctionalLitAgent[T] | Callable[[LlmRolloutFunc[T]], FunctionalLitAgent[T]]
–FunctionalLitAgent
that -
FunctionalLitAgent[T] | Callable[[LlmRolloutFunc[T]], FunctionalLitAgent[T]]
–wraps the supplied function.
Examples:
agentlightning.litagent.decorator.prompt_rollout(func=None)
¶
Create a FunctionalLitAgent
for prompt-based rollouts.
This decorator is designed for agents that work with tunable prompt templates. It enables a workflow where algorithms manage and optimize the prompt template, while agents consume the template to perform rollouts. This is particularly useful for prompt optimization scenarios.
Parameters:
-
func
(PromptRolloutFunc[T] | None
, default:None
) –Callable defining the agent's behavior. Supported signatures include:
(task, prompt_template) -> result
(task, prompt_template, rollout) -> result
async (task, prompt_template) -> result
async (task, prompt_template, rollout) -> result
Returns:
-
FunctionalLitAgent[T] | Callable[[PromptRolloutFunc[T]], FunctionalLitAgent[T]]
–FunctionalLitAgent
that -
FunctionalLitAgent[T] | Callable[[PromptRolloutFunc[T]], FunctionalLitAgent[T]]
–wraps the supplied function.
Examples:
agentlightning.llm_proxy.ModelConfig
¶
Bases: TypedDict
LiteLLM model registration entry.
This mirrors the items in LiteLLM's model_list
section.
Attributes:
-
model_name
(str
) –Logical model name exposed by the proxy.
-
litellm_params
(Dict[str, Any]
) –Parameters passed to LiteLLM for this model (e.g., backend model id, api_base, additional options).
agentlightning.llm_proxy.LightningSpanExporter
¶
Bases: SpanExporter
Buffered OTEL span exporter with subtree flushing and training-store sink.
Design:
- Spans are buffered until a root span's entire subtree is available.
- A private event loop on a daemon thread runs async flush logic.
- Rollout/attempt/sequence metadata is reconstructed by merging headers from any span within a subtree.
Thread-safety:
- Buffer access is protected by a re-entrant lock.
- Export is synchronous to the caller yet schedules an async flush on the internal loop, then waits for completion.
Parameters:
-
store
(Optional[LightningStore]
, default:None
) –Optional explicit LightningStore. If None, uses
get_global_store()
.
export(spans)
¶
Export spans via buffered subtree flush.
Appends spans to the internal buffer, then triggers an async flush on the private event loop. Blocks until that flush completes.
Parameters:
-
spans
(Sequence[ReadableSpan]
) –Sequence of spans to export.
Returns:
-
SpanExportResult
(SpanExportResult
) –SUCCESS on flush success, else FAILURE.
shutdown()
¶
Shut down the exporter event loop.
Safe to call at process exit.
agentlightning.llm_proxy.LightningOpenTelemetry
¶
Bases: OpenTelemetry
OpenTelemetry integration that exports spans to the Lightning store.
Responsibilities:
- Ensures each request is annotated with a per-attempt sequence id so spans are ordered deterministically even with clock skew across nodes.
- Uses
LightningSpanExporter
to persist spans for analytics and training.
Parameters:
-
store
(LightningStore | None
, default:None
) –Optional explicit LightningStore for the exporter.
agentlightning.llm_proxy.AddReturnTokenIds
¶
Bases: CustomLogger
LiteLLM logger hook to request token ids from vLLM.
This mutates the outgoing request payload to include return_token_ids=True
for backends that support token id return (e.g., vLLM).
See
https://github.com/vllm-project/vllm/pull/22587
async_pre_call_hook(*args, **kwargs)
async
¶
Async pre-call hook to adjust request payload.
Parameters:
-
args
(Any
, default:()
) –Positional args from LiteLLM.
-
kwargs
(Any
, default:{}
) –Keyword args from LiteLLM.
Returns:
-
Optional[Union[Exception, str, Dict[str, Any]]]
–Either an updated payload dict or an Exception to short-circuit.
agentlightning.store.base.UNSET = _UnsetType()
module-attribute
¶
agentlightning.store.utils.propagate_status(update_rollout_status, attempt, config)
async
¶
Propagate the status of an attempt to the rollout.
The rollout should be made sure in a state to be outdated. Requeue the rollout if it should be retried.
This operation is completely unlocked. The caller is responsible for locking the store.
agentlightning.tracer.agentops.LightningSpanProcessor
¶
Bases: SpanProcessor
Span processor that subclasses OpenTelemetry's SpanProcessor
and adds support to dump traces
to a LightningStore
.
Deprecated APIs¶
agentlightning.server.AgentLightningServer
¶
High-level controller for the legacy Agent Lightning FastAPI server.
The controller orchestrates server start-up, task queueing, resource updates, and retrieval of client rollouts. It is primarily used by existing systems that still rely on the HTTP-based workflow.
Deprecated
AgentLightningServer
is part of
the legacy client/server stack. Prefer the store-based runtime for new
integrations.
__init__(host='127.0.0.1', port=8000, task_timeout_seconds=300.0)
¶
Initialize the controller.
Parameters:
-
host
(str
, default:'127.0.0.1'
) –Hostname or IP address to bind the HTTP server to.
-
port
(int
, default:8000
) –TCP port exposed by the server.
-
task_timeout_seconds
(float
, default:300.0
) –Seconds before a claimed task is considered stale and re-queued.
get_completed_rollout(rollout_id)
async
¶
Retrieve a specific completed rollout by identifier.
poll_completed_rollout(rollout_id, timeout=None)
async
¶
Poll for a completed rollout until it becomes available or a timeout expires.
Parameters:
-
rollout_id
(str
) –Identifier of the rollout to wait for.
-
timeout
(Optional[float]
, default:None
) –Maximum number of seconds to wait.
None
waits indefinitely.
Returns:
-
Optional[RolloutLegacy]
–Retrieved rollout, or
None
when the timeout is reached without success.
queue_task(sample, mode=None, resources_id=None, metadata=None)
async
¶
Add a task to the queue for a client to process.
retrieve_completed_rollouts()
async
¶
Return every completed rollout and clear the internal buffer.
run_forever()
async
¶
Run the server indefinitely until stop()
is invoked.
start()
async
¶
Start the FastAPI server in the background.
stop()
async
¶
Stop the FastAPI server and wait for a graceful shutdown.
update_resources(resources)
async
¶
Publish a new resource bundle and return its generated identifier.
agentlightning.server.ServerDataStore
¶
Async-safe container for in-memory server state.
The store tracks queued tasks, claimed tasks, uploaded rollouts, and the currently published resources. All interactions are guarded by asyncio locks so that the FastAPI handlers can safely run in parallel.
Deprecated
ServerDataStore
is part of
the legacy client/server stack. Use LightningStore
instead.
add_task(sample, mode=None, resources_id=None, metadata=None)
async
¶
Enqueue a new task and return the generated rollout identifier.
Parameters:
-
sample
(Any
) –Payload that describes the task input.
-
mode
(Literal['train', 'val', 'test'] | None
, default:None
) –Phase in which the sample should be executed (
"train"
,"val"
, or"test"
). -
resources_id
(str | None
, default:None
) –Identifier of a resource bundle that the executor should load before running the task.
-
metadata
(Dict[str, Any] | None
, default:None
) –Optional metadata forwarded to the executor.
Returns:
-
str
–Unique rollout identifier assigned to the task.
get_latest_resources()
async
¶
Return the most recent resource bundle, if one exists.
get_next_task()
async
¶
get_processing_tasks()
¶
Return a copy of currently processing tasks for timeout checking.
get_resources_by_id(resources_id)
async
¶
Retrieve a specific resource bundle by identifier.
Parameters:
-
resources_id
(str
) –Identifier that was previously published to the store.
Returns:
-
Optional[ResourcesUpdate]
–Matching
ResourcesUpdate
-
Optional[ResourcesUpdate]
–instance, or
None
when the identifier is unknown.
requeue_task(task)
async
¶
Requeue a task that timed out while being processed.
retrieve_completed_rollouts()
async
¶
Return all completed rollouts and clear the internal buffer.
retrieve_rollout(rollout_id)
async
¶
Retrieve and remove a stored rollout by identifier.
Parameters:
-
rollout_id
(str
) –Identifier of the rollout to fetch.
Returns:
-
Optional[RolloutLegacy]
–Stored
RolloutLegacy
, orNone
-
Optional[RolloutLegacy]
–when the identifier is unknown.
store_rollout(rollout)
async
¶
Persist a completed rollout for later inspection.
Parameters:
-
rollout
(RolloutLegacy
) –Rollout returned by a client.
update_resources(update)
async
¶
Persist a new resource bundle and mark it as the latest version.
Parameters:
-
update
(ResourcesUpdate
) –Resource payload received from a client.
agentlightning.client.AgentLightningClient
¶
Client wrapper for the legacy version-aware Agent Lightning server.
The client exposes synchronous and asynchronous helpers for polling tasks, retrieving resource bundles, and submitting rollouts. It also maintains a simple in-memory cache keyed by the server-provided resource identifier to avoid redundant network requests.
Deprecated
AgentLightningClient
is part of
the legacy client/server stack. New code should rely on the store-based APIs
implemented in agentlightning.store
.
Attributes:
-
endpoint
–Base URL of the Agent Lightning server.
-
poll_interval
–Delay in seconds between polling attempts when no task is available.
-
timeout
–Timeout in seconds applied to HTTP requests.
-
task_count
–Number of tasks claimed during the lifetime of this client.
__init__(endpoint, poll_interval=5.0, timeout=10.0)
¶
Initialize the client.
Parameters:
-
endpoint
(str
) –Root URL of the Agent Lightning server.
-
poll_interval
(float
, default:5.0
) –Seconds to wait between polling attempts.
-
timeout
(float
, default:10.0
) –Seconds before a request to the server is considered timed out.
get_latest_resources()
¶
Fetch the most recent resource bundle advertised by the server.
Returns:
-
Optional[ResourcesUpdate]
–ResourcesUpdate
for the -
Optional[ResourcesUpdate]
–newest version, or
None
when unavailable.
get_latest_resources_async()
async
¶
Fetch the most recent resource bundle advertised by the server.
Returns:
-
Optional[ResourcesUpdate]
–ResourcesUpdate
for the -
Optional[ResourcesUpdate]
–newest version, or
None
when unavailable.
get_resources_by_id(resource_id)
¶
Fetch a specific resource bundle by identifier.
Parameters:
-
resource_id
(str
) –Identifier sourced from the task metadata.
Returns:
-
Optional[ResourcesUpdate]
–Cached or freshly downloaded
-
Optional[ResourcesUpdate]
–ResourcesUpdate
, or -
Optional[ResourcesUpdate]
–None
when the server returns an error.
get_resources_by_id_async(resource_id)
async
¶
Fetch a specific resource bundle by identifier.
Parameters:
-
resource_id
(str
) –Identifier sourced from the task metadata.
Returns:
-
Optional[ResourcesUpdate]
–Cached or freshly downloaded
-
Optional[ResourcesUpdate]
–ResourcesUpdate
, or -
Optional[ResourcesUpdate]
–None
when the server returns an error.
poll_next_task()
¶
poll_next_task_async()
async
¶
post_rollout(rollout)
¶
Submit a completed rollout back to the server.
Parameters:
-
rollout
(RolloutLegacy
) –Legacy rollout payload produced by the executor.
Returns:
-
Optional[Dict[str, Any]]
–Parsed JSON response returned by the server, or
None
when the request fails.
post_rollout_async(rollout)
async
¶
Submit a completed rollout back to the server.
Parameters:
-
rollout
(RolloutLegacy
) –Legacy rollout payload produced by the executor.
Returns:
-
Optional[Dict[str, Any]]
–Parsed JSON response returned by the server, or
None
when the request fails.
agentlightning.client.DevTaskLoader
¶
Bases: AgentLightningClient
In-memory task loader used for development and integration tests.
The loader mimics the behavior of the legacy HTTP server by storing tasks and resources locally. Polling methods simply iterate over the provided collection, allowing rapid iteration without provisioning any external infrastructure.
Deprecated
DevTaskLoader
is a compatibility shim.
Prefer Trainer.dev
for new code.
rollouts
property
¶
Return the rollouts posted back to the loader during development runs.
__init__(tasks, resources, **kwargs)
¶
Initialize the loader with predefined tasks and resources.
Parameters:
-
tasks
(Union[List[TaskInput], List[Task]]
) –Sequence of task inputs or preconstructed tasks that will be served in order.
-
resources
(Union[NamedResources, ResourcesUpdate]
) –Static resources returned for any
resources_id
query. -
**kwargs
(Any
, default:{}
) –Additional keyword arguments forwarded to the parent client.
Raises:
-
ValueError
–
agentlightning.Task
¶
Bases: BaseModel
Rollout request served to client agents.
Deprecated
The legacy HTTP client/server stack still uses this model. Prefer
LightningStore
APIs for new workflows.
agentlightning.TaskInput = Any
module-attribute
¶
Task input type. Accepts arbitrary payloads.
agentlightning.TaskIfAny
¶
Bases: BaseModel
A task or indication that no task is available.
Deprecated
Use LightningStore
APIs for new workflows.
is_available
instance-attribute
¶
Indication that a task is available.
agentlightning.RolloutRawResultLegacy = Union[None, float, List[Triplet], List[Dict[str, Any]], List[ReadableSpan], RolloutLegacy]
module-attribute
¶
Legacy rollout result type.
Deprecated
Use RolloutRawResult
instead.