Spark Extension¶
openaivec.spark_ext ¶
Asynchronous Spark UDFs for the OpenAI and Azure OpenAI APIs.
This module provides functions (responses_udf, task_udf, embeddings_udf,
count_tokens_udf, split_to_chunks_udf, similarity_udf, parse_udf)
for creating asynchronous Spark UDFs that communicate with either the public
OpenAI API or Azure OpenAI using the openaivec.spark_ext subpackage.
It supports UDFs for generating responses, creating embeddings, parsing text,
and computing similarities asynchronously. The UDFs operate on Spark DataFrames
and leverage asyncio for improved performance in I/O-bound operations.
Performance Optimization: All AI-powered UDFs (responses_udf, task_udf, embeddings_udf, parse_udf)
automatically cache duplicate inputs within each partition, significantly reducing
API calls and costs when processing datasets with overlapping content.
Setup¶
First, obtain a Spark session and configure authentication:
from pyspark.sql import SparkSession
from openaivec.spark_ext import setup, setup_azure
spark = SparkSession.builder.getOrCreate()
# Option 1: Using OpenAI
setup(
spark,
api_key="your-openai-api-key",
responses_model_name="gpt-4.1-mini", # Optional: set default model
embeddings_model_name="text-embedding-3-small" # Optional: set default model
)
# Option 2: Using Azure OpenAI
# setup_azure(
# spark,
# api_key="your-azure-openai-api-key",
# base_url="https://YOUR-RESOURCE-NAME.services.ai.azure.com/openai/v1/",
# api_version="v1",
# responses_model_name="my-gpt4-deployment", # Optional: set default deployment
# embeddings_model_name="my-embedding-deployment" # Optional: set default deployment
# )
# Option 3: Using Azure OpenAI with Entra ID (no API key)
# Set AZURE_OPENAI_BASE_URL and AZURE_OPENAI_API_VERSION in your environment.
# openaivec automatically uses DefaultAzureCredential when AZURE_OPENAI_API_KEY is not set.
Next, create UDFs and register them:
from openaivec.spark_ext import responses_udf, task_udf, embeddings_udf, count_tokens_udf, split_to_chunks_udf
from pydantic import BaseModel
# Define a Pydantic model for structured responses (optional)
class Translation(BaseModel):
en: str
fr: str
# ... other languages
# Register the asynchronous responses UDF with performance tuning
spark.udf.register(
"translate_async",
responses_udf(
instructions="Translate the text to multiple languages.",
response_format=Translation,
model_name="gpt-4.1-mini", # For Azure: deployment name, for OpenAI: model name
batch_size=64, # Rows per API request within partition
max_concurrency=8 # Concurrent requests PER EXECUTOR
),
)
# Or use a predefined task with task_udf
from openaivec.task import nlp
spark.udf.register(
"sentiment_async",
task_udf(nlp.sentiment_analysis()),
)
# Register the asynchronous embeddings UDF with performance tuning
spark.udf.register(
"embed_async",
embeddings_udf(
model_name="text-embedding-3-small", # For Azure: deployment name, for OpenAI: model name
batch_size=128, # Larger batches for embeddings
max_concurrency=8 # Concurrent requests PER EXECUTOR
),
)
# Register token counting, text chunking, and similarity UDFs
spark.udf.register("count_tokens", count_tokens_udf())
spark.udf.register("split_chunks", split_to_chunks_udf(max_tokens=512, sep=[".", "!", "?"]))
spark.udf.register("compute_similarity", similarity_udf())
You can now invoke the UDFs from Spark SQL:
SELECT
text,
translate_async(text) AS translation,
sentiment_async(text) AS sentiment,
embed_async(text) AS embedding,
count_tokens(text) AS token_count,
split_chunks(text) AS chunks,
compute_similarity(embed_async(text1), embed_async(text2)) AS similarity
FROM your_table;
Performance Considerations¶
When using these UDFs in distributed Spark environments:
-
batch_size: Controls rows processed per API request within each partition. Recommended: 32-128 for responses, 64-256 for embeddings. -
max_concurrency: Sets concurrent API requests PER EXECUTOR, not per cluster. Total cluster concurrency = max_concurrency × number_of_executors. Recommended: 4-12 per executor to avoid overwhelming OpenAI rate limits. -
Rate Limit Management: Monitor OpenAI API usage when scaling executors. Consider your OpenAI tier limits and adjust max_concurrency accordingly.
Example for a 5-executor cluster with max_concurrency=8: Total concurrent requests = 8 × 5 = 40 simultaneous API calls.
Note: AI-powered UDFs run one reusable asyncio event loop per invocation and use partition-local caches to avoid duplicate remote calls inside a partition.
Classes¶
Functions¶
setup ¶
setup(
spark: SparkSession,
api_key: str,
responses_model_name: str | None = None,
embeddings_model_name: str | None = None,
)
Setup OpenAI authentication and default model names in Spark environment. 1. Configures OpenAI API key in SparkContext environment. 2. Configures OpenAI API key in local process environment. 3. Optionally registers default model names for responses and embeddings in the DI container.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
spark
|
SparkSession
|
The Spark session to configure. |
required |
api_key
|
str
|
OpenAI API key for authentication. |
required |
responses_model_name
|
str | None
|
Default model name for response generation.
If provided, registers |
None
|
embeddings_model_name
|
str | None
|
Default model name for embeddings.
If provided, registers |
None
|
Example
Source code in src/openaivec/spark_ext.py
setup_azure ¶
setup_azure(
spark: SparkSession,
api_key: str | None = None,
base_url: str | None = None,
api_version: str = "v1",
responses_model_name: str | None = None,
embeddings_model_name: str | None = None,
)
Setup Azure OpenAI authentication and default model names in Spark environment. 1. Configures Azure OpenAI base URL and API version in SparkContext environment. 2. Optionally configures Azure OpenAI API key in SparkContext environment. 3. Configures Azure OpenAI base URL and API version in local process environment. 4. Optionally configures Azure OpenAI API key in local process environment. 5. Optionally registers default model names for responses and embeddings in the DI container.
Note
For API-key authentication, provide api_key. For Entra ID authentication,
omit api_key and configure only base_url and api_version.
Args:
spark (SparkSession): The Spark session to configure.
api_key (str | None): Azure OpenAI API key for authentication. When not
provided, AZURE_OPENAI_API_KEY is cleared and Entra ID can be used.
base_url (str | None): Base URL for the Azure OpenAI resource. Required.
api_version (str): API version to use. Defaults to "v1".
responses_model_name (str | None): Default model name for response generation.
If provided, registers ResponsesModelName in the DI container.
embeddings_model_name (str | None): Default model name for embeddings.
If provided, registers EmbeddingsModelName in the DI container.
Example
from pyspark.sql import SparkSession
from openaivec.spark_ext import setup_azure
spark = SparkSession.builder.getOrCreate()
setup_azure(
spark,
api_key="azure-key",
base_url="https://YOUR-RESOURCE-NAME.services.ai.azure.com/openai/v1/",
api_version="v1",
responses_model_name="gpt4-deployment",
embeddings_model_name="embedding-deployment",
)
Raises:
ValueError: If base_url is not provided.
Source code in src/openaivec/spark_ext.py
setup_entra_id ¶
setup_entra_id(
spark: SparkSession,
base_url: str,
tenant_id: str,
client_id: str,
client_secret: str | None = None,
kv_url: str | None = None,
kv_secret_name: str | None = None,
api_version: str = "v1",
responses_model_name: str | None = None,
embeddings_model_name: str | None = None,
)
Setup Entra ID (Service Principal) authentication for Spark environment.
Configures Azure OpenAI with Service Principal credentials using
DefaultAzureCredential (via EnvironmentCredential).
Propagates credentials to both the local process and Spark executors
via sc.environment.
The client secret can be provided directly via client_secret, or
retrieved from Key Vault via kv_url and kv_secret_name (requires
notebookutils on the Fabric driver).
Note
Any existing OPENAI_API_KEY or AZURE_OPENAI_API_KEY is
cleared to ensure the Entra ID path is used.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
spark
|
SparkSession
|
The Spark session to configure. |
required |
base_url
|
str
|
Base URL for the Azure OpenAI resource
(e.g. |
required |
tenant_id
|
str
|
Entra ID tenant ID. |
required |
client_id
|
str
|
Service Principal (App Registration) client ID. |
required |
client_secret
|
str | None
|
Service Principal client secret.
If |
None
|
kv_url
|
str | None
|
Azure Key Vault URL for secret retrieval.
Required when |
None
|
kv_secret_name
|
str | None
|
Secret name in Key Vault.
Required when |
None
|
api_version
|
str
|
API version to use. Defaults to |
'v1'
|
responses_model_name
|
str | None
|
Default model name for response generation.
If provided, registers |
None
|
embeddings_model_name
|
str | None
|
Default model name for embeddings.
If provided, registers |
None
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If neither |
Example
from pyspark.sql import SparkSession
from openaivec.spark_ext import setup_entra_id
spark = SparkSession.builder.getOrCreate()
# Option 1: Provide client_secret directly
setup_entra_id(
spark,
base_url="https://YOUR-RESOURCE.services.ai.azure.com/openai/v1/",
tenant_id="your-tenant-id",
client_id="your-client-id",
client_secret="your-secret",
)
# Option 2: Retrieve from Key Vault (Fabric driver only)
setup_entra_id(
spark,
base_url="https://YOUR-RESOURCE.services.ai.azure.com/openai/v1/",
tenant_id="your-tenant-id",
client_id="your-client-id",
kv_url="https://YOUR-KEYVAULT.vault.azure.net/",
kv_secret_name="your-secret-name",
)
Source code in src/openaivec/spark_ext.py
303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 | |
responses_udf ¶
responses_udf(
instructions: str,
response_format: type[ResponseFormat] = str,
model_name: str | None = None,
batch_size: int | None = None,
max_concurrency: int = 8,
multimodal: bool = False,
**api_kwargs,
) -> UserDefinedFunction
Create an asynchronous Spark pandas UDF for generating responses.
Configures and builds UDFs that use AsyncBatchResponses on a single
reusable event loop per UDF invocation. Each partition maintains its own
bounded cache to eliminate duplicate API calls within the partition while
avoiding repeated event-loop setup per Arrow batch.
Note
Authentication must be configured via SparkContext environment variables. Set the appropriate environment variables on the SparkContext:
For OpenAI: sc.environment["OPENAI_API_KEY"] = "your-openai-api-key"
For Azure OpenAI: API key auth: sc.environment["AZURE_OPENAI_API_KEY"] = "your-azure-openai-api-key" sc.environment["AZURE_OPENAI_BASE_URL"] = "https://YOUR-RESOURCE-NAME.services.ai.azure.com/openai/v1/" sc.environment["AZURE_OPENAI_API_VERSION"] = "v1" Entra ID auth: sc.environment["AZURE_OPENAI_BASE_URL"] = "https://YOUR-RESOURCE-NAME.services.ai.azure.com/openai/v1/" sc.environment["AZURE_OPENAI_API_VERSION"] = "v1" # Do not set AZURE_OPENAI_API_KEY
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
instructions
|
str
|
The system prompt or instructions for the model. |
required |
response_format
|
type[ResponseFormat]
|
The desired output format. Either |
str
|
model_name
|
str | None
|
For Azure OpenAI, use your deployment name (e.g., "my-gpt4-deployment"). For OpenAI, use the model name (e.g., "gpt-4.1-mini"). Defaults to configured model in DI container via ResponsesModelName if not provided. |
None
|
batch_size
|
int | None
|
Number of rows per async batch request within each partition. Larger values reduce API call overhead but increase memory usage. Defaults to None (automatic batch size optimization that dynamically adjusts based on execution time, targeting 30-60 seconds per batch). Set to a positive integer (e.g., 32-128) for fixed batch size. |
None
|
max_concurrency
|
int
|
Maximum number of concurrent API requests PER EXECUTOR. Total cluster concurrency = max_concurrency × number_of_executors. Higher values increase throughput but may hit OpenAI rate limits. Recommended: 4-12 per executor. Defaults to 8. |
8
|
**api_kwargs
|
Additional OpenAI API parameters (e.g. |
{}
|
Returns:
| Name | Type | Description |
|---|---|---|
UserDefinedFunction |
UserDefinedFunction
|
A Spark pandas UDF configured to generate responses asynchronously.
Output schema is |
Raises:
| Type | Description |
|---|---|
ValueError
|
If |
Example
from pyspark.sql import SparkSession
from openaivec.spark_ext import responses_udf, setup
spark = SparkSession.builder.getOrCreate()
setup(spark, api_key="sk-***", responses_model_name="gpt-4.1-mini")
udf = responses_udf("Reply with one word.")
spark.udf.register("short_answer", udf)
df = spark.createDataFrame([("hello",), ("bye",)], ["text"])
df.selectExpr("short_answer(text) as reply").show()
Note
For optimal performance in distributed environments:
- Automatic Caching: Duplicate inputs within each partition are cached,
reducing API calls and costs significantly on datasets with repeated content
- Monitor OpenAI API rate limits when scaling executor count
- Consider your OpenAI tier limits: total_requests = max_concurrency × executors
- Use Spark UI to optimize partition sizes relative to batch_size
- Multimodal: Local file paths are not accessible from executors.
Use HTTP(S) URLs or pre-encoded data URIs when multimodal=True.
Source code in src/openaivec/spark_ext.py
511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 | |
task_udf ¶
task_udf(
task: PreparedTask[ResponseFormat],
model_name: str | None = None,
batch_size: int | None = None,
max_concurrency: int = 8,
multimodal: bool = False,
**api_kwargs,
) -> UserDefinedFunction
Create an asynchronous Spark pandas UDF from a predefined task.
This function allows users to create UDFs from predefined tasks such as sentiment analysis, translation, or other common NLP operations defined in the openaivec.task module. Each partition maintains its own cache to eliminate duplicate API calls within the partition, significantly reducing API usage and costs when processing datasets with overlapping content.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
task
|
PreparedTask
|
A predefined task configuration containing instructions and response format. |
required |
model_name
|
str | None
|
For Azure OpenAI, use your deployment name (e.g., "my-gpt4-deployment"). For OpenAI, use the model name (e.g., "gpt-4.1-mini"). Defaults to configured model in DI container via ResponsesModelName if not provided. |
None
|
batch_size
|
int | None
|
Number of rows per async batch request within each partition. Larger values reduce API call overhead but increase memory usage. Defaults to None (automatic batch size optimization that dynamically adjusts based on execution time, targeting 30-60 seconds per batch). Set to a positive integer (e.g., 32-128) for fixed batch size. |
None
|
max_concurrency
|
int
|
Maximum number of concurrent API requests PER EXECUTOR. Total cluster concurrency = max_concurrency × number_of_executors. Higher values increase throughput but may hit OpenAI rate limits. Recommended: 4-12 per executor. Defaults to 8. |
8
|
Additional Keyword Args
Arbitrary OpenAI Responses API parameters (e.g. temperature, top_p,
frequency_penalty, presence_penalty, seed, max_output_tokens, etc.)
are forwarded verbatim to the underlying API calls. These parameters are applied to
all API requests made by the UDF.
Returns:
| Name | Type | Description |
|---|---|---|
UserDefinedFunction |
UserDefinedFunction
|
A Spark pandas UDF configured to execute the specified task asynchronously with automatic caching for duplicate inputs within each partition. Output schema is StringType for str response format or a struct derived from the task's response format for BaseModel. |
Example
Note
Automatic Caching: Duplicate inputs within each partition are cached, reducing API calls and costs significantly on datasets with repeated content.
Source code in src/openaivec/spark_ext.py
infer_schema ¶
infer_schema(
instructions: str,
example_table_name: str,
example_field_name: str,
max_examples: int = 100,
) -> SchemaInferenceOutput
Infer the schema for a response format based on example data.
This function retrieves examples from a Spark table and infers the schema for the response format using the provided instructions. It is useful when you want to dynamically generate a schema based on existing data.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
instructions
|
str
|
Instructions for the model to infer the schema. |
required |
example_table_name
|
str | None
|
Name of the Spark table containing example data. |
required |
example_field_name
|
str | None
|
Name of the field in the table to use as examples. |
required |
max_examples
|
int
|
Maximum number of examples to retrieve for schema inference. |
100
|
Returns:
| Name | Type | Description |
|---|---|---|
InferredSchema |
SchemaInferenceOutput
|
An object containing the inferred schema and response format. |
Example
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
spark.createDataFrame([("great product",), ("bad service",)], ["text"]).createOrReplaceTempView("examples")
infer_schema(
instructions="Classify sentiment as positive or negative.",
example_table_name="examples",
example_field_name="text",
max_examples=2,
)
Source code in src/openaivec/spark_ext.py
parse_udf ¶
parse_udf(
instructions: str,
response_format: type[ResponseFormat] | None = None,
example_table_name: str | None = None,
example_field_name: str | None = None,
max_examples: int = 100,
model_name: str | None = None,
batch_size: int | None = None,
max_concurrency: int = 8,
multimodal: bool = False,
**api_kwargs,
) -> UserDefinedFunction
Create an asynchronous Spark pandas UDF for parsing responses. This function allows users to create UDFs that parse responses based on provided instructions and either a predefined response format or example data. It supports both structured responses using Pydantic models and plain text responses. Each partition maintains its own cache to eliminate duplicate API calls within the partition, significantly reducing API usage and costs when processing datasets with overlapping content.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
instructions
|
str
|
The system prompt or instructions for the model. |
required |
response_format
|
type[ResponseFormat] | None
|
The desired output format.
Either |
None
|
example_table_name
|
str | None
|
Name of the Spark table containing example data.
If provided, |
None
|
example_field_name
|
str | None
|
Name of the field in the table to use as examples.
If provided, |
None
|
max_examples
|
int
|
Maximum number of examples to retrieve for schema inference. Defaults to 100. |
100
|
model_name
|
str | None
|
For Azure OpenAI, use your deployment name (e.g., "my-gpt4-deployment"). For OpenAI, use the model name (e.g., "gpt-4.1-mini"). Defaults to configured model in DI container via ResponsesModelName if not provided. |
None
|
batch_size
|
int | None
|
Number of rows per async batch request within each partition. Larger values reduce API call overhead but increase memory usage. Defaults to None (automatic batch size optimization that dynamically adjusts based on execution time, targeting 30-60 seconds per batch). Set to a positive integer (e.g., 32-128) for fixed batch size |
None
|
max_concurrency
|
int
|
Maximum number of concurrent API requests PER EXECUTOR. Total cluster concurrency = max_concurrency × number_of_executors. Higher values increase throughput but may hit OpenAI rate limits. Recommended: 4-12 per executor. Defaults to 8. |
8
|
**api_kwargs
|
Additional OpenAI API parameters (e.g. |
{}
|
Example:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
spark.createDataFrame(
[("Order #123 delivered",), ("Order #456 delayed",)],
["body"],
).createOrReplaceTempView("messages")
udf = parse_udf(
instructions="Extract order id as `order_id` and status as `status`.",
example_table_name="messages",
example_field_name="body",
)
spark.udf.register("parse_ticket", udf)
spark.sql("SELECT parse_ticket(body) AS parsed FROM messages").show()
StringType for str response format or a struct derived from
the response_format for BaseModel.
Raises:
ValueError: If neither response_format nor example_table_name and example_field_name are provided.
Source code in src/openaivec/spark_ext.py
783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 | |
embeddings_udf ¶
embeddings_udf(
model_name: str | None = None,
batch_size: int | None = None,
max_concurrency: int = 8,
**api_kwargs,
) -> UserDefinedFunction
Create an asynchronous Spark pandas UDF for generating embeddings.
Configures and builds UDFs that use AsyncBatchEmbeddings on a single
reusable event loop per UDF invocation. Each partition maintains its own
bounded cache to eliminate duplicate API calls within the partition while
avoiding repeated event-loop setup per Arrow batch.
Note
Authentication must be configured via SparkContext environment variables. Set the appropriate environment variables on the SparkContext:
For OpenAI: sc.environment["OPENAI_API_KEY"] = "your-openai-api-key"
For Azure OpenAI: API key auth: sc.environment["AZURE_OPENAI_API_KEY"] = "your-azure-openai-api-key" sc.environment["AZURE_OPENAI_BASE_URL"] = "https://YOUR-RESOURCE-NAME.services.ai.azure.com/openai/v1/" sc.environment["AZURE_OPENAI_API_VERSION"] = "v1" Entra ID auth: sc.environment["AZURE_OPENAI_BASE_URL"] = "https://YOUR-RESOURCE-NAME.services.ai.azure.com/openai/v1/" sc.environment["AZURE_OPENAI_API_VERSION"] = "v1" # Do not set AZURE_OPENAI_API_KEY
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
model_name
|
str | None
|
For Azure OpenAI, use your deployment name (e.g., "my-embedding-deployment"). For OpenAI, use the model name (e.g., "text-embedding-3-small"). Defaults to configured model in DI container via EmbeddingsModelName if not provided. |
None
|
batch_size
|
int | None
|
Number of rows per async batch request within each partition. Larger values reduce API call overhead but increase memory usage. Defaults to None (automatic batch size optimization that dynamically adjusts based on execution time, targeting 30-60 seconds per batch). Set to a positive integer (e.g., 64-256) for fixed batch size. Embeddings typically handle larger batches efficiently. |
None
|
max_concurrency
|
int
|
Maximum number of concurrent API requests PER EXECUTOR. Total cluster concurrency = max_concurrency × number_of_executors. Higher values increase throughput but may hit OpenAI rate limits. Recommended: 4-12 per executor. Defaults to 8. |
8
|
**api_kwargs
|
Additional OpenAI API parameters (e.g., dimensions for text-embedding-3 models). |
{}
|
Returns:
| Name | Type | Description |
|---|---|---|
UserDefinedFunction |
UserDefinedFunction
|
A Spark pandas UDF configured to generate embeddings asynchronously
with automatic caching for duplicate inputs within each partition,
returning an |
Note
For optimal performance in distributed environments: - Automatic Caching: Duplicate inputs within each partition are cached, reducing API calls and costs significantly on datasets with repeated content - Monitor OpenAI API rate limits when scaling executor count - Consider your OpenAI tier limits: total_requests = max_concurrency × executors - Embeddings API typically has higher throughput than chat completions - Use larger batch_size for embeddings compared to response generation
Source code in src/openaivec/spark_ext.py
886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 | |
split_to_chunks_udf ¶
Create a pandas‑UDF that splits text into token‑bounded chunks.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
max_tokens
|
int
|
Maximum tokens allowed per chunk. |
required |
sep
|
list[str]
|
Ordered list of separator strings used by |
required |
Returns:
| Type | Description |
|---|---|
UserDefinedFunction
|
A pandas UDF producing an |
Source code in src/openaivec/spark_ext.py
count_tokens_udf ¶
Create a pandas‑UDF that counts tokens for every string cell.
The UDF uses tiktoken to approximate tokenisation and caches the
resulting Encoding object per executor.
Returns:
| Type | Description |
|---|---|
UserDefinedFunction
|
A pandas UDF producing an |
Source code in src/openaivec/spark_ext.py
similarity_udf ¶
Create a pandas-UDF that computes cosine similarity between embedding vectors.
Returns:
| Name | Type | Description |
|---|---|---|
UserDefinedFunction |
UserDefinedFunction
|
A Spark pandas UDF that takes two embedding vector columns and returns their cosine similarity as a FloatType column. |