Синхронизация в Ktor для самых маленьких
Введение
Это был теплый осенний вечер, когда передо мной встала задача «настроить серверное кэширование в сервисе». Казалось бы чего может быть проще, наверняка в Ktor это делается парой строчек кода. Однако реальность оказалась не такой радужной: Ktor предлагает только Cache Headers plugin, который, например, не поддерживает распределенное кэширование. Что ж, вооружившись IDE, я начал ваять свой велосипед — Ktor Simple Cache.
Первые шаги
Сначала я задался вопросом:»А как вообще что-то прикрутить к Ktor? ». Оказалось, что Ktor расширяем через plugin’ы, о чем подробно описано в документации.
Достаточно быстро определились основные части plugin’а:
Первый код
Провайдер кэша:
abstract class SimpleCacheProvider(config: Config) {
val invalidateAt = config.invalidateAt
abstract fun getCache(key: String): Any?
abstract fun setCache(key: String, content: Any, invalidateAt: Duration?)
open class Config protected constructor() {
var invalidateAt: Duration = 5.minutes
}
}
Тело plugin’а с конфигураций:
class SimpleCacheConfig {
internal var provider: SimpleCacheProvider? = null
}
class SimpleCache(internal var config: SimpleCacheConfig) {
companion object : BaseApplicationPlugin {
override val key: AttributeKey = AttributeKey("SimpleCacheHolder")
override fun install(pipeline: Application, configure: SimpleCacheConfig.() -> Unit): SimpleCache {
return SimpleCache(SimpleCacheConfig().apply { configure })
}
}
}
Ну и сам plugin с конфигурацией:
class SimpleCachePluginConfig {
var invalidateAt: Duration? = null
}
val SimpleCachePlugin = createRouteScopedPlugin(name = "SimpleCachePlugin", ::SimpleCachePluginConfig) {
val provider = application.plugin(SimpleCache).config.provider
onCall {
val cache = provider.getCache(it.request.uri)
if (cache != null) {
it.respond(cache)
}
}
onCallRespond { call, body ->
provider.setCache(call.request.uri, body, pluginConfig.invalidateAt)
}
}
Идея донельзя простая:
Перехватываем запрос — используем OnCall
Лезем в кэш по имени метода
Если кэш найден — возвращаем
Если нет — вызываем route метод, а ответ сохраняем в кэш внутри onCallRespond.
Я предполагал, что если внутри OnCall сделать it.respond (cache) — onCallRespond вызываться не будет. Однако первый же unit test показал, что это не так. Итак, мне надо было как-то передать в OnCallRespond информацию о том, что данные взяты из кэша и повторно сохранять их не надо. Благо, разработчики Ktor предусмотрели Call State.
Изначально plugin выглядел так:
val SimpleCachePlugin = createRouteScopedPlugin(name = "SimpleCachePlugin", ::SimpleCachePluginConfig) {
val provider = application.plugin(SimpleCache).config.provider
val isResponseFromCacheKey = AttributeKey("isResponseFromCacheKey")
onCall {
val cache = provider.getCache(it.request.uri)
if (cache != null) {
it.attributes.put(isResponseFromCacheKey, true)
it.respond(cache)
}
}
onCallRespond { call, body ->
if (!call.attributes.contains(isResponseFromCacheKey)) {
provider.setCache(call.request.uri, body, pluginConfig.invalidateAt)
}
}
}
Первая рабочая версия
Решено было сразу сделать и два провайдера: Memory и Redis.
Memory не вызвал особых проблем и в первом приближении выглядел так:
SimpleMemoryCacheProvider.kt
class SimpleMemoryCacheProvider(config: Config) : SimpleCacheProvider(config) {
private val cache = mutableMapOf()
override fun getCache(key: String): Any? {
val `object` = cache[key]
if (`object` == null || `object`.isExpired) {
return null
}
return `object`.content
}
override fun setCache(key: String, content: Any, invalidateAt: Duration?) {
cache[key] = SimpleMemoryCacheObject(content, invalidateAt ?: this.invalidateAt)
}
class Config internal constructor() : SimpleCacheProvider.Config()
}
private data class SimpleMemoryCacheObject(val content: Any, val duration: Duration, val start: LocalDateTime = LocalDateTime.now()) {
val isExpired: Boolean
get() = LocalDateTime.now().isAfter(start.plusSeconds(duration.inWholeSeconds))
}
fun SimpleCacheConfig.memoryCache(
configure : SimpleMemoryCacheProvider.Config.() -> Unit
){
provider = SimpleMemoryCacheProvider(SimpleMemoryCacheProvider.Config().apply(configure))
}
Для удобства работы был объявлен SimpleMemoryCacheObject класс, который бы хранил как данные, так и время жизни кэша.
Ну и, сам кэш — это просто mutableMapOf
С Redis же пришлось немного повозиться. Для работы с ним я выбрал Jedis библиотеку. Так как ответ от сервера может быть не только текстовый, но и некие DTO объекты, пришлось вооружиться Gson'ом. В результате, SimpleRedisCacheObject стал выглядит так:
private class SimpleRedisCacheObject(val type: String, val content: String) {
override fun toString() = "$type%#%$content"
companion object {
fun fromObject(`object`: Any) = SimpleRedisCacheObject(`object`::class.java.name, Gson().toJson(`object`))
fun fromCache(cache: String): Any {
val data = cache.split("%#%")
return Gson().fromJson(data.last(), Class.forName(data.first()))
}
}
}
Ну и, сам провайдер:
SimpleRedisCacheProvider.kt
class SimpleRedisCacheProvider(private val config: Config) : SimpleCacheProvider(config) {
private val jedis: JedisPooled = JedisPooled(config.host, config.port, config.ssl)
override fun getCache(key: String): Any? = if (jedis.exists(key)) SimpleRedisCacheObject.fromCache(jedis.get(key)) else null
override fun setCache(key: String, content: Any, invalidateAt: Duration?) {
val expired = (invalidateAt ?: config.invalidateAt).inWholeMilliseconds
jedis.psetex(key, expired, SimpleRedisCacheObject.fromObject(content).toString())
}
class Config internal constructor(): SimpleCacheProvider.Config() {
var host = "localhost"
var port = 6379
var ssl = false
}
}
Конечно, все это я покрыл тестами (включая использования Redis TestContainer) и начал использовать в работе.
Так как изначально кэшировался только endpoint с одинаковым текстом для всех, то имени метода в качестве ключа хватала на ура. Однако быстро оценив перспективы библиотеки, было решено покрывать ей и другие endpoint'ы. А там уже ключ должен был учитывать query parameters. По итогу конфигурация пополнилась полем queryKeys: List
private fun buildKey(request: ApplicationRequest, queryKeys: List): String {
val keys =
if (queryKeys.isEmpty()) request.queryParameters else request.queryParameters.filter { key, _ -> key in queryKeys }
val key = "${request.path()}?${
keys.entries().sortedBy { it.key }
.joinToString("&") { "${it.key}=${it.value.joinToString(",")}" }
}"
return key.trimEnd('?')
}
Наверно, надо бы и value сортировать (значения параметров тоже должны быть отсортированы), но я пока параметры-списки не использую.
Новые проблемы
В какой-то момент, проверяя телеметрию сервиса, я заметил, что иногда запросы к БД идут не через время протухания кэша, а сразу несколько подряд. Стало понятно, что я забыл о многопоточности. При нескольких одновременных запросах, пока кэш еще не добавлен, несколько потоков подряд делают это. Прежде всего, покрыл это тестом:
Check cache is cocurrency
@Test
fun `check cache is concurrency`() {
testApplication {
val jsonClient = client.config {
install(ContentNegotiation) {
json()
}
}
application(Application::testApplication)
runBlocking {
val totalThreads = 1000
val deferred = (1..totalThreads).map {
async {
jsonClient.get("long")
}
}
val result = deferred.awaitAll().map { it.body() }.groupBy { it.id }
.map { it.key to it.value.size }
result.shouldBeSingleton {
it.second.shouldBe(totalThreads)
}
}
}
}
Запускаем параллельно 1000 запросов. Ждем завершение всех и проверяем, что все они имеют один и тот же результат (TestResponse при создании записывает в id случайное значение).
Новое решение
Сначала пытался обыграть все это через synchronized. Не помогло. Потом мучил ReentrantReadWriteLock — тоже безрезультатно.
Пришел к тому, что надо бы делать методы suspend и играться с coroutines. Вначале пытался использовать Mutex с withLock. Затем перешел на Channel. Все равно все первые запросы писали в кэш.
Решил полностью разобраться, как именно происходят вызовы, и добавил кучу логирования в тест. Благодаря этому, смог проследить весь цикл запроса-ответа и нашел главную проблему: запросы на кэш должны блокироваться с первого запроса, а разблокироваться только после записи в кэш. А благодаря тому, что когда-то получал MS Certificate, где одним из вопросов был потокобезопасный singleton решение всплыло самим собой (Mutex таки подошел идеально):
private val mutex = Mutex()
override suspend fun getCache(key: String): Any? {
var `object` = cache[key]
if (`object`?.isExpired != false) {
mutex.lock()
`object` = cache[key]
if (`object`?.isExpired != false) {
return null
} else {
mutex.unlock()
return `object`.content
}
}
return `object`.content
}
override suspend fun setCache(key: String, content: Any, invalidateAt: Duration?) {
cache[key] = SimpleMemoryCacheObject(content, invalidateAt ?: this.invalidateAt)
mutex.unlock()
}
Логика проста:
Пытаемся получить кэш — если все хорошо, возвращаем.
Нет — блокируем потоки и еще раз проверяем, нет ли кэша. Это проверка нужна, если мы проскочили первую, но какой-то поток уже добавил кэш.
Если кэша все еще нет — значит этот поток первый. Возвращаем null, чтобы plugin дернул метод и затем вызвал setCache, который и отпустит lock'и.
Если есть — разблокируем другие потоки и возвращаем данные.
Что дальше?
Внимательный читатель, наверно, спросит:»А почему любые ответы кэшируются? И даже от 400 и выше! », — и попадет в точку. Сейчас, если вдруг сервис первым вернул какую-то ошибку, она закэшируется. Решается это вроде легко:
if (!call.attributes.contains(isResponseFromCacheKey) && (call.response.status() ?: HttpStatusCode.OK) < HttpStatusCode.BadRequest) {
provider.setCache(buildKey(call.request, pluginConfig.queryKeys), body, pluginConfig.invalidateAt)
}
Вот незадача! Если такое случилось в моём потокобезопасном решении, потоки никогда не разблокируются. Пришлось делать так:
onCallRespond { call, body ->
if ((call.response.status() ?: HttpStatusCode.OK) >= HttpStatusCode.BadRequest) {
provider.badResponse()
}
else if (!call.attributes.contains(isResponseFromCacheKey)) {
provider.setCache(buildKey(call.request, pluginConfig.queryKeys), body, pluginConfig.invalidateAt)
}
}
override suspend fun badResponse() {
mutex.unlock()
}
Но мне такое решение не очень нравиться, так что буду признателен за любые советы, как улучшить concurrency.
В заключение
Библиотеку собираюсь развивать и расширять. Пока не уверен, какие еще провайдеры могут понадобиться, так что если у кого-то есть идеи на этот счет — буду рад. Так же думаю над тем, что сейчас можно обернуть в cacheOutput не только get запросы, и насколько это правильно. В целом, проверить метод и ругнуться warning'ом, вроде дело несложное.
Спасибо, что дочитали!