Spark Basics¶
This notebook explains Spark usage in order: main input, generated outputs, and practical benefits.
1. Setup¶
Initialize Spark and configure OpenAI authentication once. If built-in %%sql is unavailable, this notebook registers a compatible fallback backed by spark.sql.
In [4]:
Copied!
import os
from IPython import get_ipython
from pyspark.sql import SparkSession
from openaivec.spark import responses_udf, setup, task_udf
from openaivec.task import nlp
spark = SparkSession.builder.getOrCreate()
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
raise RuntimeError("Set OPENAI_API_KEY before running this notebook.")
setup(spark, api_key=api_key)
ip = get_ipython()
if ip and "sql" not in ip.magics_manager.magics["cell"]:
def _spark_sql_magic(line: str, cell: str):
return spark.sql(cell).show(truncate=False)
ip.register_magic_function(_spark_sql_magic, "cell", "sql")
import os
from IPython import get_ipython
from pyspark.sql import SparkSession
from openaivec.spark import responses_udf, setup, task_udf
from openaivec.task import nlp
spark = SparkSession.builder.getOrCreate()
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
raise RuntimeError("Set OPENAI_API_KEY before running this notebook.")
setup(spark, api_key=api_key)
ip = get_ipython()
if ip and "sql" not in ip.magics_manager.magics["cell"]:
def _spark_sql_magic(line: str, cell: str):
return spark.sql(cell).show(truncate=False)
ip.register_magic_function(_spark_sql_magic, "cell", "sql")
2. Input: Spark DataFrame¶
Prepare a DataFrame of fruit names. This is the main input to Spark UDFs.
In [5]:
Copied!
fruits = spark.createDataFrame(
[("apple",), ("banana",), ("lemon",), ("grapefruit",)],
["name"],
)
fruits.createOrReplaceTempView("fruits")
fruits = spark.createDataFrame(
[("apple",), ("banana",), ("lemon",), ("grapefruit",)],
["name"],
)
fruits.createOrReplaceTempView("fruits")
In [6]:
Copied!
%%sql
SELECT name
FROM fruits
%%sql
SELECT name
FROM fruits
+----------+ |name | +----------+ |apple | |banana | |lemon | |grapefruit| +----------+
3. Output A: plain-text response column¶
Use responses_udf to generate one short text output per row.
In [7]:
Copied!
spark.udf.register(
"describe_fruit",
responses_udf(
instructions="Describe the fruit in one short sentence.",
batch_size=64,
max_concurrency=4,
),
)
spark.udf.register(
"describe_fruit",
responses_udf(
instructions="Describe the fruit in one short sentence.",
batch_size=64,
max_concurrency=4,
),
)
Out[7]:
<pyspark.sql.udf.UserDefinedFunction at 0x118592a10>
In [8]:
Copied!
%%sql
SELECT
name,
describe_fruit(name) AS description
FROM fruits
%%sql
SELECT
name,
describe_fruit(name) AS description
FROM fruits
Processing batches: 100%|██████████| 1/1 [00:04<00:00, 4.98s/item] (3 + 1) / 4] Processing batches: 100%|██████████| 1/1 [00:01<00:00, 1.57s/item](8 + 3) / 11] Processing batches: 100%|██████████| 1/1 [00:01<00:00, 1.25s/item](9 + 2) / 11] [Stage 5:===================================================> (10 + 1) / 11]
+----------+-----------------------------------------------------------------------------------+ |name |description | +----------+-----------------------------------------------------------------------------------+ |apple |An apple is a round fruit with red, green, or yellow skin and a sweet, crisp flesh.| |banana |A banana is a long, curved yellow fruit with soft, sweet flesh. | |lemon |A lemon is a yellow citrus fruit known for its sour taste. | |grapefruit|Grapefruit is a large citrus fruit with a tangy and slightly bitter taste. | +----------+-----------------------------------------------------------------------------------+
Processing batches: 100%|██████████| 1/1 [00:01<00:00, 1.91s/item]
4. Output B: structured task column¶
Use a predefined task to return typed fields with a stable schema.
In [9]:
Copied!
spark.udf.register(
"analyze_sentiment",
task_udf(
nlp.sentiment_analysis(),
batch_size=64,
max_concurrency=4,
),
)
spark.udf.register(
"analyze_sentiment",
task_udf(
nlp.sentiment_analysis(),
batch_size=64,
max_concurrency=4,
),
)
Out[9]:
<pyspark.sql.udf.UserDefinedFunction at 0x118593d60>
In [10]:
Copied!
%%sql
WITH analyzed AS (
SELECT
name,
analyze_sentiment(name) AS result
FROM fruits
)
SELECT
name,
result.sentiment AS sentiment,
result.confidence AS confidence
FROM analyzed
%%sql
WITH analyzed AS (
SELECT
name,
analyze_sentiment(name) AS result
FROM fruits
)
SELECT
name,
result.sentiment AS sentiment,
result.confidence AS confidence
FROM analyzed
Processing batches: 100%|██████████| 1/1 [00:01<00:00, 1.52s/item] (3 + 1) / 4] Processing batches: 100%|██████████| 1/1 [00:01<00:00, 1.64s/item](8 + 3) / 11] Processing batches: 100%|██████████| 1/1 [00:01<00:00, 1.73s/item](9 + 2) / 11]
+----------+---------+----------+ |name |sentiment|confidence| +----------+---------+----------+ |apple |neutral |0.95 | |banana |neutral |0.99 | |lemon |neutral |0.99 | |grapefruit|neutral |0.99 | +----------+---------+----------+
Processing batches: 100%|██████████| 1/1 [00:01<00:00, 1.89s/item]10 + 1) / 11]
5. Benefits¶
Main input
- Spark DataFrame columns
- UDF instructions or predefined tasks
Main output
- New plain-text columns (
responses_udf) - New structured columns (
task_udf)
Why this helps
- Keeps Spark pipelines while adding LLM processing
- Preserves schema for downstream SQL/DataFrame logic
- Scales with
batch_sizeandmax_concurrencytuning