Skip to content

Understanding Store

The LightningStore is the central coordination point for Agent-lightning. It holds the task queue, rollouts, attempts, spans, and versioned resources, and exposes a small API both Runners and Algorithms use to communicate. This document explains what's in the store, how statuses transition, how spans are recorded, and the concurrency model (threads & processes).

What's in the Store?

Store Architecture

At a high level:

  • Task Queueenqueue_rollout adds work; workers poll with dequeue_rollout. When a rollout is dequeued, it automatically creates a new attempt associated with itself.
  • Rollouts – A rollout is one unit of work. It has input, metadata, links to resources, and a lifecycle (queuing → preparing → running → ...). Valid RolloutStatus are queuing, preparing, running, succeeded, failed, requeuing, cancelled. For algorithms and runners, the rollout can be seen as a whole, without worrying about the internal attempts.
  • Attempts – Each rollout can have multiple executions (retries). Attempts track status, start_time, end_time, last_heartbeat_time and link to spans. Valid AttemptStatus are preparing, running, succeeded, failed, requeuing, cancelled.
  • Spans – Structured trace events produced by the Tracer during an attempt. Spans are ordered by a monotonic sequence id per (rollout_id, attempt_id).
  • Resources – Versioned, named bundles (e.g., prompt templates) referenced by rollouts.
  • Workers – Metadata about runner instances: heartbeat timestamps, current assignment, and status.

Rollout and Task share the same surface in practice: Rollout.input is the task input. The queue stores rollouts that are not yet running; Runners dequeue them and update the same rollout's status as work progresses.

Before we look at status transitions, it helps to keep in mind that rollouts are the "outside view," while attempts are the "inside view." Attempts are what actually run; rollouts summarize the latest attempt plus a small set of control actions like queueing and cancellation.

Attempt Status Transitions

The status model is intentionally small and operationally clear.

stateDiagram-v2
direction LR

[*] --> preparing: <b>Runner calls</b> dequeue_rollout()<br>or start_rollout()<br>or start_attempt()
preparing --> running: <b>Runner calls</b><br>add_[otel_]span()<br>for the first time

state c_runner <<choice>>
state c_watch  <<choice>>

preparing --> c_runner: <b>Runner calls</b><br>update_attempt(...)</b>
running --> c_runner: <b>Runner calls</b><br>update_attempt(...)
running --> c_watch: <b>Watchdog checks</b>
preparing --> c_watch: <b>Watchdog checks</b>

state "Client-set outcome" as Client {
  direction TB
  succeeded
  failed
}
state "Watchdog / policy" as Watch {
  direction TB
  timeout
  unresponsive
}

c_runner --> succeeded: update_attempt(status=succeeded)
c_runner --> failed: update_attempt(status=failed)

c_watch --> timeout: now - start_time > timeout_seconds
c_watch --> unresponsive: now - last_heartbeat > unresponsive_seconds

unresponsive --> running: <b>Runner calls</b><br>add_[otel_]span()

Each attempt begins in preparing, created either when a rollout is dequeued or explicitly started. It transitions to running the first time a span is recorded. From there, a few clear rules govern how it can change:

  • When the runner explicitly marks completion, the attempt becomes succeeded or failed (when the runner catches exception thrown out by the agent).
  • When the watchdog detects that the total elapsed time since start exceeds the configured limit, it marks the attempt as timeout.
  • If heartbeats stop arriving for too long, the watchdog marks it unresponsive.
  • A new span from the runner can immediately revive an unresponsive attempt back to running.

What's a Watchdog?

The watchdog enforces timing and liveness rules defined by each rollout’s RolloutConfig. It’s not a separate thread or service, but a function periodically invoked (e.g., before store mutations) to keep attempts healthy and consistent.

This simple model allows the system to distinguish between normal termination, abnormal stalling, and recoverable interruption without additional state flags.

Worker Telemetry

Workers track runner-level activity timestamps (last_heartbeat_time, last_dequeue_time, last_busy_time, last_idle_time) plus their current rollout assignment. Those fields are now derived automatically:

  • dequeue_rollout(worker_id=...) records which worker polled the queue and refreshes last_dequeue_time.
  • update_attempt(..., worker_id=...) drives the worker status machine. Assigning an attempt marks the worker busy and stamps last_busy_time; finishing with status in {"succeeded","failed"} switches to idle, while watchdog transitions such as timeout/unresponsive make the worker unknown and clear current_rollout_id / current_attempt_id.
  • update_worker(...) is reserved for heartbeats. It snapshots optional heartbeat_stats and always updates last_heartbeat_time.

Because every transition flows through these APIs, worker status is derived automatically from rollout execution and heartbeats. Note, however, that calling update_worker with a new worker_id will create a new worker record with status "unknown" if one does not exist. Thus, while manual status changes are not allowed, new worker records can be created externally via heartbeats.

Rollout Transition Map

Rollout status is an aggregated view of its latest attempt’s status, with additional transitions for queueing and explicit cancellation.

A rollout’s retry behavior is controlled by Rollout.config (a RolloutConfig). The key fields are:

  • timeout_seconds – maximum wall-clock time for an attempt before it is marked timeout.
  • unresponsive_seconds – maximum silence between heartbeats before an attempt is marked unresponsive.
  • max_attempts – total number of attempts allowed for the rollout (including the first).
  • retry_condition – which attempt terminal statuses should trigger a retry (e.g., ["failed", "timeout", "unresponsive"]).

How it plays out: The runner works on attempt k. If the attempt ends in a status that is listed in retry_condition, and k < max_attempts, the rollout moves to requeuing and the store creates attempt k+1. Otherwise, the rollout becomes failed (or succeeded if the runner marked it so). timeout_seconds and unresponsive_seconds are enforced by the watchdog and feed into the same decision flow.

A minimal example of how to use RolloutConfig:

from agentlightning import RolloutConfig

# Retry on explicit failures or timeouts, up to 3 attempts in total.
cfg = RolloutConfig(
    timeout_seconds=600,
    unresponsive_seconds=120,
    max_attempts=3,
    retry_condition=["failed", "timeout"]
)

# When creating/enqueuing a rollout, attach this config.
# The store will propagate attempt outcomes according to cfg.
rollout = await store.enqueue_rollout(input, config=cfg)
Latest attempt status Rollout transition Notes / guards
N/A queuing Created by enqueue_rollout().
preparing queuing/requeuingpreparing Typically dequeue_rollout() or start_rollout()/start_attempt() creates a new attempt.
running preparing/queuing/requeuingrunning First add_[otel_]span() flips the attempt to running; rollout follows via rollout_status_from_attempt.
succeeded *succeeded Terminal. Rollout end_time set.
failed / timeout / unresponsive *requeuing Only if status ∈ retry_condition ∧ sequence_id < max_attempts.
failed / timeout / unresponsive *failed Otherwise (no retries left or retries disabled).
* *cancelled Explicitly set by update_rollout(status=cancelled).

Why aggregation?

In code, we use rollout_status_from_attempt() which actively updates the rollout based on the latest attempt. Reading the table above is usually easier than reverse-engineering the propagation logic in the code: think of the rollout’s transitions as callbacks on attempt state changes, plus queue/cancel paths.

Spans

Every traceable operation in a rollout is stored as a Span. Spans not only capture fine-grained instrumentation but also act as periodic heartbeats that demonstrate liveness. The first span marks activation; each subsequent one both extends the trace and refreshes the attempt’s last_heartbeat_time. If no span arrives within the configured unresponsive_seconds, the watchdog downgrades the attempt to unresponsive until activity resumes.

Spans are indexed by (rollout_id, attempt_id, sequence_id) where the sequence is monotonic. Tracing analysis tools like Adapter usually rely on "time order" to reconstruct the trace. However, in a distributed system, the recorded start time and end time of a span are not necessarily in the right order when they aggregated into a central store. Therefore, we enforce every span creation to retrieve a monotonically increasing sequence_id from the store before adding the span.

Note

In practice, one sequence_id can be used to create multiple spans. In that case, the orders between the multiple spans are determined by the order of start_time and end_time of the spans.

OpenTelemetry conversion

Runners often produce OpenTelemetry ReadableSpan objects directly. The store normalizes them into Span as follows:

  1. The runner first requests get_next_span_sequence_id via sequence_id = await store.get_next_span_sequence_id(rollout_id, attempt_id). This guarantees ordering within the attempt regardless of clock skew.
  2. trace_id, span_id, parent_id, name, status, timestamps, attributes, events, links, and resource are copied from the OTEL span. Timestamps are auto-normalized to seconds (nanoseconds are converted).
  3. OTEL SpanContext and parent context are preserved so downstream tools can correlate traces across systems.
  4. Any additional serializable fields present on the ReadableSpan are retained in the stored span (after safe JSON serialization), which keeps the representation forward-compatible.

Programmatically this is encapsulated by Span.from_opentelemetry(readable_span, rollout_id, attempt_id, sequence_id); store.add_otel_span(...) simply wraps the fetch-then-add flow. The end result is a store span that is stable to sort, merge, and query, while still preserving the richness of the original OTEL payload.

Tip

add_span or add_otel_span both appends a span and acts as a heartbeat that can revive unresponsiverunning.

OTLP Compatibility

Some of the LightningStore implementations support exporting traces via the OTLP/HTTP specification. For example, LightningStoreServer exposes /v1/traces endpoint, it implements the binary Protobuf variant defined by the spec, including the required Content-Type: application/x-protobuf, optional Content-Encoding: gzip, and status responses encoded as google.rpc.Status. Agent-lightning helps parsing ExportTraceServiceRequest messages, validate identifiers, normalize resource metadata, and allocate sequence numbers so store implementations only need to persist Span objects in order.

Because the interface speaks standard OTLP, any OpenTelemetry-compatible SDK or collector can emit spans directly to a LightningStore OTLP endpoint without custom shims. The server responds according to the OTLP contract (status code, encoding, and error payloads), which keeps Agent-lightning interoperable with existing observability tooling. This compatibility serves as a strong complement to the OpenTelemetry conversion discussed above.

Check whether the store supports OTLP traces via the capabilities["otlp_traces"] property.

Implementation Overview

The agentlightning.store module is organized into two distinct layers plus optional wrappers:

classDiagram
    direction TB

    class LightningStore {
        <<abstract>>
        +enqueue_rollout()
        +dequeue_rollout()
        +update_attempt()
        +add_span()
        +query_rollouts()
        ...
    }

    class LightningCollections {
        <<abstract>>
        +rollouts: Collection
        +attempts: Collection
        +spans: Collection
        +resources: Collection
        +workers: Collection
        +rollout_queue: Queue
        +span_sequence_ids: KeyValue
        +atomic()
    }

    class CollectionBasedLightningStore~T~ {
        +collections: T
        -healthcheck_before()
        -tracked()
    }

    class InMemoryLightningStore
    class MongoLightningStore
    class InMemoryLightningCollections
    class MongoLightningCollections

    class LightningStoreServer {
        +store: LightningStore
        +start()
        +stop()
    }
    class LightningStoreClient {
        +server_address: str
    }
    class LightningStoreThreaded {
        +store: LightningStore
    }

    LightningStore <|-- CollectionBasedLightningStore
    LightningStore <|-- LightningStoreServer
    LightningStore <|-- LightningStoreClient
    LightningStore <|-- LightningStoreThreaded

    CollectionBasedLightningStore <|-- InMemoryLightningStore
    CollectionBasedLightningStore <|-- MongoLightningStore

    LightningCollections <|-- InMemoryLightningCollections
    LightningCollections <|-- MongoLightningCollections

    InMemoryLightningStore ..> InMemoryLightningCollections : uses
    MongoLightningStore ..> MongoLightningCollections : uses

    LightningStoreServer o-- LightningStore : wraps
    LightningStoreThreaded o-- LightningStore : wraps
  1. Collections Layer – Low-level storage primitives (LightningCollections) providing CRUD operations via Collection, Queue, and KeyValue interfaces. Each backend (in-memory, MongoDB) implements these primitives.

  2. Store Layer – All LightningStore implementations must inherit from LightningStore and override the methods to implement the storage logic. CollectionBasedLightningStore builds on collections to implement the full LightningStore API, including business logic like status transitions, watchdog health checks, and retry policies.

  3. Wrappers – Cross-cutting concerns live in thin wrappers:

Collections

The collections layer provides storage primitives that CollectionBasedLightningStore builds upon. This separation keeps business logic (status transitions, watchdog, retries) in the store layer while allowing different backends to focus purely on persistence.

The off-the-shelf implementations are InMemoryLightningCollections and MongoLightningCollections, which are the underlying collections for InMemoryLightningStore and MongoLightningStore, respectively.

Collection Primitives

LightningCollections bundles three primitive types:

Primitive Purpose Methods
Collection[T] Indexed storage with primary keys query(), get(), insert(), update(), upsert(), delete()
Queue[T] FIFO queue for task scheduling enqueue(), dequeue(), peek(), size()
KeyValue[K, V] Simple key-value store get(), set(), inc(), chmax(), pop()

Every LightningCollections instance exposes these named collections:

Atomic Operations

Collections support atomic operations through the atomic() context manager:

async with collections.atomic(mode="rw", labels=["rollouts", "attempts"]) as ctx:
    rollout = await ctx.rollouts.get(filter={"rollout_id": {"exact": rollout_id}})
    # modify and update within the same transaction
    await ctx.rollouts.update([updated_rollout])

The arguments passed to atomic() are quite arbitrary and flexible. Different implementations may have different interpretations of the arguments. For example, to InMemoryLightningCollections, the mode parameter controls locking behavior ("r" for read-only, "rw" for read-write), while labels specifies which collections to lock. Acquiring locks in sorted order prevents deadlocks when multiple operations run concurrently.

Implementing a Custom Backend

To add a new storage backend, implement LightningCollections:

from agentlightning.store.collection import LightningCollections, Collection, Queue, KeyValue

class MyLightningCollections(LightningCollections):
    @property
    def rollouts(self) -> Collection[Rollout]:
        return self._rollouts  # your implementation

    @property
    def rollout_queue(self) -> Queue[str]:
        return self._queue  # your implementation

    # ... implement remaining properties

    async def atomic(self, *, mode, snapshot=False, labels=None, **kwargs):
        # provide transaction / locking semantics
        ...

Then instantiate your store:

from agentlightning.store.collection_based import CollectionBasedLightningStore

store = CollectionBasedLightningStore(collections=MyLightningCollections())

The store layer handles all business logic; your collections just need to provide correct CRUD semantics.

Collection-based Store Implementations

Agent-lightning ships with two collection-based store implementations:

InMemoryLightningStore

InMemoryLightningStore uses InMemoryLightningCollections backed by Python data structures. It supports fast startup with zero external dependencies—ideal for local development, CI, and unit tests. It also provides two lock modes, configurable between "asyncio" (single-thread, multiple coroutines) and "thread" (multi-threaded via aiologic). InMemoryLightningCollections use nested dictionaries for O(1) primary-key lookup and deque for the task queue.

MongoLightningStore

MongoLightningStore uses MongoLightningCollections backed by MongoDB. It supports persistent storage suitable for production deployments and multi-process safe via database-level atomicity. It also supports partition support via partition_id for running multiple trainers against the same database.

from agentlightning.store.mongo import MongoLightningStore

store = MongoLightningStore(
    mongo_uri="mongodb://localhost:27017/?replicaSet=rs0",
    database_name="agentlightning",
    partition_id="trainer-1",  # optional: isolate data per trainer
)

Note

MongoLightningStore requires the mongo optional dependency. Install with pip install agentlightning[mongo].

Capabilities

Different stores have different capabilities. Check the capabilities property to understand what a store supports:

Capability Description InMemory Mongo Server Client
thread_safe Safe for concurrent access from multiple threads configurable
async_safe Safe for concurrent access from multiple coroutines
zero_copy Can be shared across processes without serialization
otlp_traces Exposes an OTLP-compatible /v1/traces endpoint

Thread Safety

Thread safety can be achieved at different layers:

At the collections layer: InMemoryLightningCollections accepts a lock_type parameter:

  • "asyncio" – Uses per-event-loop asyncio.Lock for single-threaded, multi-coroutine scenarios.
  • "thread" – Uses aiologic.Lock for true multi-threaded access.

At the store layer: LightningStoreThreaded wraps any LightningStore to add mutex-based thread safety:

Database-based stores like MongoLightningStore are inherently thread-safe through database atomicity guarantees.

Process Safety and Client-server Store

LightningStoreServer wraps another underlying store and runs a FastAPI app to expose the store API over HTTP. LightningStoreClient is a small LightningStore implementation that talks to the HTTP API.

Warning

The server HTTP API is not considered a stable API at this moment. Users are encouraged to use the LightningStoreClient to communicate with the server as a stable interface.

The server tracks the creator PID. In the owner process it delegates directly to the in-memory store; in other processes it lazily constructs a LightningStoreClient to talk to the HTTP API. This prevents accidental cross-process mutation of the wrong memory image. When the server is pickled (e.g., via multiprocessing), only the minimal fields are serialized, but NOT the FastAPI/uvicorn objects. Subprocesses won’t accidentally carry live server state. Forked subprocess should also use LightningStoreClient to communicate with the server in the main process.

On the client side, the client retries network/5xx failures using a small backoff, and probes /v1/agl/health between attempts. Application exceptions inside the server are wrapped as HTTP 400 with a traceback—these are not retried. The client also maintains a per-event-loop aiohttp.ClientSession map so that tracer callbacks (often on separate loops/threads) don’t hang by reusing a session from another loop.

Minimal lifecycle:

import agentlightning as agl

# Server (owner process)
in_memory_store = agl.InMemoryLightningStore()
server = agl.LightningStoreServer(store=in_memory_store, host="0.0.0.0", port=4747)
await server.start()      # starts uvicorn in a daemon thread and waits for /health
# or keep your own event loop and stop via await server.stop()
# await server.run_forever()

# Client (same or different process)
client = agl.LightningStoreClient("http://localhost:4747")

print(await client.query_rollouts(status_in=["queuing"]))

await client.close()
await server.stop()

Another approach is to use a dedicated command line to start a long running server process, possibly sharable across multiple processes. In the main process, you can always use LightningStoreClient to communicate with the server.

agl store --port 4747

Note

LightningStoreClient.wait_for_rollouts intentionally enforces a tiny timeout (≤ 0.1s) to avoid blocking event loops. Poll with short timeouts or compose with asyncio.wait_for at a higher layer.