Skip to main content
Version: Next

Azure OpenAI for Big Data

The Azure OpenAI service can be used to solve a large number of natural language tasks through prompting the completion API. To make it easier to scale your prompting workflows from a few examples to large datasets of examples we have integrated the Azure OpenAI service with the distributed machine learning library SynapseML. This integration makes it easy to use the Apache Spark distributed computing framework to process millions of prompts with the OpenAI service. This tutorial shows how to apply large language models at a distributed scale using Azure Open AI and Azure Synapse Analytics.

Step 1: Prerequisites

The key prerequisites for this quickstart include a working Azure OpenAI resource, and an Apache Spark cluster with SynapseML installed. We suggest creating a Synapse workspace, but an Azure Databricks, HDInsight, or Spark on Kubernetes, or even a python environment with the pyspark package will work.

  1. An Azure OpenAI resource – request access here before creating a resource
  2. Create a Synapse workspace
  3. Create a serverless Apache Spark pool

Step 2: Import this guide as a notebook

The next step is to add this code into your Spark cluster. You can either create a notebook in your Spark platform and copy the code into this notebook to run the demo. Or download the notebook and import it into Synapse Analytics

  1. Download this demo as a notebook (click Raw, then save the file)
  2. Import the notebook into the Synapse Workspace or if using Databricks into the Databricks Workspace
  3. Install SynapseML on your cluster. Please see the installation instructions for Synapse at the bottom of the SynapseML website. Note that this requires pasting an additional cell at the top of the notebook you just imported
  4. Connect your notebook to a cluster and follow along, editing and rnnung the cells below.

Step 3: Fill in your service information

Next, please edit the cell in the notebook to point to your service. In particular set the service_name, deployment_name, location, and key variables to match those for your OpenAI service:

import os
from pyspark.sql import SparkSession
from synapse.ml.core.platform import running_on_synapse, find_secret

# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()
if running_on_synapse():
from notebookutils.visualization import display

# Fill in the following lines with your service information
service_name = "synapseml-openai"
deployment_name = "text-davinci-001"
key = find_secret("openai-api-key") # please replace this with your key as a string

assert key is not None and service_name is not None

Step 4: Create a dataset of prompts

Next, create a dataframe consisting of a series of rows, with one prompt per row.

You can also load data directly from ADLS or other databases. For more information on loading and preparing Spark dataframes, see the Apache Spark data loading guide.

df = spark.createDataFrame(
[
("Hello my name is",),
("The best code is code thats",),
("SynapseML is ",),
]
).toDF("prompt")

Step 5: Create the OpenAICompletion Apache Spark Client

To apply the OpenAI Completion service to your dataframe you just created, create an OpenAICompletion object which serves as a destributed client. Parameters of the service can be set either with a single value, or by a column of the dataframe with the appropriate setters on the OpenAICompletion object. Here we are setting maxTokens to 200. A token is around 4 characters, and this limit applies to the sum of the prompt and the result. We are also setting the promptCol parameter with the name of the prompt column in the dataframe.

from synapse.ml.cognitive import OpenAICompletion

completion = (
OpenAICompletion()
.setSubscriptionKey(key)
.setDeploymentName(deployment_name)
.setUrl("https://{}.openai.azure.com/".format(service_name))
.setMaxTokens(200)
.setPromptCol("prompt")
.setErrorCol("error")
.setOutputCol("completions")
)

Step 5: Transform the dataframe with the OpenAICompletion Client

Now that you have the dataframe and the completion client, you can transform your input dataset and add a column called completions with all of the information the service adds. We will select out just the text for simplicity.

from pyspark.sql.functions import col

completed_df = completion.transform(df).cache()
display(
completed_df.select(
col("prompt"),
col("error"),
col("completions.choices.text").getItem(0).alias("text"),
)
)

Your output should look something like this. Please note completion text will be different

prompterrortext
Hello my name isnullMakaveli I'm eighteen years old and I want to be a rapper when I grow up I love writing and making music I'm from Los Angeles, CA
The best code is code thatsnullunderstandable This is a subjective statement, and there is no definitive answer.
SynapseML isnullA machine learning algorithm that is able to learn how to predict the future outcome of events.

Additional Usage Examples

Improve throughput with request batching

The example above makes several requests to the service, one for each prompt. To complete multiple prompts in a single request, use batch mode. First, in the OpenAICompletion object, instead of setting the Prompt column to "Prompt", specify "batchPrompt" for the BatchPrompt column. To do so, create a dataframe with a list of prompts per row.

Note that as of this writing there is currently a limit of 20 prompts in a single request, as well as a hard limit of 2048 "tokens", or approximately 1500 words.

batch_df = spark.createDataFrame(
[
(["The time has come", "Pleased to", "Today stocks", "Here's to"],),
(["The only thing", "Ask not what", "Every litter", "I am"],),
]
).toDF("batchPrompt")

Next we create the OpenAICompletion object. Rather than setting the prompt column, set the batchPrompt column if your column is of type Array[String].

batch_completion = (
OpenAICompletion()
.setSubscriptionKey(key)
.setDeploymentName(deployment_name)
.setUrl("https://{}.openai.azure.com/".format(service_name))
.setMaxTokens(200)
.setBatchPromptCol("batchPrompt")
.setErrorCol("error")
.setOutputCol("completions")
)

In the call to transform a request will then be made per row. Since there are multiple prompts in a single row, each request will be sent with all prompts in that row. The results will contain a row for each row in the request.

completed_batch_df = batch_completion.transform(batch_df).cache()
display(completed_batch_df)

Using an automatic minibatcher

If your data is in column format, you can transpose it to row format using SynapseML's FixedMiniBatcherTransformer.

from pyspark.sql.types import StringType
from synapse.ml.stages import FixedMiniBatchTransformer
from synapse.ml.core.spark import FluentAPI

completed_autobatch_df = (
df.coalesce(
1
) # Force a single partition so that our little 4-row dataframe makes a batch of size 4, you can remove this step for large datasets
.mlTransform(FixedMiniBatchTransformer(batchSize=4))
.withColumnRenamed("prompt", "batchPrompt")
.mlTransform(batch_completion)
)

display(completed_autobatch_df)

Prompt engineering for translation

The Azure OpenAI service can solve many different natural language tasks through prompt engineering. Here we show an example of prompting for language translation:

translate_df = spark.createDataFrame(
[
("Japanese: Ookina hako \nEnglish: Big box \nJapanese: Midori tako\nEnglish:",),
(
"French: Quel heure et il au Montreal? \nEnglish: What time is it in Montreal? \nFrench: Ou est le poulet? \nEnglish:",
),
]
).toDF("prompt")

display(completion.transform(translate_df))

Prompt for question answering

Here, we prompt GPT-3 for general-knowledge guestion answering:

qa_df = spark.createDataFrame(
[
(
"Q: Where is the Grand Canyon?\nA: The Grand Canyon is in Arizona.\n\nQ: What is the weight of the Burj Khalifa in kilograms?\nA:",
)
]
).toDF("prompt")

display(completion.transform(qa_df))