Runner-side References¶
Note
This reference covers APIs that are designed to be used at "Runner Side".
Runners¶
agentlightning.LitAgentRunner
¶
Bases: Runner[T_task]
Execute LitAgent
tasks with tracing support.
This runner manages the complete lifecycle of agent rollout execution, including task polling, resource management, tracing, and hooks. It supports both continuous iteration over tasks from the store and single-step execution.
Attributes:
-
worker_id
(Optional[int]
) –Identifier for the active worker process, if any.
tracer
property
¶
__init__(tracer, max_rollouts=None, poll_interval=5.0)
¶
get_agent()
¶
get_store()
¶
Get the store instance.
Returns:
-
LightningStore
–The LightningStore instance for this worker.
Raises:
-
ValueError
–If the store has not been initialized via
init_worker
.
get_worker_id()
¶
Get the formatted worker ID string.
Returns:
-
str
–A formatted string like "Worker-0" if initialized, or "Worker-Unknown"
-
str
–if the worker ID has not been set.
init(agent, *, hooks=None, **kwargs)
¶
Initialize the runner with the agent.
This sets up the agent-runner relationship, registers hooks, and initializes the tracer.
Parameters:
init_worker(worker_id, store, **kwargs)
¶
Initialize the runner for each worker with worker_id and store.
This method is called once per worker in a distributed setup to provide the worker with its ID and store connection.
Parameters:
-
worker_id
(int
) –Unique identifier for this worker process.
-
store
(LightningStore
) –LightningStore
used for task coordination and persistence. -
**kwargs
(Any
, default:{}
) –Additional worker-specific initialization arguments (currently unused).
iter(*, event=None)
async
¶
Run the runner, continuously iterating over tasks in the store.
This method polls the store for new rollouts and executes them until:
- The event is set (if provided)
- The max_rollouts limit is reached (if configured)
- No more tasks are available
All exceptions during rollout execution are caught and logged but not propagated, allowing the runner to continue processing subsequent tasks.
Parameters:
-
event
(Optional[ExecutionEvent]
, default:None
) –Optional ExecutionEvent object to signal the runner to stop. The runner will check this event periodically and stop gracefully when set.
step(input, *, resources=None, mode=None, event=None)
async
¶
Execute a single task directly, bypassing the task queue.
This method creates a new rollout for the given input and executes it
immediately. Unlike iter()
,
exceptions are propagated to the caller.
Parameters:
-
input
(T_task
) –The task input to be processed by the agent.
-
resources
(Optional[NamedResources]
, default:None
) –Optional named resources to be used for this specific task. If provided, a new resources entry will be created in the store. If not provided, the latest resources from the store will be used.
-
mode
(Optional[RolloutMode]
, default:None
) –Optional rollout mode ("train" or "validation"). If not provided, the agent's default mode will be used.
-
event
(Optional[ExecutionEvent]
, default:None
) –Optional ExecutionEvent object to signal interruption (currently unused but included for interface consistency).
Returns:
-
Rollout
–The completed rollout.
Raises:
-
Exception
–Any exception that occurs during rollout execution will be re-raised to the caller.
teardown(*args, **kwargs)
¶
Teardown the runner and clean up all resources.
This method resets all internal state including the agent, store, hooks, and worker ID, and calls the tracer's teardown method.
Parameters:
-
*args
(Any
, default:()
) –Additional teardown arguments (currently unused).
-
**kwargs
(Any
, default:{}
) –Additional teardown keyword arguments (currently unused).
teardown_worker(worker_id, *args, **kwargs)
¶
Teardown the runner for a specific worker.
This method cleans up worker-specific resources and resets the worker ID.
Parameters:
-
worker_id
(int
) –Unique identifier of the worker being torn down.
-
*args
(Any
, default:()
) –Additional teardown arguments (currently unused).
-
**kwargs
(Any
, default:{}
) –Additional teardown keyword arguments (currently unused).
agentlightning.Runner
¶
Bases: ParallelWorkerBase
, Generic[T_task]
Abstract base class for long-running agent executors.
Runner implementations coordinate LitAgent
instances, acquire work from a LightningStore
,
and emit Rollout
objects. Subclasses decide how
to schedule work (polling, streaming, etc.) while this base class provides a
minimal lifecycle contract.
init(agent, **kwargs)
¶
Prepare the runner to execute tasks for agent
.
This method is called only once during the setup for all workers, not for each worker.
Parameters:
-
agent
(LitAgent[T_task]
) –Agent instance providing task-specific logic.
-
**kwargs
(Any
, default:{}
) –Optional runner-specific configuration.
Raises:
-
NotImplementedError
–Subclasses must supply the initialization routine.
init_worker(worker_id, store, **kwargs)
¶
Configure worker-local state before processing tasks.
This method is called for each worker during the setup.
Parameters:
-
worker_id
(int
) –Unique identifier for this worker process or thread.
-
store
(LightningStore
) –Shared
LightningStore
backing task coordination. -
**kwargs
(Any
, default:{}
) –Optional worker-specific configuration.
Raises:
-
NotImplementedError
–Subclasses must prepare per-worker resources.
iter(*, event=None)
async
¶
Run the runner, continuously iterating over tasks in the store.
This method runs in a loop, polling the store for new tasks and executing them until interrupted by the event or when no more tasks are available.
Parameters:
-
event
(Optional[ExecutionEvent]
, default:None
) –Cooperative stop signal. When set, the runner should complete the current unit of work and exit the loop.
Raises:
-
NotImplementedError
–Subclasses provide the iteration behavior.
run(*args, **kwargs)
¶
run_context(*, agent, store, hooks=None, worker_id=None)
¶
Initialize and tear down a runner within a simple context manager.
The helper is primarily intended for debugging runner implementations
outside of a full Trainer
stack.
Parameters:
-
agent
(LitAgent[T_task]
) –Agent executed by this runner.
-
store
(LightningStore
) –Backing
LightningStore
. If you don't have one, you can easily create one withInMemoryLightningStore
. -
hooks
(Optional[Sequence[Hook]]
, default:None
) –Optional sequence of hooks recognised by the runner. Not all runners support hooks.
-
worker_id
(Optional[int]
, default:None
) –Override the worker identifier used during setup. Defaults to
0
.
step(input, *, resources=None, mode=None, event=None)
async
¶
Execute a single task with the given input.
This method provides fine-grained control for executing individual tasks directly, bypassing the store's task queue.
Parameters:
-
input
(T_task
) –Task payload consumed by the agent.
-
resources
(Optional[NamedResources]
, default:None
) –Optional named resources scoped to this invocation.
-
mode
(Optional[RolloutMode]
, default:None
) –Optional rollout mode such as
"train"
or"eval"
. -
event
(Optional[ExecutionEvent]
, default:None
) –Cooperative stop signal for long-running tasks.
Returns:
-
Rollout
–Completed rollout produced by the agent.
Raises:
-
NotImplementedError
–Subclasses provide the execution behavior.
teardown(*args, **kwargs)
¶
Release resources acquired during init()
.
Raises:
-
NotImplementedError
–Subclasses must implement the shutdown routine.
teardown_worker(worker_id, *args, **kwargs)
¶
Release per-worker resources allocated by init_worker()
.
Parameters:
-
worker_id
(int
) –Identifier of the worker being torn down.
Raises:
-
NotImplementedError
–Subclasses must implement the shutdown routine.
Tracer¶
agentlightning.AgentOpsTracer
¶
Bases: Tracer
Traces agent execution using AgentOps.
This tracer provides functionality to capture execution details using the AgentOps library. It manages the AgentOps client initialization, server setup, and integration with the OpenTelemetry tracing ecosystem.
Attributes:
-
agentops_managed
–Whether to automatically manage
agentops
. When set to true, tracer callsagentops.init()
automatically and launches an agentops endpoint locally. If not, you are responsible for calling and using it before using the tracer. -
instrument_managed
–Whether to automatically manage instrumentation. When set to false, you will manage the instrumentation yourself and the tracer might not work as expected.
-
daemon
–Whether the AgentOps server runs as a daemon process. Only applicable if
agentops_managed
is True.
get_langchain_handler(tags=None)
¶
Get the Langchain callback handler for integrating with Langchain.
Parameters:
-
tags
(List[str] | None
, default:None
) –Optional list of tags to apply to the Langchain callback handler.
Returns:
-
LangchainCallbackHandler
–An instance of the Langchain callback handler.
get_last_trace()
¶
Retrieves the raw list of captured spans from the most recent trace.
Returns:
-
List[ReadableSpan]
–A list of OpenTelemetry
ReadableSpan
objects.
trace_context(name=None, *, store=None, rollout_id=None, attempt_id=None)
async
¶
Starts a new tracing context. This should be used as a context manager.
Parameters:
-
name
(Optional[str]
, default:None
) –Optional name for the tracing context.
-
store
(Optional[LightningStore]
, default:None
) –Optional store to add the spans to.
-
rollout_id
(Optional[str]
, default:None
) –Optional rollout ID to add the spans to.
-
attempt_id
(Optional[str]
, default:None
) –Optional attempt ID to add the spans to.
Yields:
-
AsyncGenerator[LightningSpanProcessor, None]
–The
LightningSpanProcessor
instance to collect spans.
agentlightning.OtelTracer
¶
Bases: Tracer
Tracer that provides a basic OpenTelemetry tracer provider.
You should be able to collect agent-lightning signals like rewards with this tracer,
but no other function instrumentations like openai.chat.completion
.
get_last_trace()
¶
Retrieves the raw list of captured spans from the most recent trace.
Returns:
-
List[ReadableSpan]
–A list of OpenTelemetry
ReadableSpan
objects.
trace_context(name=None, *, store=None, rollout_id=None, attempt_id=None)
async
¶
Starts a new tracing context. This should be used as a context manager.
Parameters:
-
name
(Optional[str]
, default:None
) –Optional name for the tracing context.
-
store
(Optional[LightningStore]
, default:None
) –Optional store to add the spans to.
-
rollout_id
(Optional[str]
, default:None
) –Optional rollout ID to add the spans to.
-
attempt_id
(Optional[str]
, default:None
) –Optional attempt ID to add the spans to.
Yields:
-
AsyncGenerator[LightningSpanProcessor, None]
–The LightningSpanProcessor instance to collect spans.
agentlightning.Tracer
¶
Bases: ParallelWorkerBase
An abstract base class for tracers.
This class defines a standard interface for tracing code execution, capturing the resulting spans, and providing them for analysis. It is designed to be backend-agnostic, allowing for different implementations (e.g., for AgentOps, OpenTelemetry, Docker, etc.).
The primary interaction pattern is through the trace_context
context manager, which ensures that traces are properly started and captured,
even in the case of exceptions.
A typical workflow:
tracer = YourTracerImplementation()
try:
async with tracer.trace_context(name="my_traced_task"):
# ... code to be traced ...
await run_my_agent_logic()
except Exception as e:
print(f"An error occurred: {e}")
# Retrieve the trace data after the context block
spans: list[ReadableSpan] = tracer.get_last_trace()
# Process the trace data
if trace_tree:
rl_triplets = TracerTraceToTriplet().adapt(spans)
# ... do something with the triplets
get_langchain_handler()
¶
Get a handler to install in langchain agent callback.
Agents are expected to use this handler in their agents to enable tracing.
get_last_trace()
¶
Retrieves the raw list of captured spans from the most recent trace.
Returns:
-
List[ReadableSpan]
–A list of OpenTelemetry
ReadableSpan
objects.
trace_context(name=None, *, store=None, rollout_id=None, attempt_id=None)
¶
Starts a new tracing context. This should be used as a context manager.
The implementation should handle the setup and teardown of the tracing
for the enclosed code block. It must ensure that any spans generated
within the with
block are collected and made available via
get_last_trace
.
If a store is provided, the spans will be added to the store when tracing.
Parameters:
-
name
(Optional[str]
, default:None
) –The name for the root span of this trace context.
-
store
(Optional[LightningStore]
, default:None
) –The store to add the spans to.
-
rollout_id
(Optional[str]
, default:None
) –The rollout ID to add the spans to.
-
attempt_id
(Optional[str]
, default:None
) –The attempt ID to add the spans to.
trace_run(func, *args, **kwargs)
¶
A convenience wrapper to trace the execution of a single synchronous function.
Deprecated in favor of customizing Runners.
Parameters:
-
func
(Callable[..., Any]
) –The synchronous function to execute and trace.
-
*args
(Any
, default:()
) –Positional arguments to pass to the function.
-
**kwargs
(Any
, default:{}
) –Keyword arguments to pass to the function.
Returns:
-
Any
–The return value of the function.
trace_run_async(func, *args, **kwargs)
async
¶
A convenience wrapper to trace the execution of a single asynchronous function.
Deprecated in favor of customizing Runners.
Parameters:
-
func
(Callable[..., Awaitable[Any]]
) –The asynchronous function to execute and trace.
-
*args
(Any
, default:()
) –Positional arguments to pass to the function.
-
**kwargs
(Any
, default:{}
) –Keyword arguments to pass to the function.
Returns:
-
Any
–The return value of the function.