Skip to content

Store References

agentlightning.LightningStore

Contract for the persistent control-plane that coordinates training rollouts.

A LightningStore mediates every interaction between algorithms and runners:

  • Rollout lifecycle: accept new rollouts, queue them for execution, create attempts, and drive the rollout status machine ("queuing""preparing""running"{"succeeded","failed","cancelled"} or "requeuing" when a retry is justified).
  • Attempt tracking: record each execution attempt, including progress heartbeats, retry sequencing, and terminal states such as "timeout" or "unresponsive".
  • Span ingest: capture structured telemetry emitted by runners (either as native Span objects or as opentelemetry.sdk.trace.ReadableSpan instances) so that algorithms can reconstruct trajectories and rewards.
  • Resource versioning: manage immutable snapshots of named resources (prompt templates, model checkpoints, proxy endpoints, …) and expose a single "latest" snapshot that runners can fetch just after claiming work.

Implementations must provide thread-safe/async-safe semantics: each coroutine should appear atomic to callers even when multiple algorithms or runners call the API concurrently. Unless stated otherwise, missing identifiers should result in a ValueError.

add_otel_span(rollout_id, attempt_id, readable_span, sequence_id=None) async

Convert and persist an OpenTelemetry span for a particular attempt.

Implementations must transform the readable_span into a Span (typically via Span.from_opentelemetry()), assign a strictly increasing sequence_id when one is not provided, and persist it using the same semantics as add_span().

Parameters:

  • rollout_id (str) –

    Identifier of the rollout that produced the span.

  • attempt_id (str) –

    Attempt identifier the span belongs to.

  • readable_span (ReadableSpan) –

    OpenTelemetry span in SDK form.

  • sequence_id (int | None, default: None ) –

    Optional explicit ordering hint. When omitted, call get_next_span_sequence_id() automatically.

Returns:

  • Span

    The stored span record.

Raises:

  • NotImplementedError

    Subclasses must implement span persistence.

  • ValueError

    Implementations must raise when the rollout or attempt is unknown.

add_resources(resources) async

Persist a new immutable snapshot of named resources and mark it as latest.

Implementations must assign a fresh resources_id and ensure subsequent calls to get_latest_resources() return the snapshot produced here.

Parameters:

  • resources (NamedResources) –

    Mapping of resource names to their serialized payloads.

Returns:

Raises:

  • NotImplementedError

    Subclasses must implement resource persistence.

add_span(span) async

Persist a pre-constructed span emitted during rollout execution.

The provided Span must already contain the rollout_id, attempt_id, and sequence_id. Implementations must:

  • Verify that both rollout and attempt exist.
  • Ensure span ordering remains strictly increasing per attempt (rejecting or keeping duplicates).
  • Treat the span arrival as a heartbeat: update the attempt's last_heartbeat_time and transition both attempt and rollout to "running" if they were still "preparing" or "requeuing".

Parameters:

  • span (Span) –

    Fully populated span to persist.

Returns:

  • Span

    The stored span record (implementations may return a copy).

Raises:

  • NotImplementedError

    Subclasses must implement span persistence.

  • ValueError

    Implementations must raise when the referenced rollout or attempt is missing.

dequeue_rollout() async

Claim the oldest queued rollout and transition it to preparing.

This function do not block.

Retrieval must be FIFO across rollouts that remain in queuing or requeuing state. When a rollout is claimed, implementations must:

  • Transition its status to "preparing".
  • Create a new attempt with status="preparing" and sequence_id equal to the number of attempts already registered for the rollout plus one.
  • Return an AttemptedRollout snapshot so the runner knows both rollout metadata and the attempt identifier.

Returns:

  • Optional[AttemptedRollout]

    The next attempt to execute, or None when no eligible rollouts are queued.

Raises:

  • NotImplementedError

    Subclasses must implement queue retrieval.

enqueue_rollout(input, mode=None, resources_id=None, config=None, metadata=None) async

Persist a rollout in queuing state so runners can claim it later.

Note

Different from start_rollout(), this method is called when the caller only wants to submit work for later scheduling.

Implementations must generate a unique rollout_id, stamp start_time with the current time, default config to a fresh RolloutConfig, and insert the rollout at the tail of the scheduling queue. No attempt is created yet.

Parameters:

  • input (TaskInput) –

    Arbitrary task payload supplied by an algorithm.

  • mode (Literal['train', 'val', 'test'] | None, default: None ) –

    Optional semantic mode indicator ("train", "val", "test").

  • resources_id (str | None, default: None ) –

    Resource snapshot used when a runner eventually executes the rollout.

  • config (RolloutConfig | None, default: None ) –

    Fine-grained retry/timeout parameters to persist with the rollout.

  • metadata (Dict[str, Any] | None, default: None ) –

    Free-form metadata stored verbatim with the rollout record.

Returns:

Raises:

  • NotImplementedError

    Subclasses must persist the rollout.

  • ValueError

    Implementations should raise when resources_id does not exist.

get_latest_attempt(rollout_id) async

Fetch the attempt with the highest sequence_id for rollout_id.

Parameters:

  • rollout_id (str) –

    Identifier to inspect.

Returns:

  • Optional[Attempt]

    The most recent attempt or None when no attempts exist yet.

Raises:

  • NotImplementedError

    Subclasses must implement retrieval.

  • ValueError

    Implementations must raise when the rollout does not exist.

get_latest_resources() async

Fetch the latest resource snapshot marked as the global default.

Returns:

Raises:

  • NotImplementedError

    Subclasses must implement retrieval.

get_next_span_sequence_id(rollout_id, attempt_id) async

Allocate the next strictly increasing sequence number used to order spans.

Implementations must retain counters so repeated calls return 1, 2, ... without gaps unless spans were explicitly inserted with a custom sequence_id. The counter may be scoped per rollout or per attempt, but the sequence must be strictly increasing for spans emitted by the specified attempt so traces remain totally ordered.

See Distributed Tracing for detailed motivations.

Parameters:

  • rollout_id (str) –

    Identifier of the rollout emitting spans.

  • attempt_id (str) –

    Attempt identifier for the upcoming span.

Returns:

  • int

    The next integer sequence identifier, unique within the attempt.

Raises:

  • NotImplementedError

    Subclasses must provide the allocator.

  • ValueError

    Implementations must raise when the rollout or attempt does not exist.

get_resources_by_id(resources_id) async

Return a specific named resource snapshot by identifier.

Parameters:

  • resources_id (str) –

    Identifier of the snapshot.

Returns:

Raises:

  • NotImplementedError

    Subclasses must implement retrieval.

get_rollout_by_id(rollout_id) async

Fetch a rollout by identifier without mutating its state.

Parameters:

  • rollout_id (str) –

    Identifier to retrieve.

Returns:

  • Optional[Rollout]

    The rollout when found, otherwise None.

Raises:

  • NotImplementedError

    Subclasses must implement retrieval.

query_attempts(rollout_id) async

Return every attempt ever created for rollout_id in ascending sequence order.

Parameters:

  • rollout_id (str) –

    Identifier of the rollout being inspected.

Returns:

  • List[Attempt]

    Attempts sorted by sequence_id (oldest first). Returns an empty list when none exist.

Raises:

  • NotImplementedError

    Subclasses must implement the query.

  • ValueError

    Implementations must raise when the rollout does not exist.

query_rollouts(*, status=None, rollout_ids=None) async

Retrieve rollouts filtered by status and/or explicit identifiers.

Parameters:

  • status (Optional[Sequence[RolloutStatus]], default: None ) –

    Optional whitelist of RolloutStatus values.

  • rollout_ids (Optional[Sequence[str]], default: None ) –

    Optional whitelist of rollout identifiers to include.

Returns:

  • List[Rollout]

    A list of matching rollouts. Ordering is backend-defined but must be deterministic.

Raises:

  • NotImplementedError

    Subclasses must implement the query.

query_spans(rollout_id, attempt_id=None) async

Return the stored spans for a rollout, optionally scoped to one attempt.

Spans must be returned in ascending sequence_id order. Implementations may raise a RuntimeError when spans were evicted or expired.

Parameters:

  • rollout_id (str) –

    Identifier of the rollout being inspected.

  • attempt_id (str | Literal['latest'] | None, default: None ) –

    Attempt identifier to filter by. Pass "latest" to retrieve only the most recent attempt, or None to return all spans across attempts.

Returns:

  • List[Span]

    An ordered list of spans (possibly empty).

Raises:

  • NotImplementedError

    Subclasses must implement the query.

  • ValueError

    Implementations must raise when the rollout or attempt is unknown.

start_attempt(rollout_id) async

Create a manual retry attempt for an existing rollout.

This is typically invoked by runners that wish to retry outside of the normal queue flow (for example in an online RL setup). Implementations must validate that the rollout exists, allocate a fresh attempt_id, increment the sequence_id monotonically, stamp the new attempt with status="preparing", and return an up-to-date AttemptedRollout.

Parameters:

  • rollout_id (str) –

    Unique identifier of the rollout receiving a new attempt.

Returns:

Raises:

  • NotImplementedError

    Subclasses must implement attempt creation.

  • ValueError

    Implementations must raise when rollout_id is unknown.

start_rollout(input, mode=None, resources_id=None, config=None, metadata=None) async

Register a rollout and immediately create its first attempt.

Note

Use enqueue_rollout() when the caller only wants to submit work for later scheduling.

The rollout must be persisted with status="preparing" and an initial attempt with sequence_id == 1 so the caller can begin execution without visiting the public queue. Implementations are expected to:

  1. Generate a unique rollout_id and attempt_id.
  2. Record start_time for both rollout and attempt based on the current clock.
  3. Copy config and metadata so later mutations do not leak shared references.
  4. Resolve resources_id to the latest resource snapshot when None is supplied.

Parameters:

  • input (TaskInput) –

    Arbitrary task payload supplied by an algorithm.

  • mode (Literal['train', 'val', 'test'] | None, default: None ) –

    Optional semantic mode for downstream analytics ("train", "val", "test").

  • resources_id (str | None, default: None ) –

    Concrete resource snapshot to execute against; defaults to the latest stored snapshot.

  • config (RolloutConfig | None, default: None ) –

    Rollout retry/timeout policy. Should default to a fresh RolloutConfig.

  • metadata (Dict[str, Any] | None, default: None ) –

    Free-form metadata persisted verbatim with the rollout.

Returns:

Raises:

  • NotImplementedError

    Subclasses must provide durable storage for the rollout.

  • ValueError

    Implementations should raise when resources_id does not exist.

update_attempt(rollout_id, attempt_id, status=UNSET, worker_id=UNSET, last_heartbeat_time=UNSET, metadata=UNSET) async

Update attempt bookkeeping such as status, worker ownership, and heartbeats.

When attempt_id is "latest" the update must target the attempt with the highest sequence_id; otherwise it must target the specific attempt. Implementations should propagate status changes to the rollout (for example via propagate_status()) once the latest attempt transitions to a terminal state.

Similar to update_rollout(), parameters also default to the sentinel UNSET.

Parameters:

  • rollout_id (str) –

    Identifier of the rollout whose attempt will be updated.

  • attempt_id (str | Literal['latest']) –

    Attempt identifier or "latest" as a convenience.

  • status (AttemptStatus | Unset, default: UNSET ) –

    Replacement attempt status. Terminal statuses must set end_time.

  • worker_id (str | Unset, default: UNSET ) –

    Identifier for the worker currently processing the attempt.

  • last_heartbeat_time (float | Unset, default: UNSET ) –

    Wall-clock timestamp (seconds) of the latest heartbeat/span.

  • metadata (Optional[Dict[str, Any]] | Unset, default: UNSET ) –

    Replacement metadata dictionary.

Returns:

  • Attempt

    The updated attempt record.

Raises:

  • NotImplementedError

    Subclasses must implement mutation logic.

  • ValueError

    Implementations must raise when the rollout or attempt is unknown.

update_resources(resources_id, resources) async

Overwrite or extend an existing resource snapshot and mark it as latest.

This API is typically used by algorithms that maintain mutable resources (e.g., model checkpoints) under a stable identifier.

Parameters:

  • resources_id (str) –

    Identifier of the snapshot to replace.

  • resources (NamedResources) –

    Updated mapping of resource names to payloads.

Returns:

Raises:

  • NotImplementedError

    Subclasses must implement resource persistence.

  • ValueError

    Implementations must raise when resources_id does not exist.

update_rollout(rollout_id, input=UNSET, mode=UNSET, resources_id=UNSET, status=UNSET, config=UNSET, metadata=UNSET) async

Update rollout metadata and, when provided, drive status transitions.

Parameters default to the sentinel UNSET to distinguish omitted fields from explicit None assignments. Implementations must:

  • Validate the rollout exists before mutating it.
  • Replace each property when a concrete value (including None) is supplied.
  • When the status switches into a terminal state, set end_time and signal any waiters.
  • When the status re-enters a queueing state, ensure the rollout is enqueued exactly once.

Parameters:

  • rollout_id (str) –

    Identifier of the rollout to update.

  • input (TaskInput | Unset, default: UNSET ) –

    Replacement task payload; pass None to explicitly clear the input.

  • mode (Optional[Literal['train', 'val', 'test']] | Unset, default: UNSET ) –

    Replacement rollout mode.

  • resources_id (Optional[str] | Unset, default: UNSET ) –

    Replacement resources snapshot reference.

  • status (RolloutStatus | Unset, default: UNSET ) –

    Target rollout status.

  • config (RolloutConfig | Unset, default: UNSET ) –

    Replacement retry/timeout configuration.

  • metadata (Optional[Dict[str, Any]] | Unset, default: UNSET ) –

    Replacement metadata dictionary.

Returns:

  • Rollout

    The updated rollout record.

Raises:

  • NotImplementedError

    Subclasses must implement mutation logic.

  • ValueError

    Implementations must raise when the rollout is unknown or the update is invalid.

wait_for_rollouts(*, rollout_ids, timeout=None) async

Block until the targeted rollouts reach a terminal status or the timeout expires.

Terminal statuses are "succeeded", "failed", and "cancelled". When the timeout elapses, implementations should return the subset of rollouts that are already terminal and omit the rest.

Warning

It's dangerous and might be event-loop blocking to call this function with a long timeout. It's a good idea to poll for the method to check if new completed rollouts can coming. Be careful in implementing the sleep logic to avoid busy-waiting.

Parameters:

  • rollout_ids (List[str]) –

    Identifiers of rollouts to watch.

  • timeout (Optional[float], default: None ) –

    Maximum time in seconds to wait. None waits indefinitely.

Returns:

  • List[Rollout]

    Rollouts that finished before the deadline, in arbitrary order.

Raises:

  • NotImplementedError

    Subclasses must implement waiting semantics.

  • ValueError

    Implementations must raise when a rollout identifier is unknown.

Store Implementations

agentlightning.InMemoryLightningStore

Bases: LightningStore

In-memory implementation of LightningStore using Python data structures. Thread-safe and async-compatible but data is not persistent.

The methods in this class should generally not call each other, especially those that are locked.

Parameters:

  • eviction_memory_threshold (float | int | None, default: None ) –

    The threshold for evicting spans in bytes. By default, it's 70% of the total VRAM available.

  • safe_memory_threshold (float | int | None, default: None ) –

    The threshold for safe memory usage in bytes. By default, it's 80% of the eviction threshold.

  • span_size_estimator (Callable[[Span], int] | None, default: None ) –

    A function to estimate the size of a span in bytes. By default, it's a simple size estimator that uses sys.getsizeof.

add_otel_span(rollout_id, attempt_id, readable_span, sequence_id=None) async

Add an opentelemetry span to the store.

See LightningStore.add_otel_span() for semantics.

add_resources(resources) async

Stores a new version of named resources and sets it as the latest.

See LightningStore.add_resources() for semantics.

add_span(span) async

Persist a pre-converted span.

See LightningStore.add_span() for semantics.

dequeue_rollout() async

Retrieves the next task from the queue without blocking. Returns None if the queue is empty.

Will set the rollout status to preparing and create a new attempt.

See LightningStore.dequeue_rollout() for semantics.

enqueue_rollout(input, mode=None, resources_id=None, config=None, metadata=None) async

Adds a new task to the queue with specific metadata and returns the rollout.

See LightningStore.enqueue_rollout() for semantics.

get_latest_attempt(rollout_id) async

Retrieves the latest attempt for a given rollout ID.

See LightningStore.get_latest_attempt() for semantics.

get_latest_resources() async

Retrieves the latest version of named resources.

See LightningStore.get_latest_resources() for semantics.

get_next_span_sequence_id(rollout_id, attempt_id) async

Get the next span sequence ID for a given rollout and attempt. The number is strictly increasing for each rollout. The store will not issue the same sequence ID twice.

See LightningStore.get_next_span_sequence_id() for semantics.

get_resources_by_id(resources_id) async

Retrieves a specific version of named resources by its ID.

See LightningStore.get_resources_by_id() for semantics.

get_rollout_by_id(rollout_id) async

Retrieves a specific rollout by its ID.

See LightningStore.get_rollout_by_id() for semantics.

query_attempts(rollout_id) async

Retrieves all attempts associated with a specific rollout ID. Returns an empty list if no attempts are found.

See LightningStore.query_attempts() for semantics.

query_rollouts(*, status=None, rollout_ids=None) async

Retrieves rollouts filtered by their status and rollout ids. If no status is provided, returns all rollouts.

See LightningStore.query_rollouts() for semantics.

query_spans(rollout_id, attempt_id=None) async

Query and retrieve all spans associated with a specific rollout ID. Returns an empty list if no spans are found.

See LightningStore.query_spans() for semantics.

start_attempt(rollout_id) async

Creates a new attempt for a given rollout ID and return the attempt details.

See LightningStore.start_attempt() for semantics.

start_rollout(input, mode=None, resources_id=None, config=None, metadata=None) async

Notify the store that I'm about to run a rollout.

See LightningStore.start_rollout() for semantics.

update_attempt(rollout_id, attempt_id, status=UNSET, worker_id=UNSET, last_heartbeat_time=UNSET, metadata=UNSET) async

Update a specific or latest attempt for a given rollout.

See LightningStore.update_attempt() for semantics.

update_resources(resources_id, resources) async

Safely stores a new version of named resources and sets it as the latest.

See LightningStore.update_resources() for semantics.

update_rollout(rollout_id, input=UNSET, mode=UNSET, resources_id=UNSET, status=UNSET, config=UNSET, metadata=UNSET) async

Update the rollout status and related metadata.

See LightningStore.update_rollout() for semantics.

wait_for_rollouts(*, rollout_ids, timeout=None) async

Wait for specified rollouts to complete with a timeout. Returns the completed rollouts, potentially incomplete if timeout is reached.

This method does not change the state of the store.

See LightningStore.wait_for_rollouts() for semantics.

Client-Server and Thread-safe Wrappers

agentlightning.LightningStoreServer

Bases: LightningStore

Server wrapper that exposes a LightningStore via HTTP API. Delegates all operations to an underlying store implementation.

Healthcheck and watchdog relies on the underlying store.

agl store is a convenient CLI to start a store server.

__getstate__()

Control pickling to prevent server state from being sent to subprocesses.

When LightningStoreServer is pickled (e.g., passed to a subprocess), we only serialize the underlying store and connection details. The FastAPI app and uvicorn server are excluded as they should not be transferred between processes.

The subprocess should create its own server instance if needed.

__setstate__(state)

Restore from pickle by reconstructing only the essential attributes.

Note: This creates a new server instance without FastAPI/uvicorn initialized. Call init() pattern or create a new LightningStoreServer if you need a fully functional server in the subprocess.

run_forever() async

Runs the FastAPI server indefinitely.

You need to call this method in the same process as the server was created in.

start() async

Starts the FastAPI server in the background.

You need to call this method in the same process as the server was created in.

stop() async

Gracefully stops the running FastAPI server.

You need to call this method in the same process as the server was created in.

agentlightning.LightningStoreClient

Bases: LightningStore

HTTP client that talks to a remote LightningStoreServer.

Parameters:

  • server_address (str) –

    The address of the LightningStoreServer to connect to.

  • retry_delays (Sequence[float], default: (1.0, 2.0, 5.0) ) –

    Backoff schedule (seconds) used when the initial request fails for a non-application reason. Each entry is a retry attempt.

  • health_retry_delays (Sequence[float], default: (0.1, 0.2, 0.5) ) –

    Delays between /health probes while waiting for the server to come back.

__getstate__()

When LightningStoreClient is pickled (e.g., passed to a subprocess), we only serialize the server address and retry configurations. The ClientSessions are excluded as they should not be transferred between processes.

__setstate__(state)

Restore from pickle by reconstructing only the essential attributes.

Replicating __init__ logic to create another client instance in the subprocess.

close() async

Close the HTTP session.

dequeue_rollout() async

Dequeue a rollout from the server queue.

Returns:

  • Optional[AttemptedRollout]

    AttemptedRollout if a rollout is available, None if queue is empty.

Note

This method does NOT retry on failures. If any exception occurs (network error, server error, etc.), it logs the error and returns None immediately.

get_latest_attempt(rollout_id) async

Get the latest attempt for a rollout.

Parameters:

  • rollout_id (str) –

    ID of the rollout to query.

Returns:

  • Optional[Attempt]

    Attempt if found, None if not found or if all retries are exhausted.

Note

This method retries on transient failures (network errors, 5xx status codes). If all retries fail, it logs the error and returns None instead of raising an exception.

get_latest_resources() async

Get the latest resources.

Returns:

  • Optional[ResourcesUpdate]

    ResourcesUpdate if found, None if not found or if all retries are exhausted.

Note

This method retries on transient failures (network errors, 5xx status codes). If all retries fail, it logs the error and returns None instead of raising an exception.

get_resources_by_id(resources_id) async

Get resources by their ID.

Parameters:

  • resources_id (str) –

    ID of the resources to retrieve.

Returns:

  • Optional[ResourcesUpdate]

    ResourcesUpdate if found, None if not found or if all retries are exhausted.

Note

This method retries on transient failures (network errors, 5xx status codes). If all retries fail, it logs the error and returns None instead of raising an exception.

get_rollout_by_id(rollout_id) async

Get a rollout by its ID.

Parameters:

  • rollout_id (str) –

    ID of the rollout to retrieve.

Returns:

  • Optional[Rollout]

    Rollout if found, None if not found or if all retries are exhausted.

Note

This method retries on transient failures (network errors, 5xx status codes). If all retries fail, it logs the error and returns None instead of raising an exception.

wait_for_rollouts(*, rollout_ids, timeout=None) async

Wait for rollouts to complete.

Parameters:

  • rollout_ids (List[str]) –

    List of rollout IDs to wait for.

  • timeout (Optional[float], default: None ) –

    Timeout in seconds. If not None, the method will raise a ValueError if the timeout is greater than 0.1 seconds.

Returns:

  • List[Rollout]

    List of rollouts that are completed.

agentlightning.LightningStoreThreaded

Bases: LightningStore

Facade that delegates all store operations to a underlying store instance.

The operations are guaranteed to be thread-safe. Make sure the threaded stores are instantiated before initializing the threads.