Параллельные запросы в Kotlin для автоматизации сборки данных
Всем привет! В своей работе я часто использую Kotlin для автоматизации. Деятельность моя не связана напрямую с программированием, но Котлин здорово упрощает рабочие задачи.
Недавно нужно было собрать данные немаленького размера, дабы сделать анализ, поэтому решил написать небольшой скрипт, для получения данных и сохранения их в Excel. С последним пунктом проблем не возникло — почитал про Apache POI, взял пару примеров из официальной документации, доработав под себя. Чего не скажешь про запросы в Сеть.
Источник отдавал пачками json и надо было как-то быстро эти «пачки» собирать, преобразовывая в текст и записывая в файл таблицу.
Асинхронный метод
Начать решил с простой асинхронщины. Немного поковыряв HttpUrlConnection, отправил туда, где ему и место, заменив на HttpClient из Java.
Для тестов взял сервис https://jsonplaceholder.typicode.com/, который мне подсказал один знакомый разработчик. Сохранил ссылку, которая выдает Json с комментариями в переменную, дабы не дублировать и начал тесты.
const val URL = "https://jsonplaceholder.typicode.com/comments"
Функция была готова и даже работала. Данные приходили.
fun getDataAsync(url: String): String? {
val httpClient = HttpClient.newBuilder()
.build()
val httpRequest = HttpRequest.newBuilder()
.uri(URI.create(link)).build()
return httpClient.sendAsync(httpRequest, BodyHandlers.ofString())
.join().body()
}
Теперь надо было проверить скорость работы. Вооружившись measureTimeMillis я запустил код.
val asyncTime = measureTimeMillis {
val res = (1..10)
.toList()
.map {getDataAsync("$URL/$it")}
res.forEach { println(it) }
}
println("Асинхронный запрос время $asyncTime мс")
Все работало как надо, но хотелось быстрее. Немного покопавшись в Интернете, я набрел на решение, в котором задачи выполняются параллельно.
Parallel Map
Автор в своем блоге пишет, что код ниже выполняется параллельно с использованием корутин. Ну что, я давно хотел их попробовать, а тут представилась возможность.
suspend fun Iterable.pmap(f: suspend (A) -> B): List =
coroutineScope {
map { async { f(it) } }.awaitAll()
}
Если я все верно понял, то здесь расширяется стандартная коллекция (класс Iterable) функцией pmap, в которую передается лямбда. В лямбду поочередно приходит параметр A. Затем после окончания прохода по списку async дожидается выполнения всех элементов списка, и с помощью .awaitAll () выдает результат в виде списка. Причем для каждого элемента функция с модификатором suspend, то есть блокироваться она не будет.
Пришло время тестов, и сказать, что я был разочарован — значит не сказать ничего.
val parmapTime = measureTimeMillis {
runBlocking {
val res = (1..10)
.toList()
.pmap { getDataAsync("$URL/$it") }
println(mapResult)
}
}
println("Время pmap $parmapTime мс")
Средний результат был в районе — 1523 мс, что не сильно то отличалось по скорости от первого решения. Задачи может и работали параллельно благодаря map и async, но уж очень медленно.
Parallel Map v 2.0
После работы, вооружившись малиновым чаем, я сел читать документацию по корутинам и через некоторое время переписал реализацию автора.
suspend fun Iterable.parMap(func: suspend (T) -> V): Iterable =
coroutineScope {
map { element ->
async(Dispatchers.IO) { func(element) }
}.awaitAll()
}
val parMapTime = measureTimeMillis {
runBlocking {
val res = (1..10)
.toList()
.parMap { getDataAsync("$URL/$it") }
}
println(res)
}
println("Параллельная map время $parMapTime мс")
После добавления контекста Dispatchers.IO задача выполнялась в 2 раза быстрее ~ 610 мс. Другое дело! Остановившись на этом варианте и дописав все до полноценного рабочего скрипта (проверка ошибок, запись в excel и т.д.) я успокоился. Но мысль в голове о том, что можно еще что-то улучшить не покидала меня.
Java ParallelStream
Через несколько дней, в одном из постов на stackowerflow прочитал о parallelStream. Не откладывая дело в долгий ящик, после работы вновь запустил IDEA.
val javaParallelTime = measureTimeMillis {
val res = (1..10).toList()
.parallelStream()
.map { getDataAsync("$URL/$it") }
res.forEach { println(it) }
}
println("Java parallelSrtream время $javaParallelTime мс")
Код выполнялся даже чуть быстрее, чем моя реализация. Но радость длилась ровно до того момента, когда пришло время обрабатывать ошибки. Точки останова насколько я понял в stream нет. Иногда, у меня получалось так, что все считалось до конца, вываливалась ошибка и в виде результата «прилетал» то неполный, то пустой Json.
Может, я делал что-то не так, но с async таких проблем не возникло. Там можно контролировать данные на каждом шаге итерации и удобно обрабатывать ошибки.
Выводы
Результаты можно посмотреть в таблице ниже. Для себя я однозначно решил оставить async await. В основном конечно из-за более простой обработки ошибок. Да и за пределы корутин тут выходить не надо.
Метод | Время (ms) |
---|---|
Асинхронный метод | 1487 |
Реализация pmap из Сети | 1523 |
Мой вариант — parallelMap | 610 |
Java.parallelStream | 578 |
В дальнейшем, есть мысли оформить это в небольшую библиотеку и использовать в личных целях, и конечно переписать все это с «индусского кода» на человеческий, на сколько хватит возможностей. А потом залить все это на vds.
Надеюсь мой опыт кому-нибудь пригодится. Буду рад конструктивной критике и советам! Всем спасибо