Advanced Tutorial¶
In this tutorial we will dive deeper into concepts, and will allow you to reconfigure pipelines and build your own meta-pipelines.
Table of Contents¶
- Operations with InPort/OutPort and Inputs/Outputs
InPort
&OutPort
Inputs
&Outputs
Pipeline
- Pipeline Definition
- Meta-Pipelines
Estimator
Example- Services
Operations with InPort/OutPort and Inputs/Outputs¶
Consider the standardization
, logistic_regression
and stz_lr
examples from the
intermediate tutorial.
InPort
& OutPort
¶
Left and right shift operations¶
You can create dependencies using left and right shift operators:
or
standardization.outputs.Z.train >> logistic_regression.inputs.X.train
logistic_regression.inputs.X.eval << standardization.outputs.Z.eval
These operators return the matching dependencies wrapped in a
collections.abc.Sequence
object.
They can be passed as part of dependencies for combining pipelines.
Merge InPort
¶
You can merge two InPort
into a single one using |
between entries, which will bind the inputs
together, exposing a single entry.
For example,
from typing import NamedTuple
from rats.processors import PipelineBuilder
class ReportOut(NamedTuple):
acc: float
class Report:
def process(probs: float) -> ReportOut: ...
r1 = PipelineBuilder.task(Report, name="r1")
r2 = PipelineBuilder.task(Report, name="r2")
reports = PipelineBuilder.combine(
pipelines=[r1, r2],
name="reports",
inputs={"probs": r1.inputs.probs | r2.inputs.probs}, # merge operation
)
(
reports.inputs.probs << logistic_regression.outputs.probs
) # will return two dependencies
We combine r1
and r2
into a single pipeline and we expose probs
as single input, which is the
merge of probs
from r1
and r2
.
Therefore, when used in as a dependency assignment, a broadcast operation will be performed
mapping the probs
from logistic_regression
to both r1
and r2
.
Merge OutPort
¶
If you merge OutPort
s with |
operator you will indicate that the outputs are to be
concatenated.
For example,
from typing import NamedTuple, Sequence
from rats.processors import PipelineBuilder
class ReportOut(NamedTuple):
acc: float
class Report:
def process(probs: float) -> ReportOut: ...
class Summary:
def process(accuracies: Sequence[float]) -> None: ...
r1 = PipelineBuilder.task(Report, name="r1")
r1 = PipelineBuilder.task(Report, name="r2")
summary = PipelineBuilder.task(Summary, name="summary")
reports = PipelineBuilder.combine(
pipelines=[r1, r2],
name="reports",
outputs={"acc": r1.outputs.acc | r2.outputs.acc}, # merge operation
)
# will return single dependency that is the concatenatation of r1.outputs.acc and r2.outputs.acc:
summary.inputs.accuracies << reports.outputs.acc
We combine r1
and r2
into a single pipeline and we expose acc
as single output, which is the
concatenation of acc
from r1
and r2
into a tuple.
Therefore, when used in as a dependency assignment, a concatenation operation will be performed
gathering the acc
from r1
and r2
into a single acc
before passing it to summary
.
Order is preserved in all merge operations.
Inputs
& Outputs
and InPorts[T]
& OutPorts[T]
¶
InPorts[T]
and OutPorts[T]
are extensions of namedcollection[T]
, which is a data structure
that implements a nested dictionary with dot notation access.
Additionally, these types implement operations to create dependencies between them.
They are generic classes,
where T
is the type of the values stored in the collection.
Inputs
and Outputs
are aliases of InPorts[Any]
and OutPorts[Any]
, respectively, and
operations between them are identical, i.e.,
When annotating inputs or outputs of a pipeline, you can extend Inputs
and Outputs
to specify
the type of the values stored in the collection. See the
annotations section for further details.
Left and right shift operations¶
Similar to port assignments, you can create dependencies using the left and right shift operators.
which is equivalent to:
or alternatively, operating with variable collections:
which is equivalent to:
logistic_regression.inputs.X.train << standardization.outputs.Z.train
logistic_regression.inputs.X.eval << standardization.outputs.Z.eval
Info: The set of names of the two collections need to be identical. Entries will be matched by name to create dependencies, e.g.,
Z.train
withX.train
andZ.eval
withX.eval
, respectively, in the above example.
The operation returns a
collections.abc.Sequence
object of dependencies created.
Rename operations¶
You can rename entries of Inputs
and Outputs
objects with rename_inputs
and rename_outputs
.
stz_train = stz_train.rename_inputs({"X": "X.train"})
stz_eval = stz_eval.rename_inputs({"X": "X.eval"})
stz_train.train # OutPort object
stz_eval.eval # OutPort object
The syntax is {"Inputs
or Outputs
object with renamed entries.
Merge operations¶
You can merge two collections into a single one using |
between collections.
For example,
stz_train = stz_train.rename_inputs({"X": "X.train"})
stz_eval = stz_eval.rename_inputs({"X": "X.eval"})
X = stz_train.inputs.X | stz_eval.inputs.X
X.train # OutPort object
X.eval # OutPort object
The above example will create a single collection with two entries, X.train
and X.eval
.
The entries come from the merge of X
from stz_train
and stz_eval
, respectively.
If the collections share the same entry names, entries will be merged together as well.
This behavior is the same for Outputs
objects.
Subtract operations¶
You can subtract variable names from Inputs
and Outputs
objects:
new_inputs_no_x = standardization.inputs - ("X",)
new_inputs_no_x_train = standardization.inputs - ("X.train",)
The syntax requires subtracting an Iterable[str]
(like tuple[str, ...]
, list[str]
, collections.abc.Set[str]
, etc.).
If what you are trying to subtract does not exist, no error will be issued.
Pipeline
¶
Left and right shift operations¶
The IO attributes above are slightly redundant in the above example, so one can simplify directly operating with pipelines.
Left / right shifting with pipeline objects will create the dependencies between the inputs
and
outputs
of the pipelines in the direction of the shift, for equally named collections.
Here is an example:
which would be equivalent to
Beware that the following would produce no dependencies, because there are no shared variable names
Rename inputs and outputs¶
You can rename inputs and outputs of a pipeline with rename_inputs
and rename_outputs
.
The syntax is {"
For example, if we want to rename the X
input of stz_train
to features
:
You can transform single entries into collections, or entries from collections into entries via dot notation:
standardization = standardization.rename_inputs(
{"X.train": "X_train", "X.eval": "X_eval"}
)
standardization.inputs.X_train # InPort objects
standardization.inputs.X_eval
# standardization.inputs.X.train # raises error
If the new name of an entry already exists, or repeats, a merge operation will be performed.
r1 = PipelineBuilder.task(Report, name="r1")
r2 = PipelineBuilder.task(Report, name="r2")
reports = PipelineBuilder.combine(
r1, r2, name="reports", inputs={"acc.r1": r1.outputs.acc, "acc.r2": r2.outputs.acc}
)
reports.in_collection.acc # Inputs collection with two entries
reports.rename_inputs({"acc.r1": "acc", "acc.r2": "acc"}) # rename / merge operation
reports.inputs.acc # InPort object with two entries merged together (broadcast)
InPort
merge section for details on broadcast
and OutPort
merge section foir details on concatenation operations.
Decorate pipelines¶
You can decorate
a pipeline, i.e., wrap a pipeline under another name:
This is useful when combining pipelines of the same type that need to be unique. Consider the following example:
class Report:
def process(probs: float) -> None: ...
report = PipelineBuilder.task(Report)
r1 = report.decorate("r1")
r2 = report.decorate("r2")
reports = PipelineBuilder.combine(
pipelines=[r1, r2],
name="reports",
dependencies=(
one_pipeline.outputs.probs >> r1.inputs.probs,
two_pipeline.outputs.probs >> r2.inputs.probs,
),
)
To combine pipelines with the same name, you can decorate them first and then combine.
Pipelines Definition¶
A Pipeline
is a (frozen) dataclass with the following attributes:
name
(str
): of the pipeline; used as the default value for collection entries; useful to distinguiss pipelines when combining.inputs
(rats.processors.Inputs
): exposure ofInputs
of a pipeline.outputs
(rats.processors.Outputs
): exposure ofOutputs
of a pipeline.
Pipeline
s should not be instantiated directly.
Instead, Pipeline
s should be created via Task
s, combining other Pipeline
s, or other
constructors, e.g., Estimator
.
Meta-Pipelines¶
We will see a few examples on how to use operations to create and compose pipelines.
Estimator
Example¶
@dataclass(frozen=True, init=False)
class Estimator(Pipeline):
def __init__(
self,
name: str,
train_pl: Pipeline,
eval_pl: Pipeline,
dependencies: Iterable[Sequence[Dependency]] = ((),),
) -> None:
# find shared parameters between train and eval
in_common = set(train_pl.inputs) & set(eval_pl.inputs)
out_common = set(train_pl.outputs) & set(eval_pl.outputs)
# rename shared parameters into train & eval entries of collections
train_pl = train_pl.rename_inputs({v: v + ".train" for v in in_common})
train_pl = train_pl.rename_outputs({v: v + ".train" for v in out_common})
eval_pl = eval_pl.rename_inputs({v: v + ".eval" for v in in_common})
eval_pl = eval_pl.rename_outputs({v: v + ".eval" for v in out_common})
# decorate train and eval pipelines for tracking purposes
train_pl = train_pl.decorate("train")
eval_pl = eval_pl.decorate("eval")
# decorate shared dependencies to match newly decorated train and eval pipelines
dependencies = (
dp.decorate("eval", "train") for dp in chain.from_iterable(dependencies)
)
# merge the `outputs` and `outputs` of train and eval pipelines
outputs: UserOutput = dict(train_pl.outputs | eval_pl.outputs)
outputs |= dict(train_pl.outputs | eval_pl.outputs)
# combine all ingredients into a new pipeline
p = PipelineBuilder.combine(
train_pl,
eval_pl,
name=name,
outputs=outputs,
dependencies=(tuple(dependencies),),
)
super().__init__(name, p._dag, p.inputs, p.outputs)
A few clarifications:
-
We made
Estimator
a (frozen) dataclass extendingPipeline
. This is because we considerEstimator
a concept and wanted to give it an entity, but this does not need to be so. -
Arguments include a
train
andeval
pipelines, estimator's name, and dependencies. -
The first step is to find the common inputs/outputs between the
train
andeval
pipelines. Then, rename inputs/outputs into collection entries: any entry with the pipeline's name is renamed intotrain
andeval
, respectively. -
We decorate the
train
andeval
pipelines before merging. This will ensure the combination is valid. -
Outputs are specified by merging the outputs of the
train
andeval
pipelines, before subtracting specified dependencies. -
Inputs are merged by subtracting the specified dependencies, default behavior.
-
Pipelines are combined, and a new
Pipeline
returned.
Services¶
TBD