Store References¶
agentlightning.LightningStore
¶
Contract for the persistent control-plane that coordinates training rollouts.
A LightningStore
mediates every interaction between algorithms and runners:
- Rollout lifecycle: accept new rollouts, queue them for execution, create attempts,
and drive the rollout status machine (
"queuing"
→"preparing"
→"running"
→{"succeeded","failed","cancelled"}
or"requeuing"
when a retry is justified). - Attempt tracking: record each execution attempt, including progress heartbeats,
retry sequencing, and terminal states such as
"timeout"
or"unresponsive"
. - Span ingest: capture structured telemetry emitted by runners (either as native
Span
objects or asopentelemetry.sdk.trace.ReadableSpan
instances) so that algorithms can reconstruct trajectories and rewards. - Resource versioning: manage immutable snapshots of named resources (prompt templates, model checkpoints, proxy endpoints, …) and expose a single "latest" snapshot that runners can fetch just after claiming work.
Implementations must provide thread-safe/async-safe semantics: each coroutine should
appear atomic to callers even when multiple algorithms or runners call the API concurrently.
Unless stated otherwise, missing identifiers should result in a ValueError
.
add_otel_span(rollout_id, attempt_id, readable_span, sequence_id=None)
async
¶
Convert and persist an OpenTelemetry span for a particular attempt.
Implementations must transform the readable_span
into a Span
(typically via Span.from_opentelemetry()
),
assign a strictly increasing sequence_id
when one is not provided, and persist it
using the same semantics as add_span()
.
Parameters:
-
rollout_id
(str
) –Identifier of the rollout that produced the span.
-
attempt_id
(str
) –Attempt identifier the span belongs to.
-
readable_span
(ReadableSpan
) –OpenTelemetry span in SDK form.
-
sequence_id
(int | None
, default:None
) –Optional explicit ordering hint. When omitted, call
get_next_span_sequence_id()
automatically.
Returns:
-
Span
–The stored span record.
Raises:
-
NotImplementedError
–Subclasses must implement span persistence.
-
ValueError
–Implementations must raise when the rollout or attempt is unknown.
add_resources(resources)
async
¶
Persist a new immutable snapshot of named resources and mark it as latest.
Implementations must assign a fresh resources_id
and ensure subsequent calls to
get_latest_resources()
return the
snapshot produced here.
Parameters:
-
resources
(NamedResources
) –Mapping of resource names to their serialized payloads.
Returns:
-
ResourcesUpdate
–The stored
ResourcesUpdate
including its generated id.
Raises:
-
NotImplementedError
–Subclasses must implement resource persistence.
add_span(span)
async
¶
Persist a pre-constructed span emitted during rollout execution.
The provided Span
must already contain the rollout_id
,
attempt_id
, and sequence_id
. Implementations must:
- Verify that both rollout and attempt exist.
- Ensure span ordering remains strictly increasing per attempt (rejecting or keeping duplicates).
- Treat the span arrival as a heartbeat: update the attempt's
last_heartbeat_time
and transition both attempt and rollout to"running"
if they were still"preparing"
or"requeuing"
.
Parameters:
-
span
(Span
) –Fully populated span to persist.
Returns:
-
Span
–The stored span record (implementations may return a copy).
Raises:
-
NotImplementedError
–Subclasses must implement span persistence.
-
ValueError
–Implementations must raise when the referenced rollout or attempt is missing.
dequeue_rollout()
async
¶
Claim the oldest queued rollout and transition it to preparing
.
This function do not block.
Retrieval must be FIFO across rollouts that remain in queuing
or requeuing
state. When a rollout is claimed, implementations must:
- Transition its status to
"preparing"
. - Create a new attempt with
status="preparing"
andsequence_id
equal to the number of attempts already registered for the rollout plus one. - Return an
AttemptedRollout
snapshot so the runner knows both rollout metadata and the attempt identifier.
Returns:
-
Optional[AttemptedRollout]
–The next attempt to execute, or
None
when no eligible rollouts are queued.
Raises:
-
NotImplementedError
–Subclasses must implement queue retrieval.
enqueue_rollout(input, mode=None, resources_id=None, config=None, metadata=None)
async
¶
Persist a rollout in queuing
state so runners can claim it later.
Note
Different from start_rollout()
,
this method is called when the caller only wants to submit work for later scheduling.
Implementations must generate a unique rollout_id
, stamp start_time
with
the current time, default config
to a fresh RolloutConfig
,
and insert the rollout at the tail of the scheduling queue. No attempt is created yet.
Parameters:
-
input
(TaskInput
) –Arbitrary task payload supplied by an algorithm.
-
mode
(Literal['train', 'val', 'test'] | None
, default:None
) –Optional semantic mode indicator (
"train"
,"val"
,"test"
). -
resources_id
(str | None
, default:None
) –Resource snapshot used when a runner eventually executes the rollout.
-
config
(RolloutConfig | None
, default:None
) –Fine-grained retry/timeout parameters to persist with the rollout.
-
metadata
(Dict[str, Any] | None
, default:None
) –Free-form metadata stored verbatim with the rollout record.
Returns:
Raises:
-
NotImplementedError
–Subclasses must persist the rollout.
-
ValueError
–Implementations should raise when
resources_id
does not exist.
get_latest_attempt(rollout_id)
async
¶
Fetch the attempt with the highest sequence_id
for rollout_id
.
Parameters:
-
rollout_id
(str
) –Identifier to inspect.
Returns:
-
Optional[Attempt]
–The most recent attempt or
None
when no attempts exist yet.
Raises:
-
NotImplementedError
–Subclasses must implement retrieval.
-
ValueError
–Implementations must raise when the rollout does not exist.
get_latest_resources()
async
¶
Fetch the latest resource snapshot marked as the global default.
Returns:
-
Optional[ResourcesUpdate]
–The current latest
ResourcesUpdate
, orNone
when -
Optional[ResourcesUpdate]
–no resources have been registered yet.
Raises:
-
NotImplementedError
–Subclasses must implement retrieval.
get_next_span_sequence_id(rollout_id, attempt_id)
async
¶
Allocate the next strictly increasing sequence number used to order spans.
Implementations must retain counters so repeated calls return 1, 2, ...
without
gaps unless spans were explicitly inserted with a custom sequence_id
. The
counter may be scoped per rollout or per attempt, but the sequence must be
strictly increasing for spans emitted by the specified attempt so traces remain
totally ordered.
See Distributed Tracing for detailed motivations.
Parameters:
-
rollout_id
(str
) –Identifier of the rollout emitting spans.
-
attempt_id
(str
) –Attempt identifier for the upcoming span.
Returns:
-
int
–The next integer sequence identifier, unique within the attempt.
Raises:
-
NotImplementedError
–Subclasses must provide the allocator.
-
ValueError
–Implementations must raise when the rollout or attempt does not exist.
get_resources_by_id(resources_id)
async
¶
Return a specific named resource snapshot by identifier.
Parameters:
-
resources_id
(str
) –Identifier of the snapshot.
Returns:
-
Optional[ResourcesUpdate]
–The stored
ResourcesUpdate
, orNone
when missing.
Raises:
-
NotImplementedError
–Subclasses must implement retrieval.
get_rollout_by_id(rollout_id)
async
¶
Fetch a rollout by identifier without mutating its state.
Parameters:
-
rollout_id
(str
) –Identifier to retrieve.
Returns:
-
Optional[Rollout]
–The rollout when found, otherwise
None
.
Raises:
-
NotImplementedError
–Subclasses must implement retrieval.
query_attempts(rollout_id)
async
¶
Return every attempt ever created for rollout_id
in ascending sequence order.
Parameters:
-
rollout_id
(str
) –Identifier of the rollout being inspected.
Returns:
-
List[Attempt]
–Attempts sorted by
sequence_id
(oldest first). Returns an empty list when none exist.
Raises:
-
NotImplementedError
–Subclasses must implement the query.
-
ValueError
–Implementations must raise when the rollout does not exist.
query_rollouts(*, status=None, rollout_ids=None)
async
¶
Retrieve rollouts filtered by status and/or explicit identifiers.
Parameters:
-
status
(Optional[Sequence[RolloutStatus]]
, default:None
) –Optional whitelist of
RolloutStatus
values. -
rollout_ids
(Optional[Sequence[str]]
, default:None
) –Optional whitelist of rollout identifiers to include.
Returns:
-
List[Rollout]
–A list of matching rollouts. Ordering is backend-defined but must be deterministic.
Raises:
-
NotImplementedError
–Subclasses must implement the query.
query_spans(rollout_id, attempt_id=None)
async
¶
Return the stored spans for a rollout, optionally scoped to one attempt.
Spans must be returned in ascending sequence_id
order. Implementations may raise
a RuntimeError
when spans were evicted or expired.
Parameters:
-
rollout_id
(str
) –Identifier of the rollout being inspected.
-
attempt_id
(str | Literal['latest'] | None
, default:None
) –Attempt identifier to filter by. Pass
"latest"
to retrieve only the most recent attempt, orNone
to return all spans across attempts.
Returns:
-
List[Span]
–An ordered list of spans (possibly empty).
Raises:
-
NotImplementedError
–Subclasses must implement the query.
-
ValueError
–Implementations must raise when the rollout or attempt is unknown.
start_attempt(rollout_id)
async
¶
Create a manual retry attempt for an existing rollout.
This is typically invoked by runners that wish to retry outside of the
normal queue flow (for example in an online RL setup).
Implementations must validate that the rollout exists, allocate a fresh attempt_id
,
increment the sequence_id
monotonically, stamp the new attempt with status="preparing"
,
and return an up-to-date AttemptedRollout
.
Parameters:
-
rollout_id
(str
) –Unique identifier of the rollout receiving a new attempt.
Returns:
-
AttemptedRollout
–The rollout paired with its newly-created attempt.
Raises:
-
NotImplementedError
–Subclasses must implement attempt creation.
-
ValueError
–Implementations must raise when
rollout_id
is unknown.
start_rollout(input, mode=None, resources_id=None, config=None, metadata=None)
async
¶
Register a rollout and immediately create its first attempt.
Note
Use enqueue_rollout()
when the
caller only wants to submit work for later scheduling.
The rollout must be persisted with status="preparing"
and an initial attempt
with sequence_id == 1
so the caller can begin execution without visiting the
public queue. Implementations are expected to:
- Generate a unique
rollout_id
andattempt_id
. - Record
start_time
for both rollout and attempt based on the current clock. - Copy
config
andmetadata
so later mutations do not leak shared references. - Resolve
resources_id
to the latest resource snapshot whenNone
is supplied.
Parameters:
-
input
(TaskInput
) –Arbitrary task payload supplied by an algorithm.
-
mode
(Literal['train', 'val', 'test'] | None
, default:None
) –Optional semantic mode for downstream analytics (
"train"
,"val"
,"test"
). -
resources_id
(str | None
, default:None
) –Concrete resource snapshot to execute against; defaults to the latest stored snapshot.
-
config
(RolloutConfig | None
, default:None
) –Rollout retry/timeout policy. Should default to a fresh
RolloutConfig
. -
metadata
(Dict[str, Any] | None
, default:None
) –Free-form metadata persisted verbatim with the rollout.
Returns:
-
AttemptedRollout
–The fully-populated
AttemptedRollout
including -
AttemptedRollout
–the just-created attempt.
Raises:
-
NotImplementedError
–Subclasses must provide durable storage for the rollout.
-
ValueError
–Implementations should raise when
resources_id
does not exist.
update_attempt(rollout_id, attempt_id, status=UNSET, worker_id=UNSET, last_heartbeat_time=UNSET, metadata=UNSET)
async
¶
Update attempt bookkeeping such as status, worker ownership, and heartbeats.
When attempt_id
is "latest"
the update must target the attempt with the highest
sequence_id
; otherwise it must target the specific attempt. Implementations should
propagate status changes to the rollout (for example via propagate_status()
)
once the latest attempt transitions to a terminal state.
Similar to update_rollout()
,
parameters also default to the sentinel UNSET
.
Parameters:
-
rollout_id
(str
) –Identifier of the rollout whose attempt will be updated.
-
attempt_id
(str | Literal['latest']
) –Attempt identifier or
"latest"
as a convenience. -
status
(AttemptStatus | Unset
, default:UNSET
) –Replacement attempt status. Terminal statuses must set
end_time
. -
worker_id
(str | Unset
, default:UNSET
) –Identifier for the worker currently processing the attempt.
-
last_heartbeat_time
(float | Unset
, default:UNSET
) –Wall-clock timestamp (seconds) of the latest heartbeat/span.
-
metadata
(Optional[Dict[str, Any]] | Unset
, default:UNSET
) –Replacement metadata dictionary.
Returns:
-
Attempt
–The updated attempt record.
Raises:
-
NotImplementedError
–Subclasses must implement mutation logic.
-
ValueError
–Implementations must raise when the rollout or attempt is unknown.
update_resources(resources_id, resources)
async
¶
Overwrite or extend an existing resource snapshot and mark it as latest.
This API is typically used by algorithms that maintain mutable resources (e.g., model checkpoints) under a stable identifier.
Parameters:
-
resources_id
(str
) –Identifier of the snapshot to replace.
-
resources
(NamedResources
) –Updated mapping of resource names to payloads.
Returns:
-
ResourcesUpdate
–The persisted
ResourcesUpdate
.
Raises:
-
NotImplementedError
–Subclasses must implement resource persistence.
-
ValueError
–Implementations must raise when
resources_id
does not exist.
update_rollout(rollout_id, input=UNSET, mode=UNSET, resources_id=UNSET, status=UNSET, config=UNSET, metadata=UNSET)
async
¶
Update rollout metadata and, when provided, drive status transitions.
Parameters default to the sentinel UNSET
to
distinguish omitted fields from explicit None
assignments. Implementations must:
- Validate the rollout exists before mutating it.
- Replace each property when a concrete value (including
None
) is supplied. - When the status switches into a terminal state, set
end_time
and signal any waiters. - When the status re-enters a queueing state, ensure the rollout is enqueued exactly once.
Parameters:
-
rollout_id
(str
) –Identifier of the rollout to update.
-
input
(TaskInput | Unset
, default:UNSET
) –Replacement task payload; pass
None
to explicitly clear the input. -
mode
(Optional[Literal['train', 'val', 'test']] | Unset
, default:UNSET
) –Replacement rollout mode.
-
resources_id
(Optional[str] | Unset
, default:UNSET
) –Replacement resources snapshot reference.
-
status
(RolloutStatus | Unset
, default:UNSET
) –Target rollout status.
-
config
(RolloutConfig | Unset
, default:UNSET
) –Replacement retry/timeout configuration.
-
metadata
(Optional[Dict[str, Any]] | Unset
, default:UNSET
) –Replacement metadata dictionary.
Returns:
-
Rollout
–The updated rollout record.
Raises:
-
NotImplementedError
–Subclasses must implement mutation logic.
-
ValueError
–Implementations must raise when the rollout is unknown or the update is invalid.
wait_for_rollouts(*, rollout_ids, timeout=None)
async
¶
Block until the targeted rollouts reach a terminal status or the timeout expires.
Terminal statuses are "succeeded"
, "failed"
, and "cancelled"
. When the timeout
elapses, implementations should return the subset of rollouts that are already terminal
and omit the rest.
Warning
It's dangerous and might be event-loop blocking to call this function with a long timeout. It's a good idea to poll for the method to check if new completed rollouts can coming. Be careful in implementing the sleep logic to avoid busy-waiting.
Parameters:
-
rollout_ids
(List[str]
) –Identifiers of rollouts to watch.
-
timeout
(Optional[float]
, default:None
) –Maximum time in seconds to wait.
None
waits indefinitely.
Returns:
-
List[Rollout]
–Rollouts that finished before the deadline, in arbitrary order.
Raises:
-
NotImplementedError
–Subclasses must implement waiting semantics.
-
ValueError
–Implementations must raise when a rollout identifier is unknown.
Store Implementations¶
agentlightning.InMemoryLightningStore
¶
Bases: LightningStore
In-memory implementation of LightningStore using Python data structures. Thread-safe and async-compatible but data is not persistent.
The methods in this class should generally not call each other, especially those that are locked.
Parameters:
-
eviction_memory_threshold
(float | int | None
, default:None
) –The threshold for evicting spans in bytes. By default, it's 70% of the total VRAM available.
-
safe_memory_threshold
(float | int | None
, default:None
) –The threshold for safe memory usage in bytes. By default, it's 80% of the eviction threshold.
-
span_size_estimator
(Callable[[Span], int] | None
, default:None
) –A function to estimate the size of a span in bytes. By default, it's a simple size estimator that uses sys.getsizeof.
add_otel_span(rollout_id, attempt_id, readable_span, sequence_id=None)
async
¶
Add an opentelemetry span to the store.
See LightningStore.add_otel_span()
for semantics.
add_resources(resources)
async
¶
Stores a new version of named resources and sets it as the latest.
See LightningStore.add_resources()
for semantics.
add_span(span)
async
¶
Persist a pre-converted span.
See LightningStore.add_span()
for semantics.
dequeue_rollout()
async
¶
Retrieves the next task from the queue without blocking.
Returns None
if the queue is empty.
Will set the rollout status to preparing and create a new attempt.
See LightningStore.dequeue_rollout()
for semantics.
enqueue_rollout(input, mode=None, resources_id=None, config=None, metadata=None)
async
¶
Adds a new task to the queue with specific metadata and returns the rollout.
See LightningStore.enqueue_rollout()
for semantics.
get_latest_attempt(rollout_id)
async
¶
Retrieves the latest attempt for a given rollout ID.
See LightningStore.get_latest_attempt()
for semantics.
get_latest_resources()
async
¶
Retrieves the latest version of named resources.
See LightningStore.get_latest_resources()
for semantics.
get_next_span_sequence_id(rollout_id, attempt_id)
async
¶
Get the next span sequence ID for a given rollout and attempt. The number is strictly increasing for each rollout. The store will not issue the same sequence ID twice.
See LightningStore.get_next_span_sequence_id()
for semantics.
get_resources_by_id(resources_id)
async
¶
Retrieves a specific version of named resources by its ID.
See LightningStore.get_resources_by_id()
for semantics.
get_rollout_by_id(rollout_id)
async
¶
Retrieves a specific rollout by its ID.
See LightningStore.get_rollout_by_id()
for semantics.
query_attempts(rollout_id)
async
¶
Retrieves all attempts associated with a specific rollout ID. Returns an empty list if no attempts are found.
See LightningStore.query_attempts()
for semantics.
query_rollouts(*, status=None, rollout_ids=None)
async
¶
Retrieves rollouts filtered by their status and rollout ids. If no status is provided, returns all rollouts.
See LightningStore.query_rollouts()
for semantics.
query_spans(rollout_id, attempt_id=None)
async
¶
Query and retrieve all spans associated with a specific rollout ID. Returns an empty list if no spans are found.
See LightningStore.query_spans()
for semantics.
start_attempt(rollout_id)
async
¶
Creates a new attempt for a given rollout ID and return the attempt details.
See LightningStore.start_attempt()
for semantics.
start_rollout(input, mode=None, resources_id=None, config=None, metadata=None)
async
¶
Notify the store that I'm about to run a rollout.
See LightningStore.start_rollout()
for semantics.
update_attempt(rollout_id, attempt_id, status=UNSET, worker_id=UNSET, last_heartbeat_time=UNSET, metadata=UNSET)
async
¶
Update a specific or latest attempt for a given rollout.
See LightningStore.update_attempt()
for semantics.
update_resources(resources_id, resources)
async
¶
Safely stores a new version of named resources and sets it as the latest.
See LightningStore.update_resources()
for semantics.
update_rollout(rollout_id, input=UNSET, mode=UNSET, resources_id=UNSET, status=UNSET, config=UNSET, metadata=UNSET)
async
¶
Update the rollout status and related metadata.
See LightningStore.update_rollout()
for semantics.
wait_for_rollouts(*, rollout_ids, timeout=None)
async
¶
Wait for specified rollouts to complete with a timeout. Returns the completed rollouts, potentially incomplete if timeout is reached.
This method does not change the state of the store.
See LightningStore.wait_for_rollouts()
for semantics.
Client-Server and Thread-safe Wrappers¶
agentlightning.LightningStoreServer
¶
Bases: LightningStore
Server wrapper that exposes a LightningStore via HTTP API. Delegates all operations to an underlying store implementation.
Healthcheck and watchdog relies on the underlying store.
agl store
is a convenient CLI to start a store server.
__getstate__()
¶
Control pickling to prevent server state from being sent to subprocesses.
When LightningStoreServer is pickled (e.g., passed to a subprocess), we only serialize the underlying store and connection details. The FastAPI app and uvicorn server are excluded as they should not be transferred between processes.
The subprocess should create its own server instance if needed.
__setstate__(state)
¶
Restore from pickle by reconstructing only the essential attributes.
Note: This creates a new server instance without FastAPI/uvicorn initialized. Call init() pattern or create a new LightningStoreServer if you need a fully functional server in the subprocess.
run_forever()
async
¶
Runs the FastAPI server indefinitely.
You need to call this method in the same process as the server was created in.
start()
async
¶
Starts the FastAPI server in the background.
You need to call this method in the same process as the server was created in.
stop()
async
¶
Gracefully stops the running FastAPI server.
You need to call this method in the same process as the server was created in.
agentlightning.LightningStoreClient
¶
Bases: LightningStore
HTTP client that talks to a remote LightningStoreServer.
Parameters:
-
server_address
(str
) –The address of the LightningStoreServer to connect to.
-
retry_delays
(Sequence[float]
, default:(1.0, 2.0, 5.0)
) –Backoff schedule (seconds) used when the initial request fails for a non-application reason. Each entry is a retry attempt.
-
health_retry_delays
(Sequence[float]
, default:(0.1, 0.2, 0.5)
) –Delays between /health probes while waiting for the server to come back.
__getstate__()
¶
When LightningStoreClient is pickled (e.g., passed to a subprocess), we only serialize the server address and retry configurations. The ClientSessions are excluded as they should not be transferred between processes.
__setstate__(state)
¶
Restore from pickle by reconstructing only the essential attributes.
Replicating __init__
logic to create another client instance in the subprocess.
close()
async
¶
Close the HTTP session.
dequeue_rollout()
async
¶
Dequeue a rollout from the server queue.
Returns:
-
Optional[AttemptedRollout]
–AttemptedRollout if a rollout is available, None if queue is empty.
Note
This method does NOT retry on failures. If any exception occurs (network error, server error, etc.), it logs the error and returns None immediately.
get_latest_attempt(rollout_id)
async
¶
Get the latest attempt for a rollout.
Parameters:
-
rollout_id
(str
) –ID of the rollout to query.
Returns:
-
Optional[Attempt]
–Attempt if found, None if not found or if all retries are exhausted.
Note
This method retries on transient failures (network errors, 5xx status codes). If all retries fail, it logs the error and returns None instead of raising an exception.
get_latest_resources()
async
¶
Get the latest resources.
Returns:
-
Optional[ResourcesUpdate]
–ResourcesUpdate if found, None if not found or if all retries are exhausted.
Note
This method retries on transient failures (network errors, 5xx status codes). If all retries fail, it logs the error and returns None instead of raising an exception.
get_resources_by_id(resources_id)
async
¶
Get resources by their ID.
Parameters:
-
resources_id
(str
) –ID of the resources to retrieve.
Returns:
-
Optional[ResourcesUpdate]
–ResourcesUpdate if found, None if not found or if all retries are exhausted.
Note
This method retries on transient failures (network errors, 5xx status codes). If all retries fail, it logs the error and returns None instead of raising an exception.
get_rollout_by_id(rollout_id)
async
¶
Get a rollout by its ID.
Parameters:
-
rollout_id
(str
) –ID of the rollout to retrieve.
Returns:
-
Optional[Rollout]
–Rollout if found, None if not found or if all retries are exhausted.
Note
This method retries on transient failures (network errors, 5xx status codes). If all retries fail, it logs the error and returns None instead of raising an exception.
wait_for_rollouts(*, rollout_ids, timeout=None)
async
¶
Wait for rollouts to complete.
Parameters:
-
rollout_ids
(List[str]
) –List of rollout IDs to wait for.
-
timeout
(Optional[float]
, default:None
) –Timeout in seconds. If not None, the method will raise a ValueError if the timeout is greater than 0.1 seconds.
Returns:
-
List[Rollout]
–List of rollouts that are completed.
agentlightning.LightningStoreThreaded
¶
Bases: LightningStore
Facade that delegates all store operations to a underlying store instance.
The operations are guaranteed to be thread-safe. Make sure the threaded stores are instantiated before initializing the threads.