Spark Basics¶
This notebook explains Spark usage in order: main input, generated outputs, and practical benefits.
1. Setup¶
Initialize Spark and configure OpenAI authentication once. If built-in %%sql is unavailable, this notebook registers a compatible fallback backed by spark.sql.
In [1]:
Copied!
import os
from IPython import get_ipython
from pyspark.sql import SparkSession
from openaivec.spark import responses_udf, setup, task_udf
from openaivec.task import nlp
spark = SparkSession.builder.getOrCreate()
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
raise RuntimeError("Set OPENAI_API_KEY before running this notebook.")
setup(spark, api_key=api_key, responses_model_name="gpt-5.4")
ip = get_ipython()
if ip and "sql" not in ip.magics_manager.magics["cell"]:
def _spark_sql_magic(line: str, cell: str):
return spark.sql(cell).show(truncate=False)
ip.register_magic_function(_spark_sql_magic, "cell", "sql")
import os
from IPython import get_ipython
from pyspark.sql import SparkSession
from openaivec.spark import responses_udf, setup, task_udf
from openaivec.task import nlp
spark = SparkSession.builder.getOrCreate()
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
raise RuntimeError("Set OPENAI_API_KEY before running this notebook.")
setup(spark, api_key=api_key, responses_model_name="gpt-5.4")
ip = get_ipython()
if ip and "sql" not in ip.magics_manager.magics["cell"]:
def _spark_sql_magic(line: str, cell: str):
return spark.sql(cell).show(truncate=False)
ip.register_magic_function(_spark_sql_magic, "cell", "sql")
WARNING: Using incubator modules: jdk.incubator.vector Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 26/04/09 01:11:55 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2. Input: Spark DataFrame¶
Prepare a DataFrame of fruit names. This is the main input to Spark UDFs.
In [2]:
Copied!
fruits = spark.createDataFrame(
[("apple",), ("banana",), ("lemon",), ("grapefruit",)],
["name"],
)
fruits.createOrReplaceTempView("fruits")
fruits = spark.createDataFrame(
[("apple",), ("banana",), ("lemon",), ("grapefruit",)],
["name"],
)
fruits.createOrReplaceTempView("fruits")
In [3]:
Copied!
%%sql
SELECT name
FROM fruits
%%sql
SELECT name
FROM fruits
[Stage 0:> (0 + 1) / 1]
+----------+ |name | +----------+ |apple | |banana | |lemon | |grapefruit| +----------+
3. Output A: plain-text response column¶
Use responses_udf to generate one short text output per row.
In [4]:
Copied!
spark.udf.register(
"describe_fruit",
responses_udf(
instructions="Describe the fruit in one short sentence.",
batch_size=64,
max_concurrency=4,
),
)
spark.udf.register(
"describe_fruit",
responses_udf(
instructions="Describe the fruit in one short sentence.",
batch_size=64,
max_concurrency=4,
),
)
Out[4]:
<pyspark.sql.udf.UserDefinedFunction at 0x11f5174d0>
In [5]:
Copied!
%%sql
SELECT
name,
describe_fruit(name) AS description
FROM fruits
%%sql
SELECT
name,
describe_fruit(name) AS description
FROM fruits
Processing batches: 0%| | 0/1 [00:00<?, ?item/s] Processing batches: 0%| | 0/1 [00:00<?, ?item/s] Processing batches: 0%| | 0/1 [00:00<?, ?item/s] Processing batches: 0%| | 0/1 [00:00<?, ?item/s] (8 + 3) / 11] [Stage 5:===================================================> (10 + 1) / 11]
+----------+---------------------------------------------------------------------------------------+ |name |description | +----------+---------------------------------------------------------------------------------------+ |apple |An apple is a crisp, sweet fruit with smooth skin. | |banana |A banana is a long, curved yellow fruit with soft, sweet flesh. | |lemon |A lemon is a bright yellow citrus fruit with a tart, tangy flavor. | |grapefruit|Grapefruit is a large citrus fruit with tart, juicy flesh and a slightly bitter flavor.| +----------+---------------------------------------------------------------------------------------+
4. Output B: structured task column¶
Use a predefined task to return typed fields with a stable schema.
In [6]:
Copied!
spark.udf.register(
"analyze_sentiment",
task_udf(
nlp.sentiment_analysis(),
batch_size=64,
max_concurrency=4,
),
)
spark.udf.register(
"analyze_sentiment",
task_udf(
nlp.sentiment_analysis(),
batch_size=64,
max_concurrency=4,
),
)
Out[6]:
<pyspark.sql.udf.UserDefinedFunction at 0x11f56e780>
In [7]:
Copied!
%%sql
WITH analyzed AS (
SELECT
name,
analyze_sentiment(name) AS result
FROM fruits
)
SELECT
name,
result.sentiment AS sentiment,
result.confidence AS confidence
FROM analyzed
%%sql
WITH analyzed AS (
SELECT
name,
analyze_sentiment(name) AS result
FROM fruits
)
SELECT
name,
result.sentiment AS sentiment,
result.confidence AS confidence
FROM analyzed
Processing batches: 0%| | 0/1 [00:00<?, ?item/s] Processing batches: 0%| | 0/1 [00:00<?, ?item/s] Processing batches: 0%| | 0/1 [00:00<?, ?item/s] Processing batches: 0%| | 0/1 [00:00<?, ?item/s] [Stage 8:===================================================> (10 + 1) / 11]
+----------+---------+----------+ |name |sentiment|confidence| +----------+---------+----------+ |apple |neutral |0.88 | |banana |neutral |0.96 | |lemon |neutral |0.94 | |grapefruit|neutral |0.95 | +----------+---------+----------+
/opt/homebrew/Cellar/python@3.14/3.14.3_1/Frameworks/Python.framework/Versions/3.14/lib/python3.14/multiprocessing/resource_tracker.py:396: UserWarning: resource_tracker: There appear to be 1 leaked semaphore objects to clean up at shutdown: {'/mp-7rs9jeqv'}
warnings.warn(
/opt/homebrew/Cellar/python@3.14/3.14.3_1/Frameworks/Python.framework/Versions/3.14/lib/python3.14/multiprocessing/resource_tracker.py:396: UserWarning: resource_tracker: There appear to be 1 leaked semaphore objects to clean up at shutdown: {'/mp-3v12lv2m'}
warnings.warn(
/opt/homebrew/Cellar/python@3.14/3.14.3_1/Frameworks/Python.framework/Versions/3.14/lib/python3.14/multiprocessing/resource_tracker.py:396: UserWarning: resource_tracker: There appear to be 1 leaked semaphore objects to clean up at shutdown: {'/mp-j6rl21k3'}
warnings.warn(
/opt/homebrew/Cellar/python@3.14/3.14.3_1/Frameworks/Python.framework/Versions/3.14/lib/python3.14/multiprocessing/resource_tracker.py:396: UserWarning: resource_tracker: There appear to be 1 leaked semaphore objects to clean up at shutdown: {'/mp-1vhiof8q'}
warnings.warn(
/opt/homebrew/Cellar/python@3.14/3.14.3_1/Frameworks/Python.framework/Versions/3.14/lib/python3.14/multiprocessing/resource_tracker.py:396: UserWarning: resource_tracker: There appear to be 1 leaked semaphore objects to clean up at shutdown: {'/mp-b_0hz3lm'}
warnings.warn(
/opt/homebrew/Cellar/python@3.14/3.14.3_1/Frameworks/Python.framework/Versions/3.14/lib/python3.14/multiprocessing/resource_tracker.py:396: UserWarning: resource_tracker: There appear to be 1 leaked semaphore objects to clean up at shutdown: {'/mp-_an8du5t'}
warnings.warn(
5. Benefits¶
Main input
- Spark DataFrame columns
- UDF instructions or predefined tasks
Main output
- New plain-text columns (
responses_udf) - New structured columns (
task_udf)
Why this helps
- Keeps Spark pipelines while adding LLM processing
- Preserves schema for downstream SQL/DataFrame logic
- Scales with
batch_sizeandmax_concurrencytuning