Параллельные запросы в Kotlin для автоматизации сборки данных

4e04b48047711a87bbbbf39f91f38a96.jpg?v=1

Всем привет! В своей работе я часто использую Kotlin для автоматизации. Деятельность моя не связана напрямую с программированием, но Котлин здорово упрощает рабочие задачи.

Недавно нужно было собрать данные немаленького размера, дабы сделать анализ, поэтому решил написать небольшой скрипт, для получения данных и сохранения их в Excel. С последним пунктом проблем не возникло — почитал про Apache POI, взял пару примеров из официальной документации, доработав под себя. Чего не скажешь про запросы в Сеть.

Источник отдавал пачками json и надо было как-то быстро эти «пачки» собирать, преобразовывая в текст и записывая в файл таблицу.

Асинхронный метод

Начать решил с простой асинхронщины. Немного поковыряв HttpUrlConnection, отправил туда, где ему и место, заменив на HttpClient из Java.

Для тестов взял сервис https://jsonplaceholder.typicode.com/, который мне подсказал один знакомый разработчик. Сохранил ссылку, которая выдает Json с комментариями в переменную, дабы не дублировать и начал тесты.

const val URL = "https://jsonplaceholder.typicode.com/comments"

Функция была готова и даже работала. Данные приходили.

fun getDataAsync(url: String): String? {
    val httpClient = HttpClient.newBuilder()
        .build()
    val httpRequest = HttpRequest.newBuilder()
        .uri(URI.create(link)).build()

    return httpClient.sendAsync(httpRequest, BodyHandlers.ofString())
        .join().body()
}

Теперь надо было проверить скорость работы. Вооружившись measureTimeMillis я запустил код.

val asyncTime = measureTimeMillis { 
    val res = (1..10)
        .toList()
        .map {getDataAsync("$URL/$it")}
    res.forEach { println(it) }
}
println("Асинхронный запрос время $asyncTime мс")

Все работало как надо, но хотелось быстрее. Немного покопавшись в Интернете, я набрел на решение, в котором задачи выполняются параллельно.

Parallel Map

Автор в своем блоге пишет, что код ниже выполняется параллельно с использованием корутин. Ну что, я давно хотел их попробовать, а тут представилась возможность.

suspend fun  Iterable.pmap(f: suspend (A) -> B): List =
    coroutineScope {
        map { async { f(it) } }.awaitAll()
    }

Если я все верно понял, то здесь расширяется стандартная коллекция (класс Iterable) функцией pmap, в которую передается лямбда. В лямбду поочередно приходит параметр A. Затем после окончания прохода по списку async дожидается выполнения всех элементов списка, и с помощью .awaitAll () выдает результат в виде списка. Причем для каждого элемента функция с модификатором suspend, то есть блокироваться она не будет.

Пришло время тестов, и сказать, что я был разочарован — значит не сказать ничего.

val parmapTime = measureTimeMillis {
    runBlocking {
        val res = (1..10)
            .toList()
            .pmap { getDataAsync("$URL/$it") }
        println(mapResult)
    }
}
println("Время pmap $parmapTime мс")

Средний результат был в районе — 1523 мс, что не сильно то отличалось по скорости от первого решения. Задачи может и работали параллельно благодаря map и async, но уж очень медленно.

Parallel Map v 2.0

После работы, вооружившись малиновым чаем, я сел читать документацию по корутинам и через некоторое время переписал реализацию автора.

suspend fun  Iterable.parMap(func: suspend (T) -> V): Iterable =
    coroutineScope {
        map { element -> 
            async(Dispatchers.IO) { func(element) } 
        }.awaitAll() 
    }

val parMapTime = measureTimeMillis {
    runBlocking {
        val res = (1..10)
            .toList()
            .parMap { getDataAsync("$URL/$it") }
    }
    println(res)
}
println("Параллельная map время $parMapTime мс")

После добавления контекста Dispatchers.IO задача выполнялась в 2 раза быстрее ~ 610 мс. Другое дело! Остановившись на этом варианте и дописав все до полноценного рабочего скрипта (проверка ошибок, запись в excel и т.д.) я успокоился. Но мысль в голове о том, что можно еще что-то улучшить не покидала меня.

Java ParallelStream

Через несколько дней, в одном из постов на stackowerflow прочитал о parallelStream. Не откладывая дело в долгий ящик, после работы вновь запустил IDEA.

val javaParallelTime = measureTimeMillis { 
    val res = (1..10).toList()
        .parallelStream()
        .map { getDataAsync("$URL/$it") }
    res.forEach { println(it) }
}
println("Java parallelSrtream время $javaParallelTime мс")

Код выполнялся даже чуть быстрее, чем моя реализация. Но радость длилась ровно до того момента, когда пришло время обрабатывать ошибки. Точки останова насколько я понял в stream нет. Иногда, у меня получалось так, что все считалось до конца, вываливалась ошибка и в виде результата «прилетал» то неполный, то пустой Json.

Может, я делал что-то не так, но с async таких проблем не возникло. Там можно контролировать данные на каждом шаге итерации и удобно обрабатывать ошибки.

Выводы

Результаты можно посмотреть в таблице ниже. Для себя я однозначно решил оставить async await. В основном конечно из-за более простой обработки ошибок. Да и за пределы корутин тут выходить не надо.

Метод

Время (ms)

Асинхронный метод

1487

Реализация pmap из Сети

1523

Мой вариант — parallelMap

610

Java.parallelStream

578

В дальнейшем, есть мысли оформить это в небольшую библиотеку и использовать в личных целях, и конечно переписать все это с «индусского кода» на человеческий, на сколько хватит возможностей. А потом залить все это на vds.

Надеюсь мой опыт кому-нибудь пригодится. Буду рад конструктивной критике и советам! Всем спасибо

© Habrahabr.ru