Open-source · durable functions for Postgres

Durable, crash-proof workflows
built into Postgres

Orchestrate retries, scheduling, parallel fan-out, and conditional branching with a tiny SQL DSL. Built on Postgres + a background worker — no containers, no external services, just Postgres.

live execution

              

Parallel aggregation · df.start( a & b & c ~> … ) — three queries fan out, then join into one result.

🔧 Without pg_durable

-- Goal: run 3 aggregations in parallel, then refresh a dashboard
--       — with retries and crash recovery. Here's the plumbing.

-- 1. Set up job queues and state tables
CREATE TABLE job_queue (
    id SERIAL PRIMARY KEY,
    payload JSONB NOT NULL,
    status TEXT DEFAULT 'pending',
    attempts INT DEFAULT 0,
    max_attempts INT DEFAULT 3,
    created_at TIMESTAMPTZ DEFAULT now(),
    locked_at TIMESTAMPTZ,
    locked_by TEXT
);

CREATE TABLE job_results (
    id SERIAL PRIMARY KEY,
    job_id INT REFERENCES job_queue(id),
    result JSONB,
    error TEXT,
    completed_at TIMESTAMPTZ DEFAULT now()
);

CREATE TABLE job_state (
    job_id INT PRIMARY KEY REFERENCES job_queue(id),
    current_step INT DEFAULT 0,
    step_data JSONB DEFAULT '{}',
    updated_at TIMESTAMPTZ DEFAULT now()
);

-- 2. Write a polling worker function
CREATE OR REPLACE FUNCTION poll_and_execute()
RETURNS void AS $$
DECLARE
    job RECORD;
BEGIN
    SELECT * INTO job FROM job_queue
    WHERE status = 'pending'
      AND (locked_at IS NULL
           OR locked_at < now() - interval '5 min')
    ORDER BY created_at
    LIMIT 1
    FOR UPDATE SKIP LOCKED;

    IF job IS NULL THEN RETURN; END IF;

    UPDATE job_queue
    SET status = 'running',
        locked_at = now(),
        locked_by = pg_backend_pid()::text
    WHERE id = job.id;

    -- 3. Execute with manual retry logic
    BEGIN
        PERFORM execute_step(job.id, job.payload);
        UPDATE job_queue SET status = 'completed'
        WHERE id = job.id;
    EXCEPTION WHEN OTHERS THEN
        UPDATE job_queue
        SET attempts = attempts + 1,
            status = CASE
              WHEN attempts + 1 >= max_attempts
              THEN 'failed' ELSE 'pending' END,
            locked_at = NULL
        WHERE id = job.id;
    END;
END;
$$ LANGUAGE plpgsql;

-- 4. Track step coordination manually
CREATE TABLE workflow_steps (
    id SERIAL PRIMARY KEY,
    job_id INT REFERENCES job_queue(id),
    step_order INT NOT NULL,
    step_name TEXT NOT NULL,
    step_query TEXT NOT NULL,
    status TEXT DEFAULT 'pending',
    result JSONB,
    error TEXT,
    started_at TIMESTAMPTZ,
    completed_at TIMESTAMPTZ,
    depends_on INT[]
);

CREATE OR REPLACE FUNCTION advance_workflow(p_job_id INT)
RETURNS void AS $$
DECLARE
    step RECORD;
    dep_id INT;
    all_deps_done BOOLEAN;
BEGIN
    FOR step IN
        SELECT * FROM workflow_steps
        WHERE job_id = p_job_id AND status = 'pending'
        ORDER BY step_order
    LOOP
        all_deps_done := true;
        IF step.depends_on IS NOT NULL THEN
            FOREACH dep_id IN ARRAY step.depends_on LOOP
                IF NOT EXISTS (
                    SELECT 1 FROM workflow_steps
                    WHERE id = dep_id AND status = 'completed'
                ) THEN
                    all_deps_done := false;
                    EXIT;
                END IF;
            END LOOP;
        END IF;

        IF NOT all_deps_done THEN CONTINUE; END IF;

        UPDATE workflow_steps
        SET status = 'running', started_at = now()
        WHERE id = step.id;

        BEGIN
            EXECUTE step.step_query;
            UPDATE workflow_steps
            SET status = 'completed',
                completed_at = now()
            WHERE id = step.id;
        EXCEPTION WHEN OTHERS THEN
            UPDATE workflow_steps
            SET status = 'failed',
                error = SQLERRM,
                completed_at = now()
            WHERE id = step.id;
            RETURN;
        END;
    END LOOP;
END;
$$ LANGUAGE plpgsql;

-- 5. Build crash recovery from scratch
CREATE OR REPLACE FUNCTION recover_crashed_jobs()
RETURNS void AS $$
DECLARE
    crashed RECORD;
BEGIN
    FOR crashed IN
        SELECT jq.* FROM job_queue jq
        WHERE jq.status = 'running'
          AND jq.locked_at < now() - interval '10 min'
          AND NOT EXISTS (
              SELECT 1 FROM pg_stat_activity
              WHERE pid = jq.locked_by::int
          )
    LOOP
        UPDATE job_queue
        SET status = 'pending',
            locked_at = NULL,
            locked_by = NULL,
            attempts = attempts + 1
        WHERE id = crashed.id;

        UPDATE workflow_steps
        SET status = 'pending',
            started_at = NULL,
            error = NULL
        WHERE job_id = crashed.id
          AND status = 'running';

        INSERT INTO job_results (job_id, error)
        VALUES (crashed.id,
            'Recovered from crash at step ' ||
            (SELECT step_name FROM workflow_steps
             WHERE job_id = crashed.id
               AND status = 'running'
             LIMIT 1));
    END LOOP;
END;
$$ LANGUAGE plpgsql;

-- 6. Custom status tracking and monitoring
CREATE OR REPLACE FUNCTION get_job_status(p_job_id INT)
RETURNS TABLE (
    job_status TEXT,
    total_steps INT,
    completed_steps INT,
    failed_steps INT,
    current_step TEXT,
    elapsed_time INTERVAL,
    last_error TEXT
) AS $$
BEGIN
    RETURN QUERY
    SELECT
        jq.status,
        (SELECT count(*)::int FROM workflow_steps
         WHERE job_id = p_job_id),
        (SELECT count(*)::int FROM workflow_steps
         WHERE job_id = p_job_id AND status = 'completed'),
        (SELECT count(*)::int FROM workflow_steps
         WHERE job_id = p_job_id AND status = 'failed'),
        (SELECT ws.step_name FROM workflow_steps ws
         WHERE ws.job_id = p_job_id
           AND ws.status = 'running'
         LIMIT 1),
        now() - jq.created_at,
        (SELECT ws.error FROM workflow_steps ws
         WHERE ws.job_id = p_job_id
           AND ws.status = 'failed'
         ORDER BY ws.completed_at DESC LIMIT 1)
    FROM job_queue jq
    WHERE jq.id = p_job_id;
END;
$$ LANGUAGE plpgsql;

-- 7. Parallel execution coordinator
CREATE OR REPLACE FUNCTION run_parallel_steps(
    p_job_id INT,
    p_step_ids INT[]
) RETURNS void AS $$
DECLARE
    step_id INT;
    step RECORD;
    failed BOOLEAN := false;
BEGIN
    FOREACH step_id IN ARRAY p_step_ids LOOP
        SELECT * INTO step FROM workflow_steps
        WHERE id = step_id AND job_id = p_job_id;

        UPDATE workflow_steps
        SET status = 'running', started_at = now()
        WHERE id = step_id;

        BEGIN
            EXECUTE step.step_query;
            UPDATE workflow_steps
            SET status = 'completed',
                completed_at = now()
            WHERE id = step_id;
        EXCEPTION WHEN OTHERS THEN
            UPDATE workflow_steps
            SET status = 'failed',
                error = SQLERRM,
                completed_at = now()
            WHERE id = step_id;
            failed := true;
        END;
    END LOOP;

    IF failed THEN
        UPDATE job_queue SET status = 'failed'
        WHERE id = p_job_id;
    END IF;
END;
$$ LANGUAGE plpgsql;

-- 8. Variable passing between steps
CREATE TABLE step_variables (
    job_id INT REFERENCES job_queue(id),
    var_name TEXT NOT NULL,
    var_value JSONB,
    set_by_step INT,
    created_at TIMESTAMPTZ DEFAULT now(),
    PRIMARY KEY (job_id, var_name)
);

CREATE OR REPLACE FUNCTION set_step_var(
    p_job_id INT, p_name TEXT,
    p_value JSONB, p_step INT
) RETURNS void AS $$
BEGIN
    INSERT INTO step_variables
        (job_id, var_name, var_value, set_by_step)
    VALUES (p_job_id, p_name, p_value, p_step)
    ON CONFLICT (job_id, var_name)
    DO UPDATE SET var_value = p_value,
                  set_by_step = p_step;
END;
$$ LANGUAGE plpgsql;

CREATE OR REPLACE FUNCTION substitute_vars(
    p_job_id INT, p_query TEXT
) RETURNS TEXT AS $$
DECLARE
    v RECORD;
    result TEXT := p_query;
BEGIN
    FOR v IN
        SELECT var_name, var_value
        FROM step_variables
        WHERE job_id = p_job_id
    LOOP
        result := replace(result,
            '$' || v.var_name,
            v.var_value::text);
    END LOOP;
    RETURN result;
END;
$$ LANGUAGE plpgsql;

-- 9. Scheduling and cron support
CREATE TABLE scheduled_jobs (
    id SERIAL PRIMARY KEY,
    cron_expr TEXT NOT NULL,
    job_payload JSONB NOT NULL,
    last_run TIMESTAMPTZ,
    next_run TIMESTAMPTZ,
    enabled BOOLEAN DEFAULT true
);

CREATE OR REPLACE FUNCTION check_scheduled_jobs()
RETURNS void AS $$
DECLARE
    sched RECORD;
BEGIN
    FOR sched IN
        SELECT * FROM scheduled_jobs
        WHERE enabled = true
          AND (next_run IS NULL
               OR next_run <= now())
    LOOP
        INSERT INTO job_queue (payload)
        VALUES (sched.job_payload);

        UPDATE scheduled_jobs
        SET last_run = now()
        WHERE id = sched.id;
        -- next_run calculation requires
        -- external cron parser library...
    END LOOP;
END;
$$ LANGUAGE plpgsql;

-- 10. Cleanup and maintenance
CREATE OR REPLACE FUNCTION cleanup_old_jobs(
    p_retention INTERVAL DEFAULT '30 days'
) RETURNS INT AS $$
DECLARE
    deleted INT;
BEGIN
    DELETE FROM step_variables
    WHERE job_id IN (
        SELECT id FROM job_queue
        WHERE status IN ('completed', 'failed')
          AND created_at < now() - p_retention
    );
    DELETE FROM workflow_steps
    WHERE job_id IN (
        SELECT id FROM job_queue
        WHERE status IN ('completed', 'failed')
          AND created_at < now() - p_retention
    );
    DELETE FROM job_results
    WHERE job_id IN (
        SELECT id FROM job_queue
        WHERE status IN ('completed', 'failed')
          AND created_at < now() - p_retention
    );
    WITH d AS (
        DELETE FROM job_queue
        WHERE status IN ('completed', 'failed')
          AND created_at < now() - p_retention
        RETURNING id
    ) SELECT count(*) INTO deleted FROM d;
    RETURN deleted;
END;
$$ LANGUAGE plpgsql;

-- 11. ...and only NOW can you wire up the actual workflow
WITH job AS (
    INSERT INTO job_queue (payload)
    VALUES ('{"name":"refresh-dashboard"}') RETURNING id
)
INSERT INTO workflow_steps
    (job_id, step_order, step_name, step_query, depends_on)
SELECT job.id, v.ord, v.name, v.query, v.deps FROM job, (VALUES
    (1, 'count_users',   'SELECT count(*) FROM users',          NULL),
    (1, 'count_orders',  'SELECT count(*) FROM orders',         NULL),
    (1, 'sum_revenue',   'SELECT sum(amount) FROM orders',      NULL),
    (2, 'refresh_dash',  'REFRESH MATERIALIZED VIEW metrics',   ARRAY[1,2,3])
) AS v(ord, name, query, deps);
-- ...then schedule the worker, poll, coordinate the parallel
-- steps, handle failures, recover crashes — see all of the above.

300+ lines of boilerplate

  • 🔧 Queue setup & configuration
  • 🔄 Worker management & polling
  • 📊 Message handling & state tracking
  • ❌ Error handling & retries
  • 🔗 Manual step coordination

⚡ With pg_durable

-- Parallel aggregation: 3 queries fan out, then refresh the dashboard
SELECT df.start(
    'SELECT count(*) FROM users'     &
    'SELECT count(*) FROM orders'    &
    'SELECT sum(amount) FROM orders'

    ~> 'refresh dashboard',

    'metrics'
);

✏️ You write the SQL. pg_durable handles everything else.

Queue management, state tracking, crash recovery, step coordination, and retries — pg_durable is the orchestration engine.

CREATE EXTENSION pg_durable;

Enable in any PostgreSQL 17 database. View full setup guide →

Why pg_durable

🛡️

Durable by default

Every step checkpoints state to PostgreSQL. Workflows survive crashes, restarts, and connection drops. Resume exactly where you left off.

Learn more →
🔁

Automatic retries

Built-in retry logic for flaky operations. When a step fails, only that step retries — the rest of your workflow continues. No manual error handling code needed.

Learn more →
🔎

Full observability in SQL

All workflow state lives in Postgres tables. Query execution history, inspect step outputs, and debug failures with standard SQL. No external dashboards.

Learn more →

Parallel execution

Fan out independent work with the & operator or df.join(). Run aggregations, API calls, or ETL steps concurrently with automatic coordination.

Learn more →

What you can build

From data pipelines to database maintenance to cloud-connected workflows — here are the patterns pg_durable handles, each backed by a copy-paste-ready scenario.

📚 Full Scenarios & Use Cases Guide

Every pattern below, written out end to end

One consolidated guide: core orchestration patterns (ETL, parallel aggregation, scheduling, branching), standard operational scenarios (vacuum, bloat & wraparound remediation), and Azure integration examples (Functions, HTTP, human approval) — all with runnable SQL.

🔗 ETL Pipelines

Chain cleanup → transform → load with sequential guarantees. Each step waits for the previous one. Failures stop the pipeline cleanly.

~> sequence |=> variables

📊 Parallel Aggregation

Count users + sum revenue + check inventory simultaneously. Fan out to multiple queries and wait for all to complete.

& parallel df.join()

📦 Order Processing

Capture an order ID, pass it through validation, processing, and completion steps. Variables flow between steps automatically.

|=> capture $var substitution df.sleep()

⏰ Scheduled Jobs

Poll APIs, archive records, or sync data on a cron schedule. Loops run forever and survive restarts.

@> loop df.wait_for_schedule()

🔀 Conditional Branching

Check pending jobs, row counts, or flags — then process or skip based on the result. Branch logic lives in SQL, not application code.

df.if() ?> conditional

✅ Multi-step Validation

Fetch data, validate schema, check business rules, then approve or reject. Each step is checkpointed — failures don't lose progress.

~> sequence df.if() |=> variables

🧹 Database Maintenance

Detect autovacuum blockers, table bloat, or wraparound risk, surface findings for review, wait for approval, then remediate — durably, even across restarts.

?> conditional df.wait_for_signal() @> loop

☁️ Azure Functions & HTTP

Call Azure Functions or any allowlisted HTTPS endpoint straight from SQL with df.http() — chunk documents, enrich rows, or classify records inline.

df.http() ~> sequence

🙋 Human-in-the-Loop Approval

Auto-approve routine work and pause high-stakes actions (large invoices, destructive ops) until a human signals approval — like the invoice-approval example.

df.wait_for_signal() df.if() @> loop
🤖 AI-assisted authoring

Let your AI assistant write the SQL

You describe the workflow in plain English — Copilot writes correct durable-function SQL.

Skip the syntax. Just describe what you want.

This repo ships a reusable agent skill, pg-durable-sql, that teaches GitHub Copilot and other agents how to generate correct durable-function SQL — operators, variable substitution, loops, parallel joins, and more.

📦 100% open source

Open-source durable functions for Postgres

No waitlist, no lock-in. Clone, build, and run durable functions in your own PostgreSQL today.

Bring durable orchestration to any PostgreSQL. Open source

pg_durable is fully open source today. Clone the repo, build the extension, and run durable functions in your own PostgreSQL — on your laptop, your server, or your cloud.

Run it in the cloud Preview

pg_durable, fully managed on Azure HorizonDB

Azure HorizonDB is Microsoft's new PostgreSQL cloud service — engineered for performance and built with pg_durable inside. Keep the durable workflows you write here, and add enterprise scale, security, and AI without managing a single server.

Up to 3× faster

Outscales self-managed Postgres with auto-scaling storage to 128 TB and scale-out compute up to 3,072 vCores.

🛡️

Enterprise protection

Real-time threat detection with Microsoft Defender and identity management through Microsoft Entra ID.

🧠

Built for AI

Filtered DiskANN vector search, semantic ranking, and a curated set of in-database AI models.

🔗

Azure-native

Near-real-time mirroring to Microsoft Fabric, VS Code integration, and GitHub Copilot — one ecosystem.

Built-in AI pipeline

Postgres Native AI pipeline, built on pg_durable

HorizonDB layers a managed, end-to-end AI pipeline on top of pg_durable's durable execution — every stage is checkpointed, retried, and crash-safe, from raw data to ready-to-query vectors.

1IngestLoad docs & data
2ChunkSplit content
3EmbedVectorize
4IndexDiskANN store
5ServeSearch & rank
Run it in the cloud Azure HorizonDB Preview