Skip to content

DuckDB Extension

openaivec.duckdb_ext

DuckDB integration for openaivec.

Provides helpers that bridge openaivec's batched AI capabilities with DuckDB:

  • UDF registration – register responses, embeddings and task functions directly as DuckDB scalar UDFs so SQL queries can invoke the OpenAI API transparently.
  • Persistent caching – pass DuckDBCacheBackend as the _cache field of BatchCache for cross-session cache persistence.
  • Vector similaritysimilarity_search performs top-k cosine similarity queries against an embedding table using DuckDB's built-in list_cosine_similarity.
  • Schema → DDLpydantic_to_duckdb_ddl converts a Pydantic model to a CREATE TABLE statement for immediate SQL analysis of structured-output results.

Quick Start

import duckdb
from openaivec.duckdb_ext import register_responses_udf, register_embeddings_udf

conn = duckdb.connect()
register_responses_udf(conn, "translate", instructions="Translate to French")
register_embeddings_udf(conn, "embed")

conn.sql("SELECT translate(review) FROM products")
conn.sql("SELECT text, embed(text) FROM documents")

Classes

Functions

register_responses_udf

register_responses_udf(
    conn: DuckDBPyConnection,
    name: str,
    *,
    instructions: str,
    response_format: type = str,
    model_name: str | None = None,
    batch_size: int = 64,
    max_concurrency: int = 8,
    **api_kwargs: Any,
) -> None

Register a DuckDB Arrow-based UDF that calls the OpenAI Responses API.

The UDF processes rows in vectorized batches via AsyncBatchResponses, leveraging deduplication and concurrency for throughput.

When response_format is a Pydantic BaseModel, the UDF returns a DuckDB STRUCT whose fields match the model, allowing direct field access in SQL (e.g. SELECT udf(text).sentiment FROM ...). When response_format is str, the UDF returns VARCHAR.

Parameters:

Name Type Description Default
conn DuckDBPyConnection

An open DuckDB connection.

required
name str

UDF name visible in SQL.

required
instructions str

System prompt for the model.

required
response_format type

str for plain text or a Pydantic BaseModel for structured output as a DuckDB STRUCT. Defaults to str.

str
model_name str | None

Model or deployment name. Defaults to the container-registered ResponsesModelName.

None
batch_size int

Rows per API batch. Defaults to 64.

64
max_concurrency int

Maximum concurrent API requests. Defaults to 8.

8
**api_kwargs Any

Extra parameters forwarded to the OpenAI API.

{}
Example

import duckdb from pydantic import BaseModel from openaivec.duckdb_ext import register_responses_udf class Sentiment(BaseModel): ... label: str ... score: float conn = duckdb.connect() register_responses_udf(conn, "sentiment", instructions="Analyze sentiment", response_format=Sentiment)

conn.sql("SELECT sentiment(text).label, sentiment(text).score FROM docs")
Source code in src/openaivec/duckdb_ext.py
def register_responses_udf(
    conn: duckdb.DuckDBPyConnection,
    name: str,
    *,
    instructions: str,
    response_format: type = str,
    model_name: str | None = None,
    batch_size: int = 64,
    max_concurrency: int = 8,
    **api_kwargs: Any,
) -> None:
    """Register a DuckDB Arrow-based UDF that calls the OpenAI Responses API.

    The UDF processes rows in vectorized batches via ``AsyncBatchResponses``,
    leveraging deduplication and concurrency for throughput.

    When ``response_format`` is a Pydantic ``BaseModel``, the UDF returns a
    DuckDB ``STRUCT`` whose fields match the model, allowing direct field
    access in SQL (e.g. ``SELECT udf(text).sentiment FROM ...``).
    When ``response_format`` is ``str``, the UDF returns ``VARCHAR``.

    Args:
        conn (duckdb.DuckDBPyConnection): An open DuckDB connection.
        name (str): UDF name visible in SQL.
        instructions (str): System prompt for the model.
        response_format (type): ``str`` for plain text or a Pydantic ``BaseModel``
            for structured output as a DuckDB STRUCT. Defaults to ``str``.
        model_name (str | None): Model or deployment name. Defaults to the
            container-registered ``ResponsesModelName``.
        batch_size (int): Rows per API batch. Defaults to 64.
        max_concurrency (int): Maximum concurrent API requests. Defaults to 8.
        **api_kwargs: Extra parameters forwarded to the OpenAI API.

    Example:
        >>> import duckdb
        >>> from pydantic import BaseModel
        >>> from openaivec.duckdb_ext import register_responses_udf
        >>> class Sentiment(BaseModel):
        ...     label: str
        ...     score: float
        >>> conn = duckdb.connect()
        >>> register_responses_udf(conn, "sentiment", instructions="Analyze sentiment", response_format=Sentiment)
        >>> # conn.sql("SELECT sentiment(text).label, sentiment(text).score FROM docs")
    """

    _model_name = model_name or CONTAINER.resolve(ResponsesModelName).value
    async_client = CONTAINER.resolve(AsyncOpenAI)

    cache: AsyncBatchCache = AsyncBatchCache(
        batch_size=batch_size,
        max_concurrency=max_concurrency,
        max_cache_size=DEFAULT_MANAGED_CACHE_SIZE,
        show_progress=False,
    )
    batch_client = AsyncBatchResponses(
        client=async_client,
        model_name=_model_name,
        system_message=instructions,
        response_format=response_format,
        cache=cache,
        api_kwargs=api_kwargs,
    )

    is_structured = isinstance(response_format, type) and issubclass(response_format, BaseModel)
    return_type = _pydantic_to_struct_type(response_format) if is_structured else duckdb.sqltype("VARCHAR")

    def _batch_udf(arrow_batch: pa.Array) -> pa.Array:
        texts = arrow_batch.to_pylist()
        non_null_indices = [i for i, t in enumerate(texts) if t is not None]
        non_null_texts = [texts[i] for i in non_null_indices]

        if not non_null_texts:
            if is_structured:
                return pa.nulls(len(texts))
            return pa.array([None] * len(texts), type=pa.string())

        results = _run_async(batch_client.parse(non_null_texts))

        out = [None] * len(texts)
        for idx, result in zip(non_null_indices, results):
            if is_structured and isinstance(result, BaseModel):
                out[idx] = _serialize_for_duckdb(result.model_dump())
            elif result is not None:
                out[idx] = str(result)

        return pa.array(out)

    conn.create_function(name, _batch_udf, [duckdb.sqltype("VARCHAR")], return_type, type="arrow")

register_embeddings_udf

register_embeddings_udf(
    conn: DuckDBPyConnection,
    name: str,
    *,
    model_name: str | None = None,
    batch_size: int = 128,
    max_concurrency: int = 8,
    **api_kwargs: Any,
) -> None

Register a DuckDB Arrow-based UDF that returns embedding vectors.

The UDF processes rows in vectorized batches via AsyncBatchEmbeddings, leveraging deduplication and concurrency for throughput.

Parameters:

Name Type Description Default
conn DuckDBPyConnection

An open DuckDB connection.

required
name str

UDF name visible in SQL.

required
model_name str | None

Embeddings model or deployment name.

None
batch_size int

Rows per API batch. Defaults to 128.

128
max_concurrency int

Maximum concurrent API requests. Defaults to 8.

8
**api_kwargs Any

Extra parameters forwarded to the OpenAI API.

{}
Example

import duckdb from openaivec.duckdb_ext import register_embeddings_udf conn = duckdb.connect() register_embeddings_udf(conn, "embed")

conn.sql("SELECT embed(text) FROM docs")
Source code in src/openaivec/duckdb_ext.py
def register_embeddings_udf(
    conn: duckdb.DuckDBPyConnection,
    name: str,
    *,
    model_name: str | None = None,
    batch_size: int = 128,
    max_concurrency: int = 8,
    **api_kwargs: Any,
) -> None:
    """Register a DuckDB Arrow-based UDF that returns embedding vectors.

    The UDF processes rows in vectorized batches via ``AsyncBatchEmbeddings``,
    leveraging deduplication and concurrency for throughput.

    Args:
        conn (duckdb.DuckDBPyConnection): An open DuckDB connection.
        name (str): UDF name visible in SQL.
        model_name (str | None): Embeddings model or deployment name.
        batch_size (int): Rows per API batch. Defaults to 128.
        max_concurrency (int): Maximum concurrent API requests. Defaults to 8.
        **api_kwargs: Extra parameters forwarded to the OpenAI API.

    Example:
        >>> import duckdb
        >>> from openaivec.duckdb_ext import register_embeddings_udf
        >>> conn = duckdb.connect()
        >>> register_embeddings_udf(conn, "embed")
        >>> # conn.sql("SELECT embed(text) FROM docs")
    """

    _model_name = model_name or CONTAINER.resolve(EmbeddingsModelName).value
    async_client = CONTAINER.resolve(AsyncOpenAI)

    cache: AsyncBatchCache[str, np.ndarray] = AsyncBatchCache(
        batch_size=batch_size,
        max_concurrency=max_concurrency,
        max_cache_size=DEFAULT_MANAGED_CACHE_SIZE,
        show_progress=False,
    )
    batch_client = AsyncBatchEmbeddings(
        client=async_client,
        model_name=_model_name,
        cache=cache,
        api_kwargs=api_kwargs,
    )

    def _batch_udf(arrow_batch: pa.Array) -> pa.Array:
        texts = arrow_batch.to_pylist()
        non_null_indices = [i for i, t in enumerate(texts) if t is not None]
        non_null_texts = [texts[i] for i in non_null_indices]

        if not non_null_texts:
            return pa.array([None] * len(texts))

        results = _run_async(batch_client.create(non_null_texts))

        out: list[list[float] | None] = [None] * len(texts)
        for idx, vec in zip(non_null_indices, results):
            out[idx] = vec.tolist()

        return pa.array(out, type=pa.list_(pa.float32()))

    conn.create_function(name, _batch_udf, [duckdb.sqltype("VARCHAR")], duckdb.list_type("FLOAT"), type="arrow")

register_task_udf

register_task_udf(
    conn: DuckDBPyConnection,
    name: str,
    *,
    task: PreparedTask[ResponseFormat],
    model_name: str | None = None,
    batch_size: int = 64,
    max_concurrency: int = 8,
    **api_kwargs: Any,
) -> None

Register a DuckDB UDF backed by a PreparedTask.

Parameters:

Name Type Description Default
conn DuckDBPyConnection

An open DuckDB connection.

required
name str

UDF name visible in SQL.

required
task PreparedTask

Pre-configured task with instructions and response format.

required
model_name str | None

Model or deployment name.

None
batch_size int

Rows per API batch. Defaults to 64.

64
max_concurrency int

Maximum concurrent API requests. Defaults to 8.

8
**api_kwargs Any

Extra parameters forwarded to the OpenAI API.

{}
Source code in src/openaivec/duckdb_ext.py
def register_task_udf(
    conn: duckdb.DuckDBPyConnection,
    name: str,
    *,
    task: PreparedTask[ResponseFormat],
    model_name: str | None = None,
    batch_size: int = 64,
    max_concurrency: int = 8,
    **api_kwargs: Any,
) -> None:
    """Register a DuckDB UDF backed by a ``PreparedTask``.

    Args:
        conn (duckdb.DuckDBPyConnection): An open DuckDB connection.
        name (str): UDF name visible in SQL.
        task (PreparedTask): Pre-configured task with instructions and response format.
        model_name (str | None): Model or deployment name.
        batch_size (int): Rows per API batch. Defaults to 64.
        max_concurrency (int): Maximum concurrent API requests. Defaults to 8.
        **api_kwargs: Extra parameters forwarded to the OpenAI API.
    """
    register_responses_udf(
        conn,
        name,
        instructions=task.instructions,
        response_format=task.response_format,
        model_name=model_name,
        batch_size=batch_size,
        max_concurrency=max_concurrency,
        **api_kwargs,
    )
similarity_search(
    conn: DuckDBPyConnection,
    target_table: str,
    query_table: str,
    *,
    target_column: str = "embedding",
    query_column: str = "embedding",
    target_text_column: str = "text",
    query_text_column: str = "text",
    top_k: int = 10,
) -> duckdb.DuckDBPyRelation

Perform top-k cosine similarity search between two DuckDB tables.

Uses DuckDB's built-in list_cosine_similarity for efficient vector comparison without leaving SQL.

Parameters:

Name Type Description Default
conn DuckDBPyConnection

An open DuckDB connection.

required
target_table str

Table containing candidate embeddings.

required
query_table str

Table containing query embeddings.

required
target_column str

Embedding column in target_table.

'embedding'
query_column str

Embedding column in query_table.

'embedding'
target_text_column str

Text identifier column in target_table.

'text'
query_text_column str

Text identifier column in query_table.

'text'
top_k int

Number of results per query.

10

Returns:

Type Description
DuckDBPyRelation

duckdb.DuckDBPyRelation: A DuckDB relation with columns query_text,

DuckDBPyRelation

target_text, score ordered by descending similarity.

Example

import duckdb from openaivec.duckdb_ext import similarity_search conn = duckdb.connect()

(after populating docs and queries tables with embeddings)

results = similarity_search(conn, "docs", "queries", top_k=5) results.df()

Source code in src/openaivec/duckdb_ext.py
def similarity_search(
    conn: duckdb.DuckDBPyConnection,
    target_table: str,
    query_table: str,
    *,
    target_column: str = "embedding",
    query_column: str = "embedding",
    target_text_column: str = "text",
    query_text_column: str = "text",
    top_k: int = 10,
) -> duckdb.DuckDBPyRelation:
    """Perform top-k cosine similarity search between two DuckDB tables.

    Uses DuckDB's built-in ``list_cosine_similarity`` for efficient
    vector comparison without leaving SQL.

    Args:
        conn (duckdb.DuckDBPyConnection): An open DuckDB connection.
        target_table (str): Table containing candidate embeddings.
        query_table (str): Table containing query embeddings.
        target_column (str): Embedding column in *target_table*.
        query_column (str): Embedding column in *query_table*.
        target_text_column (str): Text identifier column in *target_table*.
        query_text_column (str): Text identifier column in *query_table*.
        top_k (int): Number of results per query.

    Returns:
        duckdb.DuckDBPyRelation: A DuckDB relation with columns ``query_text``,
        ``target_text``, ``score`` ordered by descending similarity.

    Example:
        >>> import duckdb
        >>> from openaivec.duckdb_ext import similarity_search
        >>> conn = duckdb.connect()
        >>> # (after populating docs and queries tables with embeddings)
        >>> results = similarity_search(conn, "docs", "queries", top_k=5)
        >>> results.df()
    """
    sql = f"""
        SELECT
            q.{query_text_column} AS query_text,
            t.{target_text_column} AS target_text,
            list_cosine_similarity(
                t.{target_column}::FLOAT[],
                q.{query_column}::FLOAT[]
            ) AS score
        FROM {query_table} q
        CROSS JOIN {target_table} t
        QUALIFY row_number() OVER (
            PARTITION BY q.{query_text_column}
            ORDER BY list_cosine_similarity(
                t.{target_column}::FLOAT[],
                q.{query_column}::FLOAT[]
            ) DESC
        ) <= {top_k}
        ORDER BY q.{query_text_column}, score DESC
    """
    return conn.sql(sql)

pydantic_to_duckdb_ddl

pydantic_to_duckdb_ddl(
    model: type[BaseModel], table_name: str
) -> str

Generate a CREATE TABLE DDL statement from a Pydantic model.

Parameters:

Name Type Description Default
model type[BaseModel]

The Pydantic model class.

required
table_name str

Name for the DuckDB table.

required

Returns:

Name Type Description
str str

A CREATE TABLE IF NOT EXISTS statement.

Example

from pydantic import BaseModel from openaivec.duckdb_ext import pydantic_to_duckdb_ddl class Review(BaseModel): ... sentiment: str ... rating: int ... tags: list[str] print(pydantic_to_duckdb_ddl(Review, "reviews")) CREATE TABLE IF NOT EXISTS reviews ( sentiment VARCHAR, rating INTEGER, tags VARCHAR[] )

Source code in src/openaivec/duckdb_ext.py
def pydantic_to_duckdb_ddl(model: type[BaseModel], table_name: str) -> str:
    """Generate a ``CREATE TABLE`` DDL statement from a Pydantic model.

    Args:
        model (type[BaseModel]): The Pydantic model class.
        table_name (str): Name for the DuckDB table.

    Returns:
        str: A ``CREATE TABLE IF NOT EXISTS`` statement.

    Example:
        >>> from pydantic import BaseModel
        >>> from openaivec.duckdb_ext import pydantic_to_duckdb_ddl
        >>> class Review(BaseModel):
        ...     sentiment: str
        ...     rating: int
        ...     tags: list[str]
        >>> print(pydantic_to_duckdb_ddl(Review, "reviews"))
        CREATE TABLE IF NOT EXISTS reviews (
            sentiment VARCHAR,
            rating INTEGER,
            tags VARCHAR[]
        )
    """
    columns: list[str] = []
    for field_name, field_info in model.model_fields.items():
        col_type = _python_type_to_duckdb(field_info.annotation) if field_info.annotation else "VARCHAR"
        columns.append(f"    {field_name} {col_type}")
    body = ",\n".join(columns)
    return f"CREATE TABLE IF NOT EXISTS {table_name} (\n{body}\n)"