Subject объекты в RxDart и чем они полезны Flutter-разработчику

Dart в совокупности с пакетом Async обладает неплохим функционалом в части работы со стримами. Однако ограничения всё ещё есть. Чтобы сделать стримы более удобными, используется пакет RxDart.

ReactiveX (Rx) появился в 2010 году для .NET, а после был портирован почти на все  современные языки программирования и стал стандартом. Версию для Dart опубликовали в 2015 году, и на данный момент она входит в число Flutter Favorite пакетов — её максимально поддерживает комьюнити.

Меня зовут Виталий, я Flutter Team Lead в Surf, и эта небольшая статья станет первой в цикле публикаций на тему RxDart.

Тезисы

  • Subject — объект, на который можно подписаться и слушать переданные в него значения, аналог StreamController в Dart.

  • PublishSubject — Subject, который является аналогом стандартного широковещательного контроллера StreamController.broadcast().

  • ReplaySubject — Subject, который хранит все переданные ранее значения и при подписке возвращает сразу все прошлые значения.

  • BehaviorSubject — Subject, который хранит в себе последнеепереданное значение, и при подписке на этот Subject, сразу возвращает слушателю это значение. Может быть инициализирован только с начальным значением.

Что предлагает Dart из «коробки»

Из «коробки» Dart предоставляет для работы со Stream класс StreamController, который позволяет управлять стримами. 

Существует два вида подписки:

— single subscription — может быть только один слушатель, который гарантированно получит все сообщения, поступившие после подписки на стрим;

— broadcast — слушателей может быть много, но они также будут получать сообщения, которые попадают в стрим после подписки.

Для StreamController.broadcast() можно провести аналогию с радио — активным слушателям информация поставляется в «прямом эфире», и если подключиться к нему позже остальных, то послушать упущенное никак нельзя.

import 'dart:async';

void _firstListener(int value) => print('first: $value');
void _secondListener(int value) => print('second: $value');

void main() {
  // создаем контроллер
  final streamController = StreamController.broadcast();
  
  // добавляем первый слушатель, перед добавлением значения
  streamController.stream.listen(_firstListener);
  
  // добавляем значение
  streamController.add(1);
  // _firstListener выведет 'first: 1'
  
  // добавляем второй слушатель
  streamController.stream.listen(_secondListener);
  // ничего не выведется
}

Что предлагает RxDart

Пакет добавляет три специализированных контроллера для работы со Stream — разновидности Subject:  

  • PublishSubject

  • ReplaySubject

  • BehaviorSubject

Рассмотрим их по отдельности.

PublishSubject

4e4a2d41f04d6d38915dad6fdfa01183.png

PublishSubject — широковещательный («broadcast» или «hot») контроллер, аналог стандартного широковещательного контроллера StreamController.broadcast(), о котором писали выше. Останавливаться здесь не будем. 

ReplaySubject

320360debb17a1cfd22237311edd93ea.png

ReplaySubject — также широковещательный контроллер, который стоит использовать, если слушателю нужно передать все прошлые переданные события. Для новых слушателей он «проигрывает» все прошлые события начиная с первого. 

Аналог — прямой эфир трансляции, который можно перематывать на ранние моменты.

import 'package:rxdart/subjects.dart';

// ... _firstListener и _secondListener

void main() {
  // создаём replay subject
  final replaySubject = ReplaySubject();
  
  replaySubject..add(1)..add(2)..add(3)..add(4);
  
  // добавляем первый слушатель, перед добавлением значения
  replaySubject.stream.listen(_firstListener);
  // _firstListener выведет:
  // first: 1 
  // first: 2 
  // first: 3 
  // first: 4
  
  // добавляем второй слушатель
  replaySubject.stream.listen(_secondListener);
  // _secondListener выведет:
  // second: 1 
  // second: 2 
  // second: 3 
  // second: 4
}

BehaviorSubject

bbebaea14867b626eae11e0a37330d0f.png

BehaviorSubject — широковещательный контроллер, который хранит в себе последнее значение или ошибку, и при подписке на этот стрим сразу возвращает слушателю последнее событие, переданное в контроллер. 

Аналог — прямая трансляция без возможности перемотки. Когда подключаешься, начинаешь просмотр с самого последнего кадра.

import 'package:rxdart/subjects.dart';

// ... _firstListener и _secondListener

void main() {
  // создаём behavior subject
  final behaviorSubject = BehaviorSubject();
  
  // добавляем первый слушатель, перед добавлением значения
  behaviorSubject.stream.listen(_firstListener);
  
  // добавляем значение
  behaviorSubject.add(1);
  // _firstListener выведет 'first: 1'
  
  // добавляем второй слушатель
  behaviorSubject.stream.listen(_secondListener);
  // _secondListener выведет 'second: 1'
}

Опционально можно передать слушателю исходное значение при подписке — с помощью конструктора BehaviorSubject.seeded, для Rx это более «нативный» способ объявления BehaviorSubject.

import 'package:rxdart/subjects.dart';

// ... _firstListener и _secondListener

void main() {
  // создаём behavior subject
  final behaviorSubject = BehaviorSubject.seeded(1);
  
  // добавляем первый слушатель, перед добавлением значения
  behaviorSubject.stream.listen(_firstListener);
  // _firstListener выведет 'first: 1'
  
  // добавляем второй слушатель
  behaviorSubject.stream.listen(_secondListener);
  // _secondListener выведет 'second: 1'
}

BehaviorSubject.seededможно использовать, когда в стрим требуется передать исходное значение и слушателям необходимо «среагировать» на него.

Например, состояние корзины с товарами хранить в BehaviorSubject, , а на экране корзины связать вывод её содержимого напрямую с состоянием в условном классе CartService.

import 'package:rxdart/subjects.dart';

class Product {
  final String title;

  const Product(this.title);
}

class CartState {
  final List products;

  const CartState({required this.products});
	
  factory CartState.empty() => const CartState(products: []);
}

class CartService {
  final _cartState = BehaviorSubject.seeded(CartState.empty());

  Stream get cartStateStreamed => _cartState.stream;

  void addProduct(Product product) {
    _cartState.add(
      CartState(
        products: [
          ..._cartState.value.products,
          product,
        ],
      ),
    );
  }
}

// где-то в приложении объявляем сервис для работы с корзиной
final service = CartService();

// подписываемся на состояние корзины, для обновления счётчика товаров, 
// например в BottomAppBar, он будет обновляться при изменении состояния корзины
service.cartStateStreamed.listen((cartState) {
  print('Число товаров: ${cartState.products.length}');
});

// добавляем товары
service..addProduct(const Product("Капуста"))..addProduct(const Product("Картошка"));

// чуть позже на экране содержимого корзины подписываемся на состояние и выводим названия товаров
service.cartStateStreamed.listen((cartState) {
  print('Названия товаров: ${cartState.products.map((p) => p.title).join(',')}');
});

В некоторых случаях BehaviorSubjectможет заменить ValueNotifier, который не оповещает слушателей о своём последнем  значении при подписке. BehaviorSubject позволяет аналогично обратиться к последнему значению стрима через геттер BehaviorSubject.value.

Заключение

Если вы хотите применять пакет RxDart в своих проектах и делать их более эффективными, не забывайте про документацию.

Также стоит ознакомиться с документацией для RxJS — пакета для JavaScript, актуального и для RxDart, делая скидку на отличный от стека Flutter язык. В этом пакете классная визуализация принципов Rx, так как Rx пакеты соблюдают общий контракт для всех методов и классов.

На этом не прощаемся — продолжим писать по теме и искать возможности повышения эффективности вашей работы.

Больше полезного про Flutter — в Telegram-канале Surf Flutter Team. Кейсы, лучшие практики, новости и вакансии в команду Flutter Surf в одном месте. Присоединяйтесь.

© Habrahabr.ru