Beginners Tutorial¶
This tutorial is also available in notebook form. See Tutorial Notebooks.
"Hello World" Example¶
Let's start writing the most simple pipeline that takes no inputs or outputs and prints
"Hello World"
on screen.
The first step is to write a processor's class with the printing functionality.
We need to define a process
method with no input and no output.
To create a pipeline and run it, we can use the following lines:
from rats.processors import RatsProcessorsServices
from rats.processors.ux import PipelineBuilder
hello_world = PipelineBuilder.task(
HelloWorld, "hello_world"
) # creates a pipeline of single node
runner_factory = rats_service_provider.get_service(
runner = runner_factory(hello_world) # creates a runner for the given pipeline
runner() # runs the pipeline
We will dive into details at the end of this and next tutorials.
"Diamond" Pipeline Example¶
Consider now we have four classes, i.e., A
, B
, C
and D
with some inputs and outputs and we
want to connect them following a diamond-based shape:
We have the following classes and declared outputs:
from typing import Any, NamedTuple
class AOutput(NamedTuple):
Z1: Any
Z2: Any
class A:
def process(self) -> AOutput: ...
class BOutput(NamedTuple):
Z: float
class B:
def process(self, X: Any) -> BOutput: ...
class COutput(NamedTuple):
Z: Any
class C:
def process(self, X: Any) -> COutput: ...
class D:
def process(self, X1: Any, X2: Any) -> None: ...
Processors can have arbitrary inputs, in this case we have processors with empty, single, single and two inputs, respectively.
Processors declare their outputs too.
To declare outputs, the process
method needs to return a mapping with variable names and values,
i.e., NamedTuple
or None
is a
python built-in type to declare variable names and types, which we can use to
specify that processor A
returns two outputs, B
and C
return a single output, and D
returns nothing. Similar declaration can be achieved with
python's built-in.
Once we have written our processor classes, we need to create a pipeline and run it. We start by creating a single node pipeline per processor, which we refer as a task. Then we combine all 4 tasks into a single pipeline by declaring the input/output dependencies that exist between tasks:
from rats.processors.ux import PipelineBuilder
from rats.processors.dag import display_dag
a = PipelineBuilder.task(A, "A")
b = PipelineBuilder.task(B, "B")
c = PipelineBuilder.task(C, "C")
d = PipelineBuilder.task(D, "D")
diamond = PipelineBuilder.combine(
pipelines=[a, b, c, d],
b.inputs.X << a.outputs.Z1,
c.inputs.X << a.outputs.Z2,
d.inputs.X1 << b.outputs.Z,
d.inputs.X2 << c.outputs.Z,
display_dag(diamond) # displays the pipeline
We have seen in these examplea that pipelines, whether made from a single node or multiple, expose
inputs and outputs, and we can access them directly to create dependencies between different
The left-shift and right-shift notation, i.e., <<
, >>
, respectively, are the operators that
create dependencies, which will return a tuple holding the edge information.
If the user accesses an input or output that does not exist, or confuses the direction of the dependency, a run-time error will be raised. This is why we need to declare the inputs and outputs of a processor's class.
Standardized Logistic Regression Example¶
We are now going to build a standardized logistic example where we have two sources for input, i.e., train and eval. We will also show how to build pipelines of more than a single node, and how to connect these into larger pipelines.
This is the resulting pipeline we want to build:
We start by writing the necessary processor classes, which declare inputs and outputs.
Note that processors can depend on parameters both on the __init__
and process
methods, as
shown below.
from typing import NamedTuple
class StandardizeTrainOut(NamedTuple):
mean: float
scale: float
Z_train: float
class StandardizeTrain:
def process(self, X_train: float) -> StandardizeTrainOut: ...
class StandardizeEvalOut(NamedTuple):
Z_eval: float
class StandardizeEval:
def __init__(self, mean: float, scale: float) -> None: ...
def process(self, X_eval: float) -> StandardizeEvalOut: ...
class LogisticRegressionTrainOut(NamedTuple):
model: tuple[float, ...]
Z_train: float
class LogisticRegressionTrain:
def process(self, X_train: float, Y_train: float) -> LogisticRegressionTrainOut: ...
class LogisticRegressionEvalOut(NamedTuple):
Z_eval: float
class LogisticRegressionEval:
def __init__(self, model: tuple[float, ...]) -> None: ...
def process(self, X_eval: float, Y_eval: float) -> LogisticRegressionEvalOut: ...
We create single node pipelines, aka. tasks, and combine them, similar to how we did with in the
diamond pipeline example, to create standardization
and logistic_regression
from rats.processors.ux import PipelineBuilder
from rats.processors.dag import display_dag
stz_train = PipelineBuilder.task(StandardizeTrain, "stz_train")
stz_eval = PipelineBuilder.task(StandardizeEval, "stz_eval")
lr_train = PipelineBuilder.task(LogisticRegressionTrain, "lr_train")
lr_eval = PipelineBuilder.task(LogisticRegressionEval, "lr_eval")
standardization = PipelineBuilder.combine(
pipelines=[stz_train, stz_eval],
stz_eval.inputs.mean << stz_train.outputs.mean,
stz_eval.inputs.scale << stz_train.outputs.scale,
logistic_regression = PipelineBuilder.combine(
pipelines=[lr_train, lr_eval],
dependencies=(lr_eval.inputs.model << lr_train.outputs.model,),
The next step is to combine both pipelines into a single one and connect all dependencies between them.
standardized_lr = PipelineBuilder.combine(
pipelines=[standardization, logistic_regression],
logistic_regression.inputs.X_train << standardization.outputs.Z_train,
logistic_regression.inputs.X_eval << standardization.outputs.Z_eval,
display_dag(standardized_lr) # displays the pipeline
There are a few points to clarify here. First, combining pipelines of a single node, two nodes or more, are synthactically the same. It does not matter how many nodes a pipeline has, as long as inputs and outputs and dependencies are correctly specified when combining.
Second, pipelines can be combined in different order and the resulting pipeline can be the same. In this example, we first built the standardization and logistic_regression pipelines, and then compose these together. It needn't have been so and we show below two alternatives that yield the same result.
Third, the processor classes that we defined above do not have conflicting output names, i.e., all output variable names are different. This simplifies the exposition of the example, but in the intermediate tutorial we explain what happens when we combine pipelines that expose the same output variable names.
A Second Alternative to Standardized Logistic Regression Example¶
As explained before, we built standardized_lr
by first building standardization
and then combining them.
An alternative is to first build the train
and eval
pipelines separately, and then combine.
The following code yields the same result as before:
train_pipeline = PipelineBuilder.combine(
pipelines=[stz_train, lr_train],
dependencies=(lr_train.inputs.X_train << stz_train.outputs.Z_train,),
eval_pipeline = PipelineBuilder.combine(
pipelines=[stz_eval, lr_eval],
dependencies=(lr_eval.inputs.X_eval << stz_eval.outputs.Z_eval,),
standardized_lr = PipelineBuilder.combine(
pipelines=[train_pipeline, eval_pipeline],
eval_pipeline.inputs.mean << train_pipeline.outputs.mean,
eval_pipeline.inputs.scale << train_pipeline.outputs.scale,
eval_pipeline.inputs.model << train_pipeline.outputs.model,
There is no difference between standardize_lr
pipelines obtained from these two procedures, the
resulting pipelines are exactly the same.
A Third Alternative to Standardized Logistic Regression Example¶
A third alternative is to build the whole pipeline in a single combine operation. The only drawback to doing it this way is that pipelines are less modular, in a conceptual sense, to share between users, whereas the first and second approach encapsulate the concepts and make those explicit to share, via standardization / logistic_regression, or via train / eval pipelines, respectively.
standardized_lr = PipelineBuilder.combine(
pipelines=[stz_train, lr_train, stz_eval, lr_eval],
stz_eval.inputs.mean << stz_train.outputs.mean,
stz_eval.inputs.scale << stz_train.outputs.scale,
lr_eval.inputs.model << lr_train.outputs.model,
lr_train.inputs.X_train << stz_train.outputs.Z_train,
lr_eval.inputs.X_eval << stz_eval.outputs.Z_eval,
Rats services¶
The Rats services framework is a dependency injection plugin system. It allows code in different python libraries to expose service interfaces, and to define how service objects implementing these interfaces are initialized.
For now, it is sufficient to understand that:
- Different components expose services by providing classes with service ids as class members.
For example
is exposed byrats.processors
. Its memberRatsProcessorsServices.PIPELINE_RUNNER_FACTORY
is the ID of the service used to run pipelines. - The
interface exposed
provides access to service objects given service ids. - In habitats jupyter kernels, the global
implements theIProvideServices
interface. Therefore the following code gets the pipeline runner factory service:
from rats.processors import RatsProcessorsServices
runner_factory = rats_service_provider.get_service(
Running pipelines¶
takes a pipeline, and returns a runner capable of orchestrating the pipeline:
hello_world_runner = runner_factory(hello_world)
standardized_lr_runner = runner_factory(standardized_lr)
If the pipeline takes no inputs, e.g. our hello_world
example pipeline, one can run it by calling
the runner immediately:
If the pipeline requires inputs, e.g. our standardized_lr
example pipeline, they need to provided
as a dictionary to the runner. The runner returns an outputs object that allows access to the
pipeline's outputs.
The following pseudocode illustrates running standardized_lr
, but recall that we have not
implemented the methods of the different processors we defined above, so this isn't runnable code.