Kotlin Coroutines. Часть 2

Прошло так много времени с прошлой статьи, прошу прощения, что заставил вас ждать. Выпускной курс забирал слишком много времени и не было возможности написать столь серьезный материал.

Итак, начинаем. Сначала кратко расскажу о материале в этой статье. Для разогрева поговорим о скоупе и о том, для чего он вообще нужен, далее перейдем к контексту, там я попробую объяснить сложное на пальцах, и, наконец, затронем к Continuation.

1. Coroutine scope

В этой части хочется раскрыть ответ на вопрос из комментариев:

»Можешь, пожалуйста, рассказать, для чего вообще скоуп существует? То есть буквально не нашел какого-то функционала у самого скоупа. Так почему все это не реализовано прямо на контексте?».

Как это и принято, пойдем к ответу издалека. Напомню, Coroutine scope — это интерфейс, который предоставляет способ управления жизненным циклом сопрограмм. Он определяет набор методов для запуска новых сопрограмм и для отмены существующих.

В первую очередь, стоит сказать, что каждый корутин билдер является расширением класса CoroutineScope, в этом несложно убедиться. Открываем реализацию любого билдера, пусть launch.

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job

Итак, CoroutineScope занимается управлением жизненным циклом корутин. А Context, грубо говоря, хранит параметры для работы корутины. В контексте есть объект Job, который также отвечает за управление жизненным циклом, но только одной корутины. Забегая наперед, Job ссылаются друг на друга, когда они вложены друг в друга и родитель ждет выполнения всех дочерних корутин. Согласитесь, начинается путаница. Если оба про жизненный цикл, зачем этот скоуп нужен? Выкинем его, и дело с концом.

Естественно, не все так просто. CoroutineScope является высокоуровневой абстракцией, которая объединяет несколько корутин, обеспечивая структурированное управление их жизненным циклом и контекстом выполнения. Т.е. мы говорим тут про группу корутин. Без скоупа мы не можем гарантировать, что все корутины завершат свою работу до перехода к следующему шагу.

Вы скажете: «Как не можем? Просто надо добавить runBlocking и все!». Но нет. Есть очень важная особенность, которую надо помнить. runBlocking может завершиться, не дождавшись завершения всех вложенных корутин, если они не были явно запущены в его контексте.

fun main() {
    runBlocking {
        // Запускаем корутины
        launch {
            println("Coroutine 1 start!")
            // Вложенная корутина, запущенная в глобальном контексте
            GlobalScope.launch {
                delay(2000L)
                println("Nested coroutine 2 in global context")
            }
            println("Coroutine 1 completed")
        }

        println("RunBlocking scope completed")
    }
println("Bye Bye")
}

А это вывод в консоли. Тут явно видно, что вложенная корутина не закончила свою работу, а вот программа свою уже завершила:

RunBlocking scope completed

Coroutine 1 start!

Coroutine 1 completed

Bye Bye

Мы можем объявить свою собственную область видимости с помощью билдера coroutineScope. Предоставленная область наследует свой coroutineContext от внешней области, используя Job из этого контекста в качестве родителя для нового Job.

Эта функция предназначена для параллельного разделения работы. Когда любая дочерняя сопрограмма в этой области завершается с ошибкой, вся область также завершает работу, отменяя все остальные дочерние сопрограммы. Как только данный блок и все его дочерние сопрограммы завершаются, функция возвращает управление.

И runBlocking, и coroutineScope ожидают завершения всех операций внутри своего блока и всех запущенных дочерних сопрограмм. Разница между ними в том, что runBlocking блокирует текущий поток, а coroutineScope лишь приостанавливает работу, позволяя использовать основной поток для других задач. Поэтому runBlocking — это обычная функция, а coroutineScope — функция приостановки.

Итак, CoroutineContext сам по себе предоставляет только настройки и параметры для выполнения корутин. Он не предоставляет механизмы для управления группой корутин. В то время как CoroutineScope вводит абстракцию, позволяющую управлять жизненным циклом группы корутин, взаимодействующих между собой.

Надеюсь, я смог ответить на вопрос. А теперь мы перейдем к контексту.

2. Dispatcher и CoroutineContext, что о чем и почем?

В контексте сопрограмм Kotlin, Dispatcher отвечает за определение того, в каком потоке или нитях должна выполняться сопрограмма. Диспетчеры являются неотъемлемой частью библиотеки kotlinx.coroutines и используются для управления выполнением сопрограмм.

Существует несколько диспетчеров, предоставляемых библиотекой:

  • Dispatchers.Default: Этот диспетчер предназначен для выполнения операций с интенсивным использованием процессора и имеет пул потоков размером, равным количеству ядер на компьютере, на котором выполняется ваш код (но не менее двух).

  • Dispatchers.IO: Этот диспетчер предназначен для выполнения операций с интенсивным вводом-выводом и имеет больший пул потоков, чем дефолтный.

  • Dispatchers.Main: Этот диспетчер предназначен для запуска сопрограмм в главном потоке приложения Android.

Также можно создавать свои собственные пользовательские диспетчеры.

Чтобы использовать диспетчер, можно указать его при запуске сопрограммы с помощью конструкторов 'launch' или 'async'.

Мини-пример:

val result = async(Dispatchers.IO) {
    // выполнение задачи в другом диспетчере
    fetchData()
}.await()

2.1 Поговорим о CoroutineContext

CoroutineContext — это также, как и скоуп, интерфейс, который предоставляет набор элементов, необходимых для выполнения сопрограмм. Стоит понимать, что это важный элемент, ведь корутина выполняется всегда в определенном контексте, где определены «правила» работы. Правила — это довольно странное определение, оно введено для простоты понимания.

В первую очередь, откроем сам интерфейс CoroutineContext. В нем будет еще один вложенный интерфейс Element, а он нам очень интересен. Как уже сказано, в контексте хранится набор параметров, и каждый из них может быть получен благодаря методу get.

Как гласит официальная документация: It is an indexed set of [Element] instances. An indexed set is a mix between a set and a map. Every element in this set has a unique [Key]. Следовательно, это некий микс. Конечно, создается некая путаница, но как есть. При этом обращаемся мы к элементу, как в мапе.

Исходя из этого, приведем список некоторых элементов, которые имплементируются от CoroutineContext.Element: Job, ContinuationInterceptor, CoroutineExceptionHandler, CoroutineName, CoroutineDispatcher и другие, которые определяют поведение сопрограммы. С некоторыми элементами мы уже знакомы по первой части, посмотрим какие новые вводные имеются.

ContinuationInterceptor — наблюдает за выполнением функций в корутине, а также управляет ими. Перейдем на простую аналогию. Представим поезд в метрополитене, он ездит по своему расписанию, останавливается на каждой станции, выполняет какую-то функцию (открыть, закрыть двери, сменить машиниста, передать привет кому-то — неважно), затем продолжает свой путь. Именно ContinuationInterceptor контролирует, когда корутина должна остановиться и когда продолжит свою работу.

CoroutineExceptionHandler. Когда в корутине возникает исключение, которое не было обработано другим способом, класс перехватывает это исключение. Он позволяет определить специфическое поведение для обработки ошибок в корутинах. Важно: он обрабатывает только те исключения, которые не были обработаны другим образом в корутине.

Когда создаем сопрограмму с помощью одного из конструкторов сопрограмм, предоставляемых библиотекой kotlinx.coroutines, можно указать CoroutineContext в качестве первого параметра. Этот контекст будет использоваться в качестве контекста по умолчанию для сопрограммы, и любые элементы в контексте будут доступны сопрограмме во время ее выполнения.

launch(Dispatchers.Default + Job()) {
    println("Coroutine works in thread ${Thread.currentThread().name}")
}

2.2 Поговорим о создании контекста

В интерфейсе CoroutineContext есть следующий метод:

public operator fun plus(context: CoroutineContext): CoroutineContext =
    if (context === EmptyCoroutineContext) this else // fast path -- avoid lambda creation
        context.fold(this) { acc, element -> …

Метод plus из интерфейса CoroutineContext в Kotlin используется для комбинирования контекстов в один. Когда вызываешь метод plus на одном контексте и передаешь в качестве аргумента другой контекст, он создает новый контекст, который объединяет функциональность обоих контекстов. Имейте в виду, что этот plus не является ассоциативным. Другими словами, context1 + context2 — это не то же самое, что context2 + context1, поскольку все ключи из левого контекста будут заменены ключами из правого контекста. Конечно, это не имеет значения при объединении двух различных элементов (Dispatchers.Default + Job ()), но при объединении нескольких CoroutineContext.Element это становится важным фактором.

Но внутри вызывается метод fold, для чего он нужен?

public fun  fold(initial: R, operation: (R, Element) -> R): R

Если нам нужно что-то сделать для каждого элемента в контексте, мы можем использовать метод fold, аналогичный стандартному методу fold для других коллекций.

Существует несколько способов создания контекста корутины:

  1. Использование предопределенных элементов контекста:

Пример создания контекста с использованием предопределенных элементов:

val context = Dispatchers.Default + Job() + CoroutineExceptionHandler { _, exception ->
    println("Coroutine exception: $exception")
}
  1. Создание пользовательского контекста:

Можно создать пользовательский контекст, комбинируя различные элементы контекста с помощью оператора +.

Пример создания пользовательского контекста:

val customContext = MyCustomDispatcher + Job() + MyCustomExceptionHandler()
  1. Использование функции withContext:

Функция withContext позволяет выполнить блок кода в определенном контексте.

Пример использования withContext:

suspend fun fetchData() {
    val data = withContext(Dispatchers.IO) {
        // Код для загрузки данных в фоновом потоке
    }
}
  1. Наследование от родительской корутины:

Если корутина создается внутри другой корутины, то новая корутина наследует контекст родительской.

Пример:

val parentJob = Job()
GlobalScope.launch(parent = parentJob) {
    // Новая корутина наследует контекст от parentJob
}

2.3 Связь контекста с ThreadLocal

Spring Security обеспечивает большое удобство при разработке защищенных веб-приложений. Однако он в значительной степени опирается на SecurityContext, хранящийся в ThreadLocal (внутри класса SecurityContextHolder). При использовании сопрограмм Kotlin существует дополнительный уровень абстракции, на котором вы на самом деле не знаете (и не хотите знать), в каких потоках будет выполняться ваш код. Корутина может выполняться на разных потоках, и из-за этого может быть потерян Spring Security Context. Главная проблема, что корутина может запуститься на одном Thread, но продолжить свое выполнение на другом, собственно, поэтому и нужен CoroutineContext (справедливо, случай крайне редкий).

Используемый по умолчанию подход в Spring Security, заключается в сохранении контекста безопасности в локальном потоке, хорошо работает в традиционных сервлетных приложениях, где запрос полностью обрабатывается в одном конкретном потоке. Spring также предоставляет дополнительную поддержку при использовании асинхронного сервлета или при создании собственных потоков или исполнителей. Если вы используете Spring WebFlux (реактивный Spring), сопрограммы Kotlin уже работают в сочетании с EnableReactiveMethodSecurity. В других случаях, при использовании сопрограмм в «традиционной» среде Spring MVC, требуется другой подход.

Сопрограммы Kotlin не привязаны к определенному потоку и, следовательно, по умолчанию плохо работают с ThreadLocal переменными.

Подробнее вот тут:

Небольшой пример кода:

/**
 * Необходимо использовать с withContext(IO + SecurityCoroutineContext()) для потенциально блокирующих вызвовов в suspend-функциях,
 * до этого корутины нужно запускать с этим аргументом: runBlocking(SecurityCoroutineContext()) { //вызов suspend-функций }
 * https://blog.jdriven.com/2021/07/propagating-the-spring-securitycontext-to-your-kotlin-coroutines/
 */
class SecurityCoroutineContext(
    private val securityContext: SecurityContext = SecurityContextHolder.getContext()
) : ThreadContextElement {

    companion object Key : CoroutineContext.Key

    override val key: CoroutineContext.Key get() = Key

    override fun updateThreadContext(context: CoroutineContext): SecurityContext? {
        val previousSecurityContext = SecurityContextHolder.getContext()
        SecurityContextHolder.setContext(securityContext)
        return previousSecurityContext.takeIf { it.authentication != null }
    }

    override fun restoreThreadContext(context: CoroutineContext, oldState: SecurityContext?) {
        if (oldState == null) {
            SecurityContextHolder.clearContext()
        } else {
            SecurityContextHolder.setContext(oldState)
        }
    }
}

3. Поговорим про Continuation

Теперь настала очередь самого тяжелого (лично для меня). Как мы уже обсуждали в первой части, имеется Continuation, объект такой интересный.

public interface Continuation {
    public val context: CoroutineContext
    public fun resumeWith(result: Result)
}

У него есть имплементации в классе BaseContinuationImpl:

internal abstract class BaseContinuationImpl(
    // This is `public val` so that it is private on JVM and cannot be modified by untrusted code, yet
    // it has a public getter (since even untrusted code is allowed to inspect its call stack).
    public val completion: Continuation?
) : Continuation, CoroutineStackFrame, Serializable

BaseContinuationImpl — это абстрактный базовый класс для всех реализаций Continuation. Он определяет общее поведение и предоставляет общую функциональность. В нем реализован метод resumeWith, который возобновляет выполнение корутины с указанным результатом. Метод resumeWith всегда вызывает invokeSuspend () — абстрактный метод, который реализован в классе coroutine body, который будет создан во время компиляции.

Следующий класс — это ContinuationImpl, наследуется от BaseContinuationImpl. Его функция заключается в создании объекта DispatchedContinuation с использованием перехватчика, который также является Continuation.

DispatchedContinuation представляет собой объект Continuation из тела корутины и содержит планировщик потоков. Его функция заключается в использовании планировщика потоков для планирования выполнения основной части сопрограммы указанному потоку.

Соединяю воедино все:

  1. Имеется CoroutineDispatcher. Что такое CoroutineDispatcher? CoroutineDispatcher — это как почтальон, который разносит корутины (задачи) в разные потоки.

  2. Что такое Continuation? Continuation — это как почтовый индекс, который указывает, куда должна быть доставлена корутина после завершения.

  3. Как это работает? Когда запускаешь корутину, CoroutineDispatcher получает ее и заворачивает в специальный DispatchedContinuation. Этот конверт содержит как саму корутину, так и ее почтовый индекс (Continuation).

  4. Почему это важно? Это позволяет корутинам выполняться одновременно в разных потоках, что делает твой код более эффективным.

446e66de679812a3f92f9af5ff7570e7.png

Итоги

В этой части пытался доступно рассказать о разном: Coroutine scope, CoroutineContext, Continuation. Признаться, это было довольно тяжелая часть для меня, поскольку изучал весь материал наплывами.

Кстати, хотел бы узнать, какие задачи вы бы решали (решили) с помощью корутинов для части сравнения скорости на практике.

Спасибо, что прочитали, жду ваш фидбек!

Habrahabr.ru прочитано 1766 раз