The last time I wrote about createAtomSubscriber
and told you about its extreme usefulness. I propose to write a Subscriber
primitive for reactive subscription/unsubscription/resubscription to data sources. Let’s start with an example of using such a construction.
Imagine that we have a receiveData
function that generates data after passing it a text id
and a reactive variable $id
. Using Subscriber
we will create a reactive subscription. When getId
returns a value other than undefined
and subscriber.data
is under observation, receiveData
will be started and, using push
, we will save the last received value.
When the value returned by getId
changes (it is important that it is reactive), the subscription will be canceled. If the new value is not undefined
, then a resubscription will occur.
When subscriber.data
goes out of observation, an unsubscription always occurs, regardless of getId
.
function receiveData(id: string, cb: (data: string) => void) {
let counter = 0;
const timeout = setInterval(() => cb(`${id}-${counter++}`), 1000);
return () => clearInterval(timeout);
}
let $id = observable.box<string | undefined>(undefined);
const subscriber = new Subscriber({
getId: () => $id.get(),
subscribe: (id, push) => {
const cancel = receiveData(id, (data) => push(data));
return cancel;
},
});
const cancel = autorun(() => console.log('XXX', subscriber.data));
runInAction(() => $id.set('DATA'));
// we start seeing messages like XXX DATA-{number}
cancel?.();
Let’s pay attention to the main points of the Subscriber
implementation. The data
property returns the data that we store in the reactive variable $data
and at the same time activates the atom. The latter, when activated, calls the internal createSubscription
method, which, inside autorun
, monitors the state of getId
and, if there is a value, creates a new subscription, while always cleaning up the old one. I would also like to note the use of the untracked
method from mobx, with which we wrap the passed subscribe
to exclude any possible reactive effects during its implementation.
import {
IAtom,
autorun,
observable,
runInAction,
makeObservable,
computed,
untracked,
} from 'mobx';
export interface ISubscriberProps<TArgs, T> {
getId(): TArgs | undefined;
subscribe(id: TArgs, fn: (d: T) => void): (() => void) | void;
}
export class Subscriber<TArgs, T> {
private readonly $data = observable.box<T | undefined>(undefined, {
deep: false,
});
private readonly atom: IAtom;
constructor(private opts: ISubscriberProps<TArgs, T>) {
this.atom = createAtomSubscriber(`atom`, () => {
const cancel = this.createSubscription();
return () => cancel();
});
makeObservable(this, { data: computed });
}
get data() {
this.atom.reportObserved();
return this.$data.get();
}
private createSubscription() {
let cancelSubscribe: undefined | (() => void);
const cancel = autorun(() => {
cancelSubscribe?.();
cancelSubscribe = undefined;
const id = this.opts.getId();
if (id === undefined) {
return;
}
const cancelInner = untracked(() =>
this.opts.subscribe(id, (data) =>
runInAction(() => this.$data.set(data))
)
);
cancelSubscribe = () => cancelInner?.();
});
return () => {
cancelSubscribe?.();
cancel();
};
}
}