Skip to content

Store References

agentlightning.LightningStore

Contract for the persistent control-plane that coordinates training rollouts.

A LightningStore mediates every interaction between algorithms and runners:

  • Rollout lifecycle: accept new rollouts, queue them for execution, create attempts, and drive the rollout status machine ("queuing""preparing""running"{"succeeded","failed","cancelled"} or "requeuing" when a retry is justified).
  • Attempt tracking: record each execution attempt, including progress heartbeats, retry sequencing, and terminal states such as "timeout" or "unresponsive".
  • Span ingest: capture structured telemetry emitted by runners (either as native Span objects or as opentelemetry.sdk.trace.ReadableSpan instances) so that algorithms can reconstruct trajectories and rewards.
  • Resource versioning: manage immutable snapshots of named resources (prompt templates, model checkpoints, proxy endpoints, …) and expose a single "latest" snapshot that runners can fetch just after claiming work.

Implementations must provide thread-safe/async-safe semantics: each coroutine should appear atomic to callers even when multiple algorithms or runners call the API concurrently. Unless stated otherwise, missing identifiers should result in a ValueError.

capabilities property

Return the capabilities of the store.

add_many_spans(spans) async

Persist a sequence of pre-constructed spans emitted during rollout execution.

Implementations can simply delegate to add_span() for each span. However, if the store supports bulk insertion, it can implement this method to improve performance.

add_otel_span(rollout_id, attempt_id, readable_span, sequence_id=None) async

Convert and persist an OpenTelemetry span for a particular attempt.

Implementations must transform the readable_span into a Span (typically via Span.from_opentelemetry()), assign a strictly increasing sequence_id when one is not provided, and persist it using the same semantics as add_span().

Parameters:

  • rollout_id (str) –

    Identifier of the rollout that produced the span.

  • attempt_id (str) –

    Attempt identifier the span belongs to.

  • readable_span (ReadableSpan) –

    OpenTelemetry span in SDK form.

  • sequence_id (int | None, default: None ) –

    Optional explicit ordering hint. When omitted, call get_next_span_sequence_id() automatically.

Returns:

  • Optional[Span]

    The stored span record. Return None if the span was not added due to a duplicate.

Raises:

  • NotImplementedError

    Subclasses must implement span persistence.

  • ValueError

    Implementations must raise when the rollout or attempt is unknown.

add_resources(resources) async

Persist a new immutable snapshot of named resources and mark it as latest.

Implementations must assign a fresh resources_id and ensure subsequent calls to get_latest_resources() return the snapshot produced here.

Parameters:

  • resources (NamedResources) –

    Mapping of resource names to their serialized payloads.

Returns:

Raises:

  • NotImplementedError

    Subclasses must implement resource persistence.

add_span(span) async

Persist a pre-constructed span emitted during rollout execution.

The provided Span must already contain the rollout_id, attempt_id, and sequence_id. Implementations must:

  • Verify that both rollout and attempt exist.
  • Ensure span ordering remains strictly increasing per attempt (rejecting or keeping duplicates).
  • Treat the span arrival as a heartbeat: update the attempt's last_heartbeat_time and transition both attempt and rollout to "running" if they were still "preparing" or "requeuing".

Parameters:

  • span (Span) –

    Fully populated span to persist.

Returns:

  • Optional[Span]

    The stored span record (implementations may return a copy).

  • Optional[Span]

    Return None if the span was not added due to a duplicate.

Raises:

  • NotImplementedError

    Subclasses must implement span persistence.

  • ValueError

    Implementations must raise when the referenced rollout or attempt is missing.

dequeue_rollout(worker_id=None) async

Claim the oldest queued rollout and transition it to preparing.

This function do not block.

Retrieval must be FIFO across rollouts that remain in queuing or requeuing state. When a rollout is claimed, implementations must:

  • Transition its status to "preparing".
  • Create a new attempt with status="preparing" and sequence_id equal to the number of attempts already registered for the rollout plus one.
  • Return an AttemptedRollout snapshot so the runner knows both rollout metadata and the attempt identifier.
  • Optionally refresh the caller's Worker telemetry (e.g., last_dequeue_time) when worker_id is provided.

Returns:

  • Optional[AttemptedRollout]

    The next attempt to execute, or None when no eligible rollouts are queued.

Raises:

  • NotImplementedError

    Subclasses must implement queue retrieval.

enqueue_rollout(input, mode=None, resources_id=None, config=None, metadata=None) async

Persist a rollout in queuing state so runners can claim it later.

Note

Different from start_rollout(), this method is called when the caller only wants to submit work for later scheduling.

Implementations must generate a unique rollout_id, stamp start_time with the current time, default config to a fresh RolloutConfig, and insert the rollout at the tail of the scheduling queue. No attempt is created yet.

Parameters:

  • input (TaskInput) –

    Arbitrary task payload supplied by an algorithm.

  • mode (Literal['train', 'val', 'test'] | None, default: None ) –

    Optional semantic mode indicator ("train", "val", "test").

  • resources_id (str | None, default: None ) –

    Resource snapshot used when a runner eventually executes the rollout.

  • config (RolloutConfig | None, default: None ) –

    Fine-grained retry/timeout parameters to persist with the rollout.

  • metadata (Dict[str, Any] | None, default: None ) –

    Free-form metadata stored verbatim with the rollout record.

Returns:

Raises:

  • NotImplementedError

    Subclasses must persist the rollout.

  • ValueError

    Implementations should raise when resources_id does not exist.

get_latest_attempt(rollout_id) async

Fetch the attempt with the highest sequence_id for rollout_id.

Parameters:

  • rollout_id (str) –

    Identifier to inspect.

Returns:

  • Optional[Attempt]

    The most recent attempt or None when no attempts exist yet.

Raises:

  • NotImplementedError

    Subclasses must implement retrieval.

  • ValueError

    Implementations must raise when the rollout does not exist.

get_latest_resources() async

Fetch the latest resource snapshot marked as the global default.

Returns:

Raises:

  • NotImplementedError

    Subclasses must implement retrieval.

get_many_span_sequence_ids(rollout_attempt_ids) async

Bulk allocate the next strictly increasing sequence number used to order spans.

Implementations may delegate to get_next_span_sequence_id() for each rollout and attempt.

Parameters:

  • rollout_attempt_ids (Sequence[Tuple[str, str]]) –

    List of tuples of rollout and attempt identifiers.

Returns:

  • Sequence[int]

    List of sequence numbers.

get_next_span_sequence_id(rollout_id, attempt_id) async

Allocate the next strictly increasing sequence number used to order spans.

Implementations must retain counters so repeated calls return 1, 2, ... without gaps unless spans were explicitly inserted with a custom sequence_id. The counter may be scoped per rollout or per attempt, but the sequence must be strictly increasing for spans emitted by the specified attempt so traces remain totally ordered.

See Distributed Tracing for detailed motivations.

Parameters:

  • rollout_id (str) –

    Identifier of the rollout emitting spans.

  • attempt_id (str) –

    Attempt identifier for the upcoming span.

Returns:

  • int

    The next integer sequence identifier, unique within the attempt.

Raises:

  • NotImplementedError

    Subclasses must provide the allocator.

  • ValueError

    Implementations must raise when the rollout or attempt does not exist.

get_resources_by_id(resources_id) async

Return a specific named resource snapshot by identifier.

Parameters:

  • resources_id (str) –

    Identifier of the snapshot.

Returns:

Raises:

  • NotImplementedError

    Subclasses must implement retrieval.

get_rollout_by_id(rollout_id) async

Fetch a rollout by identifier without mutating its state.

Parameters:

  • rollout_id (str) –

    Identifier to retrieve.

Returns:

  • Optional[Rollout]

    The rollout when found, otherwise None.

Raises:

  • NotImplementedError

    Subclasses must implement retrieval.

get_worker_by_id(worker_id) async

Retrieve a single worker by identifier.

Parameters:

  • worker_id (str) –

    Identifier of the worker.

Returns:

  • Optional[Worker]

    The worker record if it exists, otherwise None.

Raises:

  • NotImplementedError

    Subclasses must implement lookup semantics.

otlp_traces_endpoint()

Return the OTLP/HTTP traces endpoint of the store.

The traces can have rollout ID and attempt ID (and optionally sequence ID) saved in the "resource" of the spans. The store, if it supports OTLP, should be able to receive the traces and save them via add_span or add_otel_span.

The endpoint should be compatible with OTLP HTTP protocol. It's not necessarily compatible with OTLP gRPC protocol.

The returned endpoint will usually ends with /v1/traces.

query_attempts(rollout_id, *, sort_by='sequence_id', sort_order='asc', limit=-1, offset=0) async

Return every attempt ever created for rollout_id in ascending sequence order.

The parameters allow callers to re-order or paginate the attempts so that large retry histories can be streamed lazily.

Parameters:

  • rollout_id (str) –

    Identifier of the rollout being inspected.

  • sort_by (Optional[str], default: 'sequence_id' ) –

    Field to sort by. Must be a numeric or string field of Attempt. Defaults to sequence_id (oldest first).

  • sort_order (Literal['asc', 'desc'], default: 'asc' ) –

    Order to sort by.

  • limit (int, default: -1 ) –

    Limit on the number of results. -1 for unlimited.

  • offset (int, default: 0 ) –

    Offset into the results.

Returns:

  • Sequence[Attempt]

    Sequence of Attempts. Returns an empty sequence when none exist.

  • Sequence[Attempt]

    The return value is not guaranteed to be a list.

Raises:

  • NotImplementedError

    Subclasses must implement the query.

  • ValueError

    Implementations must raise when the rollout does not exist.

query_resources(*, resources_id=None, resources_id_contains=None, sort_by=None, sort_order='asc', limit=-1, offset=0) async

List every stored resource snapshot in insertion order.

Supports lightweight filtering, sorting, and pagination for embedding in dashboards.

Parameters:

  • resources_id (Optional[str], default: None ) –

    Optional identifier of the resources to include.

  • resources_id_contains (Optional[str], default: None ) –

    Optional substring match for resources identifiers.

  • sort_by (Optional[str], default: None ) –

    Optional field to sort by (must be numeric or string on ResourcesUpdate).

  • sort_order (Literal['asc', 'desc'], default: 'asc' ) –

    Order to sort by.

  • limit (int, default: -1 ) –

    Limit on the number of results. -1 for unlimited.

  • offset (int, default: 0 ) –

    Offset into the results.

Returns:

Raises:

  • NotImplementedError

    Subclasses must implement retrieval.

query_rollouts(*, status_in=None, rollout_id_in=None, rollout_id_contains=None, filter_logic='and', sort_by=None, sort_order='asc', limit=-1, offset=0, status=None, rollout_ids=None) async

Retrieve rollouts filtered by status and/or explicit identifiers.

This interface supports structured filtering, sorting, and pagination so callers can build simple dashboards without copying data out of the store. The legacy parameters status and rollout_ids remain valid and are treated as aliases for status_in and rollout_id_in respectively—when both the new and deprecated parameters are supplied the new parameters take precedence.

Parameters:

  • status_in (Optional[Sequence[RolloutStatus]], default: None ) –

    Optional whitelist of RolloutStatus values.

  • rollout_id_in (Optional[Sequence[str]], default: None ) –

    Optional whitelist of rollout identifiers to include.

  • rollout_id_contains (Optional[str], default: None ) –

    Optional substring match for rollout identifiers.

  • filter_logic (Literal['and', 'or'], default: 'and' ) –

    Logical operator to combine filters.

  • sort_by (Optional[str], default: None ) –

    Optional field to sort by. Must reference a numeric or string field on Rollout.

  • sort_order (Literal['asc', 'desc'], default: 'asc' ) –

    Direction to sort when sort_by is provided.

  • limit (int, default: -1 ) –

    Maximum number of rows to return. Use -1 for "no limit".

  • offset (int, default: 0 ) –

    Number of rows to skip before returning results.

  • status (Optional[Sequence[RolloutStatus]], default: None ) –

    Deprecated field. Use status_in instead.

  • rollout_ids (Optional[Sequence[str]], default: None ) –

    Deprecated field. Use rollout_id_in instead.

Returns:

  • Sequence[Rollout]

    A sequence of matching rollouts (or AttemptedRollout

  • Sequence[Rollout]

    when attempts exist). Ordering is deterministic when sort_by is set.

  • Sequence[Rollout]

    The return value is not guaranteed to be a list.

Raises:

  • NotImplementedError

    Subclasses must implement the query.

query_spans(rollout_id, attempt_id=None, *, trace_id=None, trace_id_contains=None, span_id=None, span_id_contains=None, parent_id=None, parent_id_contains=None, name=None, name_contains=None, filter_logic='and', limit=-1, offset=0, sort_by='sequence_id', sort_order='asc') async

Return the stored spans for a rollout, optionally scoped to one attempt.

Supports a handful of filters that cover the most common debugging scenarios (matching trace_id/span_id/parent_id or substring matches on the span name). attempt_id="latest" acts as a convenience that resolves the most recent attempt before evaluating filters. When attempt_id=None, spans across every attempt are eligible. By default results are sorted by sequence_id (oldest first). Implementations may raise a RuntimeError when spans were evicted or expired.

Parameters:

  • rollout_id (str) –

    Identifier of the rollout being inspected.

  • attempt_id (str | Literal['latest'] | None, default: None ) –

    Attempt identifier to filter by. Pass "latest" to retrieve only the most recent attempt, or None to return all spans across attempts.

  • trace_id (Optional[str], default: None ) –

    Optional trace ID to filter by.

  • trace_id_contains (Optional[str], default: None ) –

    Optional substring match for trace IDs.

  • span_id (Optional[str], default: None ) –

    Optional span ID to filter by.

  • span_id_contains (Optional[str], default: None ) –

    Optional substring match for span IDs.

  • parent_id (Optional[str], default: None ) –

    Optional parent span ID to filter by.

  • parent_id_contains (Optional[str], default: None ) –

    Optional substring match for parent span IDs.

  • name (Optional[str], default: None ) –

    Optional span name to filter by.

  • name_contains (Optional[str], default: None ) –

    Optional substring match for span names.

  • filter_logic (Literal['and', 'or'], default: 'and' ) –

    Logical operator to combine the optional filters above. The rollout_id argument is always applied with AND semantics.

  • limit (int, default: -1 ) –

    Limit on the number of results. -1 for unlimited.

  • offset (int, default: 0 ) –

    Offset into the results.

  • sort_by (Optional[str], default: 'sequence_id' ) –

    Field to sort by. Must be a numeric or string field of Span.

  • sort_order (Literal['asc', 'desc'], default: 'asc' ) –

    Order to sort by.

Returns:

  • Sequence[Span]

    An ordered list of spans (possibly empty).

  • Sequence[Span]

    The return value is not guaranteed to be a list.

Raises:

  • NotImplementedError

    Subclasses must implement the query.

  • ValueError

    Implementations must raise when the rollout or attempt is unknown.

query_workers(*, status_in=None, worker_id_contains=None, filter_logic='and', sort_by=None, sort_order='asc', limit=-1, offset=0) async

Query all workers in the system.

Parameters:

  • status_in (Optional[Sequence[WorkerStatus]], default: None ) –

    Optional whitelist of WorkerStatus values.

  • worker_id_contains (Optional[str], default: None ) –

    Optional substring match for worker identifiers.

  • filter_logic (Literal['and', 'or'], default: 'and' ) –

    Logical operator to combine the optional filters above.

  • sort_by (Optional[str], default: None ) –

    Field to sort by. Must be a numeric or string field of Worker.

  • sort_order (Literal['asc', 'desc'], default: 'asc' ) –

    Order to sort by.

  • limit (int, default: -1 ) –

    Limit on the number of results. -1 for unlimited.

  • offset (int, default: 0 ) –

    Offset into the results.

Returns:

  • Sequence[Worker]

    Sequence of Workers. Returns an empty sequence when none exist.

  • Sequence[Worker]

    The return value is not guaranteed to be a list.

start_attempt(rollout_id) async

Create a manual retry attempt for an existing rollout.

This is typically invoked by runners that wish to retry outside of the normal queue flow (for example in an online RL setup). Implementations must validate that the rollout exists, allocate a fresh attempt_id, increment the sequence_id monotonically, stamp the new attempt with status="preparing", and return an up-to-date AttemptedRollout.

Parameters:

  • rollout_id (str) –

    Unique identifier of the rollout receiving a new attempt.

Returns:

Raises:

  • NotImplementedError

    Subclasses must implement attempt creation.

  • ValueError

    Implementations must raise when rollout_id is unknown.

start_rollout(input, mode=None, resources_id=None, config=None, metadata=None) async

Register a rollout and immediately create its first attempt.

Note

Use enqueue_rollout() when the caller only wants to submit work for later scheduling.

The rollout must be persisted with status="preparing" and an initial attempt with sequence_id == 1 so the caller can begin execution without visiting the public queue. Implementations are expected to:

  1. Generate a unique rollout_id and attempt_id.
  2. Record start_time for both rollout and attempt based on the current clock.
  3. Copy config and metadata so later mutations do not leak shared references.
  4. Resolve resources_id to the latest resource snapshot when None is supplied.

Parameters:

  • input (TaskInput) –

    Arbitrary task payload supplied by an algorithm.

  • mode (Literal['train', 'val', 'test'] | None, default: None ) –

    Optional semantic mode for downstream analytics ("train", "val", "test").

  • resources_id (str | None, default: None ) –

    Concrete resource snapshot to execute against; defaults to the latest stored snapshot.

  • config (RolloutConfig | None, default: None ) –

    Rollout retry/timeout policy. Should default to a fresh RolloutConfig.

  • metadata (Dict[str, Any] | None, default: None ) –

    Free-form metadata persisted verbatim with the rollout.

Returns:

Raises:

  • NotImplementedError

    Subclasses must provide durable storage for the rollout.

  • ValueError

    Implementations should raise when resources_id does not exist.

statistics() async

Return the statistics of the store.

update_attempt(rollout_id, attempt_id, status=UNSET, worker_id=UNSET, last_heartbeat_time=UNSET, metadata=UNSET) async

Update attempt bookkeeping such as status, worker ownership, and heartbeats.

When attempt_id is "latest" the update must target the attempt with the highest sequence_id; otherwise it must target the specific attempt. Implementations should propagate status changes to the rollout (for example via propagate_status()) once the latest attempt transitions to a terminal state.

Similar to update_rollout(), parameters also default to the sentinel UNSET.

If worker_id is present, the worker status will be updated following the rules:

  1. If attempt status is "succeeded" or "failed", the corresponding worker status will be set to "idle".
  2. If attempt status is "unresponsive" or "timeout", the corresponding worker status will be set to "unknown".
  3. Otherwise, the worker status will be set to "busy".

Parameters:

  • rollout_id (str) –

    Identifier of the rollout whose attempt will be updated.

  • attempt_id (str | Literal['latest']) –

    Attempt identifier or "latest" as a convenience.

  • status (AttemptStatus | Unset, default: UNSET ) –

    Replacement attempt status. Terminal statuses must set end_time.

  • worker_id (str | Unset, default: UNSET ) –

    Identifier for the worker currently processing the attempt.

  • last_heartbeat_time (float | Unset, default: UNSET ) –

    Wall-clock timestamp (seconds) of the latest heartbeat/span.

  • metadata (Optional[Dict[str, Any]] | Unset, default: UNSET ) –

    Replacement metadata dictionary.

Returns:

  • Attempt

    The updated attempt record.

Raises:

  • NotImplementedError

    Subclasses must implement mutation logic.

  • ValueError

    Implementations must raise when the rollout or attempt is unknown.

update_resources(resources_id, resources) async

Overwrite or extend an existing resource snapshot and mark it as latest.

This API is typically used by algorithms that maintain mutable resources (e.g., model checkpoints) under a stable identifier.

Parameters:

  • resources_id (str) –

    Identifier of the snapshot to replace.

  • resources (NamedResources) –

    Updated mapping of resource names to payloads.

Returns:

Raises:

  • NotImplementedError

    Subclasses must implement resource persistence.

  • ValueError

    Implementations must raise when resources_id does not exist.

update_rollout(rollout_id, input=UNSET, mode=UNSET, resources_id=UNSET, status=UNSET, config=UNSET, metadata=UNSET) async

Update rollout metadata and, when provided, drive status transitions.

Parameters default to the sentinel UNSET to distinguish omitted fields from explicit None assignments. Implementations must:

  • Validate the rollout exists before mutating it.
  • Replace each property when a concrete value (including None) is supplied.
  • When the status switches into a terminal state, set end_time and signal any waiters.
  • When the status re-enters a queueing state, ensure the rollout is enqueued exactly once.

Parameters:

  • rollout_id (str) –

    Identifier of the rollout to update.

  • input (TaskInput | Unset, default: UNSET ) –

    Replacement task payload; pass None to explicitly clear the input.

  • mode (Optional[Literal['train', 'val', 'test']] | Unset, default: UNSET ) –

    Replacement rollout mode.

  • resources_id (Optional[str] | Unset, default: UNSET ) –

    Replacement resources snapshot reference.

  • status (RolloutStatus | Unset, default: UNSET ) –

    Target rollout status.

  • config (RolloutConfig | Unset, default: UNSET ) –

    Replacement retry/timeout configuration.

  • metadata (Optional[Dict[str, Any]] | Unset, default: UNSET ) –

    Replacement metadata dictionary.

Returns:

  • Rollout

    The updated rollout record.

Raises:

  • NotImplementedError

    Subclasses must implement mutation logic.

  • ValueError

    Implementations must raise when the rollout is unknown or the update is invalid.

update_worker(worker_id, heartbeat_stats=UNSET) async

Record a heartbeat for worker_id and refresh telemetry.

Implementations must treat this API as heartbeat-only: it should snapshot the latest stats when provided, stamp last_heartbeat_time with the current wall clock, and rely on other store mutations (dequeue_rollout, update_attempt, etc.) to drive the worker's busy/idle status, assignment, and activity timestamps.

Parameters:

  • worker_id (str) –

    Identifier of the worker to update.

  • heartbeat_stats (Dict[str, Any] | Unset, default: UNSET ) –

    Replacement worker heartbeat statistics (non-null when provided).

wait_for_rollouts(*, rollout_ids, timeout=None) async

Block until the targeted rollouts reach a terminal status or the timeout expires.

Terminal statuses are "succeeded", "failed", and "cancelled". When the timeout elapses, implementations should return the subset of rollouts that are already terminal and omit the rest.

Warning

It's dangerous and might be event-loop blocking to call this function with a long timeout. It's a good idea to poll for the method to check if new completed rollouts can coming. Be careful in implementing the sleep logic to avoid busy-waiting.

Parameters:

  • rollout_ids (List[str]) –

    Identifiers of rollouts to watch.

  • timeout (Optional[float], default: None ) –

    Maximum time in seconds to wait. None waits indefinitely.

Returns:

  • List[Rollout]

    Rollouts that finished before the deadline, in arbitrary order.

Raises:

  • NotImplementedError

    Subclasses must implement waiting semantics.

  • ValueError

    Implementations must raise when a rollout identifier is unknown.

agentlightning.LightningStoreCapabilities

Bases: TypedDict

Capability of a LightningStore implementation.

All keys are optional and false by default.

async_safe instance-attribute

Whether the store is async-safe.

otlp_traces instance-attribute

Whether the store supports OTLP/HTTP traces.

thread_safe instance-attribute

Whether the store is thread-safe.

zero_copy instance-attribute

Whether the store has only one copy across all threads/processes.

Store Implementations

agentlightning.InMemoryLightningStore

Bases: CollectionBasedLightningStore[InMemoryLightningCollections]

In-memory implementation of LightningStore using Python data structures. Thread-safe and async-compatible but data is not persistent.

Parameters:

  • eviction_memory_threshold (float | int | None, default: None ) –

    The threshold for evicting spans in bytes. By default, it's 70% of the total VRAM available.

  • safe_memory_threshold (float | int | None, default: None ) –

    The threshold for safe memory usage in bytes. By default, it's 80% of the eviction threshold.

  • span_size_estimator (Callable[[Span], int] | None, default: None ) –

    A function to estimate the size of a span in bytes. By default, it's a simple size estimator that uses sys.getsizeof.

capabilities property

Return the capabilities of the store.

get_running_rollouts(collections) async

Accelerated version of get_running_rollouts for in-memory store. Used for healthcheck.

on_rollout_update(rollout) async

Update the running rollout ids set when the rollout updates.

statistics() async

Return the statistics of the store.

wait_for_rollout(rollout_id, timeout=None) async

Wait for a specific rollout to complete with a timeout.

agentlightning.CollectionBasedLightningStore

Bases: LightningStore, Generic[T_collections]

It's the standard implementation of LightningStore that uses collections to store data.

If the store implementation is to use the store's default behavior, it's recommended to inherit from this class and override the methods if needed. Bring your own collection implementation by using a different collections argument.

The methods in this class should generally not call each other, especially those that are locked.

Parameters:

  • collections (T_collections) –

    The collections to use for storage.

capabilities property

Return the capabilities of the store.

This store supports no capability. The capability depends on the underlying collections.

add_many_spans(collections, spans) async

Persist a sequence of pre-converted spans.

See LightningStore.add_many_spans() for semantics.

add_otel_span(collections, rollout_id, attempt_id, readable_span, sequence_id=None) async

Add an opentelemetry span to the store.

See LightningStore.add_otel_span() for semantics.

add_resources(collections, resources) async

Stores a new version of named resources and sets it as the latest.

See LightningStore.add_resources() for semantics.

add_span(collections, span) async

Persist a pre-converted span.

See LightningStore.add_span() for semantics.

dequeue_rollout(collections, worker_id=None) async

Retrieves the next task from the queue without blocking. Returns None if the queue is empty.

Will set the rollout status to preparing and create a new attempt.

See LightningStore.dequeue_rollout() for semantics.

enqueue_rollout(collections, input, mode=None, resources_id=None, config=None, metadata=None) async

Adds a new task to the queue with specific metadata and returns the rollout.

See LightningStore.enqueue_rollout() for semantics.

get_latest_attempt(collections, rollout_id) async

Retrieves the latest attempt for a given rollout ID.

See LightningStore.get_latest_attempt() for semantics.

get_latest_resources(collections) async

Retrieves the latest version of named resources.

See LightningStore.get_latest_resources() for semantics.

get_many_span_sequence_ids(collections, rollout_attempt_ids) async

Get the next span sequence IDs for a given list of rollout and attempt identifiers.

get_next_span_sequence_id(collections, rollout_id, attempt_id) async

Get the next span sequence ID for a given rollout and attempt. The number is strictly increasing for each rollout. The store will not issue the same sequence ID twice.

See LightningStore.get_next_span_sequence_id() for semantics.

get_resources_by_id(collections, resources_id) async

Retrieves a specific version of named resources by its ID.

See LightningStore.get_resources_by_id() for semantics.

get_rollout_by_id(collections, rollout_id) async

Retrieves a specific rollout by its ID.

See LightningStore.get_rollout_by_id() for semantics.

If the rollout has been attempted, the latest attempt will also be returned.

get_running_rollouts(collections) async

Get all running rollouts.

As this is invoked very frequently (probably at every requests), subclass can implement hacks to make it more efficient. It should also be unlocked and let the caller hold the lock.

on_rollout_update(rollout) async

Callback for subclasses to implement specific logic when a rollout changes.

Subclass should not lock this method with collections.atomic() because the caller will already hold the lock.

query_attempts(collections, rollout_id, *, sort_by='sequence_id', sort_order='asc', limit=-1, offset=0) async

Retrieve attempts for a rollout with optional ordering/pagination.

query_resources(collections, *, resources_id=None, resources_id_contains=None, sort_by=None, sort_order='asc', limit=-1, offset=0) async

Return every stored resource snapshot in insertion order.

query_rollouts(collections, *, status_in=None, rollout_id_in=None, rollout_id_contains=None, filter_logic='and', sort_by=None, sort_order='asc', limit=-1, offset=0, status=None, rollout_ids=None) async

Retrieve rollouts with filtering and pagination.

See LightningStore.query_rollouts() for semantics.

query_spans(collections, rollout_id, attempt_id=None, *, trace_id=None, trace_id_contains=None, span_id=None, span_id_contains=None, parent_id=None, parent_id_contains=None, name=None, name_contains=None, filter_logic='and', limit=-1, offset=0, sort_by='sequence_id', sort_order='asc') async

Query and retrieve spans associated with a specific rollout ID. Returns an empty list if no spans are found.

See LightningStore.query_spans() for semantics.

query_workers(collections, *, status_in=None, worker_id_contains=None, filter_logic='and', sort_by=None, sort_order='asc', limit=-1, offset=0) async

Return the current snapshot of all workers.

start_attempt(collections, rollout_id) async

Creates a new attempt for a given rollout ID and return the attempt details.

See LightningStore.start_attempt() for semantics.

start_rollout(collections, input, mode=None, resources_id=None, config=None, metadata=None) async

Notify the store that I'm about to run a rollout.

See LightningStore.start_rollout() for semantics.

statistics() async

Return the statistics of the store.

update_attempt(collections, rollout_id, attempt_id, status=UNSET, worker_id=UNSET, last_heartbeat_time=UNSET, metadata=UNSET) async

Update a specific or latest attempt for a given rollout.

See LightningStore.update_attempt() for semantics.

update_resources(collections, resources_id, resources) async

Safely stores a new version of named resources and sets it as the latest.

See LightningStore.update_resources() for semantics.

update_rollout(collections, rollout_id, input=UNSET, mode=UNSET, resources_id=UNSET, status=UNSET, config=UNSET, metadata=UNSET) async

Update the rollout status and related metadata.

See LightningStore.update_rollout() for semantics.

update_worker(collections, worker_id, heartbeat_stats=UNSET) async

Create or update a worker entry.

wait_for_rollout(rollout_id, timeout=None) async

Wait for a specific rollout to complete with a timeout.

Subclass may use advanced mechanisms like events to accelerate this.

Returns the completed rollout, or None if timeout is reached.

wait_for_rollouts(*, rollout_ids, timeout=None) async

Wait for specified rollouts to complete with a timeout. Returns the completed rollouts, potentially incomplete if timeout is reached.

This method does not change the state of the store.

See LightningStore.wait_for_rollouts() for semantics.

Client-Server and Thread-safe Wrappers

agentlightning.LightningStoreServer

Bases: LightningStore

Server wrapper that exposes a LightningStore via HTTP API. Delegates all operations to an underlying store implementation.

Healthcheck and watchdog relies on the underlying store.

agl store is a convenient CLI to start a store server.

When the server is executed in a subprocess, the store will discover itself having a different PID and automatically delegate to an HTTP client instead of using the local store. This ensures one single copy of the store will be shared across all processes.

This server exporting OTLP-compatible traces via the /v1/traces endpoint.

Parameters:

  • store (LightningStore) –

    The underlying store to delegate operations to.

  • host (str | None, default: None ) –

    The hostname or IP address to bind the server to.

  • port (int | None, default: None ) –

    The TCP port to listen on.

  • cors_allow_origins (Sequence[str] | str | None, default: None ) –

    A list of CORS origins to allow. Use '*' to allow all origins.

  • launch_mode (LaunchMode, default: 'thread' ) –

    The launch mode to use for the server. Defaults to "thread", which runs the server in a separate thread.

  • launcher_args (PythonServerLauncherArgs | None, default: None ) –

    The arguments to use for the server launcher. It's not allowed to set host, port, launch_mode together with launcher_args.

  • n_workers (int, default: 1 ) –

    The number of workers to run in the server. Only applicable for mp launch mode.

  • prometheus (bool, default: False ) –

    Whether to enable Prometheus metrics.

capabilities property

Return the capabilities of the store.

endpoint property

Endpoint is the address that the client will use to connect to the server.

__getstate__()

Control pickling to prevent server state from being sent to subprocesses.

When LightningStoreServer is pickled (e.g., passed to a subprocess), we only serialize the underlying store and connection details. The client instance and process-awareness state are excluded as they should not be transferred between processes.

The subprocess should create its own server instance if needed.

__setstate__(state)

Restore from pickle by reconstructing only the essential attributes.

Note: This creates a new server instance without FastAPI/uvicorn initialized. Call init() pattern or create a new LightningStoreServer if you need a fully functional server in the subprocess. The unpickled server will also have no app and store attributes, this is to make sure there is only one copy of the server in the whole system.

otlp_traces_endpoint()

Return the OTLP/HTTP traces endpoint of the store.

run_forever() async

Runs the FastAPI server indefinitely.

start() async

Starts the FastAPI server in the background.

You need to call this method in the same process as the server was created in.

stop() async

Gracefully stops the running FastAPI server.

You need to call this method in the same process as the server was created in.

agentlightning.LightningStoreClient

Bases: LightningStore

HTTP client that talks to a remote LightningStoreServer.

Parameters:

  • server_address (str) –

    The address of the LightningStoreServer to connect to.

  • retry_delays (Sequence[float], default: (1.0, 2.0, 5.0) ) –

    Backoff schedule (seconds) used when the initial request fails for a non-application reason. Each entry is a retry attempt. Setting to an empty sequence to disable retries.

  • health_retry_delays (Sequence[float], default: (0.1, 0.2, 0.5) ) –

    Delays between /health probes while waiting for the server to come back. Setting to an empty sequence to disable health checks.

  • request_timeout (float, default: 30.0 ) –

    Timeout (seconds) for each request.

  • connection_timeout (float, default: 5.0 ) –

    Timeout (seconds) for establishing connection.

capabilities property

Return the capabilities of the store.

__getstate__()

When LightningStoreClient is pickled (e.g., passed to a subprocess), we only serialize the server address and retry configurations. The ClientSessions are excluded as they should not be transferred between processes.

__setstate__(state)

Restore from pickle by reconstructing only the essential attributes.

Replicating __init__ logic to create another client instance in the subprocess.

close() async

Close the HTTP session.

dequeue_rollout(worker_id=None) async

Dequeue a rollout from the server queue.

Returns:

  • Optional[AttemptedRollout]

    AttemptedRollout if a rollout is available, None if queue is empty.

Note

This method does NOT retry on failures. If any exception occurs (network error, server error, etc.), it logs the error and returns None immediately.

get_latest_attempt(rollout_id) async

Get the latest attempt for a rollout.

Parameters:

  • rollout_id (str) –

    ID of the rollout to query.

Returns:

  • Optional[Attempt]

    Attempt if found, None if not found or if all retries are exhausted.

Note

This method retries on transient failures (network errors, 5xx status codes). If all retries fail, it logs the error and returns None instead of raising an exception.

get_latest_resources() async

Get the latest resources.

Returns:

  • Optional[ResourcesUpdate]

    ResourcesUpdate if found, None if not found or if all retries are exhausted.

Note

This method retries on transient failures (network errors, 5xx status codes). If all retries fail, it logs the error and returns None instead of raising an exception.

get_resources_by_id(resources_id) async

Get resources by their ID.

Parameters:

  • resources_id (str) –

    ID of the resources to retrieve.

Returns:

  • Optional[ResourcesUpdate]

    ResourcesUpdate if found, None if not found or if all retries are exhausted.

Note

This method retries on transient failures (network errors, 5xx status codes). If all retries fail, it logs the error and returns None instead of raising an exception.

get_rollout_by_id(rollout_id) async

Get a rollout by its ID.

Parameters:

  • rollout_id (str) –

    ID of the rollout to retrieve.

Returns:

  • Optional[Rollout]

    Rollout if found, None if not found or if all retries are exhausted.

Note

This method retries on transient failures (network errors, 5xx status codes). If all retries fail, it logs the error and returns None instead of raising an exception.

otlp_traces_endpoint()

Return the OTLP/HTTP traces endpoint of the store.

query_resources(*, resources_id=None, resources_id_contains=None, sort_by=None, sort_order='asc', limit=-1, offset=0) async

List all resource snapshots stored on the server.

wait_for_rollouts(*, rollout_ids, timeout=None) async

Wait for rollouts to complete.

Parameters:

  • rollout_ids (List[str]) –

    List of rollout IDs to wait for.

  • timeout (Optional[float], default: None ) –

    Timeout in seconds. If not None, the method will raise a ValueError if the timeout is greater than 0.1 seconds.

Returns:

  • List[Rollout]

    List of rollouts that are completed.

agentlightning.LightningStoreThreaded

Bases: LightningStore

Facade that delegates all store operations to a underlying store instance.

The operations are guaranteed to be thread-safe. Make sure the threaded stores are instantiated before initializing the threads.

capabilities property

Return the capabilities of the store.

statistics() async

Return the statistics of the store.

Collections and Collection Implementations

agentlightning.store.collection.Collection

Bases: Generic[T]

Behaves like a list of items. Supporting addition, updating, and deletion of items.

delete(items) async

Delete the given items from the collection.

Parameters:

  • items (Sequence[T]) –

    The items to delete from the collection.

Raises:

  • ValueError

    If the items with the primary keys to be deleted do not exist.

get(filter=None, sort=None) async

Get the first item that matches the given filters.

Parameters:

Returns:

  • Optional[T]

    The first item that matches the given filters, or None if no item matches.

insert(items) async

Add the given items to the collection.

Raises:

  • ValueError

    If an item with the same primary key already exists.

item_type()

Get the type of the items in the collection.

primary_keys()

Get the primary keys of the collection.

query(filter=None, sort=None, limit=-1, offset=0) async

Query the collection with the given filters, sort order, and pagination.

Parameters:

  • filter (Optional[FilterOptions], default: None ) –

    The filters to apply to the collection. See FilterOptions.

  • sort (Optional[SortOptions], default: None ) –

    The options for sorting the collection. See SortOptions. The field must exist in the model. If field might contain null values, in which case the behavior is undefined (i.e., depending on the implementation).

  • limit (int, default: -1 ) –

    Max number of items to return. Use -1 for "no limit".

  • offset (int, default: 0 ) –

    Number of items to skip from the start of the matching items.

Returns:

  • PaginatedResult[T]

    PaginatedResult with items, limit, offset, and total matched items.

size() async

Get the number of items in the collection.

update(items) async

Update the given items in the collection.

Raises:

  • ValueError

    If an item with the primary keys does not exist.

upsert(items) async

Upsert the given items into the collection.

If the items with the same primary keys already exist, they will be updated. Otherwise, they will be inserted.

agentlightning.store.collection.Queue

Bases: Generic[T]

Behaves like a deque. Supporting appending items to the end and popping items from the front.

dequeue(limit=1) async

Pop the given number of items from the front of the queue.

Parameters:

  • limit (int, default: 1 ) –

    The number of items to pop from the front of the queue.

Returns:

  • Sequence[T]

    The items that were popped from the front of the queue.

  • Sequence[T]

    If there are less than limit items in the queue, the remaining items will be returned.

enqueue(items) async

Append the given items to the end of the queue.

Parameters:

  • items (Sequence[T]) –

    The items to append to the end of the queue.

Returns:

  • Sequence[T]

    The items that were appended to the end of the queue.

has(item) async

Check if the given item is in the queue.

item_type()

Get the type of the items in the queue.

peek(limit=1) async

Peek the given number of items from the front of the queue.

Parameters:

  • limit (int, default: 1 ) –

    The number of items to peek from the front of the queue.

Returns:

  • Sequence[T]

    The items that were peeked from the front of the queue.

  • Sequence[T]

    If there are less than limit items in the queue, the remaining items will be returned.

size() async

Get the number of items in the queue.

agentlightning.store.collection.KeyValue

Bases: Generic[K, V]

Behaves like a dictionary. Supporting addition, updating, and deletion of items.

get(key, default=None) async

Get the value for the given key, or the default value if the key is not found.

has(key) async

Check if the given key is in the dictionary.

pop(key, default=None) async

Pop the value for the given key, or the default value if the key is not found.

set(key, value) async

Set the value for the given key.

size() async

Get the number of items in the dictionary.

agentlightning.store.collection.LightningCollections

Collections of rollouts, attempts, spans, resources, and workers.

LightningStore implementations can use this as a storage base to implement the store API.

attempts property

Collections of attempts.

resources property

Collections of resources.

rollout_queue property

Queue of rollouts (tasks).

rollouts property

Collections of rollouts.

span_sequence_ids property

Dictionary (counter) of span sequence IDs.

spans property

Collections of spans.

workers property

Collections of workers.

atomic(*args, **kwargs)

Perform a atomic operation on the collections.

Subclass may use args and kwargs to support multiple levels of atomicity.

Parameters:

  • *args (Any, default: () ) –

    Arguments to pass to the operation.

  • **kwargs (Any, default: {} ) –

    Keyword arguments to pass to the operation.

execute(callback) async

Execute the given callback within an atomic operation.

agentlightning.store.collection.ListBasedCollection

Bases: Collection[T]

In-memory implementation of Collection using a nested dict for O(1) primary-key lookup.

The internal structure is:

{
    pk1_value: {
        pk2_value: {
            ...
                pkN_value: item
        }
    }
}

where the nesting depth equals the number of primary keys.

Sorting behavior:

  1. If no sort_by is provided, the items are returned in the order of insertion.
  2. If sort_by is provided, the items are sorted by the value of the sort_by field.
  3. If the sort_by field is a timestamp, the null values are treated as infinity.
  4. If the sort_by field is not a timestamp, the null values are treated as empty string if the field is str-like, 0 if the field is int-like, 0.0 if the field is float-like.

delete(items) async

Delete the given items.

Raises:

  • ValueError

    If any item with the given primary keys does not exist.

get(filter=None, sort=None) async

Return the first (or best-sorted) item that matches the given filters, or None.

insert(items) async

Insert the given items.

Raises:

  • ValueError

    If any item with the same primary keys already exists.

item_type()

Return the Pydantic model type of items stored in this collection.

primary_keys()

Return the primary key field names for this collection.

query(filter=None, sort=None, limit=-1, offset=0) async

Query the collection with filters, sort order, and pagination.

Parameters:

  • filter (Optional[FilterOptions], default: None ) –

    Mapping of field name to operator dict along with the optional _aggregate logic.

  • sort (Optional[SortOptions], default: None ) –

    Options describing which field to sort by and in which order.

  • limit (int, default: -1 ) –

    Max number of items to return. Use -1 for "no limit".

  • offset (int, default: 0 ) –

    Number of items to skip from the start of the matching items.

size() async

Return the number of items stored in the collection.

update(items) async

Update the given items.

Raises:

  • ValueError

    If any item with the given primary keys does not exist.

upsert(items) async

Upsert the given items (insert if missing, otherwise update).

agentlightning.store.collection.DequeBasedQueue

Bases: Queue[T]

Queue implementation backed by collections.deque.

Provides O(1) amortized enqueue (append) and dequeue (popleft).

agentlightning.store.collection.DictBasedKeyValue

Bases: KeyValue[K, V]

KeyValue implementation backed by a plain dictionary.

agentlightning.store.collection.InMemoryLightningCollections

Bases: LightningCollections

In-memory implementation of LightningCollections using Python data structures.

Serves as the storage base for InMemoryLightningStore.

atomic(*args, **kwargs) async

In-memory collections apply a lock outside. It doesn't need to manipulate the collections inside.

evict_spans_for_rollout(rollout_id) async

Evict all spans for a given rollout ID.

Uses private API for efficiency.