Store References¶
agentlightning.LightningStore
¶
Contract for the persistent control-plane that coordinates training rollouts.
A LightningStore mediates every interaction between algorithms and runners:
- Rollout lifecycle: accept new rollouts, queue them for execution, create attempts,
and drive the rollout status machine (
"queuing"→"preparing"→"running"→{"succeeded","failed","cancelled"}or"requeuing"when a retry is justified). - Attempt tracking: record each execution attempt, including progress heartbeats,
retry sequencing, and terminal states such as
"timeout"or"unresponsive". - Span ingest: capture structured telemetry emitted by runners (either as native
Spanobjects or asopentelemetry.sdk.trace.ReadableSpaninstances) so that algorithms can reconstruct trajectories and rewards. - Resource versioning: manage immutable snapshots of named resources (prompt templates, model checkpoints, proxy endpoints, …) and expose a single "latest" snapshot that runners can fetch just after claiming work.
Implementations must provide thread-safe/async-safe semantics: each coroutine should
appear atomic to callers even when multiple algorithms or runners call the API concurrently.
Unless stated otherwise, missing identifiers should result in a ValueError.
add_otel_span(rollout_id, attempt_id, readable_span, sequence_id=None)
async
¶
Convert and persist an OpenTelemetry span for a particular attempt.
Implementations must transform the readable_span into a Span
(typically via Span.from_opentelemetry()),
assign a strictly increasing sequence_id when one is not provided, and persist it
using the same semantics as add_span().
Parameters:
-
rollout_id(str) –Identifier of the rollout that produced the span.
-
attempt_id(str) –Attempt identifier the span belongs to.
-
readable_span(ReadableSpan) –OpenTelemetry span in SDK form.
-
sequence_id(int | None, default:None) –Optional explicit ordering hint. When omitted, call
get_next_span_sequence_id()automatically.
Returns:
-
Span–The stored span record.
Raises:
-
NotImplementedError–Subclasses must implement span persistence.
-
ValueError–Implementations must raise when the rollout or attempt is unknown.
add_resources(resources)
async
¶
Persist a new immutable snapshot of named resources and mark it as latest.
Implementations must assign a fresh resources_id and ensure subsequent calls to
get_latest_resources() return the
snapshot produced here.
Parameters:
-
resources(NamedResources) –Mapping of resource names to their serialized payloads.
Returns:
-
ResourcesUpdate–The stored
ResourcesUpdateincluding its generated id.
Raises:
-
NotImplementedError–Subclasses must implement resource persistence.
add_span(span)
async
¶
Persist a pre-constructed span emitted during rollout execution.
The provided Span must already contain the rollout_id,
attempt_id, and sequence_id. Implementations must:
- Verify that both rollout and attempt exist.
- Ensure span ordering remains strictly increasing per attempt (rejecting or keeping duplicates).
- Treat the span arrival as a heartbeat: update the attempt's
last_heartbeat_timeand transition both attempt and rollout to"running"if they were still"preparing"or"requeuing".
Parameters:
-
span(Span) –Fully populated span to persist.
Returns:
-
Span–The stored span record (implementations may return a copy).
Raises:
-
NotImplementedError–Subclasses must implement span persistence.
-
ValueError–Implementations must raise when the referenced rollout or attempt is missing.
capabilities()
¶
Return the capabilities of the store.
dequeue_rollout()
async
¶
Claim the oldest queued rollout and transition it to preparing.
This function do not block.
Retrieval must be FIFO across rollouts that remain in queuing or requeuing
state. When a rollout is claimed, implementations must:
- Transition its status to
"preparing". - Create a new attempt with
status="preparing"andsequence_idequal to the number of attempts already registered for the rollout plus one. - Return an
AttemptedRolloutsnapshot so the runner knows both rollout metadata and the attempt identifier.
Returns:
-
Optional[AttemptedRollout]–The next attempt to execute, or
Nonewhen no eligible rollouts are queued.
Raises:
-
NotImplementedError–Subclasses must implement queue retrieval.
enqueue_rollout(input, mode=None, resources_id=None, config=None, metadata=None)
async
¶
Persist a rollout in queuing state so runners can claim it later.
Note
Different from start_rollout(),
this method is called when the caller only wants to submit work for later scheduling.
Implementations must generate a unique rollout_id, stamp start_time with
the current time, default config to a fresh RolloutConfig,
and insert the rollout at the tail of the scheduling queue. No attempt is created yet.
Parameters:
-
input(TaskInput) –Arbitrary task payload supplied by an algorithm.
-
mode(Literal['train', 'val', 'test'] | None, default:None) –Optional semantic mode indicator (
"train","val","test"). -
resources_id(str | None, default:None) –Resource snapshot used when a runner eventually executes the rollout.
-
config(RolloutConfig | None, default:None) –Fine-grained retry/timeout parameters to persist with the rollout.
-
metadata(Dict[str, Any] | None, default:None) –Free-form metadata stored verbatim with the rollout record.
Returns:
Raises:
-
NotImplementedError–Subclasses must persist the rollout.
-
ValueError–Implementations should raise when
resources_iddoes not exist.
get_latest_attempt(rollout_id)
async
¶
Fetch the attempt with the highest sequence_id for rollout_id.
Parameters:
-
rollout_id(str) –Identifier to inspect.
Returns:
-
Optional[Attempt]–The most recent attempt or
Nonewhen no attempts exist yet.
Raises:
-
NotImplementedError–Subclasses must implement retrieval.
-
ValueError–Implementations must raise when the rollout does not exist.
get_latest_resources()
async
¶
Fetch the latest resource snapshot marked as the global default.
Returns:
-
Optional[ResourcesUpdate]–The current latest
ResourcesUpdate, orNonewhen -
Optional[ResourcesUpdate]–no resources have been registered yet.
Raises:
-
NotImplementedError–Subclasses must implement retrieval.
get_next_span_sequence_id(rollout_id, attempt_id)
async
¶
Allocate the next strictly increasing sequence number used to order spans.
Implementations must retain counters so repeated calls return 1, 2, ... without
gaps unless spans were explicitly inserted with a custom sequence_id. The
counter may be scoped per rollout or per attempt, but the sequence must be
strictly increasing for spans emitted by the specified attempt so traces remain
totally ordered.
See Distributed Tracing for detailed motivations.
Parameters:
-
rollout_id(str) –Identifier of the rollout emitting spans.
-
attempt_id(str) –Attempt identifier for the upcoming span.
Returns:
-
int–The next integer sequence identifier, unique within the attempt.
Raises:
-
NotImplementedError–Subclasses must provide the allocator.
-
ValueError–Implementations must raise when the rollout or attempt does not exist.
get_resources_by_id(resources_id)
async
¶
Return a specific named resource snapshot by identifier.
Parameters:
-
resources_id(str) –Identifier of the snapshot.
Returns:
-
Optional[ResourcesUpdate]–The stored
ResourcesUpdate, orNonewhen missing.
Raises:
-
NotImplementedError–Subclasses must implement retrieval.
get_rollout_by_id(rollout_id)
async
¶
Fetch a rollout by identifier without mutating its state.
Parameters:
-
rollout_id(str) –Identifier to retrieve.
Returns:
-
Optional[Rollout]–The rollout when found, otherwise
None.
Raises:
-
NotImplementedError–Subclasses must implement retrieval.
query_attempts(rollout_id)
async
¶
Return every attempt ever created for rollout_id in ascending sequence order.
Parameters:
-
rollout_id(str) –Identifier of the rollout being inspected.
Returns:
-
List[Attempt]–Attempts sorted by
sequence_id(oldest first). Returns an empty list when none exist.
Raises:
-
NotImplementedError–Subclasses must implement the query.
-
ValueError–Implementations must raise when the rollout does not exist.
query_resources()
async
¶
List every stored resource snapshot in insertion order.
Returns:
-
List[ResourcesUpdate]–A chronological list of
ResourcesUpdateobjects.
Raises:
-
NotImplementedError–Subclasses must implement retrieval.
query_rollouts(*, status=None, rollout_ids=None)
async
¶
Retrieve rollouts filtered by status and/or explicit identifiers.
Parameters:
-
status(Optional[Sequence[RolloutStatus]], default:None) –Optional whitelist of
RolloutStatusvalues. -
rollout_ids(Optional[Sequence[str]], default:None) –Optional whitelist of rollout identifiers to include.
Returns:
-
List[Rollout]–A list of matching rollouts. Ordering is backend-defined but must be deterministic.
Raises:
-
NotImplementedError–Subclasses must implement the query.
query_spans(rollout_id, attempt_id=None)
async
¶
Return the stored spans for a rollout, optionally scoped to one attempt.
Spans must be returned in ascending sequence_id order. Implementations may raise
a RuntimeError when spans were evicted or expired.
Parameters:
-
rollout_id(str) –Identifier of the rollout being inspected.
-
attempt_id(str | Literal['latest'] | None, default:None) –Attempt identifier to filter by. Pass
"latest"to retrieve only the most recent attempt, orNoneto return all spans across attempts.
Returns:
-
List[Span]–An ordered list of spans (possibly empty).
Raises:
-
NotImplementedError–Subclasses must implement the query.
-
ValueError–Implementations must raise when the rollout or attempt is unknown.
start_attempt(rollout_id)
async
¶
Create a manual retry attempt for an existing rollout.
This is typically invoked by runners that wish to retry outside of the
normal queue flow (for example in an online RL setup).
Implementations must validate that the rollout exists, allocate a fresh attempt_id,
increment the sequence_id monotonically, stamp the new attempt with status="preparing",
and return an up-to-date AttemptedRollout.
Parameters:
-
rollout_id(str) –Unique identifier of the rollout receiving a new attempt.
Returns:
-
AttemptedRollout–The rollout paired with its newly-created attempt.
Raises:
-
NotImplementedError–Subclasses must implement attempt creation.
-
ValueError–Implementations must raise when
rollout_idis unknown.
start_rollout(input, mode=None, resources_id=None, config=None, metadata=None)
async
¶
Register a rollout and immediately create its first attempt.
Note
Use enqueue_rollout() when the
caller only wants to submit work for later scheduling.
The rollout must be persisted with status="preparing" and an initial attempt
with sequence_id == 1 so the caller can begin execution without visiting the
public queue. Implementations are expected to:
- Generate a unique
rollout_idandattempt_id. - Record
start_timefor both rollout and attempt based on the current clock. - Copy
configandmetadataso later mutations do not leak shared references. - Resolve
resources_idto the latest resource snapshot whenNoneis supplied.
Parameters:
-
input(TaskInput) –Arbitrary task payload supplied by an algorithm.
-
mode(Literal['train', 'val', 'test'] | None, default:None) –Optional semantic mode for downstream analytics (
"train","val","test"). -
resources_id(str | None, default:None) –Concrete resource snapshot to execute against; defaults to the latest stored snapshot.
-
config(RolloutConfig | None, default:None) –Rollout retry/timeout policy. Should default to a fresh
RolloutConfig. -
metadata(Dict[str, Any] | None, default:None) –Free-form metadata persisted verbatim with the rollout.
Returns:
-
AttemptedRollout–The fully-populated
AttemptedRolloutincluding -
AttemptedRollout–the just-created attempt.
Raises:
-
NotImplementedError–Subclasses must provide durable storage for the rollout.
-
ValueError–Implementations should raise when
resources_iddoes not exist.
update_attempt(rollout_id, attempt_id, status=UNSET, worker_id=UNSET, last_heartbeat_time=UNSET, metadata=UNSET)
async
¶
Update attempt bookkeeping such as status, worker ownership, and heartbeats.
When attempt_id is "latest" the update must target the attempt with the highest
sequence_id; otherwise it must target the specific attempt. Implementations should
propagate status changes to the rollout (for example via propagate_status())
once the latest attempt transitions to a terminal state.
Similar to update_rollout(),
parameters also default to the sentinel UNSET.
Parameters:
-
rollout_id(str) –Identifier of the rollout whose attempt will be updated.
-
attempt_id(str | Literal['latest']) –Attempt identifier or
"latest"as a convenience. -
status(AttemptStatus | Unset, default:UNSET) –Replacement attempt status. Terminal statuses must set
end_time. -
worker_id(str | Unset, default:UNSET) –Identifier for the worker currently processing the attempt.
-
last_heartbeat_time(float | Unset, default:UNSET) –Wall-clock timestamp (seconds) of the latest heartbeat/span.
-
metadata(Optional[Dict[str, Any]] | Unset, default:UNSET) –Replacement metadata dictionary.
Returns:
-
Attempt–The updated attempt record.
Raises:
-
NotImplementedError–Subclasses must implement mutation logic.
-
ValueError–Implementations must raise when the rollout or attempt is unknown.
update_resources(resources_id, resources)
async
¶
Overwrite or extend an existing resource snapshot and mark it as latest.
This API is typically used by algorithms that maintain mutable resources (e.g., model checkpoints) under a stable identifier.
Parameters:
-
resources_id(str) –Identifier of the snapshot to replace.
-
resources(NamedResources) –Updated mapping of resource names to payloads.
Returns:
-
ResourcesUpdate–The persisted
ResourcesUpdate.
Raises:
-
NotImplementedError–Subclasses must implement resource persistence.
-
ValueError–Implementations must raise when
resources_iddoes not exist.
update_rollout(rollout_id, input=UNSET, mode=UNSET, resources_id=UNSET, status=UNSET, config=UNSET, metadata=UNSET)
async
¶
Update rollout metadata and, when provided, drive status transitions.
Parameters default to the sentinel UNSET to
distinguish omitted fields from explicit None assignments. Implementations must:
- Validate the rollout exists before mutating it.
- Replace each property when a concrete value (including
None) is supplied. - When the status switches into a terminal state, set
end_timeand signal any waiters. - When the status re-enters a queueing state, ensure the rollout is enqueued exactly once.
Parameters:
-
rollout_id(str) –Identifier of the rollout to update.
-
input(TaskInput | Unset, default:UNSET) –Replacement task payload; pass
Noneto explicitly clear the input. -
mode(Optional[Literal['train', 'val', 'test']] | Unset, default:UNSET) –Replacement rollout mode.
-
resources_id(Optional[str] | Unset, default:UNSET) –Replacement resources snapshot reference.
-
status(RolloutStatus | Unset, default:UNSET) –Target rollout status.
-
config(RolloutConfig | Unset, default:UNSET) –Replacement retry/timeout configuration.
-
metadata(Optional[Dict[str, Any]] | Unset, default:UNSET) –Replacement metadata dictionary.
Returns:
-
Rollout–The updated rollout record.
Raises:
-
NotImplementedError–Subclasses must implement mutation logic.
-
ValueError–Implementations must raise when the rollout is unknown or the update is invalid.
wait_for_rollouts(*, rollout_ids, timeout=None)
async
¶
Block until the targeted rollouts reach a terminal status or the timeout expires.
Terminal statuses are "succeeded", "failed", and "cancelled". When the timeout
elapses, implementations should return the subset of rollouts that are already terminal
and omit the rest.
Warning
It's dangerous and might be event-loop blocking to call this function with a long timeout. It's a good idea to poll for the method to check if new completed rollouts can coming. Be careful in implementing the sleep logic to avoid busy-waiting.
Parameters:
-
rollout_ids(List[str]) –Identifiers of rollouts to watch.
-
timeout(Optional[float], default:None) –Maximum time in seconds to wait.
Nonewaits indefinitely.
Returns:
-
List[Rollout]–Rollouts that finished before the deadline, in arbitrary order.
Raises:
-
NotImplementedError–Subclasses must implement waiting semantics.
-
ValueError–Implementations must raise when a rollout identifier is unknown.
Store Implementations¶
agentlightning.InMemoryLightningStore
¶
Bases: LightningStore
In-memory implementation of LightningStore using Python data structures. Thread-safe and async-compatible but data is not persistent.
The methods in this class should generally not call each other, especially those that are locked.
Parameters:
-
eviction_memory_threshold(float | int | None, default:None) –The threshold for evicting spans in bytes. By default, it's 70% of the total VRAM available.
-
safe_memory_threshold(float | int | None, default:None) –The threshold for safe memory usage in bytes. By default, it's 80% of the eviction threshold.
-
span_size_estimator(Callable[[Span], int] | None, default:None) –A function to estimate the size of a span in bytes. By default, it's a simple size estimator that uses sys.getsizeof.
add_otel_span(rollout_id, attempt_id, readable_span, sequence_id=None)
async
¶
Add an opentelemetry span to the store.
See LightningStore.add_otel_span() for semantics.
add_resources(resources)
async
¶
Stores a new version of named resources and sets it as the latest.
See LightningStore.add_resources() for semantics.
add_span(span)
async
¶
Persist a pre-converted span.
See LightningStore.add_span() for semantics.
capabilities()
¶
Return the capabilities of the store.
dequeue_rollout()
async
¶
Retrieves the next task from the queue without blocking.
Returns None if the queue is empty.
Will set the rollout status to preparing and create a new attempt.
See LightningStore.dequeue_rollout() for semantics.
enqueue_rollout(input, mode=None, resources_id=None, config=None, metadata=None)
async
¶
Adds a new task to the queue with specific metadata and returns the rollout.
See LightningStore.enqueue_rollout() for semantics.
get_latest_attempt(rollout_id)
async
¶
Retrieves the latest attempt for a given rollout ID.
See LightningStore.get_latest_attempt() for semantics.
get_latest_resources()
async
¶
Retrieves the latest version of named resources.
See LightningStore.get_latest_resources() for semantics.
get_next_span_sequence_id(rollout_id, attempt_id)
async
¶
Get the next span sequence ID for a given rollout and attempt. The number is strictly increasing for each rollout. The store will not issue the same sequence ID twice.
See LightningStore.get_next_span_sequence_id() for semantics.
get_resources_by_id(resources_id)
async
¶
Retrieves a specific version of named resources by its ID.
See LightningStore.get_resources_by_id() for semantics.
get_rollout_by_id(rollout_id)
async
¶
Retrieves a specific rollout by its ID.
See LightningStore.get_rollout_by_id() for semantics.
If the rollout has been attempted, the latest attempt will also be returned.
query_attempts(rollout_id)
async
¶
Retrieves all attempts associated with a specific rollout ID. Returns an empty list if no attempts are found.
See LightningStore.query_attempts() for semantics.
query_resources()
async
¶
Return every stored resource snapshot in insertion order.
query_rollouts(*, status=None, rollout_ids=None)
async
¶
Retrieves rollouts filtered by their status and rollout ids. If no status is provided, returns all rollouts.
See LightningStore.query_rollouts() for semantics.
query_spans(rollout_id, attempt_id=None)
async
¶
Query and retrieve all spans associated with a specific rollout ID. Returns an empty list if no spans are found.
See LightningStore.query_spans() for semantics.
start_attempt(rollout_id)
async
¶
Creates a new attempt for a given rollout ID and return the attempt details.
See LightningStore.start_attempt() for semantics.
start_rollout(input, mode=None, resources_id=None, config=None, metadata=None)
async
¶
Notify the store that I'm about to run a rollout.
See LightningStore.start_rollout() for semantics.
update_attempt(rollout_id, attempt_id, status=UNSET, worker_id=UNSET, last_heartbeat_time=UNSET, metadata=UNSET)
async
¶
Update a specific or latest attempt for a given rollout.
See LightningStore.update_attempt() for semantics.
update_resources(resources_id, resources)
async
¶
Safely stores a new version of named resources and sets it as the latest.
See LightningStore.update_resources() for semantics.
update_rollout(rollout_id, input=UNSET, mode=UNSET, resources_id=UNSET, status=UNSET, config=UNSET, metadata=UNSET)
async
¶
Update the rollout status and related metadata.
See LightningStore.update_rollout() for semantics.
wait_for_rollouts(*, rollout_ids, timeout=None)
async
¶
Wait for specified rollouts to complete with a timeout. Returns the completed rollouts, potentially incomplete if timeout is reached.
This method does not change the state of the store.
See LightningStore.wait_for_rollouts() for semantics.
Client-Server and Thread-safe Wrappers¶
agentlightning.LightningStoreServer
¶
Bases: LightningStore
Server wrapper that exposes a LightningStore via HTTP API. Delegates all operations to an underlying store implementation.
Healthcheck and watchdog relies on the underlying store.
agl store is a convenient CLI to start a store server.
Parameters:
-
store(LightningStore) –The underlying store to delegate operations to.
-
host(str) –The hostname or IP address to bind the server to.
-
port(int) –The TCP port to listen on.
-
cors_allow_origins(Sequence[str] | str | None, default:None) –A list of CORS origins to allow. Use '*' to allow all origins.
__getstate__()
¶
Control pickling to prevent server state from being sent to subprocesses.
When LightningStoreServer is pickled (e.g., passed to a subprocess), we only serialize the underlying store and connection details. The FastAPI app and uvicorn server are excluded as they should not be transferred between processes.
The subprocess should create its own server instance if needed.
__setstate__(state)
¶
Restore from pickle by reconstructing only the essential attributes.
Note: This creates a new server instance without FastAPI/uvicorn initialized. Call init() pattern or create a new LightningStoreServer if you need a fully functional server in the subprocess.
capabilities()
¶
Return the capabilities of the store.
run_forever()
async
¶
Runs the FastAPI server indefinitely.
You need to call this method in the same process as the server was created in.
start()
async
¶
Starts the FastAPI server in the background.
You need to call this method in the same process as the server was created in.
stop()
async
¶
Gracefully stops the running FastAPI server.
You need to call this method in the same process as the server was created in.
agentlightning.LightningStoreClient
¶
Bases: LightningStore
HTTP client that talks to a remote LightningStoreServer.
Parameters:
-
server_address(str) –The address of the LightningStoreServer to connect to.
-
retry_delays(Sequence[float], default:(1.0, 2.0, 5.0)) –Backoff schedule (seconds) used when the initial request fails for a non-application reason. Each entry is a retry attempt.
-
health_retry_delays(Sequence[float], default:(0.1, 0.2, 0.5)) –Delays between /health probes while waiting for the server to come back.
__getstate__()
¶
When LightningStoreClient is pickled (e.g., passed to a subprocess), we only serialize the server address and retry configurations. The ClientSessions are excluded as they should not be transferred between processes.
__setstate__(state)
¶
Restore from pickle by reconstructing only the essential attributes.
Replicating __init__ logic to create another client instance in the subprocess.
capabilities()
¶
Return the capabilities of the store.
close()
async
¶
Close the HTTP session.
dequeue_rollout()
async
¶
Dequeue a rollout from the server queue.
Returns:
-
Optional[AttemptedRollout]–AttemptedRollout if a rollout is available, None if queue is empty.
Note
This method does NOT retry on failures. If any exception occurs (network error, server error, etc.), it logs the error and returns None immediately.
get_latest_attempt(rollout_id)
async
¶
Get the latest attempt for a rollout.
Parameters:
-
rollout_id(str) –ID of the rollout to query.
Returns:
-
Optional[Attempt]–Attempt if found, None if not found or if all retries are exhausted.
Note
This method retries on transient failures (network errors, 5xx status codes). If all retries fail, it logs the error and returns None instead of raising an exception.
get_latest_resources()
async
¶
Get the latest resources.
Returns:
-
Optional[ResourcesUpdate]–ResourcesUpdate if found, None if not found or if all retries are exhausted.
Note
This method retries on transient failures (network errors, 5xx status codes). If all retries fail, it logs the error and returns None instead of raising an exception.
get_resources_by_id(resources_id)
async
¶
Get resources by their ID.
Parameters:
-
resources_id(str) –ID of the resources to retrieve.
Returns:
-
Optional[ResourcesUpdate]–ResourcesUpdate if found, None if not found or if all retries are exhausted.
Note
This method retries on transient failures (network errors, 5xx status codes). If all retries fail, it logs the error and returns None instead of raising an exception.
get_rollout_by_id(rollout_id)
async
¶
Get a rollout by its ID.
Parameters:
-
rollout_id(str) –ID of the rollout to retrieve.
Returns:
-
Optional[Rollout]–Rollout if found, None if not found or if all retries are exhausted.
Note
This method retries on transient failures (network errors, 5xx status codes). If all retries fail, it logs the error and returns None instead of raising an exception.
query_resources()
async
¶
List all resource snapshots stored on the server.
wait_for_rollouts(*, rollout_ids, timeout=None)
async
¶
Wait for rollouts to complete.
Parameters:
-
rollout_ids(List[str]) –List of rollout IDs to wait for.
-
timeout(Optional[float], default:None) –Timeout in seconds. If not None, the method will raise a ValueError if the timeout is greater than 0.1 seconds.
Returns:
-
List[Rollout]–List of rollouts that are completed.
agentlightning.LightningStoreThreaded
¶
Bases: LightningStore
Facade that delegates all store operations to a underlying store instance.
The operations are guaranteed to be thread-safe. Make sure the threaded stores are instantiated before initializing the threads.
capabilities()
¶
Return the capabilities of the store.