Logistic Regression using sklearn
# type: ignore
# there are multiple typing issues in pandas and the iris dataset, easier to disable typing for
# this notebook
from collections.abc import Sequence
from typing import NamedTuple
import pandas as pd
from sklearn.linear_model import LogisticRegression as LR
from rats import processors as rp
from rats.processors import typing as rpt
Let's build something a little more interesting - logistic regression using sklearn, with safeguards using pandas dataframes.
First we'll create a Model
class that will hold the bare sklearn LR model, but also the meta
information that verifies that a pandas dataframe can be used to predict with the model.
class LRModel(NamedTuple):
bare_model: LR
feature_names: tuple[str, ...]
category_names: tuple[str, ...]
Next, we'll build pipelines in a container class. As before, we need to define NamedTuple classes for the output of each task.
class _SanitizeLabelsOutput(NamedTuple):
y: pd.Series # [int], contegory indices from [0, NumCategories)
number_of_labels_in_training: int
class _LRTrainOutput(NamedTuple):
model: LRModel
number_of_samples_in_training: int
class _LRPredictOutput(NamedTuple):
logits: pd.DataFrame # [float], columns are category names
class LRPipelineContainer(rp.PipelineContainer):
@rp.task
def sanitize_labels(
self,
category_names: Sequence[str], # order will determine category index in output.
y: pd.Series, # [str] category names.
) -> _SanitizeLabelsOutput:
"""Remove rows with NaN labels and verify that the remaining labels are expected categories."""
category_names = tuple(category_names)
# Remove rows with NaN labels.
# They are allowed, but not used in training.
y = y[~y.isna()]
# Verify that the remaining labels are in the allowed set
if not y.isin(category_names).all():
raise ValueError(f"Labels should be in {category_names}")
category_name_to_index = {name: i for i, name in enumerate(category_names)}
y = y.map(category_name_to_index)
number_of_labels_in_training = len(y)
return _SanitizeLabelsOutput(
y=y,
number_of_labels_in_training=number_of_labels_in_training,
)
@rp.task
def train(
self,
category_names: tuple[str, ...],
x: pd.DataFrame, # [float], column names become feature names
y: pd.Series, # [int], category indices
) -> _LRTrainOutput:
"""Train a logistic regression model.
Samples (x) and labels (y) will be matched by index. Unmatches samples/labels will not be
used in training.
"""
label_name = str(y.name)
if label_name in x.columns:
raise ValueError(
f"Label column {label_name} should not be a column of the features dataframe"
)
# Join the features and labels.
j = x.join(y, how="inner")
number_of_samples_in_training = len(j)
x = j.drop(label_name, axis=1)
y = j[label_name]
lr = LR()
lr.fit(X=x.to_numpy(), y=y.to_numpy())
feature_names = tuple(x.columns)
return _LRTrainOutput(
LRModel(
bare_model=lr,
feature_names=feature_names,
category_names=category_names,
),
number_of_samples_in_training=number_of_samples_in_training,
)
@rp.pipeline
def training_pipeline(self) -> rpt.UPipeline:
"""Train a binary logistic regression model.
Samples not associated with labels or whose labels are NaN will be ignored.
Non-binary labels will raise an error.
"""
sanitize = self.sanitize_labels()
train = self.train()
return self.combine(
pipelines=(
sanitize,
train,
),
dependencies=(sanitize >> train,),
)
@rp.task
def prediction_pipeline(
self,
model: LRModel,
x: pd.DataFrame,
) -> _LRPredictOutput:
# Reorder the columns by the order in which they were used in training.
x = x[list(model.feature_names)]
logits_np = model.bare_model.predict_log_proba(X=x.to_numpy())
logits = pd.DataFrame(
logits_np, index=x.index, columns=list(model.category_names)
)
return _LRPredictOutput(logits=logits)
lrpc = LRPipelineContainer()
app = rp.NotebookApp()
The training pipeline is composed of a sanitize_labels
task followed by a train
task.
Let's look at these sub-pipelines:
Look at the training_pipeline
method and observe:
-
The two sub-pipelines are created using by calling the reserpective methods. At this point, it does not matter that the sub-pipelines are tasks - any pipeline would work. It is important that the sub-pipelines are defined by methods of this container, because it ensures that they have distinct names.
-
The
combine
method is used to combine the sub-pipelines into a another pipeline. The argumentdependencies=(sanitize >> train,),
inspects the outputs ofsanitize
and the inputs oftrain
, matches them by name, and creates edges between matches. Here this means that they
output ofsanitize
will be connected to they
input oftrain
. -
Inputs of any sub-pipeline that are not matches with outputs of another sub-pipeline will become inputs of the combined pipeline. Here this means the
y
input ofsanitize
, and thex
input oftrain
, and thecategory_names
input of both will become inputs of the combined pipeline. Thecategory_names
input will flow to bothsanitize
andtrain
, coupling them to always take the same value. There are ways to change that behaviour, but more on that in other tutorials. -
Outputs of any sub-pipeline that are not matches with inputs of another sub-pipeline will become outputs of the combined pipeline. There are ways to expose outputs of sub-pipelines that are matched to inputs of other sub-pipelines, but more on that in other tutorials.
Here's the combined training pipeline:
training_pipeline = lrpc.training_pipeline()
print("Training pipeline input ports:", training_pipeline.inputs)
print("Training pipeline output ports:", training_pipeline.outputs)
app.display(training_pipeline)
Training pipeline input ports: InPorts(category_names=InPort[collections.abc.Sequence[str]], y=InPort[pandas.core.series.Series], x=InPort[pandas.core.frame.DataFrame])
Training pipeline output ports: OutPorts(number_of_labels_in_training=OutPort[int], model=OutPort[__main__.LRModel], number_of_samples_in_training=OutPort[int])
The prediction pipeline is a single task:
prediction_pipeline = lrpc.prediction_pipeline()
print("Prediction pipeline input ports:", prediction_pipeline.inputs)
print("Prediction pipeline output ports:", prediction_pipeline.outputs)
app.display(prediction_pipeline)
Prediction pipeline input ports: InPorts(model=InPort[__main__.LRModel], x=InPort[pandas.core.frame.DataFrame])
Prediction pipeline output ports: OutPorts(logits=OutPort[pandas.core.frame.DataFrame])
To run the training pipeline we'll need a samples dataframe and a labels series. We can then run the prediction pipeline with the trained model and a samples dataframe. Let's use the Iris dataset, using category names rather than category indices for labels, and splitting to train/test randomly.
from sklearn import datasets
iris = datasets.load_iris()
category_names = tuple(iris["target_names"])
samples = pd.DataFrame(iris["data"], columns=iris["feature_names"])
labels = pd.Series(iris["target"], name="label").map(lambda i: category_names[i])
train_indices = samples.sample(frac=0.8).index
samples_train = samples.loc[train_indices]
labels_train = labels.loc[train_indices]
samples_test = samples.drop(train_indices)[
iris["feature_names"][::-1]
] # Reverse the columns
labels_test = labels.drop(train_indices)
training_outputs = app.run(
training_pipeline,
inputs=dict(
category_names=category_names,
x=samples_train,
y=labels_train,
),
)
prediction_outputs_train = app.run(
prediction_pipeline,
inputs=dict(
model=training_outputs["model"],
x=samples_train,
),
)
prediction_outputs_test = app.run(
prediction_pipeline,
inputs=dict(
model=training_outputs["model"],
x=samples_test,
),
)
Group logits by true label to see how the model is performing:
prediction_outputs_train["logits"].join(labels_train).groupby("label").agg(
["mean", "std", "count"]
)
setosa | versicolor | virginica | |||||||
---|---|---|---|---|---|---|---|---|---|
mean | std | count | mean | std | count | mean | std | count | |
label | |||||||||
setosa | -0.024784 | 0.012254 | 44 | -3.830012 | 0.503206 | 44 | -16.085735 | 0.993230 | 44 |
versicolor | -4.437794 | 1.343680 | 37 | -0.212645 | 0.204093 | 37 | -2.533020 | 1.345161 | 37 |
virginica | -10.097900 | 2.877340 | 39 | -2.691922 | 1.403706 | 39 | -0.171744 | 0.213766 | 39 |
prediction_outputs_test["logits"].join(labels_test).groupby("label").agg(
["mean", "std", "count"]
)
setosa | versicolor | virginica | |||||||
---|---|---|---|---|---|---|---|---|---|
mean | std | count | mean | std | count | mean | std | count | |
label | |||||||||
setosa | -0.031355 | 0.007162 | 6 | -3.498385 | 0.213893 | 6 | -15.448952 | 0.625208 | 6 |
versicolor | -4.030292 | 1.248497 | 13 | -0.133180 | 0.090179 | 13 | -3.150326 | 1.414878 | 13 |
virginica | -9.550154 | 2.666712 | 11 | -2.322399 | 1.333171 | 11 | -0.236352 | 0.272456 | 11 |
Next, we'll build pipelines that incorporate pipelines defined in other containers.