Работа с асинхронностью в Dart

f0af16f72dd424bb83aa41ecee2b15dc.png?v=1

Всем привет! Меня зовут Дмитрий Репин, я Flutter-разработчик в Surf.

В этой статье я расскажу, как работать с асинхронностью в Dart: всё о самых важных классах библиотеки dart: async с примерами под катом. Поговорим о том, как в однопоточном языке сходить в сеть или базу данных и при этом не затормозить приложение.

Эта статья написана по материалам моего ролика на YouTube. Посмотрите видео, если больше любите слушать, чем читать.

Dart — однопоточный язык программирования, который выполняется в одном процессе. Что это значит по факту: если мы будем в одном потоке выполнять любую операцию, требующую времени, то приложение просто подвиснет. И всё время, пока мы, например, ждём ответа от сервера или выполнения запроса в БД, пользователь будет страдать и смотреть на лагающий интерфейс.

К счастью, Dart — хитрый однопоточный язык, который предоставляет механизм Event Loop — он даёт возможность откладывать какие-то операции на потом, когда поток будет посвободнее. Такие отложенные операции мы будем называть асинхронными.

Все операции в Dart можно разделить на два типа:

  1. Синхронные — те, что блокируют другие операции до своего выполнения.

  2. Асинхронные, которые позволяют другим операциям выполняться, пока текущая не закончится.

Вместе с операциями, функции тоже можно поделить на синхронные и асинхронные. И синхронные, и асинхронные функции могут содержать синхронные операции. Асинхронной функция начнёт считаться в тот момент, когда в ней появляется хотя бы одна асинхронная операция.

Создание асинхронной функции с помощью класса Future

Чтобы было нагляднее, давайте напишем простую асинхронную функцию. Пусть она будет принимать строку и смотреть на её длину: если она больше 10 символов, функция вернёт число 42, в ином случае — ошибку. Представим, что вычисление длины строки — достаточно тяжеловесная операция чтобы сделать её асинхронной. 

Чтобы сделать код асинхронным, нам понадобится класс Future. В конструктор он принимает функцию, которую необходимо выполнить асинхронно, а также предоставляет два обработчика: then и catchError. Первый обрабатывает успешное выполнение функции, а второй — выполнение функции с ошибкой. В итоге у нас должен получиться такой код:

runSimpleFutureExample({
    String question = 'В чем смысл жизни и всего такого?',
  }) {
    print('Start of future example');
    Future(() {
      print('Вопрос: $question');
      if (question.length > 10) { 
        return 42;
      } else {
        throw Exception('Вы задали недостаточно сложный вопрос.');
      }
    }).then((result) {
      print('Ответ: $result');
    }).catchError((error) {
      print('Ошибка. $error');
    });
    print('Finish of future example');
  }

В начале и конце метода выводятся строки о том, что он начал и закончил работу, а между ними создаётся сам future. В конструктор он принимает метод, который вычисляет длину строки, а затем к Future добавляются обработчики then и catchError, которые обрабатывают выполнение Future. Попробуем запустить этот пример и посмотрим, в каком порядке выводится на консоль результат:

Start of future example

Finish of future example

Вопрос: В чем смысл жизни и всего такого?

Ответ: 42

В результате сначала выводятся строки о начале и завершении метода, а потом выполняется код самого Future. Дело в том, что асинхронный код в Dart выполняется исключительно после синхронного. Соответственно, в синхронном коде мы не можем обработать результат асинхронной операции — при работе с асинхронщиной стоит держать это в голове.

Работать с Future через обработчики then и catchError — не самая хорошая идея, особенно если нужно передать результат одного Future во второй, а второго в третий и т. д., порождая callback hell.

Для таких случаев Dart предоставляет ключевые слова async и await. Словом async помечается функция, исполняющая асинхронные операции через await. Словом await помечаются сами асинхронные операции. К сожалению, при таком подходе мы теряем обработчик catchError, однако никто не мешает нам обрабатывать ошибки через стандартный try/catch. Чтобы понять, как это работает, перепишем наш предыдущий пример с использованием async/await и для удобства вынесем Future в отдельный метод:

 runFutureWithAwaitExample({
    String question = 'В чем смысл жизни и всего такого?',
  }) async {
    print('Start of future example');
    try {
      final result = await _getAnswerForQuestion(
        'В чем смысл жизни и всего такого?',
      );
      print('Ответ: $result');
    } catch (error) {
      print('Ошибка. $error');
    }
    print('Finish of future example');
  }
 
  Future _getAnswerForQuestion(String question) => Future(() {
        print('Вопрос: $question');
        if (question.length > 10) {
          return 42;
        } else {
          throw Exception('Вы задали недостаточно сложный вопрос.');
        }
      });

Здесь мы объявили метод, возвращающий Future. Вызываем его с использованием слова await, передавая в переменную result, с которой мы можем работать дальше, как будто в синхронном коде. Выведем на консоль результат работы метода:

Start of future example

Вопрос: В чем смысл жизни и всего такого?

Ответ: 42

Finish of future example

Как видим, теперь Future обрабатывается внутри метода, а не после его завершения. Но при этом не стоит забывать, что сам метод теперь стал асинхронным и выполнится только после выполнения синхронного кода.

Обработка последовательности асинхронных событий с помощью Stream

Помимо Future, есть ещё один важный класс для работы с асинхронностью — Stream. В отличие от Future, Stream может обрабатывать целый набор асинхронных событий. Давайте сразу разберём на примере:

void runStreamSimpleExample() {
    print('Simple stream example started');
    final stream = Stream.fromIterable([1, 2, 3, 4, 5]);
    stream.listen((number) {
      print('listener: $number');
    });
    print('Simple stream example finished');
  }

У стрима много разных конструкторов, посмотрите их в документации. Здесь используем fromIterable. Создаём в массиве пять чисел, подписываемся на стрим через метод listen и выводим на консоль:

Simple stream example started

Simple stream example finished

listener: 1

listener: 2

listener: 3

listener: 4

listener: 5

Как и в случае с Future, сначала выполняется синхронный код: сообщения о старте и завершения примера. Только потом обрабатывается стрим и выводятся на консоль числа. В этом примере подписка на стрим обрабатывается асинхронно через listen, однако есть способ обрабатывать стрим через знакомые нам ключевые слова async и await. Перепишем пример с их использованием:

void runStreamAwaitedSimpleExample() async {
    print('Simple stream example with await started');
    final stream = Stream.fromIterable([1, 2, 3, 4, 5]);
    await for (final number in stream) {
      print('Number: $number');
    }
    print('Simple stream example with await finished');
  }

Теперь обработка стрима похожа на обычный цикл. Но не стоит обрабатывать слишком большие стримы таким образом: это может затормозить работу программы. Выведем результат на консоль:

Simple stream example started

listener: 1

listener: 2

listener: 3

listener: 4

listener: 5

Simple stream example finished

Тут всё так же, как и с async/await во future. Сама функция стала асинхронной, поэтому теперь обработка Stream происходит до завершения метода. 

Single-subscription и broadcast стримы. Основы StreamController

Представим, что нам нужно подписаться на стрим в двух разных местах программы. Для этого можно сделать какой-нибудь синглтон, который бы предоставлял стрим и который можно было бы дёргать в двух разных частях кода. Чтобы не усложнять, попробуем подписаться на стрим дважды в одном методе, и посмотрим, что получится:

print('Simple stream example started');
    final stream = Stream.fromIterable([1, 2, 3, 4, 5]);
    stream.listen((number) {
      print('listener 1: $number');
    });
    stream.listen((number) {
      print('listener 2: $number');
    });
    print('Simple stream example finished');

И результат:

The following StateError was thrown while handling a gesture.

Bad state: Stream has already been listened to.

Мы получаем ошибку: на стрим уже произведена подписка и подписаться второй раз не выйдет.

Здесь стоит рассказать о том, что существует два типа стримов: single-subscription и broadcast стримы. Мы создали single-subscription стрим. Такие стримы поставляют все данные подписчику разом и только после самой подписки. Так и происходит в нашем примере.

Broadcast стримы отдают свои данные вне зависимости от того, подписан ли кто-нибудь на них или нет. При этом подписчики стрима получают события только с момента подписки, а не с момента старта жизни стрима. При создании стрима стоит посмотреть документацию к конструктору и разобраться, какие ограничения он накладывает и стрим какого типа создаётся через этот конструктор.

Чтобы увидеть разницу, создадим broadcast стрим. В этом примере для создания стрима мы не будем использовать непосредственно сам Stream. Мы будем работать с классом StreamController. Он предоставляет доступ к самому стриму, даёт возможность управлять им и добавлять в него события.

///пример broadcast стрима
  void runBroadcastStreamExample() {
    print('Broadcast stream example started');
    final streamController = StreamController.broadcast();
    streamController.stream.listen((number) {
      print('Listener 1: $number');
    });
    streamController.stream.listen((number) {
      print('Listener 2: $number');
    });
    streamController.sink.add(1);
    streamController.sink.add(2);
    streamController.sink.add(3);
    streamController.sink.add(4);
    streamController.sink.add(5);
    streamController.close();
    print('Broadcast stream example finished');
  }

Мы создали StreamController с broadcast стримом, дважды подписались на него и добавили через sink пять чисел, после чего закрыли стрим. Смотрим, что ушло на консоль:

Broadcast stream example started

Broadcast stream example finished

Listener 1: 1

Listener 2: 1

Listener 1: 2

Listener 2: 2

Listener 1: 3

Listener 2: 3

Listener 1: 4

Listener 2: 4

Listener 1: 5

Listener 2: 5

Проблем не возникло: два подписчика отработали, как и ожидалось. На самом деле у StreamController достаточно много функций, рекомендую почитать документацию к нему.

Отписка от стрима с помощью StreamSubscription

Помимо управления самим стримом часто возникает потребность управлять подпиской на него — для этого в dart: async добавлен StreamSubscription. Кстати, мы уже неявно работали с ним в наших предыдущих примерах, когда подписывались на стрим: на самом деле метод listen возвращает его, и таким образом мы можем выделить подписку в переменную. Модифицируем предыдущий пример так, чтобы вторая подписка отменялась после числа 3:

///пример broadcast стрима
  void runBroadcastStreamExample() {
    print('Broadcast stream example started');
    final streamController = StreamController.broadcast();
    streamController.stream.listen((number) {
      print('Listener 1: $number');
    });
    StreamSubscription sub2;
    sub2 = streamController.stream.listen((number) {
      print('Listener 2: $number');
      if (number == 3) {
        sub2.cancel();
      }
    });
    streamController.sink.add(1);
    streamController.sink.add(2);
    streamController.sink.add(3);
    streamController.sink.add(4);
    streamController.sink.add(5);
    streamController.close();
    print('Broadcast stream example finished');
  }

Тут без сюрпризов, на консоль выводится то, что мы ожидали:

Broadcast stream example started

Broadcast stream example finished

Listener 1: 1

Listener 2: 1

Listener 1: 2

Listener 2: 2

Listener 1: 3

Listener 2: 3

Listener 1: 4

Listener 1: 5

Стоит отметить, что StreamSubscription не только даёт возможность отменять подписку полностью, но и приостанавливать и возобновлять её, а также предоставляет колбэки на события подписки. 

Как упростить работу с асинхронностью

Мы разобрали все самые важные классы для dart: async, но осталось ещё три, о которых я бы хотел рассказать. Возможно, они используются не так часто, но могут быть очень полезны в определённых ситуациях.

Completer

Completer позволяет поставлять Future, отправлять событие о выполнении или событие об ошибке. Это может быть полезно, когда нужно сделать цепочку Future и вернуть результат. 

Разберём на примере. Пусть у нас будет completer, который складывает два числа с трёхсекундной задержкой. Это очень простой пример, но важно понимать, что тут можно сделать большую вложенность из Future и компактно уместить в один класс.

class CompleterTester {
  void runCompleterInitTest() async {
    print('Completer example started');
    var sumCompleter = SumCompleter();
    var sum = await sumCompleter.sum(20, 22);
    print('Completer result: ' + sum.toString());
    print('Completer example finished');
  }
}
 
class SumCompleter {
  Completer completer = Completer();
 
  Future sum(int a, int b) {
    _sumAsync(a, b);
    return completer.future;
  }
 
  void _sumAsync(int a, int b) {
    Future.delayed(Duration(seconds: 3), () {
      return a + b;
    }).then((value) {
      completer.complete(value);
    });
  }
}

Разберём, что тут произошло. Мы создали класс SumCompleter и создали в нём completer. Затем объявили метод sum, который вызывает приватный метод на сложение и возвращает Future этого completer. Затем этот приватный метод выполняет операцию и вызывает метод complete у комплитера. После этого пользователь метода получает результат.

Такой подход позволяет инкапсулировать в себе любую сложную часто используемую асинхронную логику и удобно с ней работать.

StreamIterator

Представьте, что вам нужно управлять переходом к следующему айтему стрима и делать это именно тогда, когда вам нужно. Именно это и делает StreamIterator. Он предоставляет метод moveNext для перехода к следующему элементу стрима. moveNext вернет true, если элемент пришел, и false, если стрим был закрыт. Также StreamIterator предоставляет свойство current для получения текущего элемента. Ну и конечно, метод cancel для отмены подписки. Небольшой пример по работе класса:  

void runStreamIteratorExample() async {
    print('StreamIteratorExample started');
    var stream = Stream.fromIterable([1, 2, 3]);
    var iterator = StreamIterator(stream);
    bool moveResult;
    do {
      moveResult = await iterator.moveNext();
      print('number: ${iterator.current}');
    } while (moveResult);
    print('StreamIteratorExample finished');
  }

С использованием этого StreamIterator мы будем переходить к следующему числу только в то время, когда нам это удобно, выполнив перед этим все нужные действия. 

StreamTransformer

На самом деле это немного переусложнённая штука, но она отлично справляется, если вам нужно много раз трансформировать поток в приложении определённым образом. Самый очевидный пример — разделение файла на строки. Разберём на примере и напишем трансформер, который будет удваивать приходящее в него число:

void runStreamTransformerExample() async {
    print('StreamTransformer example started');
    StreamTransformer doubleTransformer =
        new StreamTransformer.fromHandlers(handleData: (data, EventSink sink) {
      sink.add(data * 2);
    });
 
    StreamController controller = StreamController();
    controller.stream.transform(doubleTransformer).listen((data) {
      print('data: $data');
    });
 
    controller.add(1);
    controller.add(2);
    controller.add(3);
    print('StreamTransformer example finished');
  }

В нашем случае он будет принимать на вход данные, которые нужно трансформировать, и sink, в который мы должны передать трансформированные данные. В теле метода просто удваиваем приходящее значение. Затем остаётся только трансформировать стрим с помощью метода transform. И вуаля: в методе listen значение удваивается. Обратите внимание: трансформер может и не отдать данные в синк, а может отдать два раза. То есть он работает не только как map или where, а наделён гораздо большей функциональностью.

Рекомендую поиграться с этими примерами — они есть на нашем GitHub. Так вы точно поймете, как устроена асинхронность в Dart. Если что-то непонятно, задавайте вопросы — я отвечу в комментариях.

© Habrahabr.ru