В прошлый раз я писал про createAtomSubscriber
и рассказывал про его чрезвычайную полезность. Предлагаю написать примитив Subscriber
для реактивной подписки/отписки/переподписки на источники данных. Начнем с примера использования подобной конструкции.
Представим, что у нас есть функция receiveData
, которая генерирует данные, после передачи ей текстового id
и реактивная переменную $id
. С помощью Subscriber
создадим реактивную подписку. Когда getId
будет возвращать значение отличное от undefined
и subscriber.data
будет под наблюдением - у нас запустится receiveData
и, с помощью push
, мы будет сохранять последнее полученное значение.
Когда значение, возвращаемое getId
, изменится (важно чтобы оно было реактивным), произойдет отписка. Если новое значение будет не undefined
, то произойдет переподписка.
Когда subscriber.data
выходит из под наблюдения всегда происходит отписка, вне зависимости от getId
.
function receiveData(id: string, cb: (data: string) => void) {
let counter = 0;
const timeout = setInterval(() => cb(`${id}-${counter++}`), 1000);
return () => clearInterval(timeout);
}
let $id = observable.box<string | undefined>(undefined);
const subscriber = new Subscriber({
getId: () => $id.get(),
subscribe: (id, push) => {
const cancel = receiveData(id, (data) => push(data));
return cancel;
}
});
const cancel = autorun(() => console.log('XXX', subscriber.data));
runInAction(() => $id.set('DATA'));
// мы начинаем видеть сообщения типа XXX DATA-{число}
cancel?.();
Обратим внимание на основные моменты реализации Subscriber
. Свойство data
возвращает нам данные, которые у нас сохраняются в реактивную переменную $data
и одновременно активизирует атом. Последний при активации вызывает внутренний метод createSubscription
, который внутри autorun
следит за изменением состояния getId
и при наличии значения создает новую подписку, при этом всегда подчищая старую. Также хочется отметить использования метода untracked
из mobx, которым мы оборачиваем передаваемый subscribe
, чтобы исключить любые возможные реактивные воздействия при его реализации.
import { IAtom, autorun, observable, runInAction, makeObservable, computed, untracked } from 'mobx';
export interface ISubscriberProps<TArgs, T> {
getId(): TArgs | undefined;
subscribe(id: TArgs, fn: (d: T) => void): (() => void) | void;
}
export class Subscriber<TArgs, T> {
private readonly $data = observable.box<T | undefined>(undefined, { deep: false });
private readonly atom: IAtom;
constructor(private opts: ISubscriberProps<TArgs, T>) {
this.atom = createAtomSubscriber(`atom`, () => {
const cancel = this.createSubscription();
return () => cancel();
});
makeObservable(this, { data: computed });
}
get data() {
this.atom.reportObserved();
return this.$data.get();
}
private createSubscription() {
let cancelSubscribe: undefined | (() => void);
const cancel = autorun(() => {
cancelSubscribe?.();
cancelSubscribe = undefined;
const id = this.opts.getId();
if (id === undefined) { return; }
const cancelInner = untracked(
() => this.opts.subscribe(id, data =>
runInAction(() => this.$data.set(data))));
cancelSubscribe = () => cancelInner?.();
});
return () => {
cancelSubscribe?.();
cancel();
};
}
}