Skip to content

API Reference — Core Protocols

Protocols and ABCs that define RAMPART's extension points. Implement these to connect your agent, data sources, and custom logic to the framework.

Session & Adapter

adapter

Session and AgentAdapter protocols.

These are the two protocols that product teams implement to connect their agent to the RAMPART framework.

Session

Bases: Protocol

A bounded unit of interaction with the agent.

Sessions are async context managers. Entering returns the session ready for use; exiting guarantees cleanup of any resources the adapter holds (API clients, browser contexts, temporary state).

Fresh state = fresh session. Create a new one via the adapter.

send_async async

Python
send_async(request)

Send a request to the agent and return its response.

The adapter is responsible for populating Response.tool_calls and Response.side_effects with whatever it can observe. Empty lists are valid — they mean "no observations," not "nothing happened." The evaluator system distinguishes between these.

Parameters:

Name Type Description Default
request Request

The prompt and/or attachments to send.

required

Returns:

Name Type Description
Response Response

The agent's response with all observable data.

Source code in rampart/core/adapter.py
Python
async def send_async(self, request: Request) -> Response:
    """Send a request to the agent and return its response.

    The adapter is responsible for populating Response.tool_calls
    and Response.side_effects with whatever it can observe. Empty
    lists are valid — they mean "no observations," not "nothing
    happened." The evaluator system distinguishes between these.

    Args:
        request (Request): The prompt and/or attachments to send.

    Returns:
        Response: The agent's response with all observable data.
    """
    ...

AgentAdapter

Bases: Protocol

Factory for sessions and source of agent metadata.

Teams implement this to describe their agent and create interaction sessions. The manifest declares what the agent can do; sessions let the framework interact with it.

manifest property

Python
manifest

The agent's declared capabilities.

Returns:

Name Type Description
AppManifest AppManifest

The agent's capability declaration.

observability_profile property

Python
observability_profile

Declares what this adapter can reliably observe.

Used by evaluators to distinguish "nothing happened" from "I can't see what happened."

Returns:

Name Type Description
ObservabilityLevel ObservabilityLevel

What the adapter can observe.

create_session_async async

Python
create_session_async()

Create a fresh interaction session.

Each session is independent — no shared conversation state. Creating a new session is how the framework achieves conversation reset for persistence testing.

Returns:

Name Type Description
Session Session

A new session ready for interaction.

Source code in rampart/core/adapter.py
Python
async def create_session_async(self) -> Session:
    """Create a fresh interaction session.

    Each session is independent — no shared conversation state.
    Creating a new session is how the framework achieves
    conversation reset for persistence testing.

    Returns:
        Session: A new session ready for interaction.
    """
    ...

Evaluator

evaluator

Evaluator protocol, BaseEvaluator ABC, and composition operators.

The evaluator system is the framework's primary analytical capability. Evaluators detect conditions in an EvalContext. They are polarity-free — they answer "did X happen?", not "is X good or bad?"

Evaluator

Bases: Protocol

Detects a condition in an EvalContext.

Evaluators are polarity-free. They answer "did X happen?" — not "is X good or bad?" The Attacks/Probes factories handle the good/bad judgment.

All evaluators are async. Even evaluators with synchronous logic must be async to compose correctly with LLM-based evaluators via & and | operators. A sync evaluator composed with an async LLM judge via | would silently return a coroutine object instead of an EvalResult — this design prevents that structurally.

evaluate_async async

Python
evaluate_async(*, context)

Evaluate the context and return a detection signal.

Parameters:

Name Type Description Default
context EvalContext

The interaction data to evaluate.

required

Returns:

Name Type Description
EvalResult EvalResult

What the evaluator detected.

Source code in rampart/core/evaluator.py
Python
async def evaluate_async(self, *, context: EvalContext) -> EvalResult:
    """Evaluate the context and return a detection signal.

    Args:
        context (EvalContext): The interaction data to evaluate.

    Returns:
        EvalResult: What the evaluator detected.
    """
    ...

BaseEvaluator

Bases: ABC

Base class for evaluator implementations.

Provides composition operators (|, &, ~) and common behavior. Subclass this for concrete evaluators. Implement evaluate_async.

evaluate_async abstractmethod async

Python
evaluate_async(*, context)

Evaluate the context. Subclasses implement this.

Source code in rampart/core/evaluator.py
Python
@abstractmethod
async def evaluate_async(self, *, context: EvalContext) -> EvalResult:
    """Evaluate the context. Subclasses implement this."""
    ...

Prompt Driver

prompt_driver

PromptDriver protocol and PromptDecision.

Drivers generate the prompts sent to the agent during an execution.

PromptDriver

Bases: Protocol

Generates prompts for agent interaction.

Drivers decide WHAT to send to the agent. They do not own the session, evaluation, or result production — those belong to the execution strategy.

The protocol signature is stateless — drivers receive conversation history and return the next decision. However, some implementations may hold internal state. Refer to specific driver implementations for reuse and lifecycle details.

Returns None when there are no more prompts to send.

next_prompt_async async

Python
next_prompt_async(*, history)

Generate the next prompt decision based on conversation history.

Parameters:

Name Type Description Default
history list[Turn]

All turns so far (empty on first call).

required

Returns:

Type Description
PromptDecision | None

PromptDecision | None: The next decision, or None to stop.

Source code in rampart/core/prompt_driver.py
Python
async def next_prompt_async(
    self,
    *,
    history: list[Turn],
) -> PromptDecision | None:
    """Generate the next prompt decision based on conversation history.

    Args:
        history (list[Turn]): All turns so far (empty on first call).

    Returns:
        PromptDecision | None: The next decision, or None to stop.
    """
    ...

Surface & Injection

injection

Surface and InjectionHandle protocols.

Two protocols serving two audiences: Surface is what surface authors implement; InjectionHandle is what execution strategies consume.

sleep_until_ready is a helper function for surfaces that only need a simple delay-based readiness wait.

Surface

Bases: Protocol

An injectable data source.

Surfaces are fully configured at construction (credentials, target location) and expose a universal inject() signature. Teams implement this to connect their data sources to the framework.

The returned InjectionHandle is what execution strategies depend on. This separation means XPIAExecution never imports or depends on any Surface implementation — it only enters handles.

inject

Python
inject(*, payload)

Prepare an injection of the given payload.

Does not activate the injection — the caller enters the returned handle as an async context manager to activate it.

Parameters:

Name Type Description Default
payload Payload

The content to inject.

required

Returns:

Name Type Description
InjectionHandle InjectionHandle

Ready to activate via async with.

Source code in rampart/core/injection.py
Python
def inject(self, *, payload: Payload) -> InjectionHandle:
    """Prepare an injection of the given payload.

    Does not activate the injection — the caller enters the
    returned handle as an async context manager to activate it.

    Args:
        payload (Payload): The content to inject.

    Returns:
        InjectionHandle: Ready to activate via async with.
    """
    ...

InjectionHandle

Bases: Protocol

A prepared injection, ready to activate as an async context manager.

Returned by Surface.inject(). Entering activates the injection (writes the payload to the data source); exiting removes it (guaranteed cleanup even on exceptions).

Execution strategies depend only on this protocol — never on Surface or its concrete implementations.

payload_id property

Python
payload_id

The injected payload's identifier, for reporting.

surface_name property

Python
surface_name

The name of the surface this handle injects into (e.g., 'SharePoint').

wait_until_ready async

Python
wait_until_ready()

Block until the injected content is visible to the agent.

Implementations should raise TimeoutError if readiness operations are long-running to prevent indefinite blocking.

Source code in rampart/core/injection.py
Python
async def wait_until_ready(self) -> None:
    """Block until the injected content is visible to the agent.

    Implementations should raise `TimeoutError` if readiness
    operations are long-running to prevent indefinite blocking.
    """
    ...

sleep_until_ready async

Python
sleep_until_ready(delay)

Sleep for delay seconds. Default readiness strategy for simple surfaces.

Parameters:

Name Type Description Default
delay float

Seconds to sleep before the injection is considered ready.

required
Source code in rampart/core/injection.py
Python
async def sleep_until_ready(delay: float) -> None:
    """Sleep for `delay` seconds. Default readiness strategy for simple surfaces.

    Args:
        delay: Seconds to sleep before the injection is considered ready.
    """
    await asyncio.sleep(delay)

Converter

converter

PayloadConverter protocol — transform payloads before injection.

Converters are the single abstraction for payload transformation across the framework. They operate on Payload objects and produce new Payload objects with potentially different content and format.

Used in two contexts:

  • Post-generation: applied to LLM-generated text payloads in Payloads.generate_async(converters=[...]).
  • Pre-injection: applied directly in test code before passing a payload to a surface.

The PyRIT bridge in pyrit_bridge/converter_bridge.py will adapt PromptConverter to this protocol. Teams can also implement custom converters directly.

PayloadConverter

Bases: Protocol

Transform a payload's content or format.

Two categories of converters:

Text transforms (translation, encoding, obfuscation): Modify content, preserve format as TEXT, artifact stays None.

Format renderers (text -> PDF, DOCX, image): Preserve content for reporting. Set format and artifact on the output payload. Write artifact files into a caller-managed directory or tempdir. Callers needing persistence should pass results to PayloadStore.save() promptly while files exist.

Both categories must preserve payload.id for traceability. Metadata from the source payload should be carried forward, with converter-specific entries added.

Composition example:

Python
# Sequential chaining (manual)
translated = await translator.convert_async(payload=base)
encoded = await encoder.convert_async(payload=translated)

# Sequential chaining (inside Payloads.generate_async)
await Payloads.generate_async(..., converters=[translator, encoder])
# translates first, then encodes — same result as manual chaining

convert_async async

Python
convert_async(*, payload)

Transform the payload content.

Parameters:

Name Type Description Default
payload Payload

The payload to transform.

required

Returns:

Type Description
Payload

A new payload with transformed content.

Source code in rampart/core/converter.py
Python
async def convert_async(self, *, payload: Payload) -> Payload:
    """Transform the payload content.

    Args:
        payload: The payload to transform.

    Returns:
        A new payload with transformed content.
    """
    ...

Execution

execution

BaseExecution ABC and execution lifecycle infrastructure.

All execution strategies inherit BaseExecution, which owns the execution lifecycle and cross-cutting concerns (result collection, timing, event dispatch, infrastructure error handling).

BaseExecution

Python
BaseExecution(*, event_handlers=None)

Bases: ABC

ABC for all execution strategies.

Owns the execution lifecycle: ON_PRE_EXECUTE → _execute_async → ON_POST_EXECUTE (ON_ERROR fires for unexpected exceptions). Subclasses implement only _execute_async — the skeleton is fixed here.

Cross-cutting concerns (result collection, timing, infrastructure error handling) are handled by the lifecycle skeleton and ExecutionEventHandlers.

Infrastructure resilience is a base-class concern. Any exception from _execute_async is caught and produces a Result with SafetyStatus.ERROR.

Parameters:

Name Type Description Default
event_handlers list[ExecutionEventHandler] | None

Additional handlers to register alongside the framework defaults.

None

Initialize with optional extra event handlers.

Source code in rampart/core/execution.py
Python
def __init__(
    self,
    *,
    event_handlers: list[ExecutionEventHandler] | None = None,
) -> None:
    """Initialize with optional extra event handlers."""
    defaults = _default_handler_factory()
    self._handlers: list[ExecutionEventHandler] = defaults + (event_handlers or [])

strategy_name abstractmethod property

Python
strategy_name

Short identifier for this execution strategy.

Used in Result.strategy for reporting and dashboard grouping. Examples: "xpia", "probe", "crescendo", "pair".

execute_async async

Python
execute_async(*, adapter)

Execute the safety test.

Fires lifecycle events and delegates to _execute_async for strategy-specific logic.

All exceptions from _execute_async are caught and converted to a Result with SafetyStatus.ERROR, preventing a single test from crashing the suite.

Parameters:

Name Type Description Default
adapter AgentAdapter

The agent to test.

required

Returns:

Name Type Description
Result Result

Safety verdict with evidence and diagnostics.

Source code in rampart/core/execution.py
Python
async def execute_async(self, *, adapter: AgentAdapter) -> Result:
    """Execute the safety test.

    Fires lifecycle events and delegates to _execute_async for
    strategy-specific logic.

    All exceptions from _execute_async are caught and converted
    to a Result with SafetyStatus.ERROR, preventing a single test
    from crashing the suite.

    Args:
        adapter (AgentAdapter): The agent to test.

    Returns:
        Result: Safety verdict with evidence and diagnostics.
    """
    start = time.monotonic()
    await self._fire(
        ExecutionEvent.ON_PRE_EXECUTE,
        adapter=adapter,
        elapsed=0.0,
    )

    try:
        result = await self._execute_async(adapter=adapter)
    except Exception as exc:
        error_type = type(exc).__name__
        logger.exception(
            "%s during %s execution",
            error_type,
            self.strategy_name,
        )

        await self._fire(
            ExecutionEvent.ON_ERROR,
            adapter=adapter,
            elapsed=time.monotonic() - start,
            error=exc,
        )

        result = Result(
            safe=False,
            status=SafetyStatus.ERROR,
            summary=f"{error_type}: {exc}",
            strategy=self.strategy_name,
            observability_level=adapter.observability_profile,
            metadata={"error": str(exc), "error_type": error_type},
        )

    elapsed = time.monotonic() - start
    result.duration_seconds = elapsed
    await self._fire(
        ExecutionEvent.ON_POST_EXECUTE,
        adapter=adapter,
        elapsed=elapsed,
        result=result,
    )
    return result

ExecutionEvent

Bases: Enum

Lifecycle events fired during a BaseExecution run.

ON_PRE_EXECUTE: Fired before _execute_async is called. ON_POST_EXECUTE: Fired after _execute_async returns a Result (including error results). ON_ERROR: Fired when _execute_async raises any exception. The exception is converted to an ERROR result after handlers are notified.

ExecutionEventData dataclass

Python
ExecutionEventData(
    *,
    event,
    adapter,
    result=None,
    error=None,
    elapsed_seconds=0.0,
)

Data passed to event handlers at each lifecycle point.

Parameters:

Name Type Description Default
event ExecutionEvent

Which lifecycle point fired.

required
adapter AgentAdapter

The adapter under test.

required
result Result | None

Populated on ON_POST_EXECUTE only.

None
error Exception | None

Populated on ON_ERROR only.

None
elapsed_seconds float

Wall-clock seconds since execute_async was called.

0.0

ExecutionEventHandler

Bases: ABC

Receives lifecycle events from BaseExecution.

The pytest plugin installs a ResultCollectionHandler on every execution via the _default_handler_factory hook. Teams can register additional handlers via the event_handlers constructor parameter for custom observability.

on_event abstractmethod async

Python
on_event(*, event_data)

Handle an execution lifecycle event.

Parameters:

Name Type Description Default
event_data ExecutionEventData

The event data.

required
Source code in rampart/core/execution.py
Python
@abstractmethod
async def on_event(self, *, event_data: ExecutionEventData) -> None:
    """Handle an execution lifecycle event.

    Args:
        event_data (ExecutionEventData): The event data.
    """
    ...

ExecutionHandlerFactory

Bases: Protocol

Factory that creates default ExecutionEventHandlers for every BaseExecution.

When a BaseExecution is instantiated it needs a set of framework-level handlers (e.g. the ResultCollectionHandler that funnels results into the reporting pipeline). Rather than forcing every test to wire those handlers manually, a single factory is registered once at startup and called automatically inside BaseExecution.__init__ to supply them.

Lifecycle
  1. The pytest plugin calls register_default_handler_factory(factory) during pytest_configure.
  2. Each BaseExecution.__init__ invokes the factory to obtain a fresh list of handlers, which are prepended to any user-supplied handlers.
  3. The plugin calls clear_default_handler_factory() during pytest_unconfigure to remove the factory.

Any callable with the signature () -> list[ExecutionEventHandler] satisfies this protocol — a plain function, a lambda, or an object with a __call__ method all work.

register_default_handler_factory

Python
register_default_handler_factory(factory)

Install a factory that provides default handlers for every BaseExecution.

Called by the pytest plugin at configure time. The factory is invoked once per BaseExecution.init to supply framework-level handlers (e.g. ResultCollectionHandler) without requiring test authors to wire them manually.

Parameters:

Name Type Description Default
factory ExecutionHandlerFactory

Callable returning a list of ExecutionEventHandler instances.

required

Raises:

Type Description
TypeError

If factory does not satisfy ExecutionHandlerFactory.

Source code in rampart/core/execution.py
Python
def register_default_handler_factory(
    factory: ExecutionHandlerFactory,
) -> None:
    """Install a factory that provides default handlers for every BaseExecution.

    Called by the pytest plugin at configure time. The factory is invoked
    once per BaseExecution.__init__ to supply framework-level handlers
    (e.g. ResultCollectionHandler) without requiring test authors to
    wire them manually.

    Args:
        factory (ExecutionHandlerFactory): Callable returning a list of
            ExecutionEventHandler instances.

    Raises:
        TypeError: If factory does not satisfy ExecutionHandlerFactory.
    """
    if not callable(factory):
        msg = (
            "factory must satisfy ExecutionHandlerFactory (callable returning "
            "list[ExecutionEventHandler])"
        )
        raise TypeError(
            msg,
        )
    _default_handler_factory.factory = factory

clear_default_handler_factory

Python
clear_default_handler_factory()

Remove the installed default handler factory.

Called by the pytest plugin at unconfigure time to restore the module to its clean, no-plugin state.

Source code in rampart/core/execution.py
Python
def clear_default_handler_factory() -> None:
    """Remove the installed default handler factory.

    Called by the pytest plugin at unconfigure time to restore the
    module to its clean, no-plugin state.
    """
    _default_handler_factory.factory = None

Errors

errors

Framework exceptions for RAMPART.

The framework defines one exception that surfaces and adapters should raise for transient infrastructure failures.

InfrastructureError

Bases: Exception

Raised by surfaces and adapters for transient infrastructure failures.

When a surface cannot write to SharePoint (503, rate limit, timeout), or an adapter cannot reach the agent API, it should raise this exception. BaseExecution.execute_async catches it and produces a Result with SafetyStatus.ERROR.

Teams should raise InfrastructureError (or a subclass) for any failure that is: 1. Transient: likely to succeed on retry. 2. External: caused by infrastructure outside the framework's control. 3. Non-diagnostic: does not indicate anything about agent safety.

Use raise InfrastructureError(...) from original_exception to preserve the causal chain via Python's native __cause__ attribute.

DriverError

Bases: Exception

Raised by a PromptDriver when it cannot produce a decision.

BaseExecution.execute_async catches this and produces a Result with SafetyStatus.ERROR.