Store References¶
agentlightning.LightningStore
¶
Contract for the persistent control-plane that coordinates training rollouts.
A LightningStore mediates every interaction between algorithms and runners:
- Rollout lifecycle: accept new rollouts, queue them for execution, create attempts,
and drive the rollout status machine (
"queuing"→"preparing"→"running"→{"succeeded","failed","cancelled"}or"requeuing"when a retry is justified). - Attempt tracking: record each execution attempt, including progress heartbeats,
retry sequencing, and terminal states such as
"timeout"or"unresponsive". - Span ingest: capture structured telemetry emitted by runners (either as native
Spanobjects or asopentelemetry.sdk.trace.ReadableSpaninstances) so that algorithms can reconstruct trajectories and rewards. - Resource versioning: manage immutable snapshots of named resources (prompt templates, model checkpoints, proxy endpoints, …) and expose a single "latest" snapshot that runners can fetch just after claiming work.
Implementations must provide thread-safe/async-safe semantics: each coroutine should
appear atomic to callers even when multiple algorithms or runners call the API concurrently.
Unless stated otherwise, missing identifiers should result in a ValueError.
capabilities
property
¶
Return the capabilities of the store.
add_many_spans(spans)
async
¶
Persist a sequence of pre-constructed spans emitted during rollout execution.
Implementations can simply delegate to add_span() for each span.
However, if the store supports bulk insertion, it can implement this method to improve performance.
add_otel_span(rollout_id, attempt_id, readable_span, sequence_id=None)
async
¶
Convert and persist an OpenTelemetry span for a particular attempt.
Implementations must transform the readable_span into a Span
(typically via Span.from_opentelemetry()),
assign a strictly increasing sequence_id when one is not provided, and persist it
using the same semantics as add_span().
Parameters:
-
rollout_id(str) –Identifier of the rollout that produced the span.
-
attempt_id(str) –Attempt identifier the span belongs to.
-
readable_span(ReadableSpan) –OpenTelemetry span in SDK form.
-
sequence_id(int | None, default:None) –Optional explicit ordering hint. When omitted, call
get_next_span_sequence_id()automatically.
Returns:
-
Optional[Span]–The stored span record. Return
Noneif the span was not added due to a duplicate.
Raises:
-
NotImplementedError–Subclasses must implement span persistence.
-
ValueError–Implementations must raise when the rollout or attempt is unknown.
add_resources(resources)
async
¶
Persist a new immutable snapshot of named resources and mark it as latest.
Implementations must assign a fresh resources_id and ensure subsequent calls to
get_latest_resources() return the
snapshot produced here.
Parameters:
-
resources(NamedResources) –Mapping of resource names to their serialized payloads.
Returns:
-
ResourcesUpdate–The stored
ResourcesUpdateincluding its generated id.
Raises:
-
NotImplementedError–Subclasses must implement resource persistence.
add_span(span)
async
¶
Persist a pre-constructed span emitted during rollout execution.
The provided Span must already contain the rollout_id,
attempt_id, and sequence_id. Implementations must:
- Verify that both rollout and attempt exist.
- Ensure span ordering remains strictly increasing per attempt (rejecting or keeping duplicates).
- Treat the span arrival as a heartbeat: update the attempt's
last_heartbeat_timeand transition both attempt and rollout to"running"if they were still"preparing"or"requeuing".
Parameters:
-
span(Span) –Fully populated span to persist.
Returns:
-
Optional[Span]–The stored span record (implementations may return a copy).
-
Optional[Span]–Return
Noneif the span was not added due to a duplicate.
Raises:
-
NotImplementedError–Subclasses must implement span persistence.
-
ValueError–Implementations must raise when the referenced rollout or attempt is missing.
dequeue_rollout(worker_id=None)
async
¶
Claim the oldest queued rollout and transition it to preparing.
This function do not block.
Retrieval must be FIFO across rollouts that remain in queuing or requeuing
state. When a rollout is claimed, implementations must:
- Transition its status to
"preparing". - Create a new attempt with
status="preparing"andsequence_idequal to the number of attempts already registered for the rollout plus one. - Return an
AttemptedRolloutsnapshot so the runner knows both rollout metadata and the attempt identifier. - Optionally refresh the caller's
Workertelemetry (e.g.,last_dequeue_time) whenworker_idis provided.
Returns:
-
Optional[AttemptedRollout]–The next attempt to execute, or
Nonewhen no eligible rollouts are queued.
Raises:
-
NotImplementedError–Subclasses must implement queue retrieval.
enqueue_rollout(input, mode=None, resources_id=None, config=None, metadata=None)
async
¶
Persist a rollout in queuing state so runners can claim it later.
Note
Different from start_rollout(),
this method is called when the caller only wants to submit work for later scheduling.
Implementations must generate a unique rollout_id, stamp start_time with
the current time, default config to a fresh RolloutConfig,
and insert the rollout at the tail of the scheduling queue. No attempt is created yet.
Parameters:
-
input(TaskInput) –Arbitrary task payload supplied by an algorithm.
-
mode(Literal['train', 'val', 'test'] | None, default:None) –Optional semantic mode indicator (
"train","val","test"). -
resources_id(str | None, default:None) –Resource snapshot used when a runner eventually executes the rollout.
-
config(RolloutConfig | None, default:None) –Fine-grained retry/timeout parameters to persist with the rollout.
-
metadata(Dict[str, Any] | None, default:None) –Free-form metadata stored verbatim with the rollout record.
Returns:
Raises:
-
NotImplementedError–Subclasses must persist the rollout.
-
ValueError–Implementations should raise when
resources_iddoes not exist.
get_latest_attempt(rollout_id)
async
¶
Fetch the attempt with the highest sequence_id for rollout_id.
Parameters:
-
rollout_id(str) –Identifier to inspect.
Returns:
-
Optional[Attempt]–The most recent attempt or
Nonewhen no attempts exist yet.
Raises:
-
NotImplementedError–Subclasses must implement retrieval.
-
ValueError–Implementations must raise when the rollout does not exist.
get_latest_resources()
async
¶
Fetch the latest resource snapshot marked as the global default.
Returns:
-
Optional[ResourcesUpdate]–The current latest
ResourcesUpdate, orNonewhen -
Optional[ResourcesUpdate]–no resources have been registered yet.
Raises:
-
NotImplementedError–Subclasses must implement retrieval.
get_many_span_sequence_ids(rollout_attempt_ids)
async
¶
Bulk allocate the next strictly increasing sequence number used to order spans.
Implementations may delegate to get_next_span_sequence_id()
for each rollout and attempt.
Parameters:
-
rollout_attempt_ids(Sequence[Tuple[str, str]]) –List of tuples of rollout and attempt identifiers.
Returns:
-
Sequence[int]–List of sequence numbers.
get_next_span_sequence_id(rollout_id, attempt_id)
async
¶
Allocate the next strictly increasing sequence number used to order spans.
Implementations must retain counters so repeated calls return 1, 2, ... without
gaps unless spans were explicitly inserted with a custom sequence_id. The
counter may be scoped per rollout or per attempt, but the sequence must be
strictly increasing for spans emitted by the specified attempt so traces remain
totally ordered.
See Distributed Tracing for detailed motivations.
Parameters:
-
rollout_id(str) –Identifier of the rollout emitting spans.
-
attempt_id(str) –Attempt identifier for the upcoming span.
Returns:
-
int–The next integer sequence identifier, unique within the attempt.
Raises:
-
NotImplementedError–Subclasses must provide the allocator.
-
ValueError–Implementations must raise when the rollout or attempt does not exist.
get_resources_by_id(resources_id)
async
¶
Return a specific named resource snapshot by identifier.
Parameters:
-
resources_id(str) –Identifier of the snapshot.
Returns:
-
Optional[ResourcesUpdate]–The stored
ResourcesUpdate, orNonewhen missing.
Raises:
-
NotImplementedError–Subclasses must implement retrieval.
get_rollout_by_id(rollout_id)
async
¶
Fetch a rollout by identifier without mutating its state.
Parameters:
-
rollout_id(str) –Identifier to retrieve.
Returns:
-
Optional[Rollout]–The rollout when found, otherwise
None.
Raises:
-
NotImplementedError–Subclasses must implement retrieval.
get_worker_by_id(worker_id)
async
¶
Retrieve a single worker by identifier.
Parameters:
-
worker_id(str) –Identifier of the worker.
Returns:
-
Optional[Worker]–The worker record if it exists, otherwise
None.
Raises:
-
NotImplementedError–Subclasses must implement lookup semantics.
otlp_traces_endpoint()
¶
Return the OTLP/HTTP traces endpoint of the store.
The traces can have rollout ID and attempt ID (and optionally sequence ID)
saved in the "resource" of the spans.
The store, if it supports OTLP, should be able to receive the traces and save them
via add_span or
add_otel_span.
The endpoint should be compatible with OTLP HTTP protocol. It's not necessarily compatible with OTLP gRPC protocol.
The returned endpoint will usually ends with /v1/traces.
query_attempts(rollout_id, *, sort_by='sequence_id', sort_order='asc', limit=-1, offset=0)
async
¶
Return every attempt ever created for rollout_id in ascending sequence order.
The parameters allow callers to re-order or paginate the attempts so that large retry histories can be streamed lazily.
Parameters:
-
rollout_id(str) –Identifier of the rollout being inspected.
-
sort_by(Optional[str], default:'sequence_id') –Field to sort by. Must be a numeric or string field of
Attempt. Defaults tosequence_id(oldest first). -
sort_order(Literal['asc', 'desc'], default:'asc') –Order to sort by.
-
limit(int, default:-1) –Limit on the number of results.
-1for unlimited. -
offset(int, default:0) –Offset into the results.
Returns:
-
Sequence[Attempt]–Sequence of Attempts. Returns an empty sequence when none exist.
-
Sequence[Attempt]–The return value is not guaranteed to be a list.
Raises:
-
NotImplementedError–Subclasses must implement the query.
-
ValueError–Implementations must raise when the rollout does not exist.
query_resources(*, resources_id=None, resources_id_contains=None, sort_by=None, sort_order='asc', limit=-1, offset=0)
async
¶
List every stored resource snapshot in insertion order.
Supports lightweight filtering, sorting, and pagination for embedding in dashboards.
Parameters:
-
resources_id(Optional[str], default:None) –Optional identifier of the resources to include.
-
resources_id_contains(Optional[str], default:None) –Optional substring match for resources identifiers.
-
sort_by(Optional[str], default:None) –Optional field to sort by (must be numeric or string on
ResourcesUpdate). -
sort_order(Literal['asc', 'desc'], default:'asc') –Order to sort by.
-
limit(int, default:-1) –Limit on the number of results.
-1for unlimited. -
offset(int, default:0) –Offset into the results.
Returns:
-
Sequence[ResourcesUpdate]–ResourcesUpdateobjects. -
Sequence[ResourcesUpdate]–By default, resources are sorted in a deterministic but undefined order.
-
Sequence[ResourcesUpdate]–The return value is not guaranteed to be a list.
Raises:
-
NotImplementedError–Subclasses must implement retrieval.
query_rollouts(*, status_in=None, rollout_id_in=None, rollout_id_contains=None, filter_logic='and', sort_by=None, sort_order='asc', limit=-1, offset=0, status=None, rollout_ids=None)
async
¶
Retrieve rollouts filtered by status and/or explicit identifiers.
This interface supports structured filtering, sorting, and pagination so
callers can build simple dashboards without copying data out of the
store. The legacy parameters status and rollout_ids remain valid and
are treated as aliases for status_in and rollout_id_in
respectively—when both the new and deprecated parameters are supplied
the new parameters take precedence.
Parameters:
-
status_in(Optional[Sequence[RolloutStatus]], default:None) –Optional whitelist of
RolloutStatusvalues. -
rollout_id_in(Optional[Sequence[str]], default:None) –Optional whitelist of rollout identifiers to include.
-
rollout_id_contains(Optional[str], default:None) –Optional substring match for rollout identifiers.
-
filter_logic(Literal['and', 'or'], default:'and') –Logical operator to combine filters.
-
sort_by(Optional[str], default:None) –Optional field to sort by. Must reference a numeric or string field on
Rollout. -
sort_order(Literal['asc', 'desc'], default:'asc') –Direction to sort when
sort_byis provided. -
limit(int, default:-1) –Maximum number of rows to return. Use
-1for "no limit". -
offset(int, default:0) –Number of rows to skip before returning results.
-
status(Optional[Sequence[RolloutStatus]], default:None) –Deprecated field. Use
status_ininstead. -
rollout_ids(Optional[Sequence[str]], default:None) –Deprecated field. Use
rollout_id_ininstead.
Returns:
-
Sequence[Rollout]–A sequence of matching rollouts (or
AttemptedRollout -
Sequence[Rollout]–when attempts exist). Ordering is deterministic when
sort_byis set. -
Sequence[Rollout]–The return value is not guaranteed to be a list.
Raises:
-
NotImplementedError–Subclasses must implement the query.
query_spans(rollout_id, attempt_id=None, *, trace_id=None, trace_id_contains=None, span_id=None, span_id_contains=None, parent_id=None, parent_id_contains=None, name=None, name_contains=None, filter_logic='and', limit=-1, offset=0, sort_by='sequence_id', sort_order='asc')
async
¶
Return the stored spans for a rollout, optionally scoped to one attempt.
Supports a handful of filters that cover the most common debugging
scenarios (matching trace_id/span_id/parent_id or substring
matches on the span name). attempt_id="latest" acts as a convenience
that resolves the most recent attempt before evaluating filters. When
attempt_id=None, spans across every attempt are eligible. By default
results are sorted by sequence_id (oldest first). Implementations may
raise a RuntimeError when spans were evicted or expired.
Parameters:
-
rollout_id(str) –Identifier of the rollout being inspected.
-
attempt_id(str | Literal['latest'] | None, default:None) –Attempt identifier to filter by. Pass
"latest"to retrieve only the most recent attempt, orNoneto return all spans across attempts. -
trace_id(Optional[str], default:None) –Optional trace ID to filter by.
-
trace_id_contains(Optional[str], default:None) –Optional substring match for trace IDs.
-
span_id(Optional[str], default:None) –Optional span ID to filter by.
-
span_id_contains(Optional[str], default:None) –Optional substring match for span IDs.
-
parent_id(Optional[str], default:None) –Optional parent span ID to filter by.
-
parent_id_contains(Optional[str], default:None) –Optional substring match for parent span IDs.
-
name(Optional[str], default:None) –Optional span name to filter by.
-
name_contains(Optional[str], default:None) –Optional substring match for span names.
-
filter_logic(Literal['and', 'or'], default:'and') –Logical operator to combine the optional filters above. The
rollout_idargument is always applied with AND semantics. -
limit(int, default:-1) –Limit on the number of results.
-1for unlimited. -
offset(int, default:0) –Offset into the results.
-
sort_by(Optional[str], default:'sequence_id') –Field to sort by. Must be a numeric or string field of
Span. -
sort_order(Literal['asc', 'desc'], default:'asc') –Order to sort by.
Returns:
-
Sequence[Span]–An ordered list of spans (possibly empty).
-
Sequence[Span]–The return value is not guaranteed to be a list.
Raises:
-
NotImplementedError–Subclasses must implement the query.
-
ValueError–Implementations must raise when the rollout or attempt is unknown.
query_workers(*, status_in=None, worker_id_contains=None, filter_logic='and', sort_by=None, sort_order='asc', limit=-1, offset=0)
async
¶
Query all workers in the system.
Parameters:
-
status_in(Optional[Sequence[WorkerStatus]], default:None) –Optional whitelist of
WorkerStatusvalues. -
worker_id_contains(Optional[str], default:None) –Optional substring match for worker identifiers.
-
filter_logic(Literal['and', 'or'], default:'and') –Logical operator to combine the optional filters above.
-
sort_by(Optional[str], default:None) –Field to sort by. Must be a numeric or string field of
Worker. -
sort_order(Literal['asc', 'desc'], default:'asc') –Order to sort by.
-
limit(int, default:-1) –Limit on the number of results.
-1for unlimited. -
offset(int, default:0) –Offset into the results.
Returns:
start_attempt(rollout_id)
async
¶
Create a manual retry attempt for an existing rollout.
This is typically invoked by runners that wish to retry outside of the
normal queue flow (for example in an online RL setup).
Implementations must validate that the rollout exists, allocate a fresh attempt_id,
increment the sequence_id monotonically, stamp the new attempt with status="preparing",
and return an up-to-date AttemptedRollout.
Parameters:
-
rollout_id(str) –Unique identifier of the rollout receiving a new attempt.
Returns:
-
AttemptedRollout–The rollout paired with its newly-created attempt.
Raises:
-
NotImplementedError–Subclasses must implement attempt creation.
-
ValueError–Implementations must raise when
rollout_idis unknown.
start_rollout(input, mode=None, resources_id=None, config=None, metadata=None)
async
¶
Register a rollout and immediately create its first attempt.
Note
Use enqueue_rollout() when the
caller only wants to submit work for later scheduling.
The rollout must be persisted with status="preparing" and an initial attempt
with sequence_id == 1 so the caller can begin execution without visiting the
public queue. Implementations are expected to:
- Generate a unique
rollout_idandattempt_id. - Record
start_timefor both rollout and attempt based on the current clock. - Copy
configandmetadataso later mutations do not leak shared references. - Resolve
resources_idto the latest resource snapshot whenNoneis supplied.
Parameters:
-
input(TaskInput) –Arbitrary task payload supplied by an algorithm.
-
mode(Literal['train', 'val', 'test'] | None, default:None) –Optional semantic mode for downstream analytics (
"train","val","test"). -
resources_id(str | None, default:None) –Concrete resource snapshot to execute against; defaults to the latest stored snapshot.
-
config(RolloutConfig | None, default:None) –Rollout retry/timeout policy. Should default to a fresh
RolloutConfig. -
metadata(Dict[str, Any] | None, default:None) –Free-form metadata persisted verbatim with the rollout.
Returns:
-
AttemptedRollout–The fully-populated
AttemptedRolloutincluding -
AttemptedRollout–the just-created attempt.
Raises:
-
NotImplementedError–Subclasses must provide durable storage for the rollout.
-
ValueError–Implementations should raise when
resources_iddoes not exist.
statistics()
async
¶
Return the statistics of the store.
update_attempt(rollout_id, attempt_id, status=UNSET, worker_id=UNSET, last_heartbeat_time=UNSET, metadata=UNSET)
async
¶
Update attempt bookkeeping such as status, worker ownership, and heartbeats.
When attempt_id is "latest" the update must target the attempt with the highest
sequence_id; otherwise it must target the specific attempt. Implementations should
propagate status changes to the rollout (for example via propagate_status())
once the latest attempt transitions to a terminal state.
Similar to update_rollout(),
parameters also default to the sentinel UNSET.
If worker_id is present, the worker status will be updated following the rules:
- If attempt status is "succeeded" or "failed", the corresponding worker status will be set to "idle".
- If attempt status is "unresponsive" or "timeout", the corresponding worker status will be set to "unknown".
- Otherwise, the worker status will be set to "busy".
Parameters:
-
rollout_id(str) –Identifier of the rollout whose attempt will be updated.
-
attempt_id(str | Literal['latest']) –Attempt identifier or
"latest"as a convenience. -
status(AttemptStatus | Unset, default:UNSET) –Replacement attempt status. Terminal statuses must set
end_time. -
worker_id(str | Unset, default:UNSET) –Identifier for the worker currently processing the attempt.
-
last_heartbeat_time(float | Unset, default:UNSET) –Wall-clock timestamp (seconds) of the latest heartbeat/span.
-
metadata(Optional[Dict[str, Any]] | Unset, default:UNSET) –Replacement metadata dictionary.
Returns:
-
Attempt–The updated attempt record.
Raises:
-
NotImplementedError–Subclasses must implement mutation logic.
-
ValueError–Implementations must raise when the rollout or attempt is unknown.
update_resources(resources_id, resources)
async
¶
Overwrite or extend an existing resource snapshot and mark it as latest.
This API is typically used by algorithms that maintain mutable resources (e.g., model checkpoints) under a stable identifier.
Parameters:
-
resources_id(str) –Identifier of the snapshot to replace.
-
resources(NamedResources) –Updated mapping of resource names to payloads.
Returns:
-
ResourcesUpdate–The persisted
ResourcesUpdate.
Raises:
-
NotImplementedError–Subclasses must implement resource persistence.
-
ValueError–Implementations must raise when
resources_iddoes not exist.
update_rollout(rollout_id, input=UNSET, mode=UNSET, resources_id=UNSET, status=UNSET, config=UNSET, metadata=UNSET)
async
¶
Update rollout metadata and, when provided, drive status transitions.
Parameters default to the sentinel UNSET to
distinguish omitted fields from explicit None assignments. Implementations must:
- Validate the rollout exists before mutating it.
- Replace each property when a concrete value (including
None) is supplied. - When the status switches into a terminal state, set
end_timeand signal any waiters. - When the status re-enters a queueing state, ensure the rollout is enqueued exactly once.
Parameters:
-
rollout_id(str) –Identifier of the rollout to update.
-
input(TaskInput | Unset, default:UNSET) –Replacement task payload; pass
Noneto explicitly clear the input. -
mode(Optional[Literal['train', 'val', 'test']] | Unset, default:UNSET) –Replacement rollout mode.
-
resources_id(Optional[str] | Unset, default:UNSET) –Replacement resources snapshot reference.
-
status(RolloutStatus | Unset, default:UNSET) –Target rollout status.
-
config(RolloutConfig | Unset, default:UNSET) –Replacement retry/timeout configuration.
-
metadata(Optional[Dict[str, Any]] | Unset, default:UNSET) –Replacement metadata dictionary.
Returns:
-
Rollout–The updated rollout record.
Raises:
-
NotImplementedError–Subclasses must implement mutation logic.
-
ValueError–Implementations must raise when the rollout is unknown or the update is invalid.
update_worker(worker_id, heartbeat_stats=UNSET)
async
¶
Record a heartbeat for worker_id and refresh telemetry.
Implementations must treat this API as heartbeat-only: it should snapshot
the latest stats when provided, stamp last_heartbeat_time with the
current wall clock, and rely on other store mutations (dequeue_rollout,
update_attempt, etc.) to drive the worker's busy/idle status,
assignment, and activity timestamps.
Parameters:
-
worker_id(str) –Identifier of the worker to update.
-
heartbeat_stats(Dict[str, Any] | Unset, default:UNSET) –Replacement worker heartbeat statistics (non-null when provided).
wait_for_rollouts(*, rollout_ids, timeout=None)
async
¶
Block until the targeted rollouts reach a terminal status or the timeout expires.
Terminal statuses are "succeeded", "failed", and "cancelled". When the timeout
elapses, implementations should return the subset of rollouts that are already terminal
and omit the rest.
Warning
It's dangerous and might be event-loop blocking to call this function with a long timeout. It's a good idea to poll for the method to check if new completed rollouts can coming. Be careful in implementing the sleep logic to avoid busy-waiting.
Parameters:
-
rollout_ids(List[str]) –Identifiers of rollouts to watch.
-
timeout(Optional[float], default:None) –Maximum time in seconds to wait.
Nonewaits indefinitely.
Returns:
-
List[Rollout]–Rollouts that finished before the deadline, in arbitrary order.
Raises:
-
NotImplementedError–Subclasses must implement waiting semantics.
-
ValueError–Implementations must raise when a rollout identifier is unknown.
agentlightning.LightningStoreCapabilities
¶
Bases: TypedDict
Capability of a LightningStore implementation.
All keys are optional and false by default.
async_safe
instance-attribute
¶
Whether the store is async-safe.
otlp_traces
instance-attribute
¶
Whether the store supports OTLP/HTTP traces.
thread_safe
instance-attribute
¶
Whether the store is thread-safe.
zero_copy
instance-attribute
¶
Whether the store has only one copy across all threads/processes.
Store Implementations¶
agentlightning.InMemoryLightningStore
¶
Bases: CollectionBasedLightningStore[InMemoryLightningCollections]
In-memory implementation of LightningStore using Python data structures. Thread-safe and async-compatible but data is not persistent.
Parameters:
-
eviction_memory_threshold(float | int | None, default:None) –The threshold for evicting spans in bytes. By default, it's 70% of the total VRAM available.
-
safe_memory_threshold(float | int | None, default:None) –The threshold for safe memory usage in bytes. By default, it's 80% of the eviction threshold.
-
span_size_estimator(Callable[[Span], int] | None, default:None) –A function to estimate the size of a span in bytes. By default, it's a simple size estimator that uses sys.getsizeof.
capabilities
property
¶
Return the capabilities of the store.
get_running_rollouts(collections)
async
¶
Accelerated version of get_running_rollouts for in-memory store. Used for healthcheck.
on_rollout_update(rollout)
async
¶
Update the running rollout ids set when the rollout updates.
statistics()
async
¶
Return the statistics of the store.
wait_for_rollout(rollout_id, timeout=None)
async
¶
Wait for a specific rollout to complete with a timeout.
agentlightning.CollectionBasedLightningStore
¶
Bases: LightningStore, Generic[T_collections]
It's the standard implementation of LightningStore that uses collections to store data.
If the store implementation is to use the store's default behavior, it's recommended to
inherit from this class and override the methods if needed.
Bring your own collection implementation by using a different collections argument.
The methods in this class should generally not call each other, especially those that are locked.
Parameters:
-
collections(T_collections) –The collections to use for storage.
capabilities
property
¶
Return the capabilities of the store.
This store supports no capability. The capability depends on the underlying collections.
add_many_spans(collections, spans)
async
¶
Persist a sequence of pre-converted spans.
See LightningStore.add_many_spans() for semantics.
add_otel_span(collections, rollout_id, attempt_id, readable_span, sequence_id=None)
async
¶
Add an opentelemetry span to the store.
See LightningStore.add_otel_span() for semantics.
add_resources(collections, resources)
async
¶
Stores a new version of named resources and sets it as the latest.
See LightningStore.add_resources() for semantics.
add_span(collections, span)
async
¶
Persist a pre-converted span.
See LightningStore.add_span() for semantics.
dequeue_rollout(collections, worker_id=None)
async
¶
Retrieves the next task from the queue without blocking.
Returns None if the queue is empty.
Will set the rollout status to preparing and create a new attempt.
See LightningStore.dequeue_rollout() for semantics.
enqueue_rollout(collections, input, mode=None, resources_id=None, config=None, metadata=None)
async
¶
Adds a new task to the queue with specific metadata and returns the rollout.
See LightningStore.enqueue_rollout() for semantics.
get_latest_attempt(collections, rollout_id)
async
¶
Retrieves the latest attempt for a given rollout ID.
See LightningStore.get_latest_attempt() for semantics.
get_latest_resources(collections)
async
¶
Retrieves the latest version of named resources.
See LightningStore.get_latest_resources() for semantics.
get_many_span_sequence_ids(collections, rollout_attempt_ids)
async
¶
Get the next span sequence IDs for a given list of rollout and attempt identifiers.
get_next_span_sequence_id(collections, rollout_id, attempt_id)
async
¶
Get the next span sequence ID for a given rollout and attempt. The number is strictly increasing for each rollout. The store will not issue the same sequence ID twice.
See LightningStore.get_next_span_sequence_id() for semantics.
get_resources_by_id(collections, resources_id)
async
¶
Retrieves a specific version of named resources by its ID.
See LightningStore.get_resources_by_id() for semantics.
get_rollout_by_id(collections, rollout_id)
async
¶
Retrieves a specific rollout by its ID.
See LightningStore.get_rollout_by_id() for semantics.
If the rollout has been attempted, the latest attempt will also be returned.
get_running_rollouts(collections)
async
¶
Get all running rollouts.
As this is invoked very frequently (probably at every requests), subclass can implement hacks to make it more efficient. It should also be unlocked and let the caller hold the lock.
on_rollout_update(rollout)
async
¶
Callback for subclasses to implement specific logic when a rollout changes.
Subclass should not lock this method with collections.atomic() because the caller will already hold the lock.
query_attempts(collections, rollout_id, *, sort_by='sequence_id', sort_order='asc', limit=-1, offset=0)
async
¶
Retrieve attempts for a rollout with optional ordering/pagination.
query_resources(collections, *, resources_id=None, resources_id_contains=None, sort_by=None, sort_order='asc', limit=-1, offset=0)
async
¶
Return every stored resource snapshot in insertion order.
query_rollouts(collections, *, status_in=None, rollout_id_in=None, rollout_id_contains=None, filter_logic='and', sort_by=None, sort_order='asc', limit=-1, offset=0, status=None, rollout_ids=None)
async
¶
Retrieve rollouts with filtering and pagination.
See LightningStore.query_rollouts() for semantics.
query_spans(collections, rollout_id, attempt_id=None, *, trace_id=None, trace_id_contains=None, span_id=None, span_id_contains=None, parent_id=None, parent_id_contains=None, name=None, name_contains=None, filter_logic='and', limit=-1, offset=0, sort_by='sequence_id', sort_order='asc')
async
¶
Query and retrieve spans associated with a specific rollout ID. Returns an empty list if no spans are found.
See LightningStore.query_spans() for semantics.
query_workers(collections, *, status_in=None, worker_id_contains=None, filter_logic='and', sort_by=None, sort_order='asc', limit=-1, offset=0)
async
¶
Return the current snapshot of all workers.
start_attempt(collections, rollout_id)
async
¶
Creates a new attempt for a given rollout ID and return the attempt details.
See LightningStore.start_attempt() for semantics.
start_rollout(collections, input, mode=None, resources_id=None, config=None, metadata=None)
async
¶
Notify the store that I'm about to run a rollout.
See LightningStore.start_rollout() for semantics.
statistics()
async
¶
Return the statistics of the store.
update_attempt(collections, rollout_id, attempt_id, status=UNSET, worker_id=UNSET, last_heartbeat_time=UNSET, metadata=UNSET)
async
¶
Update a specific or latest attempt for a given rollout.
See LightningStore.update_attempt() for semantics.
update_resources(collections, resources_id, resources)
async
¶
Safely stores a new version of named resources and sets it as the latest.
See LightningStore.update_resources() for semantics.
update_rollout(collections, rollout_id, input=UNSET, mode=UNSET, resources_id=UNSET, status=UNSET, config=UNSET, metadata=UNSET)
async
¶
Update the rollout status and related metadata.
See LightningStore.update_rollout() for semantics.
update_worker(collections, worker_id, heartbeat_stats=UNSET)
async
¶
Create or update a worker entry.
wait_for_rollout(rollout_id, timeout=None)
async
¶
Wait for a specific rollout to complete with a timeout.
Subclass may use advanced mechanisms like events to accelerate this.
Returns the completed rollout, or None if timeout is reached.
wait_for_rollouts(*, rollout_ids, timeout=None)
async
¶
Wait for specified rollouts to complete with a timeout. Returns the completed rollouts, potentially incomplete if timeout is reached.
This method does not change the state of the store.
See LightningStore.wait_for_rollouts() for semantics.
Client-Server and Thread-safe Wrappers¶
agentlightning.LightningStoreServer
¶
Bases: LightningStore
Server wrapper that exposes a LightningStore via HTTP API. Delegates all operations to an underlying store implementation.
Healthcheck and watchdog relies on the underlying store.
agl store is a convenient CLI to start a store server.
When the server is executed in a subprocess, the store will discover itself having a different PID and automatically delegate to an HTTP client instead of using the local store. This ensures one single copy of the store will be shared across all processes.
This server exporting OTLP-compatible traces via the /v1/traces endpoint.
Parameters:
-
store(LightningStore) –The underlying store to delegate operations to.
-
host(str | None, default:None) –The hostname or IP address to bind the server to.
-
port(int | None, default:None) –The TCP port to listen on.
-
cors_allow_origins(Sequence[str] | str | None, default:None) –A list of CORS origins to allow. Use '*' to allow all origins.
-
launch_mode(LaunchMode, default:'thread') –The launch mode to use for the server. Defaults to "thread", which runs the server in a separate thread.
-
launcher_args(PythonServerLauncherArgs | None, default:None) –The arguments to use for the server launcher. It's not allowed to set
host,port,launch_modetogether withlauncher_args. -
n_workers(int, default:1) –The number of workers to run in the server. Only applicable for
mplaunch mode. -
prometheus(bool, default:False) –Whether to enable Prometheus metrics.
capabilities
property
¶
Return the capabilities of the store.
endpoint
property
¶
Endpoint is the address that the client will use to connect to the server.
__getstate__()
¶
Control pickling to prevent server state from being sent to subprocesses.
When LightningStoreServer is pickled (e.g., passed to a subprocess), we only serialize the underlying store and connection details. The client instance and process-awareness state are excluded as they should not be transferred between processes.
The subprocess should create its own server instance if needed.
__setstate__(state)
¶
Restore from pickle by reconstructing only the essential attributes.
Note: This creates a new server instance without FastAPI/uvicorn initialized. Call init() pattern or create a new LightningStoreServer if you need a fully functional server in the subprocess. The unpickled server will also have no app and store attributes, this is to make sure there is only one copy of the server in the whole system.
otlp_traces_endpoint()
¶
Return the OTLP/HTTP traces endpoint of the store.
run_forever()
async
¶
Runs the FastAPI server indefinitely.
start()
async
¶
Starts the FastAPI server in the background.
You need to call this method in the same process as the server was created in.
stop()
async
¶
Gracefully stops the running FastAPI server.
You need to call this method in the same process as the server was created in.
agentlightning.LightningStoreClient
¶
Bases: LightningStore
HTTP client that talks to a remote LightningStoreServer.
Parameters:
-
server_address(str) –The address of the LightningStoreServer to connect to.
-
retry_delays(Sequence[float], default:(1.0, 2.0, 5.0)) –Backoff schedule (seconds) used when the initial request fails for a non-application reason. Each entry is a retry attempt. Setting to an empty sequence to disable retries.
-
health_retry_delays(Sequence[float], default:(0.1, 0.2, 0.5)) –Delays between /health probes while waiting for the server to come back. Setting to an empty sequence to disable health checks.
-
request_timeout(float, default:30.0) –Timeout (seconds) for each request.
-
connection_timeout(float, default:5.0) –Timeout (seconds) for establishing connection.
capabilities
property
¶
Return the capabilities of the store.
__getstate__()
¶
When LightningStoreClient is pickled (e.g., passed to a subprocess), we only serialize the server address and retry configurations. The ClientSessions are excluded as they should not be transferred between processes.
__setstate__(state)
¶
Restore from pickle by reconstructing only the essential attributes.
Replicating __init__ logic to create another client instance in the subprocess.
close()
async
¶
Close the HTTP session.
dequeue_rollout(worker_id=None)
async
¶
Dequeue a rollout from the server queue.
Returns:
-
Optional[AttemptedRollout]–AttemptedRollout if a rollout is available, None if queue is empty.
Note
This method does NOT retry on failures. If any exception occurs (network error, server error, etc.), it logs the error and returns None immediately.
get_latest_attempt(rollout_id)
async
¶
Get the latest attempt for a rollout.
Parameters:
-
rollout_id(str) –ID of the rollout to query.
Returns:
-
Optional[Attempt]–Attempt if found, None if not found or if all retries are exhausted.
Note
This method retries on transient failures (network errors, 5xx status codes). If all retries fail, it logs the error and returns None instead of raising an exception.
get_latest_resources()
async
¶
Get the latest resources.
Returns:
-
Optional[ResourcesUpdate]–ResourcesUpdate if found, None if not found or if all retries are exhausted.
Note
This method retries on transient failures (network errors, 5xx status codes). If all retries fail, it logs the error and returns None instead of raising an exception.
get_resources_by_id(resources_id)
async
¶
Get resources by their ID.
Parameters:
-
resources_id(str) –ID of the resources to retrieve.
Returns:
-
Optional[ResourcesUpdate]–ResourcesUpdate if found, None if not found or if all retries are exhausted.
Note
This method retries on transient failures (network errors, 5xx status codes). If all retries fail, it logs the error and returns None instead of raising an exception.
get_rollout_by_id(rollout_id)
async
¶
Get a rollout by its ID.
Parameters:
-
rollout_id(str) –ID of the rollout to retrieve.
Returns:
-
Optional[Rollout]–Rollout if found, None if not found or if all retries are exhausted.
Note
This method retries on transient failures (network errors, 5xx status codes). If all retries fail, it logs the error and returns None instead of raising an exception.
otlp_traces_endpoint()
¶
Return the OTLP/HTTP traces endpoint of the store.
query_resources(*, resources_id=None, resources_id_contains=None, sort_by=None, sort_order='asc', limit=-1, offset=0)
async
¶
List all resource snapshots stored on the server.
wait_for_rollouts(*, rollout_ids, timeout=None)
async
¶
Wait for rollouts to complete.
Parameters:
-
rollout_ids(List[str]) –List of rollout IDs to wait for.
-
timeout(Optional[float], default:None) –Timeout in seconds. If not None, the method will raise a ValueError if the timeout is greater than 0.1 seconds.
Returns:
-
List[Rollout]–List of rollouts that are completed.
agentlightning.LightningStoreThreaded
¶
Bases: LightningStore
Facade that delegates all store operations to a underlying store instance.
The operations are guaranteed to be thread-safe. Make sure the threaded stores are instantiated before initializing the threads.
Collections and Collection Implementations¶
agentlightning.store.collection.Collection
¶
Bases: Generic[T]
Behaves like a list of items. Supporting addition, updating, and deletion of items.
delete(items)
async
¶
Delete the given items from the collection.
Parameters:
-
items(Sequence[T]) –The items to delete from the collection.
Raises:
-
ValueError–If the items with the primary keys to be deleted do not exist.
get(filter=None, sort=None)
async
¶
Get the first item that matches the given filters.
Parameters:
-
filter(Optional[FilterOptions], default:None) –The filters to apply to the collection. See
FilterOptions. -
sort(Optional[SortOptions], default:None) –Sort options. See
SortOptions.
Returns:
-
Optional[T]–The first item that matches the given filters, or None if no item matches.
insert(items)
async
¶
Add the given items to the collection.
Raises:
-
ValueError–If an item with the same primary key already exists.
item_type()
¶
Get the type of the items in the collection.
primary_keys()
¶
Get the primary keys of the collection.
query(filter=None, sort=None, limit=-1, offset=0)
async
¶
Query the collection with the given filters, sort order, and pagination.
Parameters:
-
filter(Optional[FilterOptions], default:None) –The filters to apply to the collection. See
FilterOptions. -
sort(Optional[SortOptions], default:None) –The options for sorting the collection. See
SortOptions. The field must exist in the model. If field might contain null values, in which case the behavior is undefined (i.e., depending on the implementation). -
limit(int, default:-1) –Max number of items to return. Use -1 for "no limit".
-
offset(int, default:0) –Number of items to skip from the start of the matching items.
Returns:
-
PaginatedResult[T]–PaginatedResult with items, limit, offset, and total matched items.
size()
async
¶
Get the number of items in the collection.
update(items)
async
¶
Update the given items in the collection.
Raises:
-
ValueError–If an item with the primary keys does not exist.
upsert(items)
async
¶
Upsert the given items into the collection.
If the items with the same primary keys already exist, they will be updated. Otherwise, they will be inserted.
agentlightning.store.collection.Queue
¶
Bases: Generic[T]
Behaves like a deque. Supporting appending items to the end and popping items from the front.
dequeue(limit=1)
async
¶
Pop the given number of items from the front of the queue.
Parameters:
-
limit(int, default:1) –The number of items to pop from the front of the queue.
Returns:
-
Sequence[T]–The items that were popped from the front of the queue.
-
Sequence[T]–If there are less than
limititems in the queue, the remaining items will be returned.
enqueue(items)
async
¶
Append the given items to the end of the queue.
Parameters:
-
items(Sequence[T]) –The items to append to the end of the queue.
Returns:
-
Sequence[T]–The items that were appended to the end of the queue.
has(item)
async
¶
Check if the given item is in the queue.
item_type()
¶
Get the type of the items in the queue.
peek(limit=1)
async
¶
Peek the given number of items from the front of the queue.
Parameters:
-
limit(int, default:1) –The number of items to peek from the front of the queue.
Returns:
-
Sequence[T]–The items that were peeked from the front of the queue.
-
Sequence[T]–If there are less than
limititems in the queue, the remaining items will be returned.
size()
async
¶
Get the number of items in the queue.
agentlightning.store.collection.KeyValue
¶
Bases: Generic[K, V]
Behaves like a dictionary. Supporting addition, updating, and deletion of items.
get(key, default=None)
async
¶
Get the value for the given key, or the default value if the key is not found.
has(key)
async
¶
Check if the given key is in the dictionary.
pop(key, default=None)
async
¶
Pop the value for the given key, or the default value if the key is not found.
set(key, value)
async
¶
Set the value for the given key.
size()
async
¶
Get the number of items in the dictionary.
agentlightning.store.collection.LightningCollections
¶
Collections of rollouts, attempts, spans, resources, and workers.
LightningStore implementations can use this as a storage base to implement the store API.
attempts
property
¶
Collections of attempts.
resources
property
¶
Collections of resources.
rollout_queue
property
¶
Queue of rollouts (tasks).
rollouts
property
¶
Collections of rollouts.
span_sequence_ids
property
¶
Dictionary (counter) of span sequence IDs.
spans
property
¶
Collections of spans.
workers
property
¶
Collections of workers.
atomic(*args, **kwargs)
¶
Perform a atomic operation on the collections.
Subclass may use args and kwargs to support multiple levels of atomicity.
Parameters:
-
*args(Any, default:()) –Arguments to pass to the operation.
-
**kwargs(Any, default:{}) –Keyword arguments to pass to the operation.
execute(callback)
async
¶
Execute the given callback within an atomic operation.
agentlightning.store.collection.ListBasedCollection
¶
Bases: Collection[T]
In-memory implementation of Collection using a nested dict for O(1) primary-key lookup.
The internal structure is:
{
pk1_value: {
pk2_value: {
...
pkN_value: item
}
}
}
where the nesting depth equals the number of primary keys.
Sorting behavior:
- If no sort_by is provided, the items are returned in the order of insertion.
- If sort_by is provided, the items are sorted by the value of the sort_by field.
- If the sort_by field is a timestamp, the null values are treated as infinity.
- If the sort_by field is not a timestamp, the null values are treated as empty string if the field is str-like, 0 if the field is int-like, 0.0 if the field is float-like.
delete(items)
async
¶
Delete the given items.
Raises:
-
ValueError–If any item with the given primary keys does not exist.
get(filter=None, sort=None)
async
¶
Return the first (or best-sorted) item that matches the given filters, or None.
insert(items)
async
¶
Insert the given items.
Raises:
-
ValueError–If any item with the same primary keys already exists.
item_type()
¶
Return the Pydantic model type of items stored in this collection.
primary_keys()
¶
Return the primary key field names for this collection.
query(filter=None, sort=None, limit=-1, offset=0)
async
¶
Query the collection with filters, sort order, and pagination.
Parameters:
-
filter(Optional[FilterOptions], default:None) –Mapping of field name to operator dict along with the optional
_aggregatelogic. -
sort(Optional[SortOptions], default:None) –Options describing which field to sort by and in which order.
-
limit(int, default:-1) –Max number of items to return. Use -1 for "no limit".
-
offset(int, default:0) –Number of items to skip from the start of the matching items.
size()
async
¶
Return the number of items stored in the collection.
update(items)
async
¶
Update the given items.
Raises:
-
ValueError–If any item with the given primary keys does not exist.
upsert(items)
async
¶
Upsert the given items (insert if missing, otherwise update).
agentlightning.store.collection.DequeBasedQueue
¶
Bases: Queue[T]
Queue implementation backed by collections.deque.
Provides O(1) amortized enqueue (append) and dequeue (popleft).
agentlightning.store.collection.DictBasedKeyValue
¶
agentlightning.store.collection.InMemoryLightningCollections
¶
Bases: LightningCollections
In-memory implementation of LightningCollections using Python data structures.
Serves as the storage base for InMemoryLightningStore.