Синхронизация в Ktor для самых маленьких

Введение

13ce29f64fb16734f4450b9a67b452a9.jpg

Это был теплый осенний вечер, когда передо мной встала задача «настроить серверное кэширование в сервисе». Казалось бы чего может быть проще, наверняка в Ktor это делается парой строчек кода. Однако реальность оказалась не такой радужной: Ktor предлагает только Cache Headers plugin, который, например, не поддерживает распределенное кэширование. Что ж, вооружившись IDE, я начал ваять свой велосипед — Ktor Simple Cache.

Первые шаги

Сначала я задался вопросом:»А как вообще что-то прикрутить к Ktor? ». Оказалось, что Ktor расширяем через plugin’ы, о чем подробно описано в документации.

Достаточно быстро определились основные части plugin’а:

Первый код

Провайдер кэша:

abstract class SimpleCacheProvider(config: Config) {

    val invalidateAt = config.invalidateAt

    abstract fun getCache(key: String): Any?

    abstract fun setCache(key: String, content: Any, invalidateAt: Duration?)

    open class Config protected constructor() {

         var invalidateAt: Duration = 5.minutes
    }
}

Тело plugin’а с конфигураций:

class SimpleCacheConfig {

    internal var provider: SimpleCacheProvider? = null
}

class SimpleCache(internal var config: SimpleCacheConfig) {

    companion object : BaseApplicationPlugin {
        override val key: AttributeKey = AttributeKey("SimpleCacheHolder")

        override fun install(pipeline: Application, configure: SimpleCacheConfig.() -> Unit): SimpleCache {
            return SimpleCache(SimpleCacheConfig().apply { configure })
        }
    }
}

Ну и сам plugin с конфигурацией:

class SimpleCachePluginConfig {

    var invalidateAt: Duration? = null
}

val SimpleCachePlugin = createRouteScopedPlugin(name = "SimpleCachePlugin", ::SimpleCachePluginConfig) {
    val provider = application.plugin(SimpleCache).config.provider  
    onCall {
        val cache = provider.getCache(it.request.uri)
        if (cache != null) {
            it.respond(cache)
        }
    }
    onCallRespond { call, body ->
        provider.setCache(call.request.uri, body, pluginConfig.invalidateAt)
    }
}

Идея донельзя простая:

  • Перехватываем запрос — используем OnCall

  • Лезем в кэш по имени метода

  • Если кэш найден — возвращаем

  • Если нет — вызываем route метод, а ответ сохраняем в кэш внутри onCallRespond.

Я предполагал, что если внутри OnCall сделать it.respond (cache) — onCallRespond вызываться не будет. Однако первый же unit test показал, что это не так. Итак, мне надо было как-то передать в OnCallRespond информацию о том, что данные взяты из кэша и повторно сохранять их не надо. Благо, разработчики Ktor предусмотрели Call State.

Изначально plugin выглядел так:

val SimpleCachePlugin = createRouteScopedPlugin(name = "SimpleCachePlugin", ::SimpleCachePluginConfig) {
    val provider = application.plugin(SimpleCache).config.provider
    val isResponseFromCacheKey = AttributeKey("isResponseFromCacheKey")
    onCall {
        val cache = provider.getCache(it.request.uri)
        if (cache != null) {
            it.attributes.put(isResponseFromCacheKey, true)
            it.respond(cache)
        }
    }
    onCallRespond { call, body ->
        if (!call.attributes.contains(isResponseFromCacheKey)) {
            provider.setCache(call.request.uri, body, pluginConfig.invalidateAt)
        }
    }
}

Первая рабочая версия

Решено было сразу сделать и два провайдера: Memory и Redis.

Memory не вызвал особых проблем и в первом приближении выглядел так:

SimpleMemoryCacheProvider.kt

class SimpleMemoryCacheProvider(config: Config) : SimpleCacheProvider(config) {

    private val cache = mutableMapOf()

    override fun getCache(key: String): Any? {
        val `object` = cache[key]
        if (`object` == null || `object`.isExpired) {
            return null
        }

        return `object`.content
    }

    override fun setCache(key: String, content: Any, invalidateAt: Duration?) {
        cache[key] = SimpleMemoryCacheObject(content, invalidateAt ?: this.invalidateAt)
    }

    class Config internal constructor() : SimpleCacheProvider.Config()
}

private data class SimpleMemoryCacheObject(val content: Any, val duration: Duration, val start: LocalDateTime = LocalDateTime.now()) {

    val isExpired: Boolean
        get() = LocalDateTime.now().isAfter(start.plusSeconds(duration.inWholeSeconds))
}

fun SimpleCacheConfig.memoryCache(
    configure : SimpleMemoryCacheProvider.Config.() -> Unit
){
    provider = SimpleMemoryCacheProvider(SimpleMemoryCacheProvider.Config().apply(configure))
}

Для удобства работы был объявлен SimpleMemoryCacheObject класс, который бы хранил как данные, так и время жизни кэша.

Ну и, сам кэш — это просто mutableMapOf() в котором данные и лежат.

С Redis же пришлось немного повозиться. Для работы с ним я выбрал Jedis библиотеку. Так как ответ от сервера может быть не только текстовый, но и некие DTO объекты, пришлось вооружиться Gson'ом. В результате, SimpleRedisCacheObject стал выглядит так:

private class SimpleRedisCacheObject(val type: String, val content: String) {

    override fun toString() = "$type%#%$content"

    companion object {

        fun fromObject(`object`: Any) = SimpleRedisCacheObject(`object`::class.java.name, Gson().toJson(`object`))

        fun fromCache(cache: String): Any {
            val data = cache.split("%#%")
            return Gson().fromJson(data.last(), Class.forName(data.first()))
        }
    }
}

Ну и, сам провайдер:

SimpleRedisCacheProvider.kt

class SimpleRedisCacheProvider(private val config: Config) : SimpleCacheProvider(config) {

    private val jedis: JedisPooled = JedisPooled(config.host, config.port, config.ssl)

    override fun getCache(key: String): Any? = if (jedis.exists(key)) SimpleRedisCacheObject.fromCache(jedis.get(key)) else null

    override fun setCache(key: String, content: Any, invalidateAt: Duration?) {
        val expired = (invalidateAt ?: config.invalidateAt).inWholeMilliseconds
        jedis.psetex(key, expired, SimpleRedisCacheObject.fromObject(content).toString())
    }

    class Config internal constructor(): SimpleCacheProvider.Config() {

        var host = "localhost"

        var port = 6379

        var ssl = false
    }
}

Конечно, все это я покрыл тестами (включая использования Redis TestContainer) и начал использовать в работе.

Так как изначально кэшировался только endpoint с одинаковым текстом для всех, то имени метода в качестве ключа хватала на ура. Однако быстро оценив перспективы библиотеки, было решено покрывать ей и другие endpoint'ы. А там уже ключ должен был учитывать query parameters. По итогу конфигурация пополнилась полем queryKeys: List (добавил их для одного особого случая), ну и ключ теперь формировался так:

private fun buildKey(request: ApplicationRequest, queryKeys: List): String {
    val keys =
        if (queryKeys.isEmpty()) request.queryParameters else request.queryParameters.filter { key, _ -> key in queryKeys }
    val key = "${request.path()}?${
        keys.entries().sortedBy { it.key }
            .joinToString("&") { "${it.key}=${it.value.joinToString(",")}" }
    }"
    return key.trimEnd('?')
}

Наверно, надо бы и value сортировать (значения параметров тоже должны быть отсортированы), но я пока параметры-списки не использую.

Новые проблемы

В какой-то момент, проверяя телеметрию сервиса, я заметил, что иногда запросы к БД идут не через время протухания кэша, а сразу несколько подряд. Стало понятно, что я забыл о многопоточности. При нескольких одновременных запросах, пока кэш еще не добавлен, несколько потоков подряд делают это. Прежде всего, покрыл это тестом:

Check cache is cocurrency

    @Test
    fun `check cache is concurrency`() {
        testApplication {
            val jsonClient = client.config {
                install(ContentNegotiation) {
                    json()
                }
            }

            application(Application::testApplication)

            runBlocking {
                val totalThreads = 1000
                val deferred = (1..totalThreads).map {
                    async {
                        jsonClient.get("long")
                    }
                }

                val result = deferred.awaitAll().map { it.body() }.groupBy { it.id }
                    .map { it.key to it.value.size }
                result.shouldBeSingleton {
                    it.second.shouldBe(totalThreads)
                }
            }
        }
    }

Запускаем параллельно 1000 запросов. Ждем завершение всех и проверяем, что все они имеют один и тот же результат (TestResponse при создании записывает в id случайное значение).

Новое решение

Сначала пытался обыграть все это через synchronized. Не помогло. Потом мучил ReentrantReadWriteLock — тоже безрезультатно.

Пришел к тому, что надо бы делать методы suspend и играться с coroutines. Вначале пытался использовать Mutex с withLock. Затем перешел на Channel. Все равно все первые запросы писали в кэш.

Решил полностью разобраться, как именно происходят вызовы, и добавил кучу логирования в тест. Благодаря этому, смог проследить весь цикл запроса-ответа и нашел главную проблему: запросы на кэш должны блокироваться с первого запроса, а разблокироваться только после записи в кэш. А благодаря тому, что когда-то получал MS Certificate, где одним из вопросов был потокобезопасный singleton решение всплыло самим собой (Mutex таки подошел идеально):

    private val mutex = Mutex()

    override suspend fun getCache(key: String): Any? {
        var `object` = cache[key]
        if (`object`?.isExpired != false) {
            mutex.lock()
            `object` = cache[key]
            if (`object`?.isExpired != false) {
                return null
            } else {
                mutex.unlock()
                return `object`.content
            }
        }

        return `object`.content
    }

    override suspend fun setCache(key: String, content: Any, invalidateAt: Duration?) {
        cache[key] = SimpleMemoryCacheObject(content, invalidateAt ?: this.invalidateAt)
        mutex.unlock()
    }

Логика проста:

  • Пытаемся получить кэш — если все хорошо, возвращаем.

  • Нет — блокируем потоки и еще раз проверяем, нет ли кэша. Это проверка нужна, если мы проскочили первую, но какой-то поток уже добавил кэш.

  • Если кэша все еще нет — значит этот поток первый. Возвращаем null, чтобы plugin дернул метод и затем вызвал setCache, который и отпустит lock'и.

  • Если есть — разблокируем другие потоки и возвращаем данные.

Что дальше?

Внимательный читатель, наверно, спросит:»А почему любые ответы кэшируются? И даже от 400 и выше! », — и попадет в точку. Сейчас, если вдруг сервис первым вернул какую-то ошибку, она закэшируется. Решается это вроде легко:

if (!call.attributes.contains(isResponseFromCacheKey) && (call.response.status() ?: HttpStatusCode.OK) < HttpStatusCode.BadRequest) {
    provider.setCache(buildKey(call.request, pluginConfig.queryKeys), body, pluginConfig.invalidateAt)
}

Вот незадача! Если такое случилось в моём потокобезопасном решении, потоки никогда не разблокируются. Пришлось делать так:

    onCallRespond { call, body ->
        if ((call.response.status() ?: HttpStatusCode.OK) >= HttpStatusCode.BadRequest) {
            provider.badResponse()
        }
        else if (!call.attributes.contains(isResponseFromCacheKey)) {
            provider.setCache(buildKey(call.request, pluginConfig.queryKeys), body, pluginConfig.invalidateAt)
        }
    }
    override suspend fun badResponse() {
        mutex.unlock()
    }

Но мне такое решение не очень нравиться, так что буду признателен за любые советы, как улучшить concurrency.

В заключение

Библиотеку собираюсь развивать и расширять. Пока не уверен, какие еще провайдеры могут понадобиться, так что если у кого-то есть идеи на этот счет — буду рад. Так же думаю над тем, что сейчас можно обернуть в cacheOutput не только get запросы, и насколько это правильно. В целом, проверить метод и ругнуться warning'ом, вроде дело несложное.

Спасибо, что дочитали!

© Habrahabr.ru