Tutorial 39: DLP with Attribute Ratchets¶
Time: 15 minutes ยท Level: Intermediate ยท Prerequisites: Tutorial 37 (multi-stage pipeline)
What You'll Build¶
A Data Loss Prevention (DLP) system where an agent's permissions tighten automatically after it touches sensitive data โ and cannot be reset for the rest of the session.
The Problem¶
Without session state, each tool call is evaluated independently:
Agent reads confidential document โ โ
allowed
Agent sends email with that content โ โ
allowed (policy doesn't know about the read!)
With attribute ratchets:
Agent reads confidential document โ โ
allowed, sensitivity ratchets to "confidential"
Agent tries to send email externally โ โ blocked (session.data_sensitivity == "confidential")
Agent tries to reset sensitivity โ โ ignored (monotonic โ can only go up)
Step 1: Define Session Attributes¶
from agentmesh.governance import SessionState, SessionAttribute
state = SessionState([
SessionAttribute(
name="data_sensitivity",
ordering=["public", "internal", "confidential", "restricted"],
monotonic=True,
initial="public",
),
SessionAttribute(
name="data_jurisdiction",
ordering=["domestic", "eu", "cross_border"],
monotonic=True,
initial="domestic",
),
])
print(f"Initial sensitivity: {state.get('data_sensitivity')}")
# โ "public"
Step 2: Create DLP Policy¶
# dlp-policy.yaml
apiVersion: governance.toolkit/v1
name: dlp-policy
agents: ["*"]
default_action: allow
rules:
# Block email when handling sensitive data
- name: block-email-sensitive
stage: pre_tool
condition: "action.type == 'send_email' and session.data_sensitivity in ['confidential', 'restricted']"
action: deny
description: "Cannot send emails after accessing confidential data"
priority: 900
# Block file export for restricted data
- name: block-export-restricted
stage: pre_tool
condition: "action.type == 'export' and session.data_sensitivity == 'restricted'"
action: deny
description: "Restricted data cannot be exported"
priority: 1000
# Require approval for cross-border transfers
- name: approve-cross-border
stage: pre_tool
condition: "action.type == 'transfer' and session.data_jurisdiction == 'cross_border'"
action: require_approval
approvers: ["data-protection-officer"]
priority: 800
# Allow read operations (but they may ratchet sensitivity)
- name: allow-read
stage: pre_tool
condition: "action.type == 'read'"
action: allow
priority: 100
Step 3: Simulate an Agent Session¶
from agentmesh.governance import PolicyEngine, SessionState, SessionAttribute
engine = PolicyEngine(conflict_strategy="deny_overrides")
engine.load_yaml_file("dlp-policy.yaml")
state = SessionState([
SessionAttribute(
name="data_sensitivity",
ordering=["public", "internal", "confidential", "restricted"],
monotonic=True,
),
])
# โโ Turn 1: Agent reads a public document โโโโโโโโโโโโโโโโโโ
ctx1 = {"action": {"type": "read"}, "resource": {"type": "document", "classification": "public"}}
state.inject_context(ctx1)
result1 = engine.evaluate("*", ctx1)
print(f"1. Read public doc: {result1.action}") # โ allow
# Simulate: tool returns classification = public (no ratchet)
# โโ Turn 2: Agent reads a confidential report โโโโโโโโโโโโโโ
ctx2 = {"action": {"type": "read"}, "resource": {"type": "document", "classification": "confidential"}}
state.inject_context(ctx2)
result2 = engine.evaluate("*", ctx2)
print(f"2. Read confidential report: {result2.action}") # โ allow
# Simulate: tool reports this document is confidential
state.set("data_sensitivity", "confidential")
print(f" โ Sensitivity ratcheted to: {state.get('data_sensitivity')}")
# โโ Turn 3: Agent tries to email the content โโโโโโโโโโโโโโโ
ctx3 = {"action": {"type": "send_email"}}
state.inject_context(ctx3)
result3 = engine.evaluate("*", ctx3)
print(f"3. Send email: {result3.action}") # โ DENY!
print(f" Rule: {result3.matched_rule}") # โ block-email-sensitive
# โโ Turn 4: Agent tries to "forget" the sensitivity โโโโโโโโ
reset_ok = state.set("data_sensitivity", "public")
print(f"4. Reset sensitivity: {reset_ok}") # โ False (monotonic!)
print(f" Still: {state.get('data_sensitivity')}") # โ confidential
# โโ Turn 5: Sensitivity can still go UP โโโโโโโโโโโโโโโโโโโโ
state.set("data_sensitivity", "restricted")
print(f"5. Ratcheted to: {state.get('data_sensitivity')}") # โ restricted
Output:
1. Read public doc: allow
2. Read confidential report: allow
โ Sensitivity ratcheted to: confidential
3. Send email: deny
Rule: block-email-sensitive
4. Reset sensitivity: False
Still: confidential
5. Ratcheted to: restricted
Step 4: Parse from YAML¶
Define session attributes directly in your policy YAML:
state = SessionState.from_policy_yaml("""
session_attributes:
- name: data_sensitivity
ordering: [public, internal, confidential, restricted]
monotonic: true
initial: public
- name: user_verified
ordering: [unverified, email_verified, mfa_verified]
monotonic: true
initial: unverified
""")
Step 5: Reset Between Sessions¶
# End of session โ reset for next user
state.reset()
print(state.get("data_sensitivity")) # โ "public" (back to initial)
Real-World DLP Patterns¶
| Attribute | Ordering | Use Case |
|---|---|---|
data_sensitivity | public โ restricted | Document classification ratchet |
data_jurisdiction | domestic โ cross_border | GDPR/data residency |
auth_level | anonymous โ mfa_verified | Progressive authentication |
risk_score | low โ critical | Cumulative risk escalation |
compliance_status | clean โ flagged โ blocked | Compliance state machine |
What to Try Next¶
- Tutorial 37: Multi-stage pipeline (post_tool sets sensitivity, pre_tool enforces it)
- Tutorial 41: Combine ratchets with advisory layer for ML-based classification