Skip to content

Agent Lightning Core

Client Side

agentlightning.litagent

LitAgent

Base class for the training and validation logic of an agent.

Developers should subclass this class and implement the rollout methods to define the agent's behavior for a single task. The agent's logic is completely decoupled from the server communication and training infrastructure.

Source code in agentlightning/litagent.py
class LitAgent:
    """Base class for the training and validation logic of an agent.

    Developers should subclass this class and implement the rollout methods
    to define the agent's behavior for a single task. The agent's logic
    is completely decoupled from the server communication and training
    infrastructure.
    """

    def __init__(self, *, trained_agents: Optional[str] = None) -> None:  # FIXME: str | None won't work for cli
        """
        Initialize the LitAgent.

        Args:
            trained_agents: Optional string representing the trained agents.
                            This can be used to track which agents have been trained by this instance.
        """
        self.trained_agents = trained_agents
        self._trainer_ref: weakref.ReferenceType[Trainer] | None = None
        self._runner_ref: weakref.ReferenceType[AgentRunner] | None = None

    def set_trainer(self, trainer: Trainer) -> None:
        """
        Set the trainer for this agent.

        Args:
            trainer: The Trainer instance that will handle training and validation.
        """
        self._trainer_ref = weakref.ref(trainer)

    @property
    def trainer(self) -> Trainer:
        """
        Get the trainer for this agent.

        Returns:
            The Trainer instance associated with this agent.
        """
        if self._trainer_ref is None:
            raise ValueError("Trainer has not been set for this agent.")
        trainer = self._trainer_ref()
        if trainer is None:
            raise ValueError("Trainer reference is no longer valid (object has been garbage collected).")
        return trainer

    @property
    def tracer(self) -> BaseTracer:
        """
        Get the tracer for this agent.

        Returns:
            The BaseTracer instance associated with this agent.
        """
        return self.trainer.tracer

    def set_runner(self, runner: AgentRunner) -> None:
        """
        Set the runner for this agent.

        Args:
            runner: The AgentRunner instance that will handle the execution of rollouts.
        """
        self._runner_ref = weakref.ref(runner)

    @property
    def runner(self) -> AgentRunner:
        """
        Get the runner for this agent.

        Returns:
            The AgentRunner instance associated with this agent.
        """
        if self._runner_ref is None:
            raise ValueError("Runner has not been set for this agent.")
        runner = self._runner_ref()
        if runner is None:
            raise ValueError("Runner reference is no longer valid (object has been garbage collected).")
        return runner

    def on_rollout_start(self, task: Task, runner: AgentRunner, tracer: BaseTracer) -> None:
        """Hook called immediately before a rollout begins.

        Args:
            task: The :class:`Task` object that will be processed.
            runner: The :class:`AgentRunner` managing the rollout.
            tracer: The tracer instance associated with the runner.

        Subclasses can override this method to implement custom logic such as
        logging, metric collection, or resource setup. By default, this is a
        no-op.
        """

    def on_rollout_end(self, task: Task, rollout: Rollout, runner: AgentRunner, tracer: BaseTracer) -> None:
        """Hook called after a rollout completes.

        Args:
            task: The :class:`Task` object that was processed.
            rollout: The resulting :class:`Rollout` object.
            runner: The :class:`AgentRunner` managing the rollout.
            tracer: The tracer instance associated with the runner.

        Subclasses can override this method for cleanup or additional
        logging. By default, this is a no-op.
        """

    def training_rollout(self, task: TaskInput, rollout_id: str, resources: NamedResources) -> RolloutRawResult:
        """Defines the agent's behavior for a single training task.

        This method should contain the logic for how the agent processes an
        input, uses the provided resources (like LLMs or prompts), and
        produces a result.

        Args:
            task: The task object received from the server, containing the
                  input data and metadata.
            rollout_id: A unique identifier for the rollout, used for tracking
                        and reporting purposes.
            resources: A dictionary of named resources (e.g., LLMs, prompt
                       templates) for the agent to use.

        Returns:
            The result of the rollout, which can be one of:
            - None. The tracing should be handled by the agent runner.
            - A float representing the final reward.
            - A list of `Triplet` objects for detailed, step-by-step feedback.
            - A list of `ReadableSpan` objects for OpenTelemetry tracing.
            - A list of dictionaries for any trace spans.
            - A complete `Rollout` object for full control over reporting.
        """
        raise NotImplementedError("Subclasses must implement the `training_rollout` method.")

    def validation_rollout(self, task: TaskInput, rollout_id: str, resources: NamedResources) -> RolloutRawResult:
        """Defines the agent's behavior for a single validation task.

        By default, this method redirects to `training_rollout`. Override it
        if the agent should behave differently during validation.

        Args:
            task: The task object received from the server, containing the
                  input data and metadata.
            rollout_id: A unique identifier for the validation rollout,
                        used for tracking and reporting purposes.
            resources: A dictionary of named resources for the agent to use.

        Returns:
            The result of the validation rollout. See `training_rollout` for
            possible return types.
        """
        return self.training_rollout(task, rollout_id, resources)

    async def training_rollout_async(
        self, task: TaskInput, rollout_id: str, resources: NamedResources
    ) -> RolloutRawResult:
        """Asynchronous version of `training_rollout`.

        This method should be implemented by agents that perform asynchronous
        operations (e.g., non-blocking I/O, concurrent API calls).

        Args:
            task: The task object received from the server.
            rollout_id: A unique identifier for the training rollout,
                        used for tracking and reporting purposes.
            resources: A dictionary of named resources for the agent to use.

        Returns:
            The result of the asynchronous training rollout.
        """
        raise NotImplementedError("Async agents must implement the `training_rollout_async` method.")

    async def validation_rollout_async(
        self, task: TaskInput, rollout_id: str, resources: NamedResources
    ) -> RolloutRawResult:
        """Asynchronous version of `validation_rollout`.

        By default, this method redirects to `training_rollout_async`.
        Override it for different asynchronous validation behavior.

        Args:
            task: The task object received from the server.
            rollout_id: A unique identifier for the validation rollout,
                        used for tracking and reporting purposes.
            resources: A dictionary of named resources for the agent to use.

        Returns:
            The result of the asynchronous validation rollout.
        """
        return await self.training_rollout_async(task, rollout_id, resources)

runner property

Get the runner for this agent.

Returns:

Type Description
AgentRunner

The AgentRunner instance associated with this agent.

tracer property

Get the tracer for this agent.

Returns:

Type Description
BaseTracer

The BaseTracer instance associated with this agent.

trainer property

Get the trainer for this agent.

Returns:

Type Description
Trainer

The Trainer instance associated with this agent.

__init__(*, trained_agents=None)

Initialize the LitAgent.

Parameters:

Name Type Description Default
trained_agents Optional[str]

Optional string representing the trained agents. This can be used to track which agents have been trained by this instance.

None
Source code in agentlightning/litagent.py
def __init__(self, *, trained_agents: Optional[str] = None) -> None:  # FIXME: str | None won't work for cli
    """
    Initialize the LitAgent.

    Args:
        trained_agents: Optional string representing the trained agents.
                        This can be used to track which agents have been trained by this instance.
    """
    self.trained_agents = trained_agents
    self._trainer_ref: weakref.ReferenceType[Trainer] | None = None
    self._runner_ref: weakref.ReferenceType[AgentRunner] | None = None

on_rollout_end(task, rollout, runner, tracer)

Hook called after a rollout completes.

Parameters:

Name Type Description Default
task Task

The :class:Task object that was processed.

required
rollout Rollout

The resulting :class:Rollout object.

required
runner AgentRunner

The :class:AgentRunner managing the rollout.

required
tracer BaseTracer

The tracer instance associated with the runner.

required

Subclasses can override this method for cleanup or additional logging. By default, this is a no-op.

Source code in agentlightning/litagent.py
def on_rollout_end(self, task: Task, rollout: Rollout, runner: AgentRunner, tracer: BaseTracer) -> None:
    """Hook called after a rollout completes.

    Args:
        task: The :class:`Task` object that was processed.
        rollout: The resulting :class:`Rollout` object.
        runner: The :class:`AgentRunner` managing the rollout.
        tracer: The tracer instance associated with the runner.

    Subclasses can override this method for cleanup or additional
    logging. By default, this is a no-op.
    """

on_rollout_start(task, runner, tracer)

Hook called immediately before a rollout begins.

Parameters:

Name Type Description Default
task Task

The :class:Task object that will be processed.

required
runner AgentRunner

The :class:AgentRunner managing the rollout.

required
tracer BaseTracer

The tracer instance associated with the runner.

required

Subclasses can override this method to implement custom logic such as logging, metric collection, or resource setup. By default, this is a no-op.

Source code in agentlightning/litagent.py
def on_rollout_start(self, task: Task, runner: AgentRunner, tracer: BaseTracer) -> None:
    """Hook called immediately before a rollout begins.

    Args:
        task: The :class:`Task` object that will be processed.
        runner: The :class:`AgentRunner` managing the rollout.
        tracer: The tracer instance associated with the runner.

    Subclasses can override this method to implement custom logic such as
    logging, metric collection, or resource setup. By default, this is a
    no-op.
    """

set_runner(runner)

Set the runner for this agent.

Parameters:

Name Type Description Default
runner AgentRunner

The AgentRunner instance that will handle the execution of rollouts.

required
Source code in agentlightning/litagent.py
def set_runner(self, runner: AgentRunner) -> None:
    """
    Set the runner for this agent.

    Args:
        runner: The AgentRunner instance that will handle the execution of rollouts.
    """
    self._runner_ref = weakref.ref(runner)

set_trainer(trainer)

Set the trainer for this agent.

Parameters:

Name Type Description Default
trainer Trainer

The Trainer instance that will handle training and validation.

required
Source code in agentlightning/litagent.py
def set_trainer(self, trainer: Trainer) -> None:
    """
    Set the trainer for this agent.

    Args:
        trainer: The Trainer instance that will handle training and validation.
    """
    self._trainer_ref = weakref.ref(trainer)

training_rollout(task, rollout_id, resources)

Defines the agent's behavior for a single training task.

This method should contain the logic for how the agent processes an input, uses the provided resources (like LLMs or prompts), and produces a result.

Parameters:

Name Type Description Default
task TaskInput

The task object received from the server, containing the input data and metadata.

required
rollout_id str

A unique identifier for the rollout, used for tracking and reporting purposes.

required
resources NamedResources

A dictionary of named resources (e.g., LLMs, prompt templates) for the agent to use.

required

Returns:

Type Description
RolloutRawResult

The result of the rollout, which can be one of:

RolloutRawResult
  • None. The tracing should be handled by the agent runner.
RolloutRawResult
  • A float representing the final reward.
RolloutRawResult
  • A list of Triplet objects for detailed, step-by-step feedback.
RolloutRawResult
  • A list of ReadableSpan objects for OpenTelemetry tracing.
RolloutRawResult
  • A list of dictionaries for any trace spans.
RolloutRawResult
  • A complete Rollout object for full control over reporting.
Source code in agentlightning/litagent.py
def training_rollout(self, task: TaskInput, rollout_id: str, resources: NamedResources) -> RolloutRawResult:
    """Defines the agent's behavior for a single training task.

    This method should contain the logic for how the agent processes an
    input, uses the provided resources (like LLMs or prompts), and
    produces a result.

    Args:
        task: The task object received from the server, containing the
              input data and metadata.
        rollout_id: A unique identifier for the rollout, used for tracking
                    and reporting purposes.
        resources: A dictionary of named resources (e.g., LLMs, prompt
                   templates) for the agent to use.

    Returns:
        The result of the rollout, which can be one of:
        - None. The tracing should be handled by the agent runner.
        - A float representing the final reward.
        - A list of `Triplet` objects for detailed, step-by-step feedback.
        - A list of `ReadableSpan` objects for OpenTelemetry tracing.
        - A list of dictionaries for any trace spans.
        - A complete `Rollout` object for full control over reporting.
    """
    raise NotImplementedError("Subclasses must implement the `training_rollout` method.")

training_rollout_async(task, rollout_id, resources) async

Asynchronous version of training_rollout.

This method should be implemented by agents that perform asynchronous operations (e.g., non-blocking I/O, concurrent API calls).

Parameters:

Name Type Description Default
task TaskInput

The task object received from the server.

required
rollout_id str

A unique identifier for the training rollout, used for tracking and reporting purposes.

required
resources NamedResources

A dictionary of named resources for the agent to use.

required

Returns:

Type Description
RolloutRawResult

The result of the asynchronous training rollout.

Source code in agentlightning/litagent.py
async def training_rollout_async(
    self, task: TaskInput, rollout_id: str, resources: NamedResources
) -> RolloutRawResult:
    """Asynchronous version of `training_rollout`.

    This method should be implemented by agents that perform asynchronous
    operations (e.g., non-blocking I/O, concurrent API calls).

    Args:
        task: The task object received from the server.
        rollout_id: A unique identifier for the training rollout,
                    used for tracking and reporting purposes.
        resources: A dictionary of named resources for the agent to use.

    Returns:
        The result of the asynchronous training rollout.
    """
    raise NotImplementedError("Async agents must implement the `training_rollout_async` method.")

validation_rollout(task, rollout_id, resources)

Defines the agent's behavior for a single validation task.

By default, this method redirects to training_rollout. Override it if the agent should behave differently during validation.

Parameters:

Name Type Description Default
task TaskInput

The task object received from the server, containing the input data and metadata.

required
rollout_id str

A unique identifier for the validation rollout, used for tracking and reporting purposes.

required
resources NamedResources

A dictionary of named resources for the agent to use.

required

Returns:

Type Description
RolloutRawResult

The result of the validation rollout. See training_rollout for

RolloutRawResult

possible return types.

Source code in agentlightning/litagent.py
def validation_rollout(self, task: TaskInput, rollout_id: str, resources: NamedResources) -> RolloutRawResult:
    """Defines the agent's behavior for a single validation task.

    By default, this method redirects to `training_rollout`. Override it
    if the agent should behave differently during validation.

    Args:
        task: The task object received from the server, containing the
              input data and metadata.
        rollout_id: A unique identifier for the validation rollout,
                    used for tracking and reporting purposes.
        resources: A dictionary of named resources for the agent to use.

    Returns:
        The result of the validation rollout. See `training_rollout` for
        possible return types.
    """
    return self.training_rollout(task, rollout_id, resources)

validation_rollout_async(task, rollout_id, resources) async

Asynchronous version of validation_rollout.

By default, this method redirects to training_rollout_async. Override it for different asynchronous validation behavior.

Parameters:

Name Type Description Default
task TaskInput

The task object received from the server.

required
rollout_id str

A unique identifier for the validation rollout, used for tracking and reporting purposes.

required
resources NamedResources

A dictionary of named resources for the agent to use.

required

Returns:

Type Description
RolloutRawResult

The result of the asynchronous validation rollout.

Source code in agentlightning/litagent.py
async def validation_rollout_async(
    self, task: TaskInput, rollout_id: str, resources: NamedResources
) -> RolloutRawResult:
    """Asynchronous version of `validation_rollout`.

    By default, this method redirects to `training_rollout_async`.
    Override it for different asynchronous validation behavior.

    Args:
        task: The task object received from the server.
        rollout_id: A unique identifier for the validation rollout,
                    used for tracking and reporting purposes.
        resources: A dictionary of named resources for the agent to use.

    Returns:
        The result of the asynchronous validation rollout.
    """
    return await self.training_rollout_async(task, rollout_id, resources)

agentlightning.client

AgentLightningClient

Client for interacting with a version-aware Agent Lightning Server.

This client handles polling for tasks, fetching specific versions of resources (like model configurations), and posting completed rollouts back to the server. It provides both synchronous and asynchronous methods for these operations and includes a cache for resources.

Source code in agentlightning/client.py
class AgentLightningClient:
    """
    Client for interacting with a version-aware Agent Lightning Server.

    This client handles polling for tasks, fetching specific versions of resources
    (like model configurations), and posting completed rollouts back to the server.
    It provides both synchronous and asynchronous methods for these operations and
    includes a cache for resources.
    """

    _next_task_uri = "/task"
    _resources_uri = "/resources"
    _latest_resources_uri = "/resources/latest"
    _report_rollout_uri = "/rollout"

    def __init__(self, endpoint: str, poll_interval: float = 5.0, timeout: float = 10.0):
        """Initializes the AgentLightningClient.

        Args:
            endpoint: The root URL of the Agent Lightning server.
            poll_interval: The interval in seconds to wait between polling for new tasks.
            timeout: The timeout in seconds for HTTP requests.
        """
        self.endpoint = endpoint
        self.task_count = 0
        self.poll_interval = poll_interval
        self.timeout = timeout
        self._resource_cache: Dict[str, ResourcesUpdate] = {}  # TODO: mechanism to evict cache
        self._default_headers = {"X-AgentLightning-Client": "true"}

    async def _request_json_async(self, url: str) -> Optional[Dict[str, Any]]:
        """Makes an async GET request to the specified URL and returns the JSON response.

        Args:
            url: The URL to request.

        Returns:
            The JSON response as a dictionary or None if the request fails.
        """
        timeout = aiohttp.ClientTimeout(total=self.timeout)
        async with aiohttp.ClientSession(timeout=timeout) as session:
            try:
                async with session.get(url, headers=self._default_headers) as resp:
                    resp.raise_for_status()
                    return await resp.json()
            except Exception as e:
                logger.debug(f"Async GET request failed for {url}: {e}")
                return None

    async def _post_json_async(self, url: str, payload: Dict[str, Any]) -> Optional[Dict[str, Any]]:
        """Makes an async POST request with a JSON payload.

        Args:
            url: The URL to post to.
            payload: The dictionary data to send as JSON.

        Returns:
            The JSON response as a dictionary or None if the request fails.
        """
        timeout = aiohttp.ClientTimeout(total=self.timeout)
        async with aiohttp.ClientSession(timeout=timeout) as session:
            try:
                async with session.post(url, json=payload, headers=self._default_headers) as resp:
                    resp.raise_for_status()
                    return await resp.json()
            except Exception as e:
                logger.debug(f"Async POST request failed for {url}: {e}")
                return None

    async def poll_next_task_async(self) -> Task:
        """Polls the server asynchronously for the next task until one is available.

        Returns:
            A Task object containing the task details.
        """
        url = urllib.parse.urljoin(self.endpoint, self._next_task_uri)
        while True:
            response = await self._request_json_async(url)
            if response:
                task_if_any = TaskIfAny.model_validate(response)
                if task_if_any.is_available and task_if_any.task:
                    self.task_count += 1
                    logger.info(f"[Task {self.task_count} Received] ID: {task_if_any.task.rollout_id}")
                    return task_if_any.task
            logger.debug(f"No task available yet. Retrying in {self.poll_interval} seconds...")
            await asyncio.sleep(self.poll_interval)

    async def get_resources_by_id_async(self, resource_id: str) -> Optional[ResourcesUpdate]:
        """Fetches a specific version of resources by its ID, using a cache.

        Args:
            resource_id: The ID of the resources to fetch, usually from a Task's metadata.

        Returns:
            A ResourcesUpdate object containing the versioned resources, or None if not found.
        """
        if resource_id in self._resource_cache:
            logger.debug(f"Found resources '{resource_id}' in cache.")
            return self._resource_cache[resource_id]

        url = urllib.parse.urljoin(self.endpoint, f"{self._resources_uri}/{resource_id}")
        response = await self._request_json_async(url)
        if response:
            resources_update = ResourcesUpdate.model_validate(response)
            self._resource_cache[resource_id] = resources_update
            logger.info(f"Fetched and cached resources for ID: {resource_id}")
            return resources_update
        return None

    async def get_latest_resources_async(self) -> Optional[ResourcesUpdate]:
        """Fetches the latest available resources from the server.

        Returns:
            A ResourcesUpdate object containing the latest resources.
        """
        url = urllib.parse.urljoin(self.endpoint, self._latest_resources_uri)
        response = await self._request_json_async(url)
        if response:
            resources_update = ResourcesUpdate.model_validate(response)
            # Cache this result as well
            self._resource_cache[resources_update.resources_id] = resources_update
            return resources_update
        return None

    async def post_rollout_async(self, rollout: Rollout) -> Optional[Dict[str, Any]]:
        """Posts a completed rollout to the server asynchronously.

        Args:
            rollout: A Rollout object containing the results of a task.

        Returns:
            The server's JSON response as a dictionary.
        """
        url = urllib.parse.urljoin(self.endpoint, self._report_rollout_uri)
        payload = rollout.model_dump(mode="json")
        return await self._post_json_async(url, payload)

    def _request_json(self, url: str) -> Optional[Dict[str, Any]]:
        """Makes a sync GET request to the specified URL and returns the JSON response.

        Args:
            url: The URL to request.

        Returns:
            The JSON response as a dictionary or None if the request fails.
        """
        try:
            response = requests.get(url, timeout=self.timeout, headers=self._default_headers)
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            logger.debug(f"Sync GET request failed for {url}: {e}")
            return None

    def _post_json(self, url: str, payload: Dict[str, Any]) -> Optional[Dict[str, Any]]:
        """Makes a sync POST request with a JSON payload.

        Args:
            url: The URL to post to.
            payload: The dictionary data to send as JSON.

        Returns:
            The JSON response as a dictionary or None if the request fails.
        """
        try:
            response = requests.post(url, json=payload, timeout=self.timeout, headers=self._default_headers)
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            logger.debug(f"Sync POST request failed for {url}: {e}")
            return None

    def poll_next_task(self) -> Task:
        """Polls the server synchronously for the next task until one is available.

        Returns:
            A Task object containing the task details, including the required `resources_id`.
        """
        url = urllib.parse.urljoin(self.endpoint, self._next_task_uri)
        while True:
            response = self._request_json(url)
            if response:
                task_if_any = TaskIfAny.model_validate(response)
                if task_if_any.is_available and task_if_any.task:
                    self.task_count += 1
                    logger.info(f"[Task {self.task_count} Received] ID: {task_if_any.task.rollout_id}")
                    return task_if_any.task
            logger.debug(f"No task available yet. Retrying in {self.poll_interval} seconds...")
            time.sleep(self.poll_interval)

    def get_resources_by_id(self, resource_id: str) -> Optional[ResourcesUpdate]:
        """Fetches a specific version of resources by its ID synchronously, using a cache.

        Args:
            resource_id: The ID of the resources to fetch, usually from a Task's metadata.

        Returns:
            A ResourcesUpdate object containing the versioned resources, or None if not found.
        """
        if resource_id in self._resource_cache:
            logger.debug(f"Found resources '{resource_id}' in cache.")
            return self._resource_cache[resource_id]

        url = urllib.parse.urljoin(self.endpoint, f"{self._resources_uri}/{resource_id}")
        response = self._request_json(url)
        if response:
            resources_update = ResourcesUpdate.model_validate(response)
            self._resource_cache[resource_id] = resources_update
            logger.info(f"Fetched and cached resources for ID: {resource_id}")
            return resources_update
        return None

    def get_latest_resources(self) -> Optional[ResourcesUpdate]:
        """Fetches the latest available resources from the server synchronously.

        Returns:
            A ResourcesUpdate object containing the latest resources.
        """
        url = urllib.parse.urljoin(self.endpoint, self._latest_resources_uri)
        response = self._request_json(url)
        if response:
            resources_update = ResourcesUpdate.model_validate(response)
            self._resource_cache[resources_update.resources_id] = resources_update
            return resources_update
        return None

    def post_rollout(self, rollout: Rollout) -> Optional[Dict[str, Any]]:
        """Posts a completed rollout to the server synchronously.

        Args:
            rollout: A Rollout object containing the results of a task.

        Returns:
            The server's JSON response as a dictionary.
        """
        url = urllib.parse.urljoin(self.endpoint, self._report_rollout_uri)
        payload = rollout.model_dump(mode="json")
        return self._post_json(url, payload)

__init__(endpoint, poll_interval=5.0, timeout=10.0)

Initializes the AgentLightningClient.

Parameters:

Name Type Description Default
endpoint str

The root URL of the Agent Lightning server.

required
poll_interval float

The interval in seconds to wait between polling for new tasks.

5.0
timeout float

The timeout in seconds for HTTP requests.

10.0
Source code in agentlightning/client.py
def __init__(self, endpoint: str, poll_interval: float = 5.0, timeout: float = 10.0):
    """Initializes the AgentLightningClient.

    Args:
        endpoint: The root URL of the Agent Lightning server.
        poll_interval: The interval in seconds to wait between polling for new tasks.
        timeout: The timeout in seconds for HTTP requests.
    """
    self.endpoint = endpoint
    self.task_count = 0
    self.poll_interval = poll_interval
    self.timeout = timeout
    self._resource_cache: Dict[str, ResourcesUpdate] = {}  # TODO: mechanism to evict cache
    self._default_headers = {"X-AgentLightning-Client": "true"}

get_latest_resources()

Fetches the latest available resources from the server synchronously.

Returns:

Type Description
Optional[ResourcesUpdate]

A ResourcesUpdate object containing the latest resources.

Source code in agentlightning/client.py
def get_latest_resources(self) -> Optional[ResourcesUpdate]:
    """Fetches the latest available resources from the server synchronously.

    Returns:
        A ResourcesUpdate object containing the latest resources.
    """
    url = urllib.parse.urljoin(self.endpoint, self._latest_resources_uri)
    response = self._request_json(url)
    if response:
        resources_update = ResourcesUpdate.model_validate(response)
        self._resource_cache[resources_update.resources_id] = resources_update
        return resources_update
    return None

get_latest_resources_async() async

Fetches the latest available resources from the server.

Returns:

Type Description
Optional[ResourcesUpdate]

A ResourcesUpdate object containing the latest resources.

Source code in agentlightning/client.py
async def get_latest_resources_async(self) -> Optional[ResourcesUpdate]:
    """Fetches the latest available resources from the server.

    Returns:
        A ResourcesUpdate object containing the latest resources.
    """
    url = urllib.parse.urljoin(self.endpoint, self._latest_resources_uri)
    response = await self._request_json_async(url)
    if response:
        resources_update = ResourcesUpdate.model_validate(response)
        # Cache this result as well
        self._resource_cache[resources_update.resources_id] = resources_update
        return resources_update
    return None

get_resources_by_id(resource_id)

Fetches a specific version of resources by its ID synchronously, using a cache.

Parameters:

Name Type Description Default
resource_id str

The ID of the resources to fetch, usually from a Task's metadata.

required

Returns:

Type Description
Optional[ResourcesUpdate]

A ResourcesUpdate object containing the versioned resources, or None if not found.

Source code in agentlightning/client.py
def get_resources_by_id(self, resource_id: str) -> Optional[ResourcesUpdate]:
    """Fetches a specific version of resources by its ID synchronously, using a cache.

    Args:
        resource_id: The ID of the resources to fetch, usually from a Task's metadata.

    Returns:
        A ResourcesUpdate object containing the versioned resources, or None if not found.
    """
    if resource_id in self._resource_cache:
        logger.debug(f"Found resources '{resource_id}' in cache.")
        return self._resource_cache[resource_id]

    url = urllib.parse.urljoin(self.endpoint, f"{self._resources_uri}/{resource_id}")
    response = self._request_json(url)
    if response:
        resources_update = ResourcesUpdate.model_validate(response)
        self._resource_cache[resource_id] = resources_update
        logger.info(f"Fetched and cached resources for ID: {resource_id}")
        return resources_update
    return None

get_resources_by_id_async(resource_id) async

Fetches a specific version of resources by its ID, using a cache.

Parameters:

Name Type Description Default
resource_id str

The ID of the resources to fetch, usually from a Task's metadata.

required

Returns:

Type Description
Optional[ResourcesUpdate]

A ResourcesUpdate object containing the versioned resources, or None if not found.

Source code in agentlightning/client.py
async def get_resources_by_id_async(self, resource_id: str) -> Optional[ResourcesUpdate]:
    """Fetches a specific version of resources by its ID, using a cache.

    Args:
        resource_id: The ID of the resources to fetch, usually from a Task's metadata.

    Returns:
        A ResourcesUpdate object containing the versioned resources, or None if not found.
    """
    if resource_id in self._resource_cache:
        logger.debug(f"Found resources '{resource_id}' in cache.")
        return self._resource_cache[resource_id]

    url = urllib.parse.urljoin(self.endpoint, f"{self._resources_uri}/{resource_id}")
    response = await self._request_json_async(url)
    if response:
        resources_update = ResourcesUpdate.model_validate(response)
        self._resource_cache[resource_id] = resources_update
        logger.info(f"Fetched and cached resources for ID: {resource_id}")
        return resources_update
    return None

poll_next_task()

Polls the server synchronously for the next task until one is available.

Returns:

Type Description
Task

A Task object containing the task details, including the required resources_id.

Source code in agentlightning/client.py
def poll_next_task(self) -> Task:
    """Polls the server synchronously for the next task until one is available.

    Returns:
        A Task object containing the task details, including the required `resources_id`.
    """
    url = urllib.parse.urljoin(self.endpoint, self._next_task_uri)
    while True:
        response = self._request_json(url)
        if response:
            task_if_any = TaskIfAny.model_validate(response)
            if task_if_any.is_available and task_if_any.task:
                self.task_count += 1
                logger.info(f"[Task {self.task_count} Received] ID: {task_if_any.task.rollout_id}")
                return task_if_any.task
        logger.debug(f"No task available yet. Retrying in {self.poll_interval} seconds...")
        time.sleep(self.poll_interval)

poll_next_task_async() async

Polls the server asynchronously for the next task until one is available.

Returns:

Type Description
Task

A Task object containing the task details.

Source code in agentlightning/client.py
async def poll_next_task_async(self) -> Task:
    """Polls the server asynchronously for the next task until one is available.

    Returns:
        A Task object containing the task details.
    """
    url = urllib.parse.urljoin(self.endpoint, self._next_task_uri)
    while True:
        response = await self._request_json_async(url)
        if response:
            task_if_any = TaskIfAny.model_validate(response)
            if task_if_any.is_available and task_if_any.task:
                self.task_count += 1
                logger.info(f"[Task {self.task_count} Received] ID: {task_if_any.task.rollout_id}")
                return task_if_any.task
        logger.debug(f"No task available yet. Retrying in {self.poll_interval} seconds...")
        await asyncio.sleep(self.poll_interval)

post_rollout(rollout)

Posts a completed rollout to the server synchronously.

Parameters:

Name Type Description Default
rollout Rollout

A Rollout object containing the results of a task.

required

Returns:

Type Description
Optional[Dict[str, Any]]

The server's JSON response as a dictionary.

Source code in agentlightning/client.py
def post_rollout(self, rollout: Rollout) -> Optional[Dict[str, Any]]:
    """Posts a completed rollout to the server synchronously.

    Args:
        rollout: A Rollout object containing the results of a task.

    Returns:
        The server's JSON response as a dictionary.
    """
    url = urllib.parse.urljoin(self.endpoint, self._report_rollout_uri)
    payload = rollout.model_dump(mode="json")
    return self._post_json(url, payload)

post_rollout_async(rollout) async

Posts a completed rollout to the server asynchronously.

Parameters:

Name Type Description Default
rollout Rollout

A Rollout object containing the results of a task.

required

Returns:

Type Description
Optional[Dict[str, Any]]

The server's JSON response as a dictionary.

Source code in agentlightning/client.py
async def post_rollout_async(self, rollout: Rollout) -> Optional[Dict[str, Any]]:
    """Posts a completed rollout to the server asynchronously.

    Args:
        rollout: A Rollout object containing the results of a task.

    Returns:
        The server's JSON response as a dictionary.
    """
    url = urllib.parse.urljoin(self.endpoint, self._report_rollout_uri)
    payload = rollout.model_dump(mode="json")
    return await self._post_json_async(url, payload)

DevTaskLoader

Bases: AgentLightningClient

A local task manager for development that provides sample tasks and resources.

This client mocks the server APIs by maintaining a local queue of tasks and resources within the same process. It's designed for development, testing, and scenarios where a full Agent Lightning server is not needed.

The DevTaskLoader overrides the polling and resource fetching methods to return data from local collections instead of making HTTP requests to a remote server.

Source code in agentlightning/client.py
class DevTaskLoader(AgentLightningClient):
    """A local task manager for development that provides sample tasks and resources.

    This client mocks the server APIs by maintaining a local queue of tasks and resources
    within the same process. It's designed for development, testing, and scenarios where
    a full Agent Lightning server is not needed.

    The DevTaskLoader overrides the polling and resource fetching methods to return data
    from local collections instead of making HTTP requests to a remote server.
    """

    def __init__(
        self,
        tasks: Union[List[TaskInput], List[Task]],
        resources: Union[NamedResources, ResourcesUpdate],
        **kwargs: Any,
    ):
        """Initializes the DevTaskLoader with pre-defined tasks and resources.

        Args:
            tasks: Either a List of TaskInput objects or a List of Task objects.
            resources: Either NamedResources or ResourcesUpdate object.
            **kwargs: Additional arguments passed to the parent AgentLightningClient.
        """
        super().__init__(endpoint="local://", **kwargs)
        self._tasks = tasks.copy()
        if len(self._tasks) == 0:
            raise ValueError("DevTaskLoader requires at least one task to be provided.")

        # Check if tasks are mixture of TaskInput and Task
        if any(isinstance(task, Task) for task in self._tasks):
            if not all(isinstance(task, Task) for task in self._tasks):
                raise ValueError("All tasks must be either Task or TaskInput objects.")

        self._task_index = 0

        if isinstance(resources, ResourcesUpdate):
            self._resources_update = resources
        else:
            self._resources_update = ResourcesUpdate(resources_id="local", resources=resources)

        # Store rollouts posted back to the loader for easy debugging of local runs
        self._rollouts: List[Rollout] = []

    @property
    def rollouts(self) -> List[Rollout]:
        """Return rollouts that have been posted back to the loader."""
        return self._rollouts

    def poll_next_task(self) -> Task:
        """Returns the next task from the local queue.

        If tasks are TaskInput objects, assembles them into Task objects.
        If tasks are already Task objects, returns them directly.

        Returns:
            The next Task object from the local task list.
        """
        if self._task_index >= len(self._tasks):
            self._task_index = 0

        task_or_input = self._tasks[self._task_index]

        if isinstance(task_or_input, Task):
            task = task_or_input
        else:
            rollout_id = f"local_task_{self._task_index + 1:03d}"
            task = Task(
                rollout_id=rollout_id,
                input=task_or_input,
                resources_id=self._resources_update.resources_id,
                create_time=time.time(),
            )

        self._task_index += 1
        self.task_count += 1
        logger.info(f"[Task {self.task_count} Received] Task ID: {task.rollout_id}")
        return task

    def get_resources_by_id(self, resource_id: str) -> Optional[ResourcesUpdate]:
        logger.debug(f"DevTaskLoader checking resources for ID: {resource_id}")
        if resource_id != self._resources_update.resources_id:
            raise ValueError(
                f"Resource ID '{resource_id}' not found. Only '{self._resources_update.resources_id}' is available."
            )
        return self._resources_update

    def get_latest_resources(self) -> Optional[ResourcesUpdate]:
        logger.debug("DevTaskLoader returning latest resources.")
        return self._resources_update

    def post_rollout(self, rollout: Rollout) -> Optional[Dict[str, Any]]:
        logger.debug(f"DevTaskLoader received rollout for task: {rollout.rollout_id}")
        self._rollouts.append(rollout)
        return {"status": "received", "rollout_id": rollout.rollout_id}

    async def poll_next_task_async(self) -> Task:
        return self.poll_next_task()

    async def get_resources_by_id_async(self, resource_id: str) -> Optional[ResourcesUpdate]:
        return self.get_resources_by_id(resource_id)

    async def get_latest_resources_async(self) -> Optional[ResourcesUpdate]:
        return self.get_latest_resources()

    async def post_rollout_async(self, rollout: Rollout) -> Optional[Dict[str, Any]]:
        return self.post_rollout(rollout)

    def __repr__(self):
        return f"DevTaskLoader(num_tasks={len(self._tasks)}, resources={self._resources_update.resources})"

rollouts property

Return rollouts that have been posted back to the loader.

__init__(tasks, resources, **kwargs)

Initializes the DevTaskLoader with pre-defined tasks and resources.

Parameters:

Name Type Description Default
tasks Union[List[TaskInput], List[Task]]

Either a List of TaskInput objects or a List of Task objects.

required
resources Union[NamedResources, ResourcesUpdate]

Either NamedResources or ResourcesUpdate object.

required
**kwargs Any

Additional arguments passed to the parent AgentLightningClient.

{}
Source code in agentlightning/client.py
def __init__(
    self,
    tasks: Union[List[TaskInput], List[Task]],
    resources: Union[NamedResources, ResourcesUpdate],
    **kwargs: Any,
):
    """Initializes the DevTaskLoader with pre-defined tasks and resources.

    Args:
        tasks: Either a List of TaskInput objects or a List of Task objects.
        resources: Either NamedResources or ResourcesUpdate object.
        **kwargs: Additional arguments passed to the parent AgentLightningClient.
    """
    super().__init__(endpoint="local://", **kwargs)
    self._tasks = tasks.copy()
    if len(self._tasks) == 0:
        raise ValueError("DevTaskLoader requires at least one task to be provided.")

    # Check if tasks are mixture of TaskInput and Task
    if any(isinstance(task, Task) for task in self._tasks):
        if not all(isinstance(task, Task) for task in self._tasks):
            raise ValueError("All tasks must be either Task or TaskInput objects.")

    self._task_index = 0

    if isinstance(resources, ResourcesUpdate):
        self._resources_update = resources
    else:
        self._resources_update = ResourcesUpdate(resources_id="local", resources=resources)

    # Store rollouts posted back to the loader for easy debugging of local runs
    self._rollouts: List[Rollout] = []

poll_next_task()

Returns the next task from the local queue.

If tasks are TaskInput objects, assembles them into Task objects. If tasks are already Task objects, returns them directly.

Returns:

Type Description
Task

The next Task object from the local task list.

Source code in agentlightning/client.py
def poll_next_task(self) -> Task:
    """Returns the next task from the local queue.

    If tasks are TaskInput objects, assembles them into Task objects.
    If tasks are already Task objects, returns them directly.

    Returns:
        The next Task object from the local task list.
    """
    if self._task_index >= len(self._tasks):
        self._task_index = 0

    task_or_input = self._tasks[self._task_index]

    if isinstance(task_or_input, Task):
        task = task_or_input
    else:
        rollout_id = f"local_task_{self._task_index + 1:03d}"
        task = Task(
            rollout_id=rollout_id,
            input=task_or_input,
            resources_id=self._resources_update.resources_id,
            create_time=time.time(),
        )

    self._task_index += 1
    self.task_count += 1
    logger.info(f"[Task {self.task_count} Received] Task ID: {task.rollout_id}")
    return task

agentlightning.runner

AgentRunner

Bases: ParallelWorkerBase

Manages the agent's execution loop and integrates with AgentOps.

This class orchestrates the interaction between the agent (LitAgent) and the server (AgentLightningClient). It handles polling for tasks, executing the agent's logic, and reporting results back to the server. If enabled, it will also automatically trace each rollout using AgentOps.

Attributes:

Name Type Description
agent

The LitAgent instance containing the agent's logic.

client

The AgentLightningClient for server communication.

tracer

The tracer instance for this runner/worker.

worker_id

An optional identifier for the worker process.

max_tasks

The maximum number of tasks to process before stopping.

Source code in agentlightning/runner.py
class AgentRunner(ParallelWorkerBase):
    """Manages the agent's execution loop and integrates with AgentOps.

    This class orchestrates the interaction between the agent (`LitAgent`) and
    the server (`AgentLightningClient`). It handles polling for tasks, executing
    the agent's logic, and reporting results back to the server. If enabled,
    it will also automatically trace each rollout using AgentOps.

    Attributes:
        agent: The `LitAgent` instance containing the agent's logic.
        client: The `AgentLightningClient` for server communication.
        tracer: The tracer instance for this runner/worker.
        worker_id: An optional identifier for the worker process.
        max_tasks: The maximum number of tasks to process before stopping.
    """

    def __init__(
        self,
        agent: LitAgent,
        client: AgentLightningClient,
        tracer: BaseTracer,
        triplet_exporter: TripletExporter,
        worker_id: Optional[int] = None,
        max_tasks: Optional[int] = None,
    ):
        super().__init__()
        self.agent = agent
        self.client = client
        self.tracer = tracer
        self.triplet_exporter = triplet_exporter

        # Worker-specific attributes
        self.worker_id = worker_id
        self.max_tasks = max_tasks

    def _log_prefix(self, rollout_id: Optional[str] = None) -> str:
        """Generates a standardized log prefix for the current worker."""
        if self.worker_id is not None:
            if rollout_id:
                return f"[Worker {self.worker_id} | Rollout {rollout_id}]"
            else:
                return f"[Worker {self.worker_id}]"
        if rollout_id:
            return f"[Rollout {rollout_id}]"
        return "[Default Worker]"

    def _to_rollout_object(
        self,
        result: RolloutRawResult,
        rollout_id: str,
    ) -> Rollout:
        """Standardizes the agent's return value into a Rollout object.

        Args:
            result: The output from the agent's rollout method.
            rollout_id: The unique identifier for the current task.

        Returns:
            A standardized `Rollout` object for reporting to the server.
        """
        trace: Any = None
        final_reward: Optional[float] = None
        triplets: Optional[List[Triplet]] = None
        trace_spans: Optional[List[ReadableSpan]] = None

        # Handle different types of results from the agent
        # Case 1: result is a float (final reward)
        if isinstance(result, float):
            final_reward = result
        # Case 2: result is a list of Triplets
        if isinstance(result, list) and all(isinstance(t, Triplet) for t in result):
            triplets = result  # type: ignore
        # Case 3: result is a list of ReadableSpan (OpenTelemetry spans)
        if isinstance(result, list) and all(isinstance(t, ReadableSpan) for t in result):
            trace_spans = result  # type: ignore
            trace = [json.loads(readable_span.to_json()) for readable_span in trace_spans]  # type: ignore
        # Case 4: result is a list of dict (trace JSON)
        if isinstance(result, list) and all(isinstance(t, dict) for t in result):
            trace = result
        # Case 5: result is a Rollout object
        if isinstance(result, Rollout):
            final_reward = result.final_reward
            triplets = result.triplets
            trace = result.trace

        # If the agent has tracing enabled, use the tracer's last trace if not already set
        if self.tracer and (trace is None or trace_spans is None):
            spans = self.tracer.get_last_trace()
            if spans:
                trace = [json.loads(readable_span.to_json()) for readable_span in spans]
                trace_spans = spans

        # Always extract triplets from the trace using TripletExporter
        if trace_spans:
            triplets = self.triplet_exporter.export(trace_spans)

        # If the agent has triplets, use the last one for final reward if not set
        if triplets and triplets[-1].reward is not None and final_reward is None:
            final_reward = triplets[-1].reward

        # Create the Rollout object with standardized fields
        result_dict: Dict[str, Any] = {
            "rollout_id": rollout_id,
        }
        if final_reward is not None:
            result_dict["final_reward"] = final_reward
        if triplets is not None:
            result_dict["triplets"] = triplets
        if trace is not None:
            result_dict["trace"] = trace

        if isinstance(result, Rollout):
            return result.model_copy(update=result_dict)
        return Rollout(**result_dict)

    def run(self) -> bool:
        """Poll the task and rollout once synchronously."""
        self.agent.set_runner(self)  # Ensure the agent has a reference to this runner

        task = self.client.poll_next_task()
        if task is None:
            logger.info(f"{self._log_prefix()} Poll returned no task. Exiting.")
            return False
        rollout_id = task.rollout_id

        resources_id = task.resources_id
        resources_update = None
        if resources_id:
            resources_update = self.client.get_resources_by_id(resources_id)
        else:
            logger.debug(f"{self._log_prefix(rollout_id)} No 'resources_id'. Fetching latest resources.")
            resources_update = self.client.get_latest_resources()
        if not resources_update:
            logger.error(f"{self._log_prefix(rollout_id)} Failed to fetch resources. Skipping.")
            return False

        rollout_obj = Rollout(rollout_id=task.rollout_id)  # Default empty rollout

        try:
            try:
                self.agent.on_rollout_start(task, self, self.tracer)
            except Exception:
                logger.exception(f"{self._log_prefix(rollout_id)} Exception during on_rollout_start hook.")

            with self.tracer.trace_context(name=f"rollout_{rollout_id}"):
                start_time = time.time()
                rollout_method = self.agent.training_rollout if task.mode == "train" else self.agent.validation_rollout
                # Pass the task input, not the whole task object
                result = rollout_method(task.input, task.rollout_id, resources_update.resources)
                rollout_obj = self._to_rollout_object(result, task.rollout_id)
                end_time = time.time()
                logger.info(
                    f"{self._log_prefix(rollout_id)} Completed in "
                    f"{end_time - start_time:.2f}s. Triplet length: "
                    f"{len(rollout_obj.triplets) if rollout_obj.triplets is not None else 'N/A'}. "
                    f"Reward: {rollout_obj.final_reward}"
                )

        except Exception:
            logger.exception(f"{self._log_prefix(rollout_id)} Exception during rollout.")
        finally:
            try:
                self.agent.on_rollout_end(task, rollout_obj, self, self.tracer)
            except Exception:
                logger.exception(f"{self._log_prefix(rollout_id)} Exception during on_rollout_end hook.")
            self.client.post_rollout(rollout_obj)

        return True

    def iter(self) -> int:
        """Executes the synchronous polling and rollout loop."""
        num_tasks_processed = 0
        logger.info(f"{self._log_prefix()} Started sync rollouts (max: {self.max_tasks or 'unlimited'}).")

        while self.max_tasks is None or num_tasks_processed < self.max_tasks:
            if self.run():
                num_tasks_processed += 1

            if num_tasks_processed % 10 == 0 or num_tasks_processed == 1:
                logger.info(f"{self._log_prefix()} Progress: {num_tasks_processed}/{self.max_tasks or 'unlimited'}")

        logger.info(f"{self._log_prefix()} Finished sync rollouts. Processed {num_tasks_processed} tasks.")
        return num_tasks_processed

    async def run_async(self) -> bool:
        """Poll the task and rollout once."""
        self.agent.set_runner(self)  # Ensure the agent has a reference to this runner

        task = await self.client.poll_next_task_async()
        if task is None:
            logger.info(f"{self._log_prefix()} Poll returned no task. Exiting.")
            return False
        rollout_id = task.rollout_id

        resources_id = task.resources_id
        resources_update = None
        if resources_id:
            resources_update = await self.client.get_resources_by_id_async(resources_id)
        else:
            logger.debug(f"{self._log_prefix(rollout_id)} No 'resources_id'. Fetching latest resources.")
            resources_update = await self.client.get_latest_resources_async()
        if not resources_update:
            logger.error(f"{self._log_prefix(rollout_id)} Failed to fetch resources. Skipping.")
            return False

        rollout_obj = Rollout(rollout_id=task.rollout_id)  # Default empty rollout

        try:
            try:
                self.agent.on_rollout_start(task, self, self.tracer)
            except Exception:
                logger.exception(f"{self._log_prefix(rollout_id)} Exception during on_rollout_start hook.")

            with self.tracer.trace_context(name=f"rollout_{rollout_id}"):
                start_time = time.time()
                rollout_method = (
                    self.agent.training_rollout_async if task.mode == "train" else self.agent.validation_rollout_async
                )
                # Pass the task input, not the whole task object
                result = await rollout_method(task.input, task.rollout_id, resources_update.resources)
                rollout_obj = self._to_rollout_object(result, task.rollout_id)
                end_time = time.time()
                logger.info(
                    f"{self._log_prefix(rollout_id)} Completed in "
                    f"{end_time - start_time:.2f}s. Reward: {rollout_obj.final_reward}"
                )
        except Exception:
            logger.exception(f"{self._log_prefix(rollout_id)} Exception during rollout.")
        finally:
            try:
                self.agent.on_rollout_end(task, rollout_obj, self, self.tracer)
            except Exception:
                logger.exception(f"{self._log_prefix(rollout_id)} Exception during on_rollout_end hook.")
            await self.client.post_rollout_async(rollout_obj)

        return True

    async def iter_async(self) -> int:
        """Executes the asynchronous polling and rollout loop."""
        num_tasks_processed = 0
        logger.info(f"{self._log_prefix()} Started async rollouts (max: {self.max_tasks or 'unlimited'}).")

        while self.max_tasks is None or num_tasks_processed < self.max_tasks:
            if await self.run_async():
                num_tasks_processed += 1

            if num_tasks_processed % 10 == 0 or num_tasks_processed == 1:
                logger.info(f"{self._log_prefix()} Progress: {num_tasks_processed}/{self.max_tasks or 'unlimited'}")
        logger.info(f"{self._log_prefix()} Finished async rollouts. Processed {num_tasks_processed} tasks.")
        return num_tasks_processed

iter()

Executes the synchronous polling and rollout loop.

Source code in agentlightning/runner.py
def iter(self) -> int:
    """Executes the synchronous polling and rollout loop."""
    num_tasks_processed = 0
    logger.info(f"{self._log_prefix()} Started sync rollouts (max: {self.max_tasks or 'unlimited'}).")

    while self.max_tasks is None or num_tasks_processed < self.max_tasks:
        if self.run():
            num_tasks_processed += 1

        if num_tasks_processed % 10 == 0 or num_tasks_processed == 1:
            logger.info(f"{self._log_prefix()} Progress: {num_tasks_processed}/{self.max_tasks or 'unlimited'}")

    logger.info(f"{self._log_prefix()} Finished sync rollouts. Processed {num_tasks_processed} tasks.")
    return num_tasks_processed

iter_async() async

Executes the asynchronous polling and rollout loop.

Source code in agentlightning/runner.py
async def iter_async(self) -> int:
    """Executes the asynchronous polling and rollout loop."""
    num_tasks_processed = 0
    logger.info(f"{self._log_prefix()} Started async rollouts (max: {self.max_tasks or 'unlimited'}).")

    while self.max_tasks is None or num_tasks_processed < self.max_tasks:
        if await self.run_async():
            num_tasks_processed += 1

        if num_tasks_processed % 10 == 0 or num_tasks_processed == 1:
            logger.info(f"{self._log_prefix()} Progress: {num_tasks_processed}/{self.max_tasks or 'unlimited'}")
    logger.info(f"{self._log_prefix()} Finished async rollouts. Processed {num_tasks_processed} tasks.")
    return num_tasks_processed

run()

Poll the task and rollout once synchronously.

Source code in agentlightning/runner.py
def run(self) -> bool:
    """Poll the task and rollout once synchronously."""
    self.agent.set_runner(self)  # Ensure the agent has a reference to this runner

    task = self.client.poll_next_task()
    if task is None:
        logger.info(f"{self._log_prefix()} Poll returned no task. Exiting.")
        return False
    rollout_id = task.rollout_id

    resources_id = task.resources_id
    resources_update = None
    if resources_id:
        resources_update = self.client.get_resources_by_id(resources_id)
    else:
        logger.debug(f"{self._log_prefix(rollout_id)} No 'resources_id'. Fetching latest resources.")
        resources_update = self.client.get_latest_resources()
    if not resources_update:
        logger.error(f"{self._log_prefix(rollout_id)} Failed to fetch resources. Skipping.")
        return False

    rollout_obj = Rollout(rollout_id=task.rollout_id)  # Default empty rollout

    try:
        try:
            self.agent.on_rollout_start(task, self, self.tracer)
        except Exception:
            logger.exception(f"{self._log_prefix(rollout_id)} Exception during on_rollout_start hook.")

        with self.tracer.trace_context(name=f"rollout_{rollout_id}"):
            start_time = time.time()
            rollout_method = self.agent.training_rollout if task.mode == "train" else self.agent.validation_rollout
            # Pass the task input, not the whole task object
            result = rollout_method(task.input, task.rollout_id, resources_update.resources)
            rollout_obj = self._to_rollout_object(result, task.rollout_id)
            end_time = time.time()
            logger.info(
                f"{self._log_prefix(rollout_id)} Completed in "
                f"{end_time - start_time:.2f}s. Triplet length: "
                f"{len(rollout_obj.triplets) if rollout_obj.triplets is not None else 'N/A'}. "
                f"Reward: {rollout_obj.final_reward}"
            )

    except Exception:
        logger.exception(f"{self._log_prefix(rollout_id)} Exception during rollout.")
    finally:
        try:
            self.agent.on_rollout_end(task, rollout_obj, self, self.tracer)
        except Exception:
            logger.exception(f"{self._log_prefix(rollout_id)} Exception during on_rollout_end hook.")
        self.client.post_rollout(rollout_obj)

    return True

run_async() async

Poll the task and rollout once.

Source code in agentlightning/runner.py
async def run_async(self) -> bool:
    """Poll the task and rollout once."""
    self.agent.set_runner(self)  # Ensure the agent has a reference to this runner

    task = await self.client.poll_next_task_async()
    if task is None:
        logger.info(f"{self._log_prefix()} Poll returned no task. Exiting.")
        return False
    rollout_id = task.rollout_id

    resources_id = task.resources_id
    resources_update = None
    if resources_id:
        resources_update = await self.client.get_resources_by_id_async(resources_id)
    else:
        logger.debug(f"{self._log_prefix(rollout_id)} No 'resources_id'. Fetching latest resources.")
        resources_update = await self.client.get_latest_resources_async()
    if not resources_update:
        logger.error(f"{self._log_prefix(rollout_id)} Failed to fetch resources. Skipping.")
        return False

    rollout_obj = Rollout(rollout_id=task.rollout_id)  # Default empty rollout

    try:
        try:
            self.agent.on_rollout_start(task, self, self.tracer)
        except Exception:
            logger.exception(f"{self._log_prefix(rollout_id)} Exception during on_rollout_start hook.")

        with self.tracer.trace_context(name=f"rollout_{rollout_id}"):
            start_time = time.time()
            rollout_method = (
                self.agent.training_rollout_async if task.mode == "train" else self.agent.validation_rollout_async
            )
            # Pass the task input, not the whole task object
            result = await rollout_method(task.input, task.rollout_id, resources_update.resources)
            rollout_obj = self._to_rollout_object(result, task.rollout_id)
            end_time = time.time()
            logger.info(
                f"{self._log_prefix(rollout_id)} Completed in "
                f"{end_time - start_time:.2f}s. Reward: {rollout_obj.final_reward}"
            )
    except Exception:
        logger.exception(f"{self._log_prefix(rollout_id)} Exception during rollout.")
    finally:
        try:
            self.agent.on_rollout_end(task, rollout_obj, self, self.tracer)
        except Exception:
            logger.exception(f"{self._log_prefix(rollout_id)} Exception during on_rollout_end hook.")
        await self.client.post_rollout_async(rollout_obj)

    return True

agentlightning.trainer

Trainer

Bases: ParallelWorkerBase

Orchestrates the distributed execution of agent rollouts.

The Trainer is responsible for launching one or more worker processes that run the agent's execution loop. It manages multiprocessing, handles graceful shutdown, and serves as the main entry point for running a client-side agent fleet.

Attributes:

Name Type Description
dev

If True, rollouts are run against the dev endpoint provided in fit.

n_workers

Number of agent workers (processes) to run in parallel.

max_tasks

Maximum number of tasks to process per worker. If None, workers run until no more tasks are available.

daemon

Whether worker processes should be daemons. Daemon processes are terminated automatically when the main process exits.

tracer

A tracer instance, or a string pointing to the class full name or a dictionary with a 'type' key that specifies the class full name and other initialization parameters. If None, a default AgentOpsTracer will be created with the current settings.

triplet_exporter

An instance of TripletExporter to export triplets from traces, or a dictionary with the initialization parameters for the exporter.

Source code in agentlightning/trainer.py
class Trainer(ParallelWorkerBase):
    """Orchestrates the distributed execution of agent rollouts.

    The Trainer is responsible for launching one or more worker processes
    that run the agent's execution loop. It manages multiprocessing,
    handles graceful shutdown, and serves as the main entry point for
    running a client-side agent fleet.

    Attributes:
        dev: If True, rollouts are run against the dev endpoint provided in `fit`.
        n_workers: Number of agent workers (processes) to run in parallel.
        max_tasks: Maximum number of tasks to process per worker. If None,
                   workers run until no more tasks are available.
        daemon: Whether worker processes should be daemons. Daemon processes
                are terminated automatically when the main process exits.
        tracer: A tracer instance, or a string pointing to the class full name or a dictionary with a 'type' key
                that specifies the class full name and other initialization parameters.
                If None, a default `AgentOpsTracer` will be created with the current settings.
        triplet_exporter: An instance of `TripletExporter` to export triplets from traces,
                          or a dictionary with the initialization parameters for the exporter.
    """

    def __init__(
        self,
        *,
        dev: bool = False,
        n_workers: int = 1,
        max_tasks: Optional[int] = None,
        daemon: bool = True,
        tracer: Union[BaseTracer, str, dict, None] = None,
        triplet_exporter: Union[TripletExporter, dict, None] = None,
    ):
        super().__init__()
        self.n_workers = n_workers
        self.max_tasks = max_tasks
        self.daemon = daemon
        self.dev = dev
        self._client: AgentLightningClient | None = None  # Will be initialized in fit method

        self.tracer = self._make_tracer(tracer)
        if isinstance(triplet_exporter, TripletExporter):
            self.triplet_exporter = triplet_exporter
        elif isinstance(triplet_exporter, dict):
            self.triplet_exporter = TripletExporter(**triplet_exporter)
        elif triplet_exporter is None:
            self.triplet_exporter = TripletExporter()
        else:
            raise ValueError(
                f"Invalid triplet_exporter type: {type(triplet_exporter)}. Expected TripletExporter, dict, or None."
            )

        if not self.daemon:
            logger.warning(
                "daemon=False. Worker processes are non-daemonic. "
                "The worker processes will NOT be terminated when the main process exits. "
                "The cleanup must be handled manually."
            )

    def _make_tracer(self, tracer: Union[BaseTracer, str, dict, None]) -> BaseTracer:
        """Creates a tracer instance based on the provided configuration."""
        if isinstance(tracer, BaseTracer):
            return tracer
        if isinstance(tracer, str):
            module_name, class_name = tracer.rsplit(".", 1)
            module = importlib.import_module(module_name)
            tracer_cls = getattr(module, class_name)
            return tracer_cls()
        if isinstance(tracer, dict):
            tracer_type = tracer.get("type")
            if tracer_type is None:
                raise ValueError("tracer dict must have a 'type' key with the class full name")
            module_name, class_name = tracer_type.rsplit(".", 1)
            module = importlib.import_module(module_name)
            tracer_cls = getattr(module, class_name)
            # Remove 'type' key and pass remaining keys as kwargs
            tracer_kwargs = {k: v for k, v in tracer.items() if k != "type"}
            return tracer_cls(**tracer_kwargs)
        if tracer is None:
            return AgentOpsTracer(agentops_managed=True, instrument_managed=True, daemon=self.daemon)
        raise ValueError(f"Invalid tracer type: {type(tracer)}. Expected BaseTracer, str, dict, or None.")

    def init(self, backend: Union[str, AgentLightningClient]) -> None:
        logger.info(f"Initializing Trainer...")

        self._init_client(backend)

        self.tracer.init()

        logger.info(f"Trainer main initialization complete.")

    def teardown(self) -> None:
        logger.info(f"Cleaning up Trainer...")
        self.tracer.teardown()

        self._client = None
        logger.info(f"Trainer main cleanup complete.")

    def client(self) -> AgentLightningClient:
        """Returns the AgentLightningClient instance."""
        if self._client is None:
            raise RuntimeError("AgentLightningClient has not been initialized. Call `init` first.")
        return self._client

    def _init_client(self, backend: Union[str, AgentLightningClient]) -> AgentLightningClient:
        if self._client is None:
            if isinstance(backend, AgentLightningClient):
                logger.info("Using provided AgentLightningClient instance.")
                self._client = backend
            else:
                logger.info(f"Initializing AgentLightningClient with endpoint: {backend}")
                if not isinstance(backend, str):
                    raise ValueError("backend must be a string URL or an AgentLightningClient instance.")
                if not backend.startswith("http://") and not backend.startswith("https://"):
                    raise ValueError("backend must be a valid URL starting with http:// or https://")
                # Initialize the client with the provided backend URL
                self._client = AgentLightningClient(endpoint=backend)
        else:
            logger.warning("AgentLightningClient already initialized. Returning existing instance.")
        return self._client

    def _worker_main_loop(self, agent: LitAgent, worker_id: int, is_async: bool):
        """The main function for each worker process.

        This function initializes the client and the loop, then starts the
        execution. It also configures process-specific settings like the
        process title and signal handling.

        Args:
            agent: The `LitAgent` instance to run.
            worker_id: The unique ID for this worker.
            is_async: A boolean indicating if the async loop should be run.
        """
        if self.n_workers > 1:
            import setproctitle

            # Ignore Ctrl+C in worker processes; the main process handles it
            signal.signal(signal.SIGINT, signal.SIG_IGN)
            setproctitle.setproctitle(multiprocessing.current_process().name)

        # Now we are in child processes, so we can safely set up the environment.
        agent.set_trainer(self)
        # TODO: this should be set elsewhere
        if agent.trained_agents:
            self.triplet_exporter.agent_match = agent.trained_agents
        self._initialize_worker_env(worker_id)

        mode = "Async" if is_async else "Sync"
        logger.info(f"[Worker {worker_id}] {mode} worker process started.")

        num_processed = 0

        try:
            client = self.client()
            loop = AgentRunner(
                agent=agent,
                client=client,
                tracer=self.tracer,
                triplet_exporter=self.triplet_exporter,
                max_tasks=self.max_tasks,
                worker_id=worker_id,
            )
            loop.init_worker(worker_id)
            if is_async:
                num_processed = asyncio.run(loop.iter_async())
            else:
                num_processed = loop.iter()
        except Exception:
            logger.exception(f"[Worker {worker_id}] Unhandled exception in worker loop.")
        finally:
            self._teardown_worker_env(worker_id)

        return num_processed

    def _initialize_worker_env(self, worker_id: int):
        logger.info(f"[Worker {worker_id}] Setting up trainer environment...")  # worker_id included in process name
        self.tracer.init_worker(worker_id)

    def _teardown_worker_env(self, worker_id: int):
        logger.info(f"[Worker {worker_id}] Cleaning up trainer environment...")
        self.tracer.teardown_worker(worker_id)
        logger.info(f"[Worker {worker_id}] Environment cleanup complete.")

    @staticmethod
    def kill_orphaned_processes() -> None:
        """
        Kill any orphaned processes that may have been left behind by previous runs.
        This is useful for cleaning up after crashes or unexpected exits.
        """
        import psutil

        for proc in psutil.process_iter():
            # check whether the process name matches
            if proc.name().startswith("AgentLightning-"):
                proc.kill()

    def fit(
        self,
        agent: LitAgent,
        backend: Union[str, AgentLightningClient],
        dev_backend: Union[str, AgentLightningClient, None] = None,
    ):
        if self.dev:
            if dev_backend is None:
                raise ValueError("dev_backend must be provided when dev=True.")
            logger.warning(f"Running in dev mode. Using dev backend: {dev_backend}")
            self.init(dev_backend)
        else:
            logger.debug(f"Running in non-dev mode. Using backend: {backend}")
            self.init(backend)

        processes: List[multiprocessing.Process] = []

        # Determine if the agent is asynchronous.
        is_async = (
            hasattr(agent, "training_rollout_async")
            and agent.__class__.training_rollout_async is not LitAgent.training_rollout_async
        )

        mode = "asynchronous" if is_async else "synchronous"

        try:
            if self.n_workers == 1:
                logger.info(f"Running with n_workers=1 ({mode} in main process).")
                num_tasks = self._worker_main_loop(agent, 0, is_async)
                logger.info(f"Single worker mode finished. Tasks processed: {num_tasks}")
            else:
                logger.info(f"Running with n_workers={self.n_workers} ({mode} multiprocessing).")
                for i in range(self.n_workers):
                    process_name = f"AgentLightning-Worker-{i}"
                    p = multiprocessing.Process(
                        target=self._worker_main_loop,
                        args=(agent, i, is_async),
                        daemon=self.daemon,
                        name=process_name,
                    )
                    processes.append(p)
                    logger.info(f"Starting worker process {i} (name: {process_name})...")
                    p.start()

                if self.daemon:
                    for i, p in enumerate(processes):
                        p.join()  # Wait for the process to complete
                        logger.info(
                            f"Worker process {i} (name: {p.name}, PID: {p.pid}) joined with exit code {p.exitcode}."
                        )
                        if p.exitcode != 0:
                            logger.warning(
                                f"Worker process {i} (name: {p.name}, PID: {p.pid}) exited with non-zero code: {p.exitcode}."
                            )

                    logger.info(f"All {self.n_workers} worker processes have completed.")
                else:
                    logger.info("All worker processes started. Main process will not wait.")

                    # A hack to stop the main process from waiting for child processes to finish.
                    time.sleep(1)  # Give workers time to start
                    import multiprocessing.process as multiprocessing_process

                    multiprocessing_process._children.clear()  # type: ignore

        except KeyboardInterrupt:
            if self.n_workers > 1 and len(processes) > 0:
                logger.info(f"KeyboardInterrupt received. Terminating workers...")
                for i, p in enumerate(processes):
                    if p.is_alive():
                        logger.info(f"Terminating worker {i} (name: {p.name}, PID: {p.pid})...")
                        p.terminate()
                    else:
                        logger.info(
                            f"Worker {i} (name: {p.name}, PID: {p.pid}) is not alive or has already terminated."
                        )
                for i, p in enumerate(processes):
                    if p.is_alive():
                        p.join(timeout=10)  # Give some time to terminate
                    if p.is_alive():  # If still alive, kill
                        logger.warning(
                            f"Worker {i} (name: {p.name}, PID: {p.pid}) did not terminate gracefully, killing..."
                        )
                        p.kill()
                        p.join(timeout=10)  # Ensure it's reaped
            logger.info(f"Workers terminated or single worker interrupted.")
        except Exception as e:
            logger.exception(f"Unhandled exception in fit method.")
        finally:
            if self.daemon:
                self.teardown()
            else:
                logger.info("Main process exiting. Please use Trainer.kill_orphaned_processes() for cleanup.")

client()

Returns the AgentLightningClient instance.

Source code in agentlightning/trainer.py
def client(self) -> AgentLightningClient:
    """Returns the AgentLightningClient instance."""
    if self._client is None:
        raise RuntimeError("AgentLightningClient has not been initialized. Call `init` first.")
    return self._client

kill_orphaned_processes() staticmethod

Kill any orphaned processes that may have been left behind by previous runs. This is useful for cleaning up after crashes or unexpected exits.

Source code in agentlightning/trainer.py
@staticmethod
def kill_orphaned_processes() -> None:
    """
    Kill any orphaned processes that may have been left behind by previous runs.
    This is useful for cleaning up after crashes or unexpected exits.
    """
    import psutil

    for proc in psutil.process_iter():
        # check whether the process name matches
        if proc.name().startswith("AgentLightning-"):
            proc.kill()

agentlightning.tracer

agentlightning.reward

reward(fn)

A decorator to wrap a function that computes rewards. It will automatically handle the input and output of the function.

Source code in agentlightning/reward.py
def reward(fn: callable) -> callable:
    """
    A decorator to wrap a function that computes rewards.
    It will automatically handle the input and output of the function.
    """

    def wrap_result(result: Optional[float]) -> RewardSpanData:
        """
        Wrap the result of the function in a dict.
        """
        if result is None:
            return {"type": "reward", "value": None}
        if not isinstance(result, (float, int)):
            warnings.warn(f"Reward is ignored because it is not a number: {result}")
            return {"type": "reward", "value": None}
        return {"type": "reward", "value": float(result)}

    # Check if the function is async
    is_async = asyncio.iscoroutinefunction(fn) or inspect.iscoroutinefunction(fn)

    if is_async:

        async def wrapper_async(*args, **kwargs):
            result: Optional[float] = None

            @operation
            async def agentops_reward_operation() -> RewardSpanData:
                # The reward function we are interested in tracing
                # It takes zero inputs and return a formatted dict
                nonlocal result
                result = await fn(*args, **kwargs)
                return wrap_result(result)

            await agentops_reward_operation()
            return result

        return wrapper_async

    else:

        def wrapper(*args, **kwargs):
            result: Optional[float] = None

            @operation
            def agentops_reward_operation() -> RewardSpanData:
                nonlocal result
                result = fn(*args, **kwargs)
                return wrap_result(result)

            agentops_reward_operation()
            return result

        return wrapper

Server Side

agentlightning.server

AgentLightningServer

The main SDK class for developers to control the Agent Lightning Server.

This class manages the server lifecycle, task queueing, resources updates, and retrieval of results, providing a simple interface for the optimization logic.

Source code in agentlightning/server.py
class AgentLightningServer:
    """
    The main SDK class for developers to control the Agent Lightning Server.

    This class manages the server lifecycle, task queueing, resources updates,
    and retrieval of results, providing a simple interface for the optimization logic.
    """

    def __init__(self, host: str = "127.0.0.1", port: int = 8000, task_timeout_seconds: float = 300.0):
        """
        Initializes the server controller.

        Args:
            host: The host to bind the server to.
            port: The port to bind the server to.
            task_timeout_seconds: Time in seconds after which a claimed task is considered stale and requeued.
        """
        self.host = host
        self.port = port
        self.endpoint = f"http://{host}:{port}"
        self._task_timeout_seconds = task_timeout_seconds

        # Defer initialization and use event for cross-thread communication
        self._store: Optional[ServerDataStore] = None
        self.loop: Optional[asyncio.AbstractEventLoop] = None
        self.startup_event = threading.Event()

        # Create FastAPI app instance with a lifespan manager
        self._app = FastAPI(lifespan=self._lifespan)
        self._setup_routes()

        self._uvicorn_config = uvicorn.Config(self._app, host=self.host, port=self.port, log_level="info")
        self._uvicorn_server = uvicorn.Server(self._uvicorn_config)

    # --- ADDED: Lifespan context manager ---
    @asynccontextmanager
    async def _lifespan(self, app: FastAPI):
        """
        Manages server startup and shutdown. This runs inside the server's event loop.
        """
        logger.info("Server is starting up...")
        self.loop = asyncio.get_running_loop()
        self._store = ServerDataStore()  # Initialize data store here
        self.startup_event.set()  # Signal that the server is ready

        yield

        logger.info("Server is shutting down.")
        self._store = None
        self.startup_event.clear()  # Clear the startup event
        self.loop = None

    async def _check_and_requeue_stale_tasks(self):
        """
        Check for stale tasks and requeue them. Called reactively during get_next_task.
        """
        current_time = time.time()
        # Ensure store is initialized before checking
        if not self._store:
            return
        processing_tasks = self._store.get_processing_tasks()

        for rollout_id, task in processing_tasks.items():
            if task.last_claim_time and current_time - task.last_claim_time > self._task_timeout_seconds:
                await self._store.requeue_task(task)
                logger.warning(
                    f"Task {task.rollout_id} timed out after {self._task_timeout_seconds}s, requeued (attempt {task.num_claims})"
                )

    def _setup_routes(self):
        """Setup FastAPI routes."""

        @self._app.get("/task", response_model=TaskIfAny)
        async def next_task() -> TaskIfAny:
            """Endpoint for clients to poll for the next available task."""
            await self._check_and_requeue_stale_tasks()

            if not self._store:
                return TaskIfAny(is_available=False)

            task = await self._store.get_next_task()
            if task:
                logger.debug(f"Serving task {task.rollout_id} to a client.")
                return TaskIfAny(is_available=True, task=task)
            else:
                logger.debug("No task available for client.")
                return TaskIfAny(is_available=False)

        @self._app.get("/resources/latest", response_model=ResourcesUpdate)
        async def fetch_latest_resources() -> ResourcesUpdate:
            """Endpoint for clients to poll for the latest available resources."""
            if not self._store:
                raise HTTPException(status_code=503, detail="Server not fully initialized.")
            resources_update = await self._store.get_latest_resources()
            if not resources_update:
                raise HTTPException(status_code=404, detail="No resources have been set on the server.")
            logger.debug(f"Serving latest resources '{resources_update.resources_id}' to a client.")
            return resources_update

        @self._app.get("/resources/{resource_id}", response_model=ResourcesUpdate)
        async def fetch_resources_by_id(
            resource_id: str = Path(..., description="The unique identifier for the resource version.")
        ) -> ResourcesUpdate:
            """Endpoint for clients to fetch a specific version of resources."""
            if not self._store:
                raise HTTPException(status_code=503, detail="Server not fully initialized.")
            resources_update = await self._store.get_resources_by_id(resource_id)
            if not resources_update:
                raise HTTPException(status_code=404, detail=f"Resource ID '{resource_id}' not found.")
            logger.debug(f"Serving resources for ID '{resource_id}' to a client.")
            return resources_update

        @self._app.post("/rollout", response_model=GenericResponse)
        async def post_rollout(payload: Rollout) -> GenericResponse:
            """Endpoint for clients to report a completed rollout."""
            if not self._store:
                raise HTTPException(status_code=503, detail="Server not fully initialized.")
            await self._store.store_rollout(payload)
            return GenericResponse(
                status="ok",
                message=f"Rollout {payload.rollout_id} received and stored.",
            )

    async def start(self):
        """Starts the FastAPI server in the background."""
        logger.info(f"Starting server at {self.endpoint}")
        asyncio.create_task(self._uvicorn_server.serve())
        await asyncio.sleep(1)  # Allow time for server to start up.

    async def stop(self):
        """Gracefully stops the running FastAPI server."""
        if self._uvicorn_server.started:
            logger.info("Stopping server...")
            self._uvicorn_server.should_exit = True
            await asyncio.sleep(1)  # Allow time for graceful shutdown.
            logger.info("Server stopped.")

    async def run_forever(self):
        """
        Runs the server indefinitely until stopped.
        This is useful when async start and stop methods do not work.
        """
        await self._uvicorn_server.serve()

    async def queue_task(
        self,
        sample: Any,
        mode: Literal["train", "val", "test"] | None = None,
        resources_id: str | None = None,
        metadata: Dict[str, Any] | None = None,
    ) -> str:
        """
        Adds a task to the queue for a client to process.
        """
        if not self._store:
            raise RuntimeError("Store not initialized. The server may not be running.")
        return await self._store.add_task(sample, mode=mode, resources_id=resources_id, metadata=metadata)

    async def update_resources(self, resources: NamedResources) -> str:
        """
        Updates the resources, creating a new version and setting it as the latest.
        """
        if not self._store:
            raise RuntimeError("Store not initialized. The server may not be running.")
        resources_id = f"res-{uuid.uuid4()}"
        update = ResourcesUpdate(resources_id=resources_id, resources=resources)
        await self._store.update_resources(update)
        return resources_id

    async def get_completed_rollout(self, rollout_id: str) -> Optional[Rollout]:
        """
        Retrieves a specific completed rollout by its ID.
        """
        if not self._store:
            raise RuntimeError("Store not initialized. The server may not be running.")
        return await self._store.retrieve_rollout(rollout_id)

    async def poll_completed_rollout(self, rollout_id: str, timeout: Optional[float] = None) -> Optional[Rollout]:
        """
        Polls for a completed rollout by its ID, waiting up to `timeout` seconds.
        """
        start_time = time.time()
        while True:
            rollout = await self.get_completed_rollout(rollout_id)
            if rollout:
                return rollout
            if timeout and (time.time() - start_time) >= timeout:
                return None
            await asyncio.sleep(1)

    async def retrieve_completed_rollouts(self) -> List[Rollout]:
        """
        Retrieves all available completed trajectories and clears the internal store.
        """
        if not self._store:
            raise RuntimeError("Store not initialized. The server may not be running.")
        return await self._store.retrieve_completed_rollouts()

__init__(host='127.0.0.1', port=8000, task_timeout_seconds=300.0)

Initializes the server controller.

Parameters:

Name Type Description Default
host str

The host to bind the server to.

'127.0.0.1'
port int

The port to bind the server to.

8000
task_timeout_seconds float

Time in seconds after which a claimed task is considered stale and requeued.

300.0
Source code in agentlightning/server.py
def __init__(self, host: str = "127.0.0.1", port: int = 8000, task_timeout_seconds: float = 300.0):
    """
    Initializes the server controller.

    Args:
        host: The host to bind the server to.
        port: The port to bind the server to.
        task_timeout_seconds: Time in seconds after which a claimed task is considered stale and requeued.
    """
    self.host = host
    self.port = port
    self.endpoint = f"http://{host}:{port}"
    self._task_timeout_seconds = task_timeout_seconds

    # Defer initialization and use event for cross-thread communication
    self._store: Optional[ServerDataStore] = None
    self.loop: Optional[asyncio.AbstractEventLoop] = None
    self.startup_event = threading.Event()

    # Create FastAPI app instance with a lifespan manager
    self._app = FastAPI(lifespan=self._lifespan)
    self._setup_routes()

    self._uvicorn_config = uvicorn.Config(self._app, host=self.host, port=self.port, log_level="info")
    self._uvicorn_server = uvicorn.Server(self._uvicorn_config)

get_completed_rollout(rollout_id) async

Retrieves a specific completed rollout by its ID.

Source code in agentlightning/server.py
async def get_completed_rollout(self, rollout_id: str) -> Optional[Rollout]:
    """
    Retrieves a specific completed rollout by its ID.
    """
    if not self._store:
        raise RuntimeError("Store not initialized. The server may not be running.")
    return await self._store.retrieve_rollout(rollout_id)

poll_completed_rollout(rollout_id, timeout=None) async

Polls for a completed rollout by its ID, waiting up to timeout seconds.

Source code in agentlightning/server.py
async def poll_completed_rollout(self, rollout_id: str, timeout: Optional[float] = None) -> Optional[Rollout]:
    """
    Polls for a completed rollout by its ID, waiting up to `timeout` seconds.
    """
    start_time = time.time()
    while True:
        rollout = await self.get_completed_rollout(rollout_id)
        if rollout:
            return rollout
        if timeout and (time.time() - start_time) >= timeout:
            return None
        await asyncio.sleep(1)

queue_task(sample, mode=None, resources_id=None, metadata=None) async

Adds a task to the queue for a client to process.

Source code in agentlightning/server.py
async def queue_task(
    self,
    sample: Any,
    mode: Literal["train", "val", "test"] | None = None,
    resources_id: str | None = None,
    metadata: Dict[str, Any] | None = None,
) -> str:
    """
    Adds a task to the queue for a client to process.
    """
    if not self._store:
        raise RuntimeError("Store not initialized. The server may not be running.")
    return await self._store.add_task(sample, mode=mode, resources_id=resources_id, metadata=metadata)

retrieve_completed_rollouts() async

Retrieves all available completed trajectories and clears the internal store.

Source code in agentlightning/server.py
async def retrieve_completed_rollouts(self) -> List[Rollout]:
    """
    Retrieves all available completed trajectories and clears the internal store.
    """
    if not self._store:
        raise RuntimeError("Store not initialized. The server may not be running.")
    return await self._store.retrieve_completed_rollouts()

run_forever() async

Runs the server indefinitely until stopped. This is useful when async start and stop methods do not work.

Source code in agentlightning/server.py
async def run_forever(self):
    """
    Runs the server indefinitely until stopped.
    This is useful when async start and stop methods do not work.
    """
    await self._uvicorn_server.serve()

start() async

Starts the FastAPI server in the background.

Source code in agentlightning/server.py
async def start(self):
    """Starts the FastAPI server in the background."""
    logger.info(f"Starting server at {self.endpoint}")
    asyncio.create_task(self._uvicorn_server.serve())
    await asyncio.sleep(1)  # Allow time for server to start up.

stop() async

Gracefully stops the running FastAPI server.

Source code in agentlightning/server.py
async def stop(self):
    """Gracefully stops the running FastAPI server."""
    if self._uvicorn_server.started:
        logger.info("Stopping server...")
        self._uvicorn_server.should_exit = True
        await asyncio.sleep(1)  # Allow time for graceful shutdown.
        logger.info("Server stopped.")

update_resources(resources) async

Updates the resources, creating a new version and setting it as the latest.

Source code in agentlightning/server.py
async def update_resources(self, resources: NamedResources) -> str:
    """
    Updates the resources, creating a new version and setting it as the latest.
    """
    if not self._store:
        raise RuntimeError("Store not initialized. The server may not be running.")
    resources_id = f"res-{uuid.uuid4()}"
    update = ResourcesUpdate(resources_id=resources_id, resources=resources)
    await self._store.update_resources(update)
    return resources_id

ServerDataStore

A centralized, thread-safe, async, in-memory data store for the server's state. This holds the task queue, versioned resources, and completed rollouts.

Source code in agentlightning/server.py
class ServerDataStore:
    """
    A centralized, thread-safe, async, in-memory data store for the server's state.
    This holds the task queue, versioned resources, and completed rollouts.
    """

    def __init__(self):
        self._task_queue: asyncio.Queue[Task] = asyncio.Queue()
        self._processing_tasks: Dict[str, Task] = {}  # Currently processing tasks
        self._completed_rollouts: Dict[str, Rollout] = {}

        # Store for versioned resources
        self._resource_versions: Dict[str, NamedResources] = {}
        self._latest_resources_id: Optional[str] = None

        # Locks for thread-safe access
        self._results_lock = asyncio.Lock()
        self._resources_lock = asyncio.Lock()

    async def add_task(
        self,
        sample: Any,
        mode: Literal["train", "val", "test"] | None = None,
        resources_id: str | None = None,
        metadata: Dict[str, Any] | None = None,
    ) -> str:
        """
        Adds a new task to the queue with specific metadata and returns its unique ID.
        """
        rollout_id = f"rollout-{uuid.uuid4()}"
        task = Task(
            rollout_id=rollout_id,
            input=sample,
            mode=mode,
            resources_id=resources_id,
            create_time=time.time(),
            num_claims=0,
            metadata=metadata or {},
        )
        await self._task_queue.put(task)
        logger.info(f"Task queued: {rollout_id} (mode: {mode}, resources_id: {resources_id})")
        return rollout_id

    async def get_next_task(self) -> Optional[Task]:
        """
        Retrieves the next task from the queue without blocking.
        Returns None if the queue is empty.
        """
        try:
            async with self._results_lock:
                task = self._task_queue.get_nowait()
                task = task.model_copy(
                    update={
                        "last_claim_time": time.time(),
                        "num_claims": (task.num_claims or 0) + 1,
                    }
                )
                self._processing_tasks[task.rollout_id] = task
                if task.num_claims == 1:
                    logger.debug(f"Next task retrieved: {task.rollout_id}")
                else:
                    logger.info(f"Task {task.rollout_id} re-claimed (attempt {task.num_claims})")
                return task
        except asyncio.QueueEmpty:
            return None

    async def update_resources(self, update: ResourcesUpdate):
        """
        Safely stores a new version of named resources and sets it as the latest.
        """
        # TODO: evict old resources if necessary.
        async with self._resources_lock:
            self._resource_versions[update.resources_id] = update.resources
            self._latest_resources_id = update.resources_id
            logger.info(f"Resources updated. New version '{update.resources_id}' is now latest.")

    async def get_resources_by_id(self, resources_id: str) -> Optional[ResourcesUpdate]:
        """
        Safely retrieves a specific version of named resources by its ID.
        """
        async with self._resources_lock:
            resources = self._resource_versions.get(resources_id)
            if resources:
                return ResourcesUpdate(resources_id=resources_id, resources=resources)
            return None

    async def get_latest_resources(self) -> Optional[ResourcesUpdate]:
        """
        Safely retrieves the latest version of named resources.
        """
        if self._latest_resources_id:
            return await self.get_resources_by_id(self._latest_resources_id)
        return None

    async def store_rollout(self, rollout: Rollout):
        """
        Safely stores a completed rollout from a client.
        """
        async with self._results_lock:
            self._processing_tasks.pop(rollout.rollout_id, None)
            self._completed_rollouts[rollout.rollout_id] = rollout
            logger.info(f"Rollout received and stored: {rollout.rollout_id}")

    async def retrieve_rollout(self, rollout_id: str) -> Optional[Rollout]:
        """
        Safely retrieves a single rollout by its ID, removing it from the store.
        """
        async with self._results_lock:
            return self._completed_rollouts.pop(rollout_id, None)

    async def retrieve_completed_rollouts(self) -> List[Rollout]:
        """
        Retrieves all completed rollouts and clears the store.
        """
        async with self._results_lock:
            rollouts = list(self._completed_rollouts.values())
            self._completed_rollouts.clear()
            return rollouts

    def get_processing_tasks(self) -> Dict[str, Task]:
        """Returns a copy of currently processing tasks for timeout checking."""
        return self._processing_tasks.copy()

    async def requeue_task(self, task: Task):
        """Requeues a task that has timed out and removes it from processing."""
        logger.warning(f"Requeuing task {task.rollout_id} after timeout (attempt {task.num_claims})")
        async with self._results_lock:
            # Remove from processing tasks
            self._processing_tasks.pop(task.rollout_id, None)
            self._task_queue.put_nowait(task)

add_task(sample, mode=None, resources_id=None, metadata=None) async

Adds a new task to the queue with specific metadata and returns its unique ID.

Source code in agentlightning/server.py
async def add_task(
    self,
    sample: Any,
    mode: Literal["train", "val", "test"] | None = None,
    resources_id: str | None = None,
    metadata: Dict[str, Any] | None = None,
) -> str:
    """
    Adds a new task to the queue with specific metadata and returns its unique ID.
    """
    rollout_id = f"rollout-{uuid.uuid4()}"
    task = Task(
        rollout_id=rollout_id,
        input=sample,
        mode=mode,
        resources_id=resources_id,
        create_time=time.time(),
        num_claims=0,
        metadata=metadata or {},
    )
    await self._task_queue.put(task)
    logger.info(f"Task queued: {rollout_id} (mode: {mode}, resources_id: {resources_id})")
    return rollout_id

get_latest_resources() async

Safely retrieves the latest version of named resources.

Source code in agentlightning/server.py
async def get_latest_resources(self) -> Optional[ResourcesUpdate]:
    """
    Safely retrieves the latest version of named resources.
    """
    if self._latest_resources_id:
        return await self.get_resources_by_id(self._latest_resources_id)
    return None

get_next_task() async

Retrieves the next task from the queue without blocking. Returns None if the queue is empty.

Source code in agentlightning/server.py
async def get_next_task(self) -> Optional[Task]:
    """
    Retrieves the next task from the queue without blocking.
    Returns None if the queue is empty.
    """
    try:
        async with self._results_lock:
            task = self._task_queue.get_nowait()
            task = task.model_copy(
                update={
                    "last_claim_time": time.time(),
                    "num_claims": (task.num_claims or 0) + 1,
                }
            )
            self._processing_tasks[task.rollout_id] = task
            if task.num_claims == 1:
                logger.debug(f"Next task retrieved: {task.rollout_id}")
            else:
                logger.info(f"Task {task.rollout_id} re-claimed (attempt {task.num_claims})")
            return task
    except asyncio.QueueEmpty:
        return None

get_processing_tasks()

Returns a copy of currently processing tasks for timeout checking.

Source code in agentlightning/server.py
def get_processing_tasks(self) -> Dict[str, Task]:
    """Returns a copy of currently processing tasks for timeout checking."""
    return self._processing_tasks.copy()

get_resources_by_id(resources_id) async

Safely retrieves a specific version of named resources by its ID.

Source code in agentlightning/server.py
async def get_resources_by_id(self, resources_id: str) -> Optional[ResourcesUpdate]:
    """
    Safely retrieves a specific version of named resources by its ID.
    """
    async with self._resources_lock:
        resources = self._resource_versions.get(resources_id)
        if resources:
            return ResourcesUpdate(resources_id=resources_id, resources=resources)
        return None

requeue_task(task) async

Requeues a task that has timed out and removes it from processing.

Source code in agentlightning/server.py
async def requeue_task(self, task: Task):
    """Requeues a task that has timed out and removes it from processing."""
    logger.warning(f"Requeuing task {task.rollout_id} after timeout (attempt {task.num_claims})")
    async with self._results_lock:
        # Remove from processing tasks
        self._processing_tasks.pop(task.rollout_id, None)
        self._task_queue.put_nowait(task)

retrieve_completed_rollouts() async

Retrieves all completed rollouts and clears the store.

Source code in agentlightning/server.py
async def retrieve_completed_rollouts(self) -> List[Rollout]:
    """
    Retrieves all completed rollouts and clears the store.
    """
    async with self._results_lock:
        rollouts = list(self._completed_rollouts.values())
        self._completed_rollouts.clear()
        return rollouts

retrieve_rollout(rollout_id) async

Safely retrieves a single rollout by its ID, removing it from the store.

Source code in agentlightning/server.py
async def retrieve_rollout(self, rollout_id: str) -> Optional[Rollout]:
    """
    Safely retrieves a single rollout by its ID, removing it from the store.
    """
    async with self._results_lock:
        return self._completed_rollouts.pop(rollout_id, None)

store_rollout(rollout) async

Safely stores a completed rollout from a client.

Source code in agentlightning/server.py
async def store_rollout(self, rollout: Rollout):
    """
    Safely stores a completed rollout from a client.
    """
    async with self._results_lock:
        self._processing_tasks.pop(rollout.rollout_id, None)
        self._completed_rollouts[rollout.rollout_id] = rollout
        logger.info(f"Rollout received and stored: {rollout.rollout_id}")

update_resources(update) async

Safely stores a new version of named resources and sets it as the latest.

Source code in agentlightning/server.py
async def update_resources(self, update: ResourcesUpdate):
    """
    Safely stores a new version of named resources and sets it as the latest.
    """
    # TODO: evict old resources if necessary.
    async with self._resources_lock:
        self._resource_versions[update.resources_id] = update.resources
        self._latest_resources_id = update.resources_id
        logger.info(f"Resources updated. New version '{update.resources_id}' is now latest.")

Utilities

agentlightning.config

This file is not carefully reviewed. It might contain unintentional bugs and issues. Please always review the parsed construction arguments before using them.

lightning_cli(*classes)

lightning_cli(cls1: Type[_C1]) -> _C1
lightning_cli(
    cls1: Type[_C1], cls2: Type[_C2]
) -> Tuple[_C1, _C2]
lightning_cli(
    cls1: Type[_C1], cls2: Type[_C2], cls3: Type[_C3]
) -> Tuple[_C1, _C2, _C3]
lightning_cli(
    cls1: Type[_C1],
    cls2: Type[_C2],
    cls3: Type[_C3],
    cls4: Type[_C4],
) -> Tuple[_C1, _C2, _C3, _C4]
lightning_cli(
    *classes: Type[CliConfigurable],
) -> Tuple[CliConfigurable, ...]

Parses command-line arguments to configure and instantiate provided CliConfigurable classes.

Parameters:

Name Type Description Default
*classes Type[CliConfigurable]

One or more classes that inherit from CliConfigurable. Each class's init parameters will be exposed as command-line arguments.

()

Returns:

Type Description
CliConfigurable | Tuple[CliConfigurable, ...]

A tuple of instantiated objects, corresponding to the input classes in order.

Source code in agentlightning/config.py
def lightning_cli(*classes: Type[CliConfigurable]) -> CliConfigurable | Tuple[CliConfigurable, ...]:
    """
    Parses command-line arguments to configure and instantiate provided CliConfigurable classes.

    Args:
        *classes: One or more classes that inherit from CliConfigurable. Each class's
                  __init__ parameters will be exposed as command-line arguments.

    Returns:
        A tuple of instantiated objects, corresponding to the input classes in order.
    """
    if not classes:
        return tuple()  # Return an empty tuple if no classes are provided

    parser = _create_argument_parser()

    # This map will store {cls: {init_param_name: argparse_dest_name}}
    class_arg_configs_maps: Dict[Type[CliConfigurable], Dict[str, str]] = {}

    for cls in classes:
        _add_arguments_for_class(parser, cls, class_arg_configs_maps)

    parsed_args = parser.parse_args()  # Uses sys.argv[1:] by default

    # Correctly handle single class case for return type matching overloads
    instances = _instantiate_classes(parsed_args, classes, class_arg_configs_maps)
    if len(classes) == 1:
        return instances[0]
    return instances

nullable_float(value)

Converts specific string values (case-insensitive) to None, otherwise returns the float.

Source code in agentlightning/config.py
def nullable_float(value: str) -> float | None:
    """Converts specific string values (case-insensitive) to None, otherwise returns the float."""
    if value.lower() in ["none", "null", "~", "nil"]:  # Define keywords for None
        return None
    try:
        return float(value)
    except ValueError:
        raise argparse.ArgumentTypeError(f"Invalid float value: '{value}'")

nullable_int(value)

Converts specific string values (case-insensitive) to None, otherwise returns the integer.

Source code in agentlightning/config.py
def nullable_int(value: str) -> int | None:
    """Converts specific string values (case-insensitive) to None, otherwise returns the integer."""
    if value.lower() in ["none", "null", "~", "nil"]:  # Define keywords for None
        return None
    try:
        return int(value)
    except ValueError:
        raise argparse.ArgumentTypeError(f"Invalid integer value: '{value}'")

nullable_str(value)

Converts specific string values (case-insensitive) to None, otherwise returns the string.

Source code in agentlightning/config.py
def nullable_str(value: str) -> str | None:
    """Converts specific string values (case-insensitive) to None, otherwise returns the string."""
    if value.lower() in ["none", "null", "~", "nil"]:  # Define keywords for None
        return None
    return value

agentlightning.types

NamedResources = Dict[str, ResourceUnion] module-attribute

A dictionary-like class to hold named resources.

Example

resources: NamedResources = { 'main_llm': LLM( endpoint="http://localhost:8080", model="llama3", sampling_parameters={'temperature': 0.7, 'max_tokens': 100} ), 'system_prompt': PromptTemplate( template="You are a helpful assistant.", engine='f-string' ) }

GenericResponse

Bases: BaseModel

A generic response message that can be used for various purposes.

Source code in agentlightning/types.py
class GenericResponse(BaseModel):
    """
    A generic response message that can be used for various purposes.
    """

    status: str = "success"
    message: Optional[str] = None
    data: Optional[Dict[str, Any]] = None

LLM

Bases: Resource

Provide an LLM endpoint and model name as a resource.

Attributes:

Name Type Description
endpoint str

The URL of the LLM API endpoint.

model str

The identifier for the model to be used (e.g., 'gpt-4o').

sampling_parameters SamplingParameters

A dictionary of hyperparameters for model inference, such as temperature, top_p, etc.

Source code in agentlightning/types.py
class LLM(Resource):
    """
    Provide an LLM endpoint and model name as a resource.

    Attributes:
        endpoint (str): The URL of the LLM API endpoint.
        model (str): The identifier for the model to be used (e.g., 'gpt-4o').
        sampling_parameters (SamplingParameters): A dictionary of hyperparameters
            for model inference, such as temperature, top_p, etc.
    """

    resource_type: Literal["llm"] = "llm"
    endpoint: str
    model: str
    sampling_parameters: Dict[str, Any] = Field(default_factory=dict)

ParallelWorkerBase

Base class for objects that can be parallelized across multiple worker processes.

This class defines the standard lifecycle for parallel processing:

Main Process
  1. init() - Initialize the object in the main process
  2. spawn workers and call init_worker() in each worker
  3. run() - Execute the main workload in parallel across workers
  4. teardown_worker() - Clean up resources in each worker
  5. teardown() - Final cleanup in the main process

Subclasses should implement the run() method and optionally override the lifecycle methods for custom initialization and cleanup behavior.

Source code in agentlightning/types.py
class ParallelWorkerBase:
    """Base class for objects that can be parallelized across multiple worker processes.

    This class defines the standard lifecycle for parallel processing:

    Main Process:
        1. init() - Initialize the object in the main process
        2. spawn workers and call init_worker() in each worker
        3. run() - Execute the main workload in parallel across workers
        4. teardown_worker() - Clean up resources in each worker
        5. teardown() - Final cleanup in the main process

    Subclasses should implement the run() method and optionally override
    the lifecycle methods for custom initialization and cleanup behavior.
    """

    def __init__(self) -> None:
        """Initialize the base class. This method can be overridden by subclasses."""
        self.worker_id: Optional[int] = None

    def init(self, *args: Any, **kwargs: Any) -> None:
        pass

    def init_worker(self, worker_id: int, *args: Any, **kwargs: Any) -> None:
        self.worker_id = worker_id

    def run(self, *args: Any, **kwargs: Any) -> Any:
        pass

    def teardown_worker(self, worker_id: int, *args: Any, **kwargs: Any) -> None:
        pass

    def teardown(self, *args: Any, **kwargs: Any) -> None:
        pass

__init__()

Initialize the base class. This method can be overridden by subclasses.

Source code in agentlightning/types.py
def __init__(self) -> None:
    """Initialize the base class. This method can be overridden by subclasses."""
    self.worker_id: Optional[int] = None

PromptTemplate

Bases: Resource

A prompt template as a resource.

Attributes:

Name Type Description
template str

The template string. The format depends on the engine.

engine Literal['jinja', 'f-string', 'poml']

The templating engine to use for rendering the prompt. I imagine users can use their own customized engines, but algos can only well operate on a subset of them.

Source code in agentlightning/types.py
class PromptTemplate(Resource):
    """
    A prompt template as a resource.

    Attributes:
        template (str): The template string. The format depends on the engine.
        engine (Literal['jinja', 'f-string', 'poml']): The templating engine
            to use for rendering the prompt. I imagine users can use their own
            customized engines, but algos can only well operate on a subset of them.
    """

    resource_type: Literal["prompt_template"] = "prompt_template"
    template: str
    engine: Literal["jinja", "f-string", "poml"]

Resource

Bases: BaseModel

Base class for all tunable resources.

Source code in agentlightning/types.py
class Resource(BaseModel):
    """
    Base class for all tunable resources.
    """

    resource_type: Any

ResourcesUpdate

Bases: BaseModel

A resource update message to be sent from the server to clients.

This message contains a dictionary of resources that clients should use for subsequent tasks. It is used to update the resources available to clients dynamically.

Source code in agentlightning/types.py
class ResourcesUpdate(BaseModel):
    """
    A resource update message to be sent from the server to clients.

    This message contains a dictionary of resources that clients should use
    for subsequent tasks. It is used to update the resources available to
    clients dynamically.
    """

    resources_id: str
    resources: NamedResources

Rollout

Bases: BaseModel

The standard reporting object from client to server.

Source code in agentlightning/types.py
class Rollout(BaseModel):
    """The standard reporting object from client to server."""

    rollout_id: str

    # Primary, high-level feedback
    final_reward: Optional[float] = None

    # Structured, sequential feedback for RL-style optimization
    triplets: Optional[List[Triplet]] = None

    # Optional, rich-context data for deep analysis
    trace: Optional[List[Dict[str, Any]]] = Field(
        default=None,
        description="A list of spans that conform to the OpenTelemetry JSON format. "
        "Users of the opentelemetry-sdk can generate this by calling "
        "json.loads(readable_span.to_json()).",
    )
    logs: Optional[List[str]] = None

    # A bucket for any other relevant information
    metadata: Dict[str, Any] = Field(default_factory=dict)

Task

Bases: BaseModel

A task (rollout request) to be processed by the client agent.

Source code in agentlightning/types.py
class Task(BaseModel):
    """A task (rollout request) to be processed by the client agent."""

    rollout_id: str
    input: TaskInput

    mode: Optional[Literal["train", "val", "test"]] = None
    resources_id: Optional[str] = None

    # Optional fields for tracking task lifecycle
    create_time: Optional[float] = None
    last_claim_time: Optional[float] = None
    num_claims: Optional[int] = None

    # Allow additional metadata fields
    metadata: Dict[str, Any] = Field(default_factory=dict)

Triplet

Bases: BaseModel

A standard structure for a single turn in a trajectory.

Source code in agentlightning/types.py
class Triplet(BaseModel):
    """A standard structure for a single turn in a trajectory."""

    prompt: Any
    response: Any
    reward: Optional[float] = None
    metadata: Dict[str, Any] = Field(default_factory=dict)

agentlightning.logging

agentlightning.instrumentation