Загадка трубы, или AsyncPipe в Angular

733dfefabd3daf68ae6ed1cfb02666d3.jpg

Всем привет. Меня зовут Дима, я фронтенд-разработчик в Тинькофф.

У нас в проектах повсеместно используется AsyncPipe для отображения асинхронных данных в шаблонах. Недавно мне захотелось разобраться, как он работает изнутри. Сегодня расскажу, что я узнал.

Что такое AsyncPipe

AsyncPipe дает возможность подписаться на Observable или Promise в шаблоне компонента и возвращать последнее полученное значение. Взглянем на пример:

import { Component } from '@angular/core';
import { interval } from 'rxjs';

@Component({
  selector: 'my-app',
  template: `: {{interval$ | async}}`
})
export class AppComponent {
  readonly interval$ = interval(1000);
}

Код на StackBlitz

interval(1000) создает новый Observable, который возвращает каждую секунду новое число. С помощью AsyncPipe подписываемся на него в шаблоне и выводим значение. Такой подход является более емким и элегантным, чем подписка в OnInit.

Но разве не нужно отписаться от Observable, чтобы не получить утечку памяти? Не стоит беспокоиться: AsyncPipe берет эту работу на себя. Он автоматически отписывается от Observable по уничтожению компонента. AsyncPipe — своего рода швейцарский нож для отображения асинхронных данных.

Можно использовать конструкцию data$ | async as data в связке со структурными директивами, такими как *ngIf и *ngFor, чтобы не подписываться два раза на одни и те же данные:

import { Component } from '@angular/core';
import { HttpClient } from '@angular/common/http';

@Component({
  selector: 'my-app',
  template: `
    
name: {{ user.name }}, id: {{user.id }}
` }) export class AppComponent { readonly user$ = this.http.get<{name: string, id: string}>( '' ); constructor(private readonly http: HttpClient){} }

Код на StackBlitz

Мы подписались на user$ с помощью async в шаблоне и поместили полученное значение из user$ в переменную user, которую можем переиспользовать внутри тега

.

Можно сразу обращаться к свойству асинхронного объекта, если обернуть выражение obs$ | async в круглые скобки:

*ngIf="(user$ | async).name as name"

К сожалению, в рантайме выпадет ошибка в консоль о том, что невозможно прочитать свойство name. Почему это происходит, разберемся дальше. А чтобы избежать этого, воспользуемся Optional Chaining и добавим ? перед обращением к свойству:

*ngIf="(user$ | async)?.name as name"

Мы рассмотрели, что такое AsyncPipe и способы его использования. Дальше предлагаю разобраться, как он устроен под капотом. Давайте поэтапно напишем свою реализацию, основанную на оригинальном пайпе.

Внутрь AsyncPipe

Начнем с контракта. Для упрощения не будем брать во внимание Promise. Получается, нам нужно принимать Observable и возвращать значения, которые он получает. Звучит несложно. Набросаем черновой вариант:

@Pipe({ name: 'myAsync' })
export class MyAsyncPipe implements PipeTransform {
  private lastValue?: any;
  private observable?: Observable;

  transform(observable: Observable): T | undefined {
    if (!this.observable) {
      this.observable = observable;
      observable.subscribe(value => this.lastValue = value);
    }
    return this.lastValue;
  }
}

Идея такая: в первом вызове transform() подписываемся на Observable и возвращаем undefined. Когда значение из Observable придет, следующий вызов transform() вернет актуальное значение. В оригинальном пайпе первый вызов возвращает null, именно поэтому возникает проблема с обращением к вложенным свойствам.

Тут есть загвоздка с тем, как часто вызывается метод transform() у пайпов в Angular. По умолчанию пайп вызывает данный метод, только если изменилось входное значение. Это замечательное свойство, которое позволяет не пересчитывать на каждый чих результат функции. Подробнее можно прочитать здесь. Но, так как наш объект Observable передается только один раз, нужно установить pure: false в метаданные пайпа, чтобы сообщить Angular, что мы хотим вызывать transform() на каждый цикл обнаружения изменений:

@Pipe({ name: 'myAsync', pure: false })

Код на StackBlitz

Теперь давайте добавим отписку на уничтожение компонента. Можно реализовать интерфейс OnDestroy для пайпа и там отписаться:

@Pipe({ name: 'myAsync', pure: false })
export class MyAsyncPipe implements PipeTransform, OnDestroy {
  private lastValue?: any;
  private observable?: Observable;
  private subscription?: Subscription;

  ngOnDestroy() {
    if (this.subscription) {
      this.subscription.unsubscribe();
    }
  }

  transform(observable: Observable): T | undefined {
    if (!this.observable) {
      this.observable = observable;
      this.subscription = observable.subscribe(
        value => this.lastValue = value
      );
    }
    return this.lastValue;
  }
}

Код на StackBlitz

myAsync уже имеет право на жизнь. Правда, если у компонента стратегия changeDetection будет равна OnPush, то данные не будут обновляться. Цикл обнаружения изменений не дойдет до компонента, потому что input-свойства компонента не изменились и, следовательно, не вызывается метод transform() у нашего пайпа.

Чтобы исправить это поведение, заинжектим ChangeDetectorRef и будем вызывать markForCheck(), когда нам будут приходить новые данные из Observable:

@Pipe({ name: 'myAsync', pure: false })
export class MyAsyncPipe implements PipeTransform, OnDestroy {
  ....
  constructor(private readonly ref: ChangeDetectorRef) {}
  ...

  transform(observable: Observable): T | undefined {
    if (!this.observable) {
      this.observable = observable;
      this.subscription = observable.subscribe(value => {
        this.lastValue = value;
        this.ref.markForCheck();
      });
    }
    return this.lastValue;
  }
}

Код на StackBlitz

Теперь наш пайп соответствует определению в документации. Выделю еще один момент, о котором нужно рассказать. В текущей реализации myAsync, если передать сначала один Observable, а затем еще один, то последний будет проигнорирован. Лучше отписаться от первого Observable и подписаться на новый. Давайте реализуем:

@Pipe({ name: 'myAsync', pure: false })
export class MyAsyncPipe implements PipeTransform, OnDestroy {
  ...
  ngOnDestroy() {
    this.dispose();
  }

  transform(observable: Observable): T | undefined {
    if (!this.observable) {
      ...
      return this.lastValue;
    }

    if (observable !== this.observable) {
      this.dispose();
      return this.transform(observable);
    }

    return this.lastValue;
  }

  private dispose() {
    if (this.subscription) {
      this.subscription.unsubscribe();
      this.subscription = undefined;
    }
    this.observable = undefined;
    this.lastValue = undefined;
  }
}

Код на StackBlitz

Мы вынесли отписку и обнуление переменных в метод dispose(). Вызываем его на хук OnDestroy, как и раньше. Если приходит новый Observable, то отписываемся от старого и вызываем transform() для нового значения, чтобы подписаться. Слегка запутанно.

Зато код практически идентичен оригинальному AsyncPipe. Там еще обвязка на Promise и проверка передаваемого значения, но смысл аналогичен нашему пайпу.

Вместо заключения

Мы рассмотрели возможности использования AsyncPipe и узнали, что он представляет собой внутри. Надеюсь, мне удалось немного пролить свет на эту технологию. И помните: с большим количеством подписок приходит и большая ответственность!

© Habrahabr.ru