Skip to main content

Intro to Subscriptions

What are Subscriptions?

The avionics framework largely adopts a paradigm where code that needs access to data (consumers) have the data pushed to them, as opposed to one where consumers actively query data. In order for consumers to have data pushed to them, they must explicitly enter into a contract with a data source saying, "I, the consumer, want to be notified when you, the data source, have something substantive to tell me about." The details of the "something substantive" in the contract is left up to the data source to decide and can take many forms, including but not limited to a state change in a value or a triggered event.

A subscription then, is just such a contract between a consumer and a data source. Within the framework, subscriptions are represented by the Subscription interface.

Creating Subscriptions

In general, consumers do not directly create subscriptions. Instead, subscriptions are created by invoking methods on data sources to set up notification contracts. Any data source that supports notifying subscribers can be made to generate subscriptions. The rest of this section describes some common framework data sources that generate subscriptions.

Event Bus

Setting up a handler for an event bus topic will create a subscription for each handler:

const sub: Subscription = bus.getSubscriber<MyEvents>().on('my_event').handle(() => {
console.log('my_event was published!');
});

In the above example, sub is a Subscription object that represents the new contract created by handle() wherein the event bus will call the handler function whenever the my_event topic is published.

Subscribables

Subscribables are a collection of classes that provide an observable value. Subscribing to one of these objects will create a subscription:

const sub: Subscription = subscribable.sub(v => {
console.log(`Value changed to ${v}!`);
});

In the above example, sub is a Subscription object that represents the new contract created by sub() wherein the subscribable will call the handler function whenever its value changes.

SubEvent

SubEvent is a class that supports publishing events and can be thought of as a more localized version of the event bus. Subscribing to a SubEvent will create a subscription:

const sub: Subscription = subEvent.on((sender, data) => {
console.log(`An event was triggered with data ${data}!`);
});

In the above example, sub is a Subscription object that represents the new contract created by on() wherein the SubEvent will call the handler function whenever an event is triggered by calling the SubEvent's notify() method.

Managing Subscriptions

Once a subscription is created, a consumer can use it to manage the new notification contract established between it and the data source. Subscriptions support three basic operations: pause, resume, and destroy.

Pause/Resume

Pausing/resuming a subscription controls whether notifications are received from its data source. A paused subscription will not receive any notifications, while a resumed subscription will. Subscriptions can be paused and resumed at will, and there is no limit to how many times a subscription can toggle between the two states. The pause() and resume() methods are used to pause and resume subscriptions, respectively:

// Subject is a Subscribable.
const subject = Subject.create(0);

const sub = subject.sub(v => {
console.log(`Value changed to ${v}!`);
});

sub.pause();
subject.set(5); // No console message will be logged.

sub.resume();
subject.set(10); // 'Value changed to 10!' will be logged.

Generally, newly created subscriptions are initialized as resumed, but some data sources will allow you to initialize subscription to the paused state instead:

const subEvent = new SubEvent<void, number>();

// The second argument passed to on() controls whether the new subscription is initially paused.
const sub = subEvent.on((sender, data) => {
console.log(`An event was triggered with data ${data}!`);
}, true);

subEvent.notify(undefined, 5); // No console message will be logged.

sub.resume();
subEvent.notify(undefined, 10); // 'An event was triggered with data 10!' will be logged.

Initial Notify on Resume

When resuming certain subscriptions, you can control whether an initial notification will be sent immediately when it is resumed. This initial notify operation is primarily useful for subscriptions in which the data source is notifying the consumer about state changes. In such cases, initial notify can be used to "sync" the consumer with the current state of the data source as soon as it is resumed rather than waiting (potentially forever) for the state to change again:

// Subject is a Subscribable.
const subject = Subject.create(0);

const sub = subject.sub(v => {
console.log(`Value changed to ${v}!`);
});

sub.pause();
subject.set(5); // No console message will be logged.

sub.resume(true); // 'Value changed to 5!' will be logged.

Not all subscriptions support initial notify on resume. In particular, subscriptions for event notifications that are not tied to some persistent state typically do not support initial notify. For example:

const subEvent = new SubEvent<void, number>();

const sub = subEvent.on((sender, data) => {
console.log(`An event was triggered with data ${data}!`);
});

sub.pause();
subEvent.notify(undefined, 5); // No console message will be logged.

sub.resume(true); // No console message will be logged.

The canInitialNotify property on a Subscription object reports whether that particular subscription supports initial notify on resume. If canInitialNotify is false, then initial notify on resume will always fail on that subscription. If the property is true, then initial notify may work but is not guaranteed to do so under all circumstances. For example, all subscriptions created by setting up event bus handlers report canInitialNotify as true, but initial notify will only work if the subscription's associated topic has cached data published to it:

const sub1 = bus.getSubscriber<MyEvents>().on('my_event_1').handle(v => {
console.log(`my_event_1 was published with data ${v}!`);
});
const sub2 = bus.getSubscriber<MyEvents>().on('my_event_2').handle(v => {
console.log(`my_event_2 was published with data ${v}!`);
});

sub1.pause();
sub2.pause();

// The last argument passed to pub() controls whether published data is cached.
bus.getPublisher<MyEvents>().pub('my_event_1', 5, false, false);
bus.getPublisher<MyEvents>().pub('my_event_2', 5, false, true);

sub1.resume(true); // No console message will be logged.
sub2.resume(true); // 'my_event_2 was published with data 5!' will be logged.
note

Some subscriptions always trigger an initial notification when resumed, even if false is passed as the initialNotify argument to resume(). Examples of subscriptions that behave like this include ConsumerSubject and mapped subscribables.

Some data sources allow you to specify whether subscriptions should be resumed with initial notify immediately upon creation while others always perform an initial notify on newly created resumed subscriptions:

// Subject is a Subscribable.
const subject = Subject.create(0);

// The second argument passed to sub() controls whether to perform an initial notify if the subscription is created as resumed.
subject.sub(v => {
console.log(`Value changed to ${v}!`);
}, true); // 'Value changed to 0!' will be logged.

// ----------------
// ----------------

bus.getPublisher<MyEvents>().pub('my_event', 5, false, true);

// handle() always performs an initial notify if the subscription is created as resumed.
bus.getSubscriber<MyEvents>().on('my_event').handle(v => {
console.log(`my_event was published with data ${v}!`);
}); // 'my_event was published with data 5!' will be logged.
tip

If you want to set up an event bus handler without an initial notify when the subscription is created, then create the subscription as paused and immediately resume the subscription without initial notify:

bus.getSubscriber<MyEvents>().on('my_event').handle(v => {
console.log(`my_event was published with data ${v}!`);
}, true).resume();

Destroy

Subscription objects can be destroyed by calling their destroy() methods. Destroying a subscription permanently stops notifications from being received from its data source. Once destroyed, a subscription can no longer be paused or resumed (attempting to do so will result in runtime errors being thrown).

When a subscription is created, a strong reference to the subscription is created and stored within the subscription's data source. As long as that reference exists (and assuming the data source has not been garbage collected), the subscription and anything it references (which usually includes things like associated handler functions) cannot be garbage collected. Therefore, it is imperative that subscriptions be destroyed once they are at the end of their useful lives. Failure to do so may result in memory leaks.

note

Although destroying and re-creating subscriptions achieves the same end result as pausing/resuming, the former strategy incurs additional performance and memory allocation overhead. Therefore, it is recommended to pause/resume subscriptions whenever possible and only destroy them when they never need to be resumed again.