Зачем использовать materialize и dematerialize операторы, и что такое Notification в RxJS?

fec1733b619f743e0b7c4e1147dcb392.jpg

Вы когда-нибудь встречали такие операторы, как materialize и dematerialize в RxJS? А что насчет класса Notification? Вероятно, многие слышали, но не до конца представляли, где их можно применить на практике.

В этой статье я расскажу, что делают эти операторы и приведу несколько примеров, которые в будущем вам могут пригодиться.

Однажды, интереса ради, я просматривала документацию RxJS и заметила ранее для себя неизвестные операторы: materialize и dematerialize. Первый вопрос, который у меня возник:, а где их использовать в реальном проекте?

Разберемся вместе?

Materialize

Для начала вспомним, какие типы значений может эмитить объект типа Observable: это next, error и complete. Если вы не помните, что это значит, здесь можно почитать.
Соответственно и про observer, набор коллбэков (onNext, onError, onComplete), тоже советую вспомнить.

Вот что говорится в документации о materialize операторе: «A function that returns an Observable that emits Notification objects that wrap the original emissions from the source Observable with metadata».

Сигнатура оператора:
materialize(): OperatorFunctionNotification & ObservableNotification>

Получается, materialize оборачивает любое испускаемое observable значение в некий Notification объект и эмитит его как »next».

Чем здесь является Notification? Это обертка над observable значением, которая просто добавляет к нему некоторые метаданные:

class Notification {
  static createNext(value: T)
  static createError(err?: any)
  static createComplete(): Notification & CompleteNotification
  constructor(kind: "N" | "E" | "C", value?: T, error?: any)
  get hasValue: boolean
  get kind: 'N' | 'E' | 'C'
  get value?: T
  get error?: any
  observe(observer: PartialObserver): void
  do(nextHandler: (value: T) => void, errorHandler?: (err: any) => void, completeHandler?: () => void): void
  accept(nextOrObserver: NextObserver | ErrorObserver | CompletionObserver | ((value: T) => void), error?: (err: any) => void, complete?: () => void)
  toObservable(): Observable
}

Больше всего нас здесь интересует свойство kind. Именно в нем хранится первоначальный тип значения observable: N — next, E — error, C — complete.

Давайте посмотрим на визуализацию работы оператора:

47fa93d081eee15f79661bbba1cd9c6a.png

Теперь на примере:

of("one”) // поток эмитит одно значение и после завершается
.pipe(materialize()) // оборачиваем в Notification object
.subscribe(x => console.log(x)); // в подписку поочередно прилетает 2 Notification

Output:
{kind: "N”, value: "one”, error: undefined, hasValue: true} // Notification object
{kind: "C”, value: undefined, error: undefined, hasValue: false} // Notification object

Давайте разберемся:

  • поток эмитит «one» → materialize конвертирует в Notification, где kind = «N» (next), а в value записывается передаваемое значение «one».

  • поток завершается → materialize конвертирует в Notification, где kind = «C» (complete), а в value ничего не записывается, т.к. поток завершился.

Из-за того, что materiailze конвертирует значения в Notification объект, observable эмитит их как «next», и мы видим обернутый complete выбросв next обработчике observable.

Еще пример:

throwError(() => new Error("error”)) // эмитим ошибку
.pipe(materialize()) // конвертируем в Notification
.subscribe(x => console.log(x));

Output:
{kind: "E”, value: undefined, error: Error, hasValue: false}

Опять же из-за того, что materiailze все оборачивает в Notification, даже ошибка попадает в next обработчик.

Dematerialize

Но что делать, если мы хотим вернуться к исходному выбросу observable? Здесь нам поможет противоположность materialize оператора — dematerialize, с его помощью происходит обратная конвертация.

Из документации: «Converts an Observable of Notification objects into the emissions that they represent».

Сигнатура оператора:
dematerializeObservableNotification>(): OperatorFunctionValueFromNotification>

Немного визуализации:

18b2cbf3084742047a10fb0f0a7a5446.png

Работа оператора в коде:

// 1 пример
of("one”) // эмитится одно значение, и завершается поток
.pipe(
	materialize(), // конвертируем в Notification oбъект
  dematerialize()) // конвертируем Notification в исходное значение потока
.subscribe(x => console.log(x));

Output: "one" // получили то, что изначально эмитил поток

// 2 пример
throwError(() => new Error("error”)) // эмитим ошибку
.pipe(
	materialize(), // конвертируем в Notification oбъект
  dematerialize()) // конвертируем Notification в исходное значение потока
.subscribe(x => console.log(x));

Output: // ничего не логируется next обработчиком, 
ERROR Error: error // видим обычную ошибку

А нужно ли оно мне?

Мы вкратце рассмотрели, что делают операторы materialize и dematerialize. Но где же они могут пригодиться?

Первое, где их можно использовать во время дебага. Например, вам нужно добавить задержку во время эмита ошибки.

Если это сделать следующим образом:  

throwError(() => new Error("message”))
.pipe(delay(3000)) // добавляем задержку
.subscribe({
	next: (x) => { console.log(`${x}`); },
	error: (err) => { console.log(`${err}`); }
	complete: () => { console.log('complete'); }
})

То в этом случае ошибку вы увидите моментально, задержка не сработает. Это происходит потому, что оператор delay работает только для next выбросов.

Но если использовать изученные нами операторы, то ошибка выводится после заданного времени задержки.

throwError(() => new Error('message'))
      .pipe(
      	materialize(), // ошибка конвертируется в Notification объект, это позволяет нам применить delay    y
      	delay(5000), // добавляем задержку
        dematerialize()) // конвертируем обратно в ошибку
      .subscribe({
        next: (x) => { console.log(`${x}`); },
        error: (err) => { console.log(`${err}`); },
        complete: () => { console.log('complete'); },
      });
      
Output: Error: message // сработал error обработчик через 5 секунд

Все из-за того, что ошибка конвертируется в Notification объект с помощью materialize, это позволяет нам применить оператор delay.

После (с помощью dematerialize оператора) мы получаем первоначальную ошибку.

Исходный код

Где же еще они пригодятся?

Скажем, нужно объединить 2 observable. Как только один из них завершится, то результирующий observable тоже должен завершиться.

Если реализовать это с помощью оператора merge:

const result$ = merge(this.first$, this.second$)
      .subscribe({
        next: (x) => { console.log(`${x}`); },
        complete: () => { console.log('complete'); },
      });
 
    this.first$.next(1);
 
    this.second$.complete(); // завершаем один из потоков
 
    this.first$.next(2); // результирующий поток все еще не завершен

Output:
1
2

Мы не увидим в output «complete». Это случилось из-за того, что merge просто так работает:»The output Observable only completes once all input Observables have completed». И один из способов справиться с этим — использовать операторы materialize и dematerialize:

 const result$ = merge(
      this.first$.pipe(materialize()), // конвертируем в Notification 1 поток 
      this.second$.pipe(materialize()) // конвертируем в Notification 2 поток 
      ) 
      .pipe(dematerialize()) // возвращаемся к исходному значению
      .subscribe({
        next: (x) => { console.log(`${x}`); },
        complete: () => { console.log('complete'); },
      });
 
    this.first$.next(1); // Notification {kind: "N"}
 
    this.second$.complete(); // завершаем один из потоков - Notification {kind: "C"}
 
    this.first$.next(2); // этот next уже на сработает

Output: 
1 // next от first$
complete // complete от second$ — завершился результирующий поток

Это то, чего мы добивались. Догадываетесь, почему сработало?

Мы знаем, что при использовании merge оператора результирующий поток не завершится при завершении только одного из переданных в него observable. А materialize оператор конвертирует любое значение observable в Notification объект, что воспринимается как «next» выброс.

Это позволило нам получить «complete» событие от this.second$ observable в pipe (…).
И уже с помощью dematerialize оператора мы получаем исходное значение observable — завершение потока.

Исходный код

Заключение

Я постаралась привести несколько примеров использования materialize/dematerialize, чтобы понять, на что они способны и где полезны.

Уверена, что эти операторы помогут вам лучше понимать и использовать все возможности RxJS, и вы найдете им интересные способы применения.

А вы раньше использовали их?

Полезные ссылки

https://rxjs.dev/api/operators/materialize
https://rxjs.dev/api/operators/dematerialize
https://rxjs.dev/api/index/class/Notification
https://docs.w3cub.com/rxjs/api/index/class/notification
https://docs.w3cub.com/rxjs/api/operators/materialize
https://docs.w3cub.com/rxjs/api/operators/dematerialize
https://docs.w3cub.com/rxjs/api/index/function/merge

© Habrahabr.ru