Skip to content

API Reference — pytest Plugin

RAMPART's pytest integration. Activates automatically when installed.

_collection

Result collection infrastructure for the pytest plugin.

Provides the ContextVar-based mechanism for collecting Result objects produced during test execution. The pytest plugin activates a collector per test; execution event handlers write into it automatically.

ResultCollectionHandler

Bases: ExecutionEventHandler

Default ExecutionEventHandler installed on every BaseExecution.

Writes the Result into the active per-test collector on ON_POST_EXECUTE. No-op for all other events. No-op when no collector is active (safe to use outside pytest).

on_event async

Python
on_event(*, event_data)

Record result on post-execute. Ignore all other events.

Parameters:

Name Type Description Default
event_data ExecutionEventData

The event data.

required
Source code in rampart/pytest_plugin/_collection.py
Python
async def on_event(self, *, event_data: ExecutionEventData) -> None:
    """Record result on post-execute. Ignore all other events.

    Args:
        event_data (ExecutionEventData): The event data.
    """
    if event_data.event is not ExecutionEvent.ON_POST_EXECUTE:
        return
    if event_data.result is None:
        return
    collector = _active_collector.get()
    if collector is not None:
        collector.record(result=event_data.result)

ResultCollector

Python
ResultCollector()

Accumulates Result objects produced during a single test.

Framework-internal. Never referenced by test authors.

Source code in rampart/pytest_plugin/_collection.py
Python
def __init__(self) -> None:
    self._results: list[Result] = []

results property

Python
results

All results recorded so far.

record

Python
record(*, result)

Record a result.

Parameters:

Name Type Description Default
result Result

The result to record.

required
Source code in rampart/pytest_plugin/_collection.py
Python
def record(self, *, result: Result) -> None:
    """Record a result.

    Args:
        result (Result): The result to record.
    """
    self._results.append(result)

record_result

Python
record_result(result)

Record a Result into the active test's collector.

For building-block tests that construct Results manually rather than via Attacks. or Probes. factories. No-op when called outside a pytest test context (e.g., in library usage or scripts).

Re-exported from rampart at the top level — consumers import it as:

Text Only
from rampart import record_result

Parameters:

Name Type Description Default
result Result

The result to record.

required
Source code in rampart/pytest_plugin/_collection.py
Python
def record_result(result: Result) -> None:
    """Record a Result into the active test's collector.

    For building-block tests that construct Results manually rather
    than via Attacks.* or Probes.* factories. No-op when called
    outside a pytest test context (e.g., in library usage or scripts).

    Re-exported from rampart at the top level — consumers import it as:

        from rampart import record_result

    Args:
        result (Result): The result to record.
    """
    collector = _active_collector.get()
    if collector is not None:
        collector.record(result=result)

_session

Session-scoped state for the RAMPART pytest plugin.

Accumulates Result objects, computes trial group aggregates, and builds the final TestRunReport.

RampartSession

Python
RampartSession(*, sinks=None)

Session-scoped state for the RAMPART plugin.

Accumulates Result objects from all tests, stores trial group aggregates, tracks session duration, and builds the final TestRunReport. Holds configured sinks for report emission.

Parameters:

Name Type Description Default
sinks list[ReportSink]

Report sinks to emit to at session end. Defaults to an empty list (terminal-only output).

None
Source code in rampart/pytest_plugin/_session.py
Python
def __init__(self, *, sinks: list[ReportSink] | None = None) -> None:
    self._results: list[Result] = []
    self._results_by_nodeid: dict[str, list[Result]] = {}
    self._trial_groups: dict[str, TrialGroupResult] = {}
    self._trial_specs: dict[str, TrialSpec] = {}
    self._sinks: list[ReportSink] = sinks or []
    self._duration_seconds: float = 0.0
    self._cached_report: TestRunReport | None = None
    self._emitted: bool = False
    self._incomplete: bool = False
    self._incomplete_reasons: list[str] = []
    self._report_metadata: dict[str, object] = {}

sinks property

Python
sinks

Configured report sinks.

results_by_nodeid property

Python
results_by_nodeid

Read-only view of results grouped by pytest node ID.

is_emitted property

Python
is_emitted

True once report emission has been attempted (idempotency guard).

is_incomplete property

Python
is_incomplete

True if any worker failed to deliver complete results.

incomplete_reasons property

Python
incomplete_reasons

The recorded reasons the run is incomplete (empty if complete).

has_results property

Python
has_results

True if any results have been collected.

trial_groups property

Python
trial_groups

Trial group aggregates, keyed by base node ID.

trial_specs property

Python
trial_specs

Read-only view of registered trial specs, keyed by clone node ID.

add_sinks

Python
add_sinks(*, sinks)

Register additional sinks for report emission.

Called by the fixture-based bootstrap to add team-provided sinks.

Parameters:

Name Type Description Default
sinks list[ReportSink]

Sinks to append.

required

Raises:

Type Description
TypeError

If any item does not satisfy ReportSink.

Source code in rampart/pytest_plugin/_session.py
Python
def add_sinks(self, *, sinks: list[ReportSink]) -> None:
    """Register additional sinks for report emission.

    Called by the fixture-based bootstrap to add team-provided
    sinks.

    Args:
        sinks (list[ReportSink]): Sinks to append.

    Raises:
        TypeError: If any item does not satisfy ReportSink.
    """
    for sink in sinks:
        if not isinstance(sink, ReportSink):
            msg = (
                f"Expected ReportSink, got {type(sink).__name__}. "
                "Sinks must implement: "
                "async def emit_async(*, report: TestRunReport) -> None"
            )
            raise TypeError(
                msg,
            )
        self._sinks.append(sink)

set_duration

Python
set_duration(*, duration_seconds)

Set the total session duration.

Called by the plugin at session finish with the elapsed time since pytest_configure.

Parameters:

Name Type Description Default
duration_seconds float

Total wall-clock seconds.

required
Source code in rampart/pytest_plugin/_session.py
Python
def set_duration(self, *, duration_seconds: float) -> None:
    """Set the total session duration.

    Called by the plugin at session finish with the elapsed time
    since pytest_configure.

    Args:
        duration_seconds (float): Total wall-clock seconds.
    """
    self._duration_seconds = duration_seconds

absorb

Python
absorb(*, node, collector)

Absorb results from a completed test's collector.

Tags each result with the short test name (extracted from the node ID), the full node ID, its index within the test, and the harm category from @pytest.mark.harm. The nodeid and index give a total, deterministic ordering for the terminal summary and report regardless of xdist worker completion order.

Results are shallow-copied before tagging to avoid mutating objects the test body may still reference.

Parameters:

Name Type Description Default
node Item

The test item that just completed.

required
collector ResultCollector

The test's result collector.

required
Source code in rampart/pytest_plugin/_session.py
Python
def absorb(self, *, node: pytest.Item, collector: ResultCollector) -> None:
    """Absorb results from a completed test's collector.

    Tags each result with the short test name (extracted from the
    node ID), the full node ID, its index within the test, and the
    harm category from ``@pytest.mark.harm``. The nodeid and index
    give a total, deterministic ordering for the terminal summary and
    report regardless of xdist worker completion order.

    Results are shallow-copied before tagging to avoid mutating
    objects the test body may still reference.

    Args:
        node (pytest.Item): The test item that just completed.
        collector (ResultCollector): The test's result collector.
    """
    test_name = node.nodeid.split("::")[-1] if "::" in node.nodeid else node.nodeid
    harm_marker = node.get_closest_marker("harm")
    harm_category = (
        harm_marker.args[0] if harm_marker and harm_marker.args else None
    )

    collected = collector.results
    tagged: list[Result] = []
    for result_index, original_result in enumerate(collected):
        # Shallow copy is sufficient because we reconstruct all
        # mutable fields we modify (currently metadata and harm_category).
        result = copy.copy(original_result)
        result.metadata = {
            **result.metadata,
            "_pytest_test_name": test_name,
            "_pytest_nodeid": node.nodeid,
            "_rampart_result_index": result_index,
        }
        if harm_category is not None and result.harm_category is None:
            result.harm_category = harm_category
        tagged.append(result)
    self._results.extend(tagged)
    self._results_by_nodeid[node.nodeid] = tagged
    self._cached_report = None

record_trial_group

Python
record_trial_group(
    *, base_nodeid, clone_nodeids, threshold
)

Record aggregate statistics for a trial group.

Semantics: - Any UNSAFE result across all trials -> group FAILS - threshold is the minimum pass rate (SAFE / total). e.g. 0.8 means at least 80% of runs must be SAFE. - ERROR results count against the pass rate (they're not SAFE). - Clones with zero results (skipped or crashed before producing a Result) are tracked as no_result and count against the pass rate.

Parameters:

Name Type Description Default
base_nodeid str

The original test's node ID.

required
clone_nodeids Sequence[str]

Pytest node IDs of all clones in this trial group.

required
threshold float

Minimum pass rate required.

required
Source code in rampart/pytest_plugin/_session.py
Python
def record_trial_group(
    self,
    *,
    base_nodeid: str,
    clone_nodeids: Sequence[str],
    threshold: float,
) -> None:
    """Record aggregate statistics for a trial group.

    Semantics:
    - Any UNSAFE result across all trials -> group FAILS
    - threshold is the minimum pass rate (SAFE / total).
      e.g. 0.8 means at least 80% of runs must be SAFE.
    - ERROR results count against the pass rate (they're not SAFE).
    - Clones with zero results (skipped or crashed before producing
      a Result) are tracked as ``no_result`` and count against
      the pass rate.

    Args:
        base_nodeid (str): The original test's node ID.
        clone_nodeids (Sequence[str]): Pytest node IDs of all clones
            in this trial group.
        threshold (float): Minimum pass rate required.
    """
    if not clone_nodeids:
        return

    total = len(clone_nodeids)
    unsafe_count = 0
    error_count = 0
    safe_count = 0
    no_result_count = 0

    for nodeid in clone_nodeids:
        node_results = self._results_by_nodeid.get(nodeid, [])
        if not node_results:
            no_result_count += 1
            continue
        has_unsafe = any(r.status == SafetyStatus.UNSAFE for r in node_results)
        has_error = any(r.status == SafetyStatus.ERROR for r in node_results)
        has_safe = any(r.status == SafetyStatus.SAFE for r in node_results)
        if has_unsafe:
            unsafe_count += 1
        elif has_error:
            error_count += 1
        elif has_safe:
            safe_count += 1

    pass_rate = safe_count / total if total > 0 else 0.0
    passed = unsafe_count == 0 and pass_rate >= threshold

    self._trial_groups[base_nodeid] = TrialGroupResult(
        total=total,
        safe=safe_count,
        unsafe=unsafe_count,
        errors=error_count,
        no_result=no_result_count,
        threshold=threshold,
        pass_rate=pass_rate,
        passed=passed,
    )

register_trial_spec

Python
register_trial_spec(
    *, clone_nodeid, base_nodeid, threshold
)

Record trial metadata for a cloned item at collection time.

Called from pytest_collection_modifyitems whenever a @pytest.mark.trial test is expanded into clones. Stores the data needed for session-end aggregation in a form that survives the xdist worker→controller boundary.

Identical re-registration (same key, same spec) is a no-op so that repeated collection passes (e.g., in workers and the controller) converge safely.

Parameters:

Name Type Description Default
clone_nodeid str

Node ID of the cloned item.

required
base_nodeid str

Node ID of the original (uncloned) item.

required
threshold float

Pass-rate threshold from the trial marker.

required
Source code in rampart/pytest_plugin/_session.py
Python
def register_trial_spec(
    self,
    *,
    clone_nodeid: str,
    base_nodeid: str,
    threshold: float,
) -> None:
    """Record trial metadata for a cloned item at collection time.

    Called from ``pytest_collection_modifyitems`` whenever a
    ``@pytest.mark.trial`` test is expanded into clones. Stores
    the data needed for session-end aggregation in a form that
    survives the xdist worker→controller boundary.

    Identical re-registration (same key, same spec) is a no-op so
    that repeated collection passes (e.g., in workers and the
    controller) converge safely.

    Args:
        clone_nodeid (str): Node ID of the cloned item.
        base_nodeid (str): Node ID of the original (uncloned) item.
        threshold (float): Pass-rate threshold from the trial marker.
    """
    self._trial_specs[clone_nodeid] = TrialSpec(
        base_nodeid=base_nodeid,
        threshold=threshold,
    )

merge_trial_specs

Python
merge_trial_specs(*, trial_specs)

Merge trial specs received from an xdist worker payload.

Idempotent: re-merging identical specs is a no-op. Spec values from workers should match the controller's own collection because the same plugin code runs in every process; we merge defensively so the controller can aggregate correctly even when its own collection state is unavailable.

Parameters:

Name Type Description Default
trial_specs Mapping[str, TrialSpec]

Specs keyed by clone node ID.

required
Source code in rampart/pytest_plugin/_session.py
Python
def merge_trial_specs(
    self,
    *,
    trial_specs: Mapping[str, TrialSpec],
) -> None:
    """Merge trial specs received from an xdist worker payload.

    Idempotent: re-merging identical specs is a no-op. Spec values
    from workers should match the controller's own collection
    because the same plugin code runs in every process; we merge
    defensively so the controller can aggregate correctly even
    when its own collection state is unavailable.

    Args:
        trial_specs (Mapping[str, TrialSpec]): Specs keyed by
            clone node ID.
    """
    for clone_nodeid, spec in trial_specs.items():
        self._trial_specs.setdefault(clone_nodeid, spec)

merge_worker_results

Python
merge_worker_results(*, results_by_nodeid)

Merge an xdist worker's results into this session.

Extends both the flat _results list and the _results_by_nodeid mapping. Invalidates any cached report so the next build_report() reflects the merged data.

Parameters:

Name Type Description Default
results_by_nodeid dict[str, list[Result]]

Worker results grouped by pytest node ID.

required
Source code in rampart/pytest_plugin/_session.py
Python
def merge_worker_results(
    self,
    *,
    results_by_nodeid: dict[str, list[Result]],
) -> None:
    """Merge an xdist worker's results into this session.

    Extends both the flat ``_results`` list and the
    ``_results_by_nodeid`` mapping. Invalidates any cached report
    so the next ``build_report()`` reflects the merged data.

    Args:
        results_by_nodeid (dict[str, list[Result]]): Worker results
            grouped by pytest node ID.
    """
    for nodeid, results in results_by_nodeid.items():
        self._results.extend(results)
        self._results_by_nodeid.setdefault(nodeid, []).extend(results)
    self._cached_report = None

mark_emitted

Python
mark_emitted()

Mark the session as having attempted report emission.

Source code in rampart/pytest_plugin/_session.py
Python
def mark_emitted(self) -> None:
    """Mark the session as having attempted report emission."""
    self._emitted = True

mark_incomplete

Python
mark_incomplete(*, reason)

Record that a worker failed to deliver complete results.

Parameters:

Name Type Description Default
reason str

A short human-readable explanation surfaced in the report metadata.

required
Source code in rampart/pytest_plugin/_session.py
Python
def mark_incomplete(self, *, reason: str) -> None:
    """Record that a worker failed to deliver complete results.

    Args:
        reason (str): A short human-readable explanation surfaced
            in the report metadata.
    """
    self._incomplete = True
    self._incomplete_reasons.append(reason)
    self._cached_report = None

set_report_metadata

Python
set_report_metadata(*, metadata)

Attach run-level metadata that will appear on TestRunReport.

Used by the plugin to surface xdist run-mode information (active, worker count, dist mode). Subsequent calls merge into existing metadata.

Parameters:

Name Type Description Default
metadata dict[str, object]

Key/value pairs to attach.

required
Source code in rampart/pytest_plugin/_session.py
Python
def set_report_metadata(self, *, metadata: dict[str, object]) -> None:
    """Attach run-level metadata that will appear on ``TestRunReport``.

    Used by the plugin to surface xdist run-mode information
    (active, worker count, dist mode). Subsequent calls merge into
    existing metadata.

    Args:
        metadata (dict[str, object]): Key/value pairs to attach.
    """
    self._report_metadata.update(metadata)
    self._cached_report = None

build_report

Python
build_report()

Build a TestRunReport from all collected results.

The report is cached and reused on subsequent calls. The cache is invalidated when new results are absorbed or merged or when metadata is updated.

Results are sorted by (_pytest_nodeid, _rampart_result_index, _rampart_source_worker) for a total, deterministic ordering across xdist worker completion orders. _pytest_nodeid falls back to _pytest_test_name and _rampart_source_worker is absent (constant) outside xdist, so single-process ordering is unaffected.

These leading-underscore keys are RAMPART scheduling bookkeeping, namespaced to avoid colliding with user-supplied result metadata.

Returns:

Name Type Description
TestRunReport TestRunReport

Aggregated test run results.

Source code in rampart/pytest_plugin/_session.py
Python
def build_report(self) -> TestRunReport:
    """Build a TestRunReport from all collected results.

    The report is cached and reused on subsequent calls. The
    cache is invalidated when new results are absorbed or merged
    or when metadata is updated.

    Results are sorted by ``(_pytest_nodeid, _rampart_result_index,
    _rampart_source_worker)`` for a total, deterministic ordering across
    xdist worker completion orders. ``_pytest_nodeid`` falls back to
    ``_pytest_test_name`` and ``_rampart_source_worker`` is absent
    (constant) outside xdist, so single-process ordering is unaffected.

    These leading-underscore keys are RAMPART scheduling bookkeeping,
    namespaced to avoid colliding with user-supplied result metadata.

    Returns:
        TestRunReport: Aggregated test run results.
    """
    if self._cached_report is not None:
        return self._cached_report
    sorted_results = sorted(self._results, key=_result_sort_key)
    counts = Counter(r.status for r in sorted_results)
    metadata: dict[str, Any] = dict(self._report_metadata)
    if self._incomplete:
        metadata["incomplete"] = True
        metadata["incomplete_reasons"] = list(self._incomplete_reasons)
    self._cached_report = TestRunReport(
        results=sorted_results,
        total_runs=len(sorted_results),
        passed=counts[SafetyStatus.SAFE],
        failed=counts[SafetyStatus.UNSAFE],
        undetermined=counts[SafetyStatus.UNDETERMINED],
        errors=counts[SafetyStatus.ERROR],
        duration_seconds=self._duration_seconds,
        metadata=metadata,
    )
    return self._cached_report

TrialGroupResult dataclass

Python
TrialGroupResult(
    *,
    total,
    safe,
    unsafe,
    errors,
    no_result,
    threshold,
    pass_rate,
    passed,
)

Aggregate statistics for a trial group.

verdict property

Python
verdict

Human-readable verdict: PASSED or FAILED.

terminal_label property

Python
terminal_label

Short label for terminal output: PASS or FAIL.

detail property

Python
detail

Summary detail string for terminal output (e.g. '8/10 safe, 2 no-result').

has_unsafe property

Python
has_unsafe

True if any trial produced an UNSAFE result.

Parallel Execution Hooks

When pytest-xdist is installed, the plugin registers pytest_testnodedown (as an optional hook) to merge worker results into the controller session. See Parallel Execution for the data flow and trust boundary.

_xdist

xdist support for RAMPART's pytest plugin.

Provides serialization, deserialization, and controller-side merge logic for running RAMPART under pytest-xdist. Workers serialize their Result objects into config.workeroutput; the controller merges worker payloads in pytest_testnodedown and emits a single unified report at session end.

Trust boundary: worker payloads may contain attacker-controlled content (agent responses, payload text). Serialization is strictly JSON-safe primitives; deserialization validates schema version, enum values, and metadata depth; ANSI escapes are stripped from free text as defense-in-depth.

SCHEMA_VERSION module-attribute

Python
SCHEMA_VERSION = 'rampart.xdist.v1'

WORKEROUTPUT_KEY module-attribute

Python
WORKEROUTPUT_KEY = 'rampart_xdist_v1'

SIZE_LIMIT_OPTION module-attribute

Python
SIZE_LIMIT_OPTION = 'rampart_xdist_max_bytes'

DEFAULT_SIZE_LIMIT_BYTES module-attribute

Python
DEFAULT_SIZE_LIMIT_BYTES = 64 * 1024 * 1024

WorkerOutputError

Bases: Exception

Base error for xdist worker output processing failures.

SchemaVersionError

Bases: WorkerOutputError

Raised when a worker payload has missing or unknown schema version.

SizeLimitError

Bases: WorkerOutputError

Raised when a worker payload exceeds the configured size cap.

is_xdist_worker

Python
is_xdist_worker(*, config)

Return True when this process is a pytest-xdist worker.

Detection is attribute-based; no xdist import required, so this function is safe to call when pytest-xdist is not installed.

Parameters:

Name Type Description Default
config Config

The pytest configuration object.

required

Returns:

Name Type Description
bool bool

True if running in an xdist worker process.

Source code in rampart/pytest_plugin/_xdist.py
Python
def is_xdist_worker(*, config: pytest.Config) -> bool:
    """Return True when this process is a pytest-xdist worker.

    Detection is attribute-based; no xdist import required, so this
    function is safe to call when pytest-xdist is not installed.

    Args:
        config (pytest.Config): The pytest configuration object.

    Returns:
        bool: True if running in an xdist worker process.
    """
    return hasattr(config, "workerinput")

is_xdist_controller

Python
is_xdist_controller(*, config)

Return True when this process is the pytest-xdist controller.

The controller is the non-worker process that owns an active distribution: a --dist mode other than "no" plus at least one way of spawning execution endpoints (--numprocesses workers or explicit --tx gateways). Keying off distribution rather than the worker count alone keeps -d/--tx runs (no -n) on the controller path while excluding a bare --dist with no endpoints.

Parameters:

Name Type Description Default
config Config

The pytest configuration object.

required

Returns:

Name Type Description
bool bool

True if running in the xdist controller process.

Source code in rampart/pytest_plugin/_xdist.py
Python
def is_xdist_controller(*, config: pytest.Config) -> bool:
    """Return True when this process is the pytest-xdist controller.

    The controller is the non-worker process that owns an active
    distribution: a ``--dist`` mode other than ``"no"`` plus at least one
    way of spawning execution endpoints (``--numprocesses`` workers or
    explicit ``--tx`` gateways). Keying off distribution rather than the
    worker count alone keeps ``-d``/``--tx`` runs (no ``-n``) on the
    controller path while excluding a bare ``--dist`` with no endpoints.

    Args:
        config (pytest.Config): The pytest configuration object.

    Returns:
        bool: True if running in the xdist controller process.
    """
    if is_xdist_worker(config=config):
        return False
    if get_dist_mode(config=config) == "no":
        return False
    numprocesses = getattr(config.option, "numprocesses", None)
    tx = getattr(config.option, "tx", None)
    return bool(numprocesses) or bool(tx)

get_dist_mode

Python
get_dist_mode(*, config)

Return the active --dist mode string.

Parameters:

Name Type Description Default
config Config

The pytest configuration object.

required

Returns:

Name Type Description
str str

The dist mode (e.g., "load", "loadgroup", "no").

Source code in rampart/pytest_plugin/_xdist.py
Python
def get_dist_mode(*, config: pytest.Config) -> str:
    """Return the active ``--dist`` mode string.

    Args:
        config (pytest.Config): The pytest configuration object.

    Returns:
        str: The dist mode (e.g., ``"load"``, ``"loadgroup"``, ``"no"``).
    """
    return cast("str", getattr(config.option, "dist", "no"))

get_worker_count

Python
get_worker_count(*, config)

Return the number of xdist workers configured.

Parameters:

Name Type Description Default
config Config

The pytest configuration object.

required

Returns:

Name Type Description
int int

Number of workers (0 when xdist is not active).

Source code in rampart/pytest_plugin/_xdist.py
Python
def get_worker_count(*, config: pytest.Config) -> int:
    """Return the number of xdist workers configured.

    Args:
        config (pytest.Config): The pytest configuration object.

    Returns:
        int: Number of workers (0 when xdist is not active).
    """
    numprocesses = getattr(config.option, "numprocesses", 0)
    return int(numprocesses) if numprocesses else 0

serialize_worker_data

Python
serialize_worker_data(*, session)

Serialize a worker's RampartSession state for transport to the controller.

Produces a JSON-safe dict containing the schema version, the package version (for cross-version diagnostics), the worker's _results_by_nodeid mapping serialized to primitive types, and trial specs registered during collection.

Parameters:

Name Type Description Default
session RampartSession

The worker's session state.

required

Returns:

Type Description
dict[str, Any]

dict[str, Any]: A JSON-safe payload ready to write to config.workeroutput.

Source code in rampart/pytest_plugin/_xdist.py
Python
def serialize_worker_data(*, session: RampartSession) -> dict[str, Any]:
    """Serialize a worker's RampartSession state for transport to the controller.

    Produces a JSON-safe dict containing the schema version, the
    package version (for cross-version diagnostics), the worker's
    ``_results_by_nodeid`` mapping serialized to primitive types,
    and trial specs registered during collection.

    Args:
        session (RampartSession): The worker's session state.

    Returns:
        dict[str, Any]: A JSON-safe payload ready to write to
            ``config.workeroutput``.
    """
    serialized: dict[str, list[dict[str, Any]]] = {}
    for nodeid, results in session.results_by_nodeid.items():
        serialized[nodeid] = [
            _serialize_result(result=r, nodeid=nodeid) for r in results
        ]
    return {
        "schema": SCHEMA_VERSION,
        "results_by_nodeid": serialized,
        "trial_specs": [
            {
                "clone_nodeid": clone_nodeid,
                "base_nodeid": spec.base_nodeid,
                "threshold": _safe_float(value=spec.threshold) or 0.0,
            }
            for clone_nodeid, spec in session.trial_specs.items()
        ],
    }

deserialize_worker_data

Python
deserialize_worker_data(*, data)

Deserialize a worker payload back into a results_by_nodeid mapping.

Performs strict schema validation: missing schema key, unknown versions, and malformed enum values all raise WorkerOutputError (or subclass). Caller should catch and mark the run incomplete rather than letting the exception propagate to pytest.

Each result's metadata["_pytest_nodeid"] and metadata["_rampart_result_index"] are set authoritatively from the outer mapping key and list position so cross-worker ordering is total and independent of any (untrusted) serialized values.

Parameters:

Name Type Description Default
data object

The deserialized JSON object from node.workeroutput.

required

Returns:

Type Description
dict[str, list[Result]]

dict[str, list[Result]]: Results grouped by nodeid.

Raises:

Type Description
SchemaVersionError

Missing or unknown schema version.

WorkerOutputError

Malformed payload (type errors, bad enums).

Source code in rampart/pytest_plugin/_xdist.py
Python
def deserialize_worker_data(*, data: object) -> dict[str, list[Result]]:
    """Deserialize a worker payload back into a ``results_by_nodeid`` mapping.

    Performs strict schema validation: missing ``schema`` key, unknown
    versions, and malformed enum values all raise ``WorkerOutputError``
    (or subclass). Caller should catch and mark the run incomplete
    rather than letting the exception propagate to pytest.

    Each result's ``metadata["_pytest_nodeid"]`` and
    ``metadata["_rampart_result_index"]`` are set authoritatively from the
    outer mapping key and list position so cross-worker ordering is total
    and independent of any (untrusted) serialized values.

    Args:
        data (object): The deserialized JSON object from
            ``node.workeroutput``.

    Returns:
        dict[str, list[Result]]: Results grouped by nodeid.

    Raises:
        SchemaVersionError: Missing or unknown schema version.
        WorkerOutputError: Malformed payload (type errors, bad enums).
    """
    typed = _validate_schema(data=data)
    raw_results = typed.get("results_by_nodeid", {})
    if not isinstance(raw_results, dict):
        msg = f"Expected dict for results_by_nodeid, got {type(raw_results).__name__}."
        raise WorkerOutputError(msg)
    out: dict[str, list[Result]] = {}
    for nodeid, results_data in cast("dict[Any, Any]", raw_results).items():
        if not isinstance(results_data, list):
            continue
        nodeid_str = str(nodeid)
        deserialized: list[Result] = []
        for index, raw_result in enumerate(cast("list[Any]", results_data)):
            result = _deserialize_result(data=raw_result)
            result.metadata["_pytest_nodeid"] = nodeid_str
            result.metadata["_rampart_result_index"] = index
            deserialized.append(result)
        out[nodeid_str] = deserialized
    return out

finalize_worker

Python
finalize_worker(*, config, session)

Serialize the worker's session state into config.workeroutput.

Called from pytest_sessionfinish on each xdist worker. The worker skips sink emission entirely; the controller is responsible for the final report.

Parameters:

Name Type Description Default
config Config

The pytest configuration object.

required
session RampartSession

The worker's session state.

required

Raises:

Type Description
SizeLimitError

If the serialized payload exceeds the configured cap. The truncation marker is still written to workeroutput before the exception is raised so the controller can record the run as incomplete.

Source code in rampart/pytest_plugin/_xdist.py
Python
def finalize_worker(*, config: pytest.Config, session: RampartSession) -> None:
    """Serialize the worker's session state into ``config.workeroutput``.

    Called from ``pytest_sessionfinish`` on each xdist worker. The
    worker skips sink emission entirely; the controller is responsible
    for the final report.

    Args:
        config (pytest.Config): The pytest configuration object.
        session (RampartSession): The worker's session state.

    Raises:
        SizeLimitError: If the serialized payload exceeds the
            configured cap. The truncation marker is still written to
            ``workeroutput`` before the exception is raised so the
            controller can record the run as incomplete.
    """
    if not is_xdist_worker(config=config):
        return
    payload = serialize_worker_data(session=session)
    encoded = json.dumps(payload, default=str)
    size = len(encoded.encode("utf-8"))
    limit = _size_limit(config=config)
    workeroutput = cast(
        "dict[str, Any]",
        config.workeroutput,  # ty: ignore[unresolved-attribute]
    )
    if size > limit:
        workeroutput[WORKEROUTPUT_KEY] = {
            "schema": SCHEMA_VERSION,
            _TRUNCATED_MARKER: True,
            "size_bytes": size,
            "limit_bytes": limit,
        }
        msg = (
            f"Worker payload size {size} bytes exceeds cap of {limit}; "
            f"truncated. Increase --{SIZE_LIMIT_OPTION.replace('_', '-')} "
            f"(or the {SIZE_LIMIT_OPTION} ini option) to raise the cap."
        )
        raise SizeLimitError(msg)
    logger.debug("Worker payload size: %d bytes", size)
    workeroutput[WORKEROUTPUT_KEY] = payload

handle_testnodedown

Python
handle_testnodedown(*, session, node, error)

Merge a finished worker's results into the controller session.

Called from pytest_testnodedown on the controller for each worker that completes. Failures (missing payload, deserialization errors, worker crashes) are recorded via mark_incomplete rather than raised, so a single bad worker does not abort report emission.

Parameters:

Name Type Description Default
session RampartSession

The controller's session state.

required
node object

The xdist node object (has workeroutput attribute).

required
error object

The shutdown error from xdist, or None on clean exit.

required
Source code in rampart/pytest_plugin/_xdist.py
Python
def handle_testnodedown(
    *,
    session: RampartSession,
    node: object,
    error: object,
) -> None:
    """Merge a finished worker's results into the controller session.

    Called from ``pytest_testnodedown`` on the controller for each
    worker that completes. Failures (missing payload, deserialization
    errors, worker crashes) are recorded via ``mark_incomplete`` rather
    than raised, so a single bad worker does not abort report emission.

    Args:
        session (RampartSession): The controller's session state.
        node: The xdist node object (has ``workeroutput`` attribute).
        error: The shutdown error from xdist, or None on clean exit.
    """
    worker_id = getattr(node, "gateway", None)
    worker_id_str = str(getattr(worker_id, "id", node)) if worker_id else str(node)
    if error is not None:
        logger.warning(
            "Worker %s reported shutdown error; report will be incomplete: %s",
            worker_id_str,
            error,
        )
        session.mark_incomplete(reason=f"worker {worker_id_str} error: {error}")
        return
    workeroutput = getattr(node, "workeroutput", None)
    if not isinstance(workeroutput, dict):
        logger.warning(
            "Worker %s exited without workeroutput; report will be incomplete.",
            worker_id_str,
        )
        session.mark_incomplete(reason=f"worker {worker_id_str} missing workeroutput")
        return
    payload: Any = cast("dict[str, Any]", workeroutput).get(WORKEROUTPUT_KEY)
    if payload is None:
        logger.warning(
            "Worker %s did not produce RAMPART output; report will be incomplete.",
            worker_id_str,
        )
        session.mark_incomplete(reason=f"worker {worker_id_str} missing RAMPART output")
        return
    typed_payload_dict: dict[str, Any] | None = (
        cast("dict[str, Any]", payload) if isinstance(payload, dict) else None
    )
    if typed_payload_dict is not None and typed_payload_dict.get(_TRUNCATED_MARKER):
        logger.error(
            "Worker %s payload was truncated due to size cap; "
            "report will be incomplete.",
            worker_id_str,
        )
        session.mark_incomplete(
            reason=f"worker {worker_id_str} payload truncated (size cap)",
        )
        return
    try:
        results_by_nodeid = deserialize_worker_data(data=cast("object", payload))
    except WorkerOutputError as exc:
        logger.exception(
            "Failed to deserialize worker %s output; report will be incomplete.",
            worker_id_str,
        )
        session.mark_incomplete(
            reason=f"worker {worker_id_str} deserialization failed: {exc}",
        )
        return
    trial_specs = _safe_deserialize_trial_specs(
        payload=cast("object", payload),
        worker_id_str=worker_id_str,
    )
    _tag_source_worker(
        results_by_nodeid=results_by_nodeid,
        worker_id_str=worker_id_str,
    )
    session.merge_worker_results(results_by_nodeid=results_by_nodeid)
    if trial_specs:
        session.merge_trial_specs(trial_specs=trial_specs)
    logger.info(
        "Merged %d result group(s) from worker %s.",
        len(results_by_nodeid),
        worker_id_str,
    )

discover_sinks_from_conftest

Python
discover_sinks_from_conftest(*, config)

Discover rampart_sinks definitions from registered conftest modules.

Workers run the standard _rampart_sink_bootstrap fixture to register sinks via pytest's fixture machinery. The controller has no test execution, so fixtures do not run. This function scans registered plugins for a module-level rampart_sinks attribute and resolves it:

  • If callable with zero arguments, invoke it and use the return.
  • If a list, use it directly.
  • Otherwise, log a warning and skip.

Sinks that depend on other fixtures cannot be discovered this way. Such configurations should register sinks via the pytest_rampart_sinks hook, which is resolved identically on the controller and in every worker.

Parameters:

Name Type Description Default
config Config

The pytest configuration object.

required

Returns:

Type Description
list[ReportSink]

list[ReportSink]: Discovered sinks (may be empty).

Source code in rampart/pytest_plugin/_xdist.py
Python
def discover_sinks_from_conftest(*, config: pytest.Config) -> list[ReportSink]:
    """Discover ``rampart_sinks`` definitions from registered conftest modules.

    Workers run the standard ``_rampart_sink_bootstrap`` fixture to
    register sinks via pytest's fixture machinery. The controller has
    no test execution, so fixtures do not run. This function scans
    registered plugins for a module-level ``rampart_sinks`` attribute
    and resolves it:

    - If callable with zero arguments, invoke it and use the return.
    - If a list, use it directly.
    - Otherwise, log a warning and skip.

    Sinks that depend on other fixtures cannot be discovered this way.
    Such configurations should register sinks via the
    ``pytest_rampart_sinks`` hook, which is resolved identically on the
    controller and in every worker.

    Args:
        config (pytest.Config): The pytest configuration object.

    Returns:
        list[ReportSink]: Discovered sinks (may be empty).
    """
    discovered: list[ReportSink] = []
    seen: set[int] = set()
    for plugin in config.pluginmanager.get_plugins():
        if plugin is None or id(plugin) in seen:
            continue
        seen.add(id(plugin))
        candidate = getattr(plugin, "rampart_sinks", None)
        if candidate is None:
            continue
        resolved = _resolve_sink_candidate(candidate=candidate, plugin=plugin)
        if resolved is None:
            continue
        for sink in resolved:
            if isinstance(sink, ReportSink):
                discovered.append(sink)
            else:
                logger.warning(
                    "rampart_sinks in %s yielded a non-ReportSink: %r",
                    getattr(plugin, "__name__", repr(plugin)),
                    sink,
                )
    return discovered