Binary Classification with VowpalWabbit on Criteo Dataset
SparkML Vector input
Read dataset
import pyspark.sql.types as T
from pyspark.sql import functions as F
schema = T.StructType(
[
T.StructField("label", T.IntegerType(), True),
*[T.StructField("i" + str(i), T.IntegerType(), True) for i in range(1, 13)],
*[T.StructField("s" + str(i), T.StringType(), True) for i in range(26)],
]
)
df = (
spark.read.format("csv")
.option("header", False)
.option("delimiter", "\t")
.schema(schema)
.load("wasbs://publicwasb@mmlspark.blob.core.windows.net/criteo_day0_1k.csv.gz")
)
# print dataset basic info
print("records read: " + str(df.count()))
print("Schema: ")
df.printSchema()
display(df)
Reformat into VW-native format
See VW docs for format details
# create VW string format
cols = [
F.col("label"),
F.lit("|"),
*[F.col("i" + str(i)) for i in range(1, 13)],
*[F.col("s" + str(i)) for i in range(26)],
]
df = df.select(F.concat_ws(" ", *cols).alias("value"))
display(df)
Split the dataset into train and test
train, test = df.randomSplit([0.6, 0.4], seed=1)
Model Training
from synapse.ml.vw import VowpalWabbitGeneric
# number of partitions determines data parallelism
train = train.repartition(2)
model = VowpalWabbitGeneric(
numPasses=5,
useBarrierExecutionMode=False,
passThroughArgs="--holdout_off --loss_function logistic --link logistic",
).fit(train)
Model Prediction
predictions = model.transform(test)
predictions = predictions.withColumn(
"prediction", F.col("prediction").cast("double")
).withColumn("label", F.substring("value", 0, 1).cast("double"))
display(predictions)
from synapse.ml.train import ComputeModelStatistics
metrics = ComputeModelStatistics(
evaluationMetric="classification", labelCol="label", scoredLabelsCol="prediction"
).transform(predictions)
display(metrics)