Реактивное программирование со Spring Boot 2. Часть 1

cjhrpraqlfoay-vyjj7h0z_31ju.jpeg


Не так давно вышла новая версия самого популярного фреймворка на Java: Spring Framework 5. Новая версия принесла много нового. Одно из самых больших нововведений — модель реактивного программирования. Совсем скоро выйдет Spring Boot 2, который существенно упростит создание микросервисов c данным подходом.

Если вы, как и я, хотите разобраться подробнее, что это такое и как это используется, то добро пожаловать под кат. Статья делится на две части — теоретическую и практическую. Сейчас мы постараемся разобраться, что значит быть реактивным. После чего попробуем использовать полученные знания для написания собственного микросервиса (часть 2).

Что такое реактивность?


Для начала рассмотрим понятие реактивности. И тут нужно сделать сразу четкое разраничение в определениях.

Реактивная система


Реактивная система — архитектурный паттерн, который удовлетворяет некоторому набору правил (reactive manifesto). Данный монифест был разработан в 2013 году для устранения неопределенности. Дело в том, что на тот момент в Европе и США термин «reactive» являлся слишком избочным. Каждый понимал по-своему какую систему можно назвать реактивной. Это рождало огромную путаницу и в итоге был создан манифест, который устанавливает четкие критерии реактивной системы.

Посмотрим на картинку из манифеста и разберем более подробно, что означает каждый пункт:

image

  • Responsive. Данный принцип говорит нам о том, что разрабатываемая система должна отвечать быстро и за определенное, заранее заданное, время. Кроме того система должна быть достаточно гибкой для самодиагностики и починки.

    Что это значит на практикте? Традиционно при запросе некоторого сервиса мы идем в базу данных, вынимаем необходимый объем информации и отдаем ее пользователю. Здесь все хорошо, если наша система достаточно быстрая и база данных не очень большая. Но что, если время формирования ответа гораздно больше ожидаемого? Кроме того, у пользователя мог пропасть интернет на несколько миллисекунд. Тогда все усилия по выборке данных и формированию ответа пропадают. Вспомните gmail или facebook. Когда у вас плохой интернет, вы не получаете ошибку, а просто ждете результат больше обычного. Кроме того, этот пункт говорит нам о том, что ответы и запросы должны быть упорядочены и последовательны.

  • Resilient. Система остается в рабочем состоянии даже, если один из компонентов отказал. Другими словами, компоненты нашей системы должны быть досточно гибкими и изолированами друг от друга. Достигается это путем репликаций. Если, например, одна реплика PostgreSQL отказала, необходимо сделать так, чтобы всегда была доступна другая. Кромен того, наше приложение должно работать во множестве экземпляров.
  • Elastic. Данный принцип говорит о том, что система должна занимать оптимальное количество ресурсов в каждый промежуток времени. Если у нас высокая нагрузка, то необходимо увеличить количество экзепляров приложения. В случае малой нагрузки, ресурсы свободных машин должны быть очищены. Типичный инструменты реализации данного принципа: Kubernetes.
  • Message Driven. Здесь начинается наиболее важный пункт для Java-разработчика. Именно этим вопросом должно озаботиться наше приложение. Общение между сервисами должно происходить через асинхронные сообщения. Это значит, что каждый элемент системы запрашивает информацию из другого элемента, но не ожидает получение результата сразу же. Вместо этого он продолжает выполняеть свои задачи. Это позволяет увеличить пользу от системных ресурсов и управлять более гибко возникающими ошибками. Обычно такой результат достигается через реактивное программирование.


Реактивное программирование


Если верить википедии, то реактивное программирование — парадигма программирования, ориентированная на потоки данных. Очень скоро мы разберем как это работает на практике, но вначале посмотрим на чем основана данная парадигма.

Основная концепция реактивного программирования базируется на неблокирующем вводе/ввыоде. Обычно при обращении к некоторому ресурсу (базе данных, файле на диске, удаленному серверу и т.д.) мы получаем результат сразу же (часто в той же строчке). При неблокирующем обращении к ресурсу наш поток не останавливается на обращении и продолжает выполнение. Результат мы получаем позже и по необходимости.

Практика


Отлично! Теперь приступим к реализации реактивного программирования в Java. Единственное, следует заметить, что мы будем использовать Spring WebFlux. Это новый фреймворк для реактивного программирования. Возникает вопрос: почему команда Spring не использовала для этих целей Spring Web MVC? Дело в том, что далеко не все модули в этом фреймворке можно использовать для работы в реактивном режиме. Останается много кода и сторонних библитек, например, Tomcat, которые основы на декларативном программировании и потоках.

В процессе работы над фреймворком была разработана небольшая спецификация для асинхронной работы. В дальнейшем эту спецификацию решили включить в Java 9. Однако я буду использовать Java 8 и Spring Boot 2 для простоты.

Основные концепции


В новом подходе у нас есть два основных класса для работы в реактивном режиме:

  • Mono
    Класс Mono нужен для работы с единственным объектом. Давайте посмотрим как будет выглядеть простое приложение с использованием Mono. Для этого создадим проект и сущность User в нем (все настройки и примеры можно найти на моем профиле в github):
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public class User {
        private String firstName;
        private String lastName;
    }

    Далее создадим класс с тестами и подготовленными пользователями:
    public class HabrreactiveApplicationTests {
    
    	private User peter = new User("Peter", "Griffin");
    	private User lois = new User("Lois", "Griffin");
    	private User brain = new User("Brain", "Griffin");
    }

    Напишем тест:
    	@Test
    	public void mono() {
    		// Создаем объект
    		Mono monoPeter = Mono.just(peter);
    
    		// Блокируем текущий поток до тех пор пока не получим объект
    		User peter2 = monoPeter.block();
    
    		// Проверяем, что мы получили ожидаемый объект
    		assertEquals(peter, peter2);
    	}

    Как видно из примера — использовать реактивный подход довольно просто.

    Кроме того, у класса Mono есть множество методов на любой случай жизни. Например, есть всем известный метод map для преобразования одного типа в другой:

    	@Test
    	public void blockMono() {
    		Mono monoPeter = Mono.just(peter);
    
    		// Блокируем текущий поток до тех пока мы не получим и не обработаем данные
    		String name = monoPeter.map(User::getFirstName).block();
    		assertEquals(name, "Peter");
    	}

  • Flux

    Данный класс схож с Mono, но предоставляет возможность асинхронной работы со множеством объектов:

    	@Test
    	public void flux() {
    		// Создаем поток данных для выгрузки наших
    		Flux fluxUsers = Flux.just(peter, lois, brain);
    
    		// Получаем данные и обрабатываем по мере поступления
    		fluxUsers.subscribe(System.out::println);
    	}

    Как и в случае с Mono у Flux есть набор полезных методов:
    	@Test
    	public void fluxFilter() {
    		Flux userFlux = Flux.just(peter, lois, brain);
    
    		// Фильтруем и оставляем одного Питера
    		userFlux
    				.filter(user -> user.getFirstName().equals("Peter"))
    				.subscribe(user -> assertEquals(user, peter));
    	}
    
    	@Test
    	public void fluxMap() {
    		Flux userFlux = Flux.just(peter, lois, brain);
    
    		// Преобразуем тип User в String
    		userFlux
    				.map(User::getFirstName)
    				.subscribe(System.out::println);
    	}

    Здесь следует подчеркнуть одну особенность. В отличае от стандартных (не демонов) потоков, при завершении работы основного потока выполнения, сбор наших данных останавливается и программа завершается. Это можно легко продемострировать. Следующий код ничего не выведет на консоль:
           @Test
    	public void fluxDelayElements() {
    		Flux userFlux = Flux.just(peter, lois, brain);
    
    		// Ожидаем получение данных 1 секунду и только после этого производим обработку событий
    		userFlux.delayElements(Duration.ofSeconds(1))
    				.subscribe(System.out::println);
    	}

    Этого можно избежать с помощью класса CountDownLatch:
    	@Test
    	public void fluxDelayElementsCountDownLatch() throws Exception {
    		// Создаем счечик и заводим его на единицу
    		CountDownLatch countDownLatch = new CountDownLatch(1);
    
    		Flux userFlux = Flux.just(peter, lois, brain);
    
    		// Запускаем userFlux со срабатыванием по прошествию одной секунды
    		// и устанавлием сбрасывание счетчика при завершении
    		userFlux
    				.delayElements(Duration.ofSeconds(1))
    				.doOnComplete(countDownLatch::countDown)
    				.subscribe(System.out::println); // вывод каждую секунду
    
    		// Ожидаем сброса счетчика
    		countDownLatch.await();
    	}


Все это очень просто и эффективно по ресурсам. Представьте чего можно достить при комбинировании вызовов методов стрима.

В данной статье мы рассмотрели понятие рективностивной системы и реактивного программирования. Кроме того, мы поняли как связаны эти понятия. В следующей части мы пойдем дальше и попробуем построить свой сервис на основе полученных знаний.

P.S. Предлагаю разобрать систему обмена сообщениями от mail.ru. Как вы считаете, такое приложение можно назвать системой сообщений согласно манифесту? Пишите свои мысли в комментариях. Очень интересно.

© Habrahabr.ru