Skip to main content
Version: 0.11.4

Cognitive Services Advanced Guide: Asynchrony, Batching, Multi-Key

Step 1: Imports and Keys

from import find_secret

service_key = find_secret("cognitive-api-key")
service_loc = "eastus"

Step 2: Basic Usage

Image 1Image 2Image 3
from import AnalyzeImage

# Create a dataframe with the image URLs
base_url = ""
image_df = spark.createDataFrame(
[(base_url + "objects.jpg",), (base_url + "dog.jpg",), (base_url + "house.jpg",)],

# Run the Computer Vision service. Analyze Image extracts infortmation from/about the images.
analyzer = (
["Categories", "Color", "Description", "Faces", "Objects", "Tags"]

image_results = analyzer.transform(image_df).cache()

First we'll look at the full response objects:


We can select out just what we need:


What's going on under the hood

When we call the cognitive service transformer, we start cognitive service clients on each of your spark workers. These clients send requests to the cloud, and turn the JSON responses into Spark Struct Types so that you can access any field that the service returns.

Step 3: Asynchronous Usage

Apache Spark ordinarily parallelizes a computation to all of it's worker threads. When working with services however this parallelism doesent fully maximize throughput because workers sit idle as requests are processed on the server. The concurrency parameter makes sure that each worker can stay busy as they wait for requests to complete.


Faster without extra hardware:

Step 4: Batching

from import TextSentiment

# Create a dataframe
text_df = spark.createDataFrame(
("I am so happy today, its sunny!",),
("I am frustrated by this rush hour traffic",),
("The cognitive services on spark is pretty lit",),

sentiment = (

# Show the results of your text query
display(sentiment.transform(text_df).select("text", "sentiment.document.sentiment"))

Step 5: Multi-Key

from import TextSentiment
from pyspark.sql.functions import udf
import random

service_key_2 = find_secret("cognitive-api-key-2")
keys = [service_key, service_key_2]

def random_key():
return keys[random.randint(0, len(keys) - 1)]

image_df2 = image_df.withColumn("key", random_key())

results = analyzer.setSubscriptionKeyCol("key").transform(image_df2)
display("key", "analysis_results.description.captions.text"))

Learn More