OpenCV - Pipeline Image Transformations
This example shows how to manipulate the collection of images. First, the images are downloaded to the local directory. Second, they are copied to your cluster's attached HDFS.
The images are loaded from the directory (for fast prototyping, consider loading a fraction of images). Inside the dataframe, each image is a single field in the image column. The image has sub-fields (path, height, width, OpenCV type and OpenCV bytes).
from synapse.ml.opencv import toNDArray
from synapse.ml.io import *
imageDir = "wasbs://publicwasb@mmlspark.blob.core.windows.net/sampleImages"
images = spark.read.image().load(imageDir).cache()
images.printSchema()
print(images.count())
We can also alternatively stream the images with a similar api. Check the Structured Streaming Programming Guide for more details on streaming.
import time
imageStream = spark.readStream.image().load(imageDir)
query = (
imageStream.select("image.height")
.writeStream.format("memory")
.queryName("heights")
.start()
)
time.sleep(3)
print("Streaming query activity: {}".format(query.isActive))
Wait a few seconds and then try querying for the images below. Note that when streaming a directory of images that already exists it will consume all images in a single batch. If one were to move images into the directory, the streaming engine would pick up on them and send them as another batch.
heights = spark.sql("select * from heights")
print("Streamed {} heights".format(heights.count()))
After we have streamed the images we can stop the query:
from py4j.protocol import Py4JJavaError
try:
query.stop()
except Py4JJavaError as e:
print(e)
When collected from the DataFrame, the image data are stored in a Row, which is Spark's way
to represent structures (in the current example, each dataframe row has a single Image, which
itself is a Row). It is possible to address image fields by name and use toNDArray()
helper
function to convert the image into numpy array for further manipulations.
from synapse.ml.core.platform import running_on_binder
if running_on_binder():
from IPython import get_ipython
from PIL import Image
import matplotlib.pyplot as plt
data = images.take(3) # take first three rows of the dataframe
im = data[2][0] # the image is in the first column of a given row
print("image type: {}, number of fields: {}".format(type(im), len(im)))
print("image path: {}".format(im.origin))
print("height: {}, width: {}, OpenCV type: {}".format(im.height, im.width, im.mode))
arr = toNDArray(im) # convert to numpy array
print(images.count())
plt.imshow(Image.fromarray(arr, "RGB")) # display the image inside notebook
Use ImageTransformer
for the basic image manipulation: resizing, cropping, etc.
Internally, operations are pipelined and backed by OpenCV implementation.
from synapse.ml.opencv import ImageTransformer
tr = (
ImageTransformer() # images are resized and then cropped
.setOutputCol("transformed")
.resize(size=(200, 200))
.crop(0, 0, height=180, width=180)
)
small = tr.transform(images).select("transformed")
im = small.take(3)[2][0] # take third image
plt.imshow(Image.fromarray(toNDArray(im), "RGB")) # display the image inside notebook
For the advanced image manipulations, use Spark UDFs. The SynapseML package provides conversion function between Spark Row and ndarray image representations.
from pyspark.sql.functions import udf
from synapse.ml.opencv import ImageSchema, toNDArray, toImage
def u(row):
array = toNDArray(row) # convert Image to numpy ndarray[height, width, 3]
array[:, :, 2] = 0
return toImage(array) # numpy array back to Spark Row structure
noBlueUDF = udf(u, ImageSchema)
noblue = small.withColumn("noblue", noBlueUDF(small["transformed"])).select("noblue")
im = noblue.take(3)[2][0] # take second image
plt.imshow(Image.fromarray(toNDArray(im), "RGB")) # display the image inside notebook
Images could be unrolled into the dense 1D vectors suitable for CNTK evaluation.
from synapse.ml.image import UnrollImage
unroller = UnrollImage().setInputCol("noblue").setOutputCol("unrolled")
unrolled = unroller.transform(noblue).select("unrolled")
vector = unrolled.take(1)[0][0]
print(type(vector))
len(vector.toArray())