Skip to main content
Version: 0.10.2

AutoML

FindBestModel

from synapse.ml.automl import *
from synapse.ml.train import *
from pyspark.ml.classification import RandomForestClassifier

df = (spark.createDataFrame([
(0, 2, 0.50, 0.60, 0),
(1, 3, 0.40, 0.50, 1),
(0, 4, 0.78, 0.99, 2),
(1, 5, 0.12, 0.34, 3),
(0, 1, 0.50, 0.60, 0),
(1, 3, 0.40, 0.50, 1),
(0, 3, 0.78, 0.99, 2),
(1, 4, 0.12, 0.34, 3),
(0, 0, 0.50, 0.60, 0),
(1, 2, 0.40, 0.50, 1),
(0, 3, 0.78, 0.99, 2),
(1, 4, 0.12, 0.34, 3)
], ["Label", "col1", "col2", "col3", "col4"]))

# mocking models
randomForestClassifier = (TrainClassifier()
.setModel(RandomForestClassifier()
.setMaxBins(32)
.setMaxDepth(5)
.setMinInfoGain(0.0)
.setMinInstancesPerNode(1)
.setNumTrees(20)
.setSubsamplingRate(1.0)
.setSeed(0))
.setFeaturesCol("mlfeatures")
.setLabelCol("Label"))
model = randomForestClassifier.fit(df)

findBestModel = (FindBestModel()
.setModels([model, model])
.setEvaluationMetric("accuracy"))
bestModel = findBestModel.fit(df)
bestModel.transform(df).show()
Python API: FindBestModelScala API: FindBestModel.NET API: FindBestModelSource: FindBestModel

TuneHyperparameters

from synapse.ml.automl import *
from synapse.ml.train import *
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier


df = (spark.createDataFrame([
(0, 1, 1, 1, 1, 1, 1.0, 3, 1, 1),
(0, 1, 1, 1, 1, 2, 1.0, 1, 1, 1),
(0, 1, 1, 1, 1, 2, 1.0, 2, 1, 1),
(0, 1, 2, 3, 1, 2, 1.0, 3, 1, 1),
(0, 3, 1, 1, 1, 2, 1.0, 3, 1, 1)
], ["Label", "Clump_Thickness", "Uniformity_of_Cell_Size",
"Uniformity_of_Cell_Shape", "Marginal_Adhesion", "Single_Epithelial_Cell_Size",
"Bare_Nuclei", "Bland_Chromatin", "Normal_Nucleoli", "Mitoses"]))

logReg = LogisticRegression()
randForest = RandomForestClassifier()
gbt = GBTClassifier()
smlmodels = [logReg, randForest, gbt]
mmlmodels = [TrainClassifier(model=model, labelCol="Label") for model in smlmodels]

paramBuilder = (HyperparamBuilder()
.addHyperparam(logReg, logReg.regParam, RangeHyperParam(0.1, 0.3))
.addHyperparam(randForest, randForest.numTrees, DiscreteHyperParam([5,10]))
.addHyperparam(randForest, randForest.maxDepth, DiscreteHyperParam([3,5]))
.addHyperparam(gbt, gbt.maxBins, RangeHyperParam(8,16))
.addHyperparam(gbt, gbt.maxDepth, DiscreteHyperParam([3,5])))
searchSpace = paramBuilder.build()
# The search space is a list of params to tuples of estimator and hyperparam
randomSpace = RandomSpace(searchSpace)

bestModel = TuneHyperparameters(
evaluationMetric="accuracy", models=mmlmodels, numFolds=2,
numRuns=len(mmlmodels) * 2, parallelism=2,
paramSpace=randomSpace.space(), seed=0).fit(df)
Python API: TuneHyperparametersScala API: TuneHyperparameters.NET API: TuneHyperparametersSource: TuneHyperparameters

Featurize

CleanMissingData

from synapse.ml.featurize import *

dataset = spark.createDataFrame([
(0, 2, 0.50, 0.60, 0),
(1, 3, 0.40, None, None),
(0, 4, 0.78, 0.99, 2),
(1, 5, 0.12, 0.34, 3),
(0, 1, 0.50, 0.60, 0),
(None, None, None, None, None),
(0, 3, 0.78, 0.99, 2),
(1, 4, 0.12, 0.34, 3),
(0, None, 0.50, 0.60, 0),
(1, 2, 0.40, 0.50, None),
(0, 3, None, 0.99, 2),
(1, 4, 0.12, 0.34, 3)
], ["col1", "col2", "col3", "col4", "col5"])

cmd = (CleanMissingData()
.setInputCols(dataset.columns)
.setOutputCols(dataset.columns)
.setCleaningMode("Mean"))
Python API: CleanMissingDataScala API: CleanMissingData.NET API: CleanMissingDataSource: CleanMissingData

CountSelector

from synapse.ml.featurize import *
from pyspark.ml.linalg import Vectors

df = spark.createDataFrame([
(Vectors.sparse(3, [(0, 1.0), (2, 2.0)]), Vectors.dense(1.0, 0.1, 0)),
(Vectors.sparse(3, [(0, 1.0), (2, 2.0)]), Vectors.dense(1.0, 0.1, 0))
], ["col1", "col2"])

cs = CountSelector().setInputCol("col1").setOutputCol("col3")

cs.fit(df).transform(df).show()
Python API: CountSelectorScala API: CountSelector.NET API: CountSelectorSource: CountSelector

Featurize

from synapse.ml.featurize import *

dataset = spark.createDataFrame([
(0, 2, 0.50, 0.60, "pokemon are everywhere"),
(1, 3, 0.40, 0.50, "they are in the woods"),
(0, 4, 0.78, 0.99, "they are in the water"),
(1, 5, 0.12, 0.34, "they are in the fields"),
(0, 3, 0.78, 0.99, "pokemon - gotta catch em all")
], ["Label", "col1", "col2", "col3"])

feat = (Featurize()
.setNumFeatures(10)
.setOutputCol("testColumn")
.setInputCols(["col1", "col2", "col3"])
.setOneHotEncodeCategoricals(False))

feat.fit(dataset).transform(dataset).show()
Python API: FeaturizeScala API: Featurize.NET API: FeaturizeSource: Featurize

ValueIndexer

from synapse.ml.featurize import *

df = spark.createDataFrame([
(-3, 24, 0.32534, True, "piano"),
(1, 5, 5.67, False, "piano"),
(-3, 5, 0.32534, False, "guitar")
], ["int", "long", "double", "bool", "string"])

vi = ValueIndexer().setInputCol("string").setOutputCol("string_cat")

vi.fit(df).transform(df).show()
Python API: ValueIndexerScala API: ValueIndexer.NET API: ValueIndexerSource: ValueIndexer

Featurize Text

TextFeaturizer

from synapse.ml.featurize.text import *

dfRaw = spark.createDataFrame([
(0, "Hi I"),
(1, "I wish for snow today"),
(2, "we Cant go to the park, because of the snow!"),
(3, "")
], ["label", "sentence"])

tfRaw = (TextFeaturizer()
.setInputCol("sentence")
.setOutputCol("features")
.setNumFeatures(20))

tfRaw.fit(dfRaw).transform(dfRaw).show()
Python API: TextFeaturizerScala API: TextFeaturizer.NET API: TextFeaturizerSource: TextFeaturizer

Isolation Forest

IsolationForest

from synapse.ml.isolationforest import *

isolationForest = (IsolationForest()
.setNumEstimators(100)
.setBootstrap(False)
.setMaxSamples(256)
.setMaxFeatures(1.0)
.setFeaturesCol("features")
.setPredictionCol("predictedLabel")
.setScoreCol("outlierScore")
.setContamination(0.02)
.setContaminationError(0.02 * 0.01)
.setRandomSeed(1))
Python API: IsolationForestScala API: IsolationForest.NET API: IsolationForestSource: IsolationForest

NN

ConditionalKNN

from synapse.ml.nn import *

cknn = (ConditionalKNN()
.setOutputCol("matches")
.setFeaturesCol("features"))
Python API: ConditionalKNNScala API: ConditionalKNN.NET API: ConditionalKNNSource: ConditionalKNN

KNN

from synapse.ml.nn import *

knn = (KNN()
.setOutputCol("matches"))
Python API: KNNScala API: KNN.NET API: KNNSource: KNN

Recommendation

RecommendationIndexer, RankingEvaluator, RankingAdapter and RankingTrainValidationSplit

from synapse.ml.recommendation import *
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import *

ratings = (spark.createDataFrame([
("11", "Movie 01", 2),
("11", "Movie 03", 1),
("11", "Movie 04", 5),
("11", "Movie 05", 3),
("11", "Movie 06", 4),
("11", "Movie 07", 1),
("11", "Movie 08", 5),
("11", "Movie 09", 3),
("22", "Movie 01", 4),
("22", "Movie 02", 5),
("22", "Movie 03", 1),
("22", "Movie 05", 3),
("22", "Movie 06", 3),
("22", "Movie 07", 5),
("22", "Movie 08", 1),
("22", "Movie 10", 3),
("33", "Movie 01", 4),
("33", "Movie 03", 1),
("33", "Movie 04", 5),
("33", "Movie 05", 3),
("33", "Movie 06", 4),
("33", "Movie 08", 1),
("33", "Movie 09", 5),
("33", "Movie 10", 3),
("44", "Movie 01", 4),
("44", "Movie 02", 5),
("44", "Movie 03", 1),
("44", "Movie 05", 3),
("44", "Movie 06", 4),
("44", "Movie 07", 5),
("44", "Movie 08", 1),
("44", "Movie 10", 3)
], ["customerIDOrg", "itemIDOrg", "rating"])
.dropDuplicates()
.cache())

recommendationIndexer = (RecommendationIndexer()
.setUserInputCol("customerIDOrg")
.setUserOutputCol("customerID")
.setItemInputCol("itemIDOrg")
.setItemOutputCol("itemID")
.setRatingCol("rating"))

transformedDf = (recommendationIndexer.fit(ratings)
.transform(ratings).cache())

als = (ALS()
.setNumUserBlocks(1)
.setNumItemBlocks(1)
.setUserCol("customerID")
.setItemCol("itemID")
.setRatingCol("rating")
.setSeed(0))

evaluator = (RankingEvaluator()
.setK(3)
.setNItems(10))

adapter = (RankingAdapter()
.setK(evaluator.getK())
.setRecommender(als))

adapter.fit(transformedDf).transform(transformedDf).show()

paramGrid = (ParamGridBuilder()
.addGrid(als.regParam, [1.0])
.build())

tvRecommendationSplit = (RankingTrainValidationSplit()
.setEstimator(als)
.setEvaluator(evaluator)
.setEstimatorParamMaps(paramGrid)
.setTrainRatio(0.8)
.setUserCol(recommendationIndexer.getUserOutputCol())
.setItemCol(recommendationIndexer.getItemOutputCol())
.setRatingCol("rating"))

tvRecommendationSplit.fit(transformedDf).transform(transformedDf).show()
Python API: RecommendationIndexerScala API: RecommendationIndexer.NET API: RecommendationIndexerSource: RecommendationIndexer
Python API: RankingEvaluatorScala API: RankingEvaluator.NET API: RankingEvaluatorSource: RankingEvaluator
Python API: RankingAdapterScala API: RankingAdapter.NET API: RankingAdapterSource: RankingAdapter
Python API: RankingTrainValidationSplitScala API: RankingTrainValidationSplit.NET API: RankingTrainValidationSplitSource: RankingTrainValidationSplit

SAR

from synapse.ml.recommendation import *

ratings = (spark.createDataFrame([
("11", "Movie 01", 2),
("11", "Movie 03", 1),
("11", "Movie 04", 5),
("11", "Movie 05", 3),
("11", "Movie 06", 4),
("11", "Movie 07", 1),
("11", "Movie 08", 5),
("11", "Movie 09", 3),
("22", "Movie 01", 4),
("22", "Movie 02", 5),
("22", "Movie 03", 1),
("22", "Movie 05", 3),
("22", "Movie 06", 3),
("22", "Movie 07", 5),
("22", "Movie 08", 1),
("22", "Movie 10", 3),
("33", "Movie 01", 4),
("33", "Movie 03", 1),
("33", "Movie 04", 5),
("33", "Movie 05", 3),
("33", "Movie 06", 4),
("33", "Movie 08", 1),
("33", "Movie 09", 5),
("33", "Movie 10", 3),
("44", "Movie 01", 4),
("44", "Movie 02", 5),
("44", "Movie 03", 1),
("44", "Movie 05", 3),
("44", "Movie 06", 4),
("44", "Movie 07", 5),
("44", "Movie 08", 1),
("44", "Movie 10", 3)
], ["customerIDOrg", "itemIDOrg", "rating"])
.dropDuplicates()
.cache())

recommendationIndexer = (RecommendationIndexer()
.setUserInputCol("customerIDOrg")
.setUserOutputCol("customerID")
.setItemInputCol("itemIDOrg")
.setItemOutputCol("itemID")
.setRatingCol("rating"))

algo = (SAR()
.setUserCol("customerID")
.setItemCol("itemID")
.setRatingCol("rating")
.setTimeCol("timestamp")
.setSupportThreshold(1)
.setSimilarityFunction("jacccard")
.setActivityTimeFormat("EEE MMM dd HH:mm:ss Z yyyy"))

adapter = (RankingAdapter()
.setK(5)
.setRecommender(algo))

res1 = recommendationIndexer.fit(ratings).transform(ratings).cache()

adapter.fit(res1).transform(res1).show()
Python API: SARScala API: SAR.NET API: SARSource: SAR

Stages

ClassBalancer

from synapse.ml.stages import *

df = (spark.createDataFrame([
(0, 1.0, "Hi I"),
(1, 1.0, "I wish for snow today"),
(2, 2.0, "I wish for snow today"),
(3, 2.0, "I wish for snow today"),
(4, 2.0, "I wish for snow today"),
(5, 2.0, "I wish for snow today"),
(6, 0.0, "I wish for snow today"),
(7, 1.0, "I wish for snow today"),
(8, 0.0, "we Cant go to the park, because of the snow!"),
(9, 2.0, "")
], ["index", "label", "sentence"]))

cb = ClassBalancer().setInputCol("label")

cb.fit(df).transform(df).show()
Python API: ClassBalancerScala API: ClassBalancer.NET API: ClassBalancerSource: ClassBalancer

MultiColumnAdapter

from synapse.ml.stages import *
from pyspark.ml.feature import Tokenizer

df = (spark.createDataFrame([
(0, "This is a test", "this is one too"),
(1, "could be a test", "bar"),
(2, "foo", "bar"),
(3, "foo", "maybe not")
], ["label", "words1", "words2"]))

stage1 = Tokenizer()
mca = (MultiColumnAdapter()
.setBaseStage(stage1)
.setInputCols(["words1", "words2"])
.setOutputCols(["output1", "output2"]))

mca.fit(df).transform(df).show()
Python API: MultiColumnAdapterScala API: MultiColumnAdapter.NET API: MultiColumnAdapterSource: MultiColumnAdapter

Timer

from synapse.ml.stages import *
from pyspark.ml.feature import *

df = (spark.createDataFrame([
(0, "Hi I"),
(1, "I wish for snow today"),
(2, "we Cant go to the park, because of the snow!"),
(3, "")
], ["label", "sentence"]))

tok = (Tokenizer()
.setInputCol("sentence")
.setOutputCol("tokens"))

df2 = Timer().setStage(tok).fit(df).transform(df)

df3 = HashingTF().setInputCol("tokens").setOutputCol("hash").transform(df2)

idf = IDF().setInputCol("hash").setOutputCol("idf")
timer = Timer().setStage(idf)

timer.fit(df3).transform(df3).show()
Python API: TimerScala API: Timer.NET API: TimerSource: Timer

Train

TrainClassifier

from synapse.ml.train import *
from pyspark.ml.classification import LogisticRegression

df = spark.createDataFrame([
(0, 2, 0.50, 0.60, 0),
(1, 3, 0.40, 0.50, 1),
(0, 4, 0.78, 0.99, 2),
(1, 5, 0.12, 0.34, 3),
(0, 1, 0.50, 0.60, 0),
(1, 3, 0.40, 0.50, 1),
(0, 3, 0.78, 0.99, 2),
(1, 4, 0.12, 0.34, 3),
(0, 0, 0.50, 0.60, 0),
(1, 2, 0.40, 0.50, 1),
(0, 3, 0.78, 0.99, 2),
(1, 4, 0.12, 0.34, 3)],
["Label", "col1", "col2", "col3", "col4"]
)

tc = (TrainClassifier()
.setModel(LogisticRegression())
.setLabelCol("Label"))

tc.fit(df).transform(df).show()
Python API: TrainClassifierScala API: TrainClassifier.NET API: TrainClassifierSource: TrainClassifier

TrainRegressor

from synapse.ml.train import *
from pyspark.ml.regression import LinearRegression

dataset = (spark.createDataFrame([
(0.0, 2, 0.50, 0.60, 0.0),
(1.0, 3, 0.40, 0.50, 1.0),
(2.0, 4, 0.78, 0.99, 2.0),
(3.0, 5, 0.12, 0.34, 3.0),
(0.0, 1, 0.50, 0.60, 0.0),
(1.0, 3, 0.40, 0.50, 1.0),
(2.0, 3, 0.78, 0.99, 2.0),
(3.0, 4, 0.12, 0.34, 3.0),
(0.0, 0, 0.50, 0.60, 0.0),
(1.0, 2, 0.40, 0.50, 1.0),
(2.0, 3, 0.78, 0.99, 2.0),
(3.0, 4, 0.12, 0.34, 3.0)],
["label", "col1", "col2", "col3", "col4"]))

linearRegressor = (LinearRegression()
.setRegParam(0.3)
.setElasticNetParam(0.8))
trainRegressor = (TrainRegressor()
.setModel(linearRegressor)
.setLabelCol("label"))

trainRegressor.fit(dataset).transform(dataset).show()
Python API: TrainRegressorScala API: TrainRegressor.NET API: TrainRegressorSource: TrainRegressor