Skip to content

Getting Started

reactivedataflow is a Python library for building reactive data processing graphs. It is designed to work with streaming data sources.

pip install reactivedataflow

The dependencies for this project include rx, networkx, and pydantic.

Legacy

reactivedataflow has a design that is inspired by our prior work with datashaper and the neuron model of neural networks. In a neural network, individual neurons are connected to other neurons through synapses. In traditional neural network topologies, there are "hidden" layers of normal neurons, in addition to special neurons that are designated as input neurons, and other neurons that are designated as output neurons.

In reactivedataflow, we have a similar conceptual framework of Verb Nodes, Input Nodes, and Output Nodes.

Nodes

Nodes are the heart of the system, and they are responsible for processing data streams and emitting transformed results. A key feature of this system is that data streams between nodes are polymorphic - meaning that they can be any type. Care should be taken that the processing function of a verb node is able to handle the data types that are passed to it.

Input Nodes

flowchart LR
    A[rx source observable] --> B((Input Node))
    B --> |default output| C[consumer]
    B --> |default output| D[consumer]
    B --> |default output| E[consumer]

Input nodes are simple nodes that are initialized with a reactivex event stream, and emit data on a single output port.

Output Nodes

flowchart LR
    A((node)) --> | named output | B((Output Node))
    B -->  C[graph reader]

Output nodes are simple nodes that are used to observe the output of a verb node. They are initialized with a reference to a verb node, and emit data on a single output port. This is the primary mechanism for reading results from the processing graph.

Verb Nodes

flowchart LR
    A[named inputs] -->|x| B((Verb Node))
    A[named inputs] -->|y| B((Verb Node))
    A[named inputs] -->|z| B((Verb Node))
    C[array input] --> |array input| B
    D[configuration] --> |named static values| B
    B --> |p| E[named outputs]
    B --> |q| E[named outputs]
    B --> |s| E[named outputs]

Verb nodes are composed of a number of "ports" that are used to describe their inputs, outputs, and configuration properties.

  • Input ports represent data streams that are consumed by the verb node. Each message from an input port will result in a re-evaluation of the VerbNode's processing function, and may result in new messages being emitted on any number of output ports.
  • The array input port is a special port type that allows for multiple input streams to be consumed by the verb node. This is useful for cases where multiple data streams are required to be processed together. When any of the input streams emit a message, the verb node will re-evaluate its processing function with the latest messages from all input streams.
  • Configuration ports are used to provide static configuration values, such as system services or algorithmic hyper-parameters, to the verb node. These values are used to parameterize the processing function, and are not expected to change during the lifetime of the verb node.
  • Output ports represent data streams that are emitted by the verb node. Each message emitted on an output port will be sent to any downstream nodes that are connected to the verb node via an input port.

Edges

Edges are used to connect nodes together in a processing graph. They are used to define the flow of data between nodes, and are used to establish the dependencies between nodes. Each edge represents an event-based reactive datastream. Edges are attached to two nodes. Edge properties may include:

  • from node (required) - the source node id of the data stream.
  • to node (required) - the destination node id of the data stream.
  • to_port - the name of the input port in the target node. If this is not provided it will be treated as an array input.
  • from_port - the name of the output port in the source node. If this is not provided it, we will use the default output name.