Сравнение операторов RxJava 3 и Kotlin Coroutines Flow

0d496e054bf7ad3f2521b915195ef198.png

Привет, Хабр! Меня зовут Константинов Александр, я Android-разработчик в «Студии Олега Чулакова». Сегодня мы сравим операторы RxJava 3 и Flow. Статья будет полезна как для изучения операторов, так и для более легкого перехода с RxJava на Flow. Буду рад вашему фидбеку и комметариям.

Ну что ж, давайте начинать!

В этой статье рассмотрим только самые популярые операторы и примеры кода:

Операторы преобразования

RxJava map / Flow map:

Преобразует элементы, применяя функцию к каждому из них. Работает последовательно.

Flow map

Flow map

Пример кода

//RxJava map
fun main() {
  Observable.range(1, 5)
    .map { it * 2 }
    .subscribe(::println) // Вывод: 2, 4, 6, 8, 10
}
//Flow map
fun main() = runBlocking {
  (1..5).asFlow()
    .map { it * 2 }
    .collect { println(it) } // Вывод: 2, 4, 6, 8, 10
}

RxJava flatMap / Flow flatMapMerge:

Преобразует каждое значение в поток. RxJava flatMap / Flow flatMapMerge, в отличие от RxJava concatMap / Flow flatMapConcat, обрабатывает элементы исходного потока параллельно, а не последовательно. Потоки для каждого элемента запускаются одновременно и эмитят значения в порядке их готовности, а не поочередно, поэтому порядок значений может отличаться и не соответствовать исходному. Следует использовать, когда порядок обработки не критичен и можно выиграть в производительности от параллельной обработки.

Flow flatMapMerge

Flow flatMapMerge

Пример кода

//RxJava flatMap
fun main() {
    Observable.just(1, 2, 3)
        .flatMap { i ->
            Observable.just(i * 10)
                .concatWith(Observable.just(i * 20).delay(10, TimeUnit.MILLISECONDS))
        }
        .blockingSubscribe(::println)
}
//Flow flatMapMerge
fun main() = runBlocking {
  flowOf(1, 2, 3)
    .flatMapMerge { flow { emit(it * 10); delay(10); emit(it * 20) } }
    .collect { println(it) }
}

RxJava concatMap / Flow flatMapConcat:

Преобразует каждое значение в поток и объединяет их последовательно. Берет каждый элемент из исходного потока и, для каждого элемента, создает новый поток. Эти потоки затем объединяются в один линейный поток. Важно отметить, что объединение происходит поочередно: следующий поток начинается только после того, как завершен предыдущий.

Flow flatMapConcat

Flow flatMapConcat

Пример кода

//RxJava concatMap
fun main() {
  Observable.just(1, 2, 3)
    .concatMap { i -> Observable.just(i * 10, i * 20) }
    .subscribe(::println) // Вывод: 10, 20, 20, 40, 30, 60
}
//Flow flatMapConcat
fun main() = runBlocking {
  flowOf(1, 2, 3)
    .flatMapConcat { flowOf(it * 10, it * 20) }
    .collect { println(it) } // Вывод: 10, 20, 20, 40, 30, 60
}

RxJava buffer / Flow buffer:

Группирует элементы в списки фиксированного размера.

Используется для обработки элементов в буферах (или пакетами), что позволяет повысить производительность, особенно в случаях, когда обработка каждого элемента по отдельности блокирует лишние ресурсы.

Пример кода

//RxJava buffer
fun main() {
  Observable.just(1, 2, 3, 4, 5)
    .doOnNext { println("Emitting $it") }
    .buffer(2) // Буферизуем элементы по 2
    .subscribe { buffer ->
      println("Collected $buffer")
    }
  Thread.sleep(1000)
}
//Flow buffer
fun main() = runBlocking {
  flowOf(1, 2, 3, 4, 5)
    .onEach { println("Emitting $it") }
    .buffer() // Буферизуем все элементы
    .collect { value ->
      println("Collected $value")
    }
}

RxJava scan / Flow scan:

Последовательно применяет функцию к элементам, возвращая промежуточные результаты. Используется для накопления значений с возможностью учета предыдущих накопленных значений.

Flow scan

Flow scan

Пример кода

//RxJava scan
fun main() {
    Observable.fromArray(1, 2, 3)
        .scan(0) { accumulator, value ->
            accumulator + value
        }
        .subscribe { result ->
            println(result) // Вывод: 0, 1, 3, 6
        }
}
//Flow scan
fun main() = runBlocking {
    // Создаем Flow от 1 до 3
    val flow = (1..3).asFlow()

    // Применяем оператор scan
    flow.scan(0) { accumulator, value ->
        accumulator + value
    }.collect { result ->
        println(result) // Вывод: 0, 1, 3, 6
    }
}

RxJava groupBy / Аналог на Flow отсутствует:

Разбивает поток на несколько потоков, сгруппированных по ключам.

Пример кода

fun main() {
  val observable = Observable.just(
    "one", "two", "three", "four", "five", "six"
  )

  observable.groupBy { it.length }
    .flatMapSingle { group ->
      group.toList().map { list -> Pair(group.key, list) }
    }
    .subscribe { pair ->
      println("Length: ${pair.first}, Words: ${pair.second}")
    }
}

Фильтрация

RxJava distinctUntilChanged / Flow distinctUntilChanged

Исключает повторяющиеся последовательные элементы.

Flow distinctUntilChanged

Flow distinctUntilChanged

Пример кода

//RxJava distinctUntilChanged
fun main() {
  Observable.just(1, 1, 2, 2, 3, 1)
    .distinctUntilChanged()
    .subscribe(::println) // Вывод: 1, 2, 3, 1
}
//Flow distinctUntilChanged
fun main() = runBlocking {
  flowOf(1, 1, 2, 2, 3, 1)
    .distinctUntilChanged()
    .collect { println(it) } // Вывод: 1, 2, 3, 1
}

RxJava filter / Flow filter:

Отбирает элементы, соответствующие условию.

Flow filter

Flow filter

Пример кода

//RxJava filter
fun main() {
  Observable.range(1, 5)
    .filter { it % 2 == 0 }
    .subscribe(::println) // Вывод: 2, 4
}
//Flow filter
fun main() = runBlocking {
  (1..5).asFlow()
    .filter { it % 2 == 0 }
    .collect { println(it) } // Вывод: 2, 4
}

RxJava take / Flow take:

Берет первые N элементов из входящего потока.

Flow take

Flow take

Пример кода

//RxJava take
fun main() {
  Observable.range(1, 10)
    .take(3)
    .subscribe(::println) // Вывод: 1, 2, 3
}
//Flow take
fun main() = runBlocking {
  (1..10).asFlow()
    .take(3)
    .collect { println(it) } // Вывод: 1, 2, 3
}

RxJava skip / Flow drop:

Пропускает первые N элементов.

Flow drop

Flow drop

Пример кода

//Flow drop
fun main() = runBlocking {
  flowOf(1, 2, 3, 4, 5)
    .drop(2) // Пропускаем первые 2 элемента
    .collect { value ->
      println(value) // Выведет 3, 4, 5
    }
}
//RxJava skip
fun main() {
  Observable.just(1, 2, 3, 4, 5)
    .skip(2) // Пропускаем первые 2 элемента
    .subscribe { value ->
      println(value) // Выведет 3, 4, 5
    }
}

Комбинирование

RxJava merge / Flow merge:

Объединяет заданные потоки в один поток. Все потоки объединяются одновременно, без ограничения на количество одновременно собираемых потоков.

Flow merge

Flow merge

Пример кода

//Flow merge
fun main() = runBlocking {
  val flow1 = flow {
    repeat(3) { 
      emit("From flow1: $it")
      delay(500L) // Эмиссия каждые 500 мс
    }
  }

  val flow2 = flow {
    repeat(3) { 
      emit("From flow2: $it")
      delay(1000L) // Эмиссия каждые 1000 мс
    }
  }

  val startTime = System.currentTimeMillis()
  merge(flow1, flow2).collect { value ->
    println("$value at ${System.currentTimeMillis() - startTime} ms")
  }
}
//RxJava merge
fun main() {
    val observable1 = Observable.interval(500, TimeUnit.MILLISECONDS)
        .take(3)
        .map { "From observable1: $it" }

    val observable2 = Observable.interval(1000, TimeUnit.MILLISECONDS)
        .take(3)
        .map { "From observable2: $it" }

    val startTime = System.currentTimeMillis()
    Observable.merge(observable1, observable2)
        .subscribe { value ->
            println("$value at ${System.currentTimeMillis() - startTime} ms")
        }

    // Ожидание завершения потоков
    Thread.sleep(4000)
}

RxJava zip / Flow zip:

Объединяет потоки, применяя функцию комбинирования к элементам. Элемент для которого нет пары не испускается.

Flow zip

Flow zip

Пример кода

//RxJava zip
fun main() {
  val nums = Observable.range(1, 3)
  val letters = Observable.just("A", "B", "C", "D")
  
  Observable.zip(nums, letters) { n, l -> "$n -> $l" }
    .subscribe(::println) 
  // Вывод: 1 -> A, 2 -> B, 3 -> C //D - не эмиттится
}
//Fllow zip
fun main() = runBlocking {
  val numbers = (1..3).asFlow()
  val letters = flowOf("A", "B", "C", "D")
  numbers.zip(letters) { n, l -> "$n -> $l" }
    .collect { println(it) } 
  // Вывод: 1 -> A, 2 -> B, 3 -> C //D - не эмиттится
}

RxJava combineLatest / Flow combine:

Объединяет потоки, используя последние элементы из каждого.

Flow combine

Flow combine

Пример кода

//Flow combine
fun main() = runBlocking {
    val flow1 = flow {
        repeat(3) {
            emit("String: s$it")
            delay(500L) // Эмиссия каждые 500 мс
        }
    }

    val flow2 = flow {
        repeat(3) {
            emit(it)
            delay(1000L) // Эмиссия каждые 1000 мс
        }
    }

    flow1.combine(flow2) { str, num ->
        "$str and Int: $num"
    }.collect { value ->
        println(value)
    }
}
//RxJava combineLatest
fun main() {
    val observable1 = Observable.interval(500, TimeUnit.MILLISECONDS)
        .take(3)
        .map { "String: s$it" }

    val observable2 = Observable.interval(1000, TimeUnit.MILLISECONDS)
        .take(3)

    Observable.combineLatest(observable1, observable2) { str: String, num: Long ->
        "$str and Int: $num"
    }
        .subscribe { value ->
            println(value)
        }
    Thread.sleep(5000L)
}

Агрегация

RxJava reduce / Flow reduce:

В Kotlin Flow оператор reduce аналогичен одноименному оператору в RxJava. Оба используются для преобразования коллекции значений в одиночное значение, используя начальное значение и функцию агрегирования (аккумулятора). Применяет функцию агрегирования ко всем элементам, возвращает одно значение.

Пример кода

//Flow reduce
fun main() = runBlocking {
  val result = (1..5).asFlow()
    .reduce { accumulator, value ->
      accumulator + value
    }

  println("Flow Reduce Result: $result")
}
//RxJava reduce
fun main() {
    Observable.fromIterable(listOf(1, 2, 3, 4, 5))
        .reduce { accumulator, value ->
            accumulator + value
        }
        .subscribe { result ->
            println("RxJava Reduce Result: $result")
        }
}
  • count:

    Используются для подсчета количества элементов в потоке, которые удовлетворяют определенному условию (или всех элементов, если условие отсутствует)

RxJava count / Flow count:

Используются для подсчета количества элементов в потоке, которые удовлетворяют определенному условию (или всех элементов, если условие отсутствует)

Пример кода

//Flow count
fun main() = runBlocking {
    val count = (1..10).asFlow()
        .count { it % 2 == 0 } // Считаем количество четных чисел

    println("Flow Count Result: $count")
}
//RxJava count
fun main() {
    Observable.fromIterable(1..10)
        .filter { it % 2 == 0 } // Отфильтруем четные числа
        .count() // Подсчитываем количество элементов после фильтрации
        .subscribe { count ->
            println("RxJava Count Result: $count")
        }
}

Управление временем

RxJava debounce / Flow debounce:

Пропускает элементы, если они генерируются слишком быстро.

Flow debounce

Flow debounce

Пример кода

//RxJava debounce
import io.reactivex.rxjava3.core.Observable
import java.util.concurrent.TimeUnit

fun main() {
  Observable.interval(100, TimeUnit.MILLISECONDS)
    .take(10)
    .debounce(150, TimeUnit.MILLISECONDS) // Испускает только элементы, 
  //за которыми следует 150мс тишины
    .subscribe(::println)

  Thread.sleep(2000)
}
//Flow debounce
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking {
  (1..10).asFlow()
    .onEach { delay(100L) }
    .debounce(150L) // Испускает только элементы, 
  //за которыми следует 150мс тишины
    .collect { println(it) } 
}

RxJava delay / Flow — другой подход

В RxJava Задерживает эмиссию элементов на указанное время. В Flow используется delay из kotlinx.coroutines.delay внутри построителя flow или внутри операторов.

Пример кода

//RxJava delay
fun main() {
  val startTime = System.currentTimeMillis()

  Observable.range(1, 5)
    .concatMap { item ->
      Observable.just(item)
        .delay(1000, TimeUnit.MILLISECONDS) // Задержка каждого элемента
    }
    .subscribe { value ->
      println("RxJava emitted $value at ${System.currentTimeMillis() - startTime} ms")
    }

  // Чтобы приложение не завершилось раньше времени
  Thread.sleep(6000)
}
//Flow: delay в onEach
fun main() = runBlocking {
    val startTime = System.currentTimeMillis()

    (1..5).asFlow()
        .onEach { delay(1000) } // Задержка перед каждым элементом
        .collect { value ->
            println("Flow emitted $value " +
                    "at ${System.currentTimeMillis() - startTime} ms")
        }
}

Надеюсь статья дала Вам более развернутое понимание, как сопоставляются различые операторы в RxJava и Kotlin Flow и поможет более быстро мигрировать с одоного на другое. Так же в статье постарался использовать много временЫх диаграмм для лучшего поимания принципов работы оператора.

Спасибо за внимание!

© Habrahabr.ru