Using the Emitter¶
While returning a single float for the final reward is sufficient for many algorithm-agent combinations, some advanced scenarios require richer feedback. For instance, an algorithm might learn more effectively if it receives intermediate rewards throughout a multi-step task, or if the agent needs to emit additional spans for debugging or analysis.
Agent-lightning provides an emitter module for recording custom spans inside your agent logic. Just as Tracer automatically instruments common operations (for example, LLM calls), each emitter helper sends a Span that captures Agent-lightning-specific work so downstream algorithms can query it later. See Working with Traces for more details.
For multi-step routines such as function calls, tools, or adapters, wrap code with operation — either as a decorator or a context manager — to capture inputs, outputs, and metadata on a dedicated operation span. This makes it easier to correlate downstream annotations (like rewards or messages) with the higher-level work that produced them.
You can find the emitter functions in agentlightning.emitter.
Emitting Rewards, Messages, and More¶
Here are the primary emitter functions:
emit_reward(value: float): Records an intermediate/final reward, which is a convenient wrapper ofemit_annotation.emit_annotation(attributes: Dict[str, Any]): Records arbitrary metadata as a span.emit_message(message: str): Records a simple log message as a span.emit_exception(exception: BaseException): Records a Python exception, including its type, message, and stack trace.emit_object(obj: Any): Records any JSON-serializable object, perfect for structured data.
Let's first see an example of an agent using these emitters to provide detailed feedback.
import agentlightning as agl
@agl.rollout
def multi_step_agent(task: dict, prompt_template: PromptTemplate) -> float:
try:
# Step 1: Initial planning
agl.emit_message("Starting planning phase.")
plan = generate_plan(task, prompt_template)
agl.emit_object({"plan_steps": len(plan), "first_step": plan[0]})
# Award a small reward for a valid plan
plan_reward = grade_plan(plan)
agl.emit_reward(plan_reward)
# Step 2: Execute the plan
agl.emit_message(f"Executing {len(plan)}-step plan.")
execution_result = execute_plan(plan)
# Step 3: Final evaluation
final_reward = custom_grade_final_result(execution_result, task["expected_output"])
# The return value is treated as the final reward for the rollout
return final_reward
except ValueError as e:
# Record the specific error and return a failure reward
agl.emit_exception(e)
return 0.0
Each helper accepts nested attributes (or keyword arguments for operation) and automatically flattens/sanitizes them into dotted OpenTelemetry keys. This means you can pass ordinary dictionaries/lists without pre-processing and still get consistent attribute names such as meta.any_attribute across all emitter operations. Agent-lightning does not restrict the attributes you supply, but it is best to consult OpenTelemetry's semantic conventions for recommended names. Agent-lightning also defines specific semconv for its own use cases. The pattern looks like this:
from opentelemetry.semconv.attributes import server_attributes
from agentlightning import emit_object
emit_object({
"name": "John Doe",
"age": 30,
"email": "john.doe@example.com",
}, attributes={
server_attributes.SERVER_ADDRESS: "127.0.0.1",
server_attributes.SERVER_PORT: 8080,
})
Running the above code sends the following span to the backend if you have a tracer active:
Span(
name='agentlightning.object',
attributes={
'agentlightning.object.type': 'dict',
'agentlightning.object.json': '{"name": "John Doe", "age": 30, "email": "john.doe@example.com"}',
'server.address': '127.0.0.1',
'server.port': 8080
}
)
Tip
If you don't have a tracer active, the above code will raise the following error:
By default, emitter helpers delegate to the active tracer to create and export spans (specifically via Tracer.create_span). If you want to emit spans without an active tracer, set propagate=False to keep the span local — a useful option for offline tests. The default True streams spans through the active tracer/exporters.
When working with agentlightning.semconv, you typically use utilities such as make_tag_attributes and make_link_attributes to build the attributes dictionary. For example:
from agentlightning.utils.otel import make_tag_attributes
emit_annotation(make_tag_attributes(["tool", "calculator", "fast", "good"]))
The above code will send a span with the following attributes to the backend:
{
"agentlightning.tag.0": "tool",
"agentlightning.tag.1": "calculator",
"agentlightning.tag.2": "fast",
"agentlightning.tag.3": "good"
}
A counterpart utility function extract_tags_from_attributes is also available to extract the tags from the attributes dictionary.
Operations¶
The operation helper tracks logical units of work within your agent, capturing inputs, outputs, timing, and success/failure status. Unlike point-in-time emitters, operations create a span representing a time interval. Use operations for tool calls, multi-step workflows, debugging, and performance monitoring. operation works as either a decorator or a context manager.
The decorator automatically captures function arguments as inputs and the return value as output:
import agentlightning as agl
@agl.operation
def search_documents(query: str, max_results: int = 10) -> list[dict]:
results = perform_search(query, max_results)
return results
@agl.operation(category="tool", priority="high")
def execute_calculation(expression: str) -> float:
return eval_safely(expression)
The example above emits a span with {"category": "tool", "priority": "high"} attributes. It also records the function input and output via OPERATION_INPUT and OPERATION_OUTPUT. It works with async functions too:
@agl.operation
async def async_api_call(endpoint: str, payload: dict) -> dict:
response = await http_client.post(endpoint, json=payload)
return response.json()
Override the operation name if needed:
For more control, operation can also be used as a context manager to explicitly record inputs and outputs:
with agl.operation(tool_name="web_search") as op:
op.set_input(query="latest AI research", filters={"date": "2024"})
results = search_web("latest AI research", {"date": "2024"})
op.set_output({"result_count": len(results), "top_result": results[0]})
The propagate=False flag also applies to operation when you want to keep operations local without requiring an active tracer:
Linking to Other Spans¶
Sometimes a span should explicitly point back to another span that produced the input it is working on (for example, linking a reward annotation to the agentlightning.operation span that generated a response). Agent-lightning encodes these relationships through flattened link attributes. The helper make_link_attributes converts a dictionary of keys such as trace_id, span_id, or any custom attribute into the "agentlightning.link.*" (LightningSpanAttributes.LINK) fields expected by the backend. Later, query_linked_spans can recover the original span(s) from those link descriptors.
import opentelemetry.trace as trace_api
from agentlightning import emit_annotation, operation
from agentlightning.utils.otel import make_link_attributes, make_tag_attributes
with operation(conversation_id="chat-42") as op:
# ... perform the work ...
link_attrs = make_link_attributes({
"conversation_id": "chat-42",
})
emit_annotation(
{
**link_attrs,
**make_tag_attributes(["reward", "good"]),
}
)
When analyzing in adapters, pass the extracted link models to query_linked_spans to retrieve the matching span(s):
from agentlightning.utils.otel import extract_links_from_attributes, query_linked_spans
annotation_span = ... # Span from your trace store
operation_spans = [...] # list of spans you want to search
link_models = extract_links_from_attributes(annotation_span.attributes)
matches = query_linked_spans(operation_spans, link_models)
assert matches # Contains the original operation span
Correlating Rewards with LLM Requests
Tracer instruments each request/response as its own span. You can link to the gen_ai.response.id attribute, which comes from the LLM response ID.
from agentlightning import emit_reward
from agentlightning.utils.otel import make_link_attributes
result = call_llm(prompt)
reward_links = make_link_attributes({"gen_ai.response.id": result.id})
emit_reward(0.9, attributes=reward_links)
Later, use the same gen_ai.response.id key inside query_linked_spans to find the reward(s) that reference that specific LLM request span.