Готовим rutracker на spring и kotlin
В преддверии первого релиза языка kotlin, я хотел бы поделиться с вами опытом создания на нем небольшого проекта. Это будет приложение-сервис, для поиска торрентов в базе rutracker-а. Весь код + бонусный браузерный клиент можно найти здесь. Итак, посмотрим, что же получилось.
Задача
База торрентов раздается в виде набора csv-файлов и периодически обновляется путем добавлением новой версии дампа всей базы в директорию с именем, соответствующим дате создания дампа. В связи с этим, наш небольшой проект будет следить за появлением новых версий (уже скаченных, а клиент, который сам будет скачивать базу мы возможно сделаем в другой раз), разбирать, складывать в базу и предоставлять json rest api для поиска по имени.
Средства
Для быстрого старта возьмем spring boot. У spring boot есть много особенностей, которые могут серьезно осложнить жизнь в больших проектах, но для небольших приложений, как наше, boot — это отличное решение, позволяющее создать конфигурацию для типового набора технологий. Основной способ у boot-а понять, для каких именно технологий создавать бины, — это наличие в classpath ключевых для той или иной технологии классов. Мы добавляем их через подключение зависимостей в maven. В нашем случае boot автоматически сконфигурирует нам соединение с базой (h2) + пул (tomcat-jdbc) и провайдер json (gson). Версии библиотек при подключении зависимости мы не указываем, берем из заранее определенного boot-ом набора — для этого мы указываем в мавене родительский проект spring-boot-starter-parent. Так же мы подключаем spring-boot-starter-web и spring-boot-starter-tomcat, чтобы boot сконфигурировал нам web mvc для нашего будущего rest-а и tomcat в качестве контейнера. Теперь давайте посмотрим на main.
// main.kt
fun main(args: Array) {
SpringApplication
.run(MainConfiguration::class.java, *args)
}
И на основную конфигурацию MainConfiguration, которую мы передаем в SpringApplication в качестве источника для бинов.
@Configuration
@Import(JdbcRepositoriesConfiguration::class, ImportConfiguration::class, RestConfiguration::class)
@EnableAutoConfiguration
open class MainConfiguration : SpringBootServletInitializer() {
override fun configure(builder: SpringApplicationBuilder): SpringApplicationBuilder {
return builder.sources(MainConfiguration::class.java)
}
}
Надо заметить, что boot позволяет деплоить получившееся приложение в качестве web-модуля, а не только запускать через main-метод. Для того, чтобы такой подход тоже работал, мы переопределяем метод configure у SpringBootServletInitializer, который будет вызван контейнером при деплое приложения. Так же, обратите внимание, что мы не используем аннотацию @SpringBootApplication на MainConfiguration, но включаем автоконфигурирование напрямую аннотацией @EnableAutoConfiguration. Это я сделал, чтобы не использовать поиск компонентов помеченных аннотацией @Component — все бины, которые мы будем создавать, будут явно создаваться kotlin-конфигурациями. Тут же стоит отметить особенность kotlin-конфигураций — мы вынуждены помечать классы-конфигурации как open (так же как и методы, создающие бины), потому что в kotlin все классы и методы по-умолчанию final, что не позволит spring-у создать для них обертку.
Модель
Модель нашего приложения очень простая и состоит из двух сущностей. Это категория, к которой относится торрент (имеет поле parent, но по факту торрент всегда находится в категории, у которого всего один родитель), и сам торрент.
data class Category(val id:Long, val name:String, val parent:Category?)
data class Torrent(val id:Long,val categoryId:Long, val hash:String, val name:String, val size:Long, val created:Date)
Наши модельные классы, я описал просто как неизменяемые data классы. В этом проекте не используется jpa по этическим причинам и как следствие принципа бритвы Оккама. К тому же orm потребовал бы использования лишних технологий и очевидное проседание производительности. Для мэпинга данных из базы в объекты я буду просто использовать jdbc и jdbctemplate, как вполне достаточный для нашей задачи инструмент.
Итак, мы определили нашу модель, в которой, помимо вполне рядовых полей, стоит обратить внимание на поле hash, которое собственно и является идентификатором торрента в мире общения торрент-клиентов и которого достаточно, чтобы найти (например через dht) счастливых обладателей, раздающих данный торрент и получить у них недостающую информацию (вроде имен файлов), которая и отличает torrent-файл от magnet-ссылки.
Репозитории
Для доступа к данным мы используем небольшую абстракцию, которая позволит нам отделить хранилище данных от его потребителя. Например, из-за специфики данных, мы вполне могли бы использовать просто хранение в памяти и разбор csv-базы при старте, также эта абстракция подошла бы тем, кто особо остро желает использовать jpa, о котором мы говорили чуть выше. Итак, для каждой сущности создаем свой репозиторий, плюс один репозиторий для доступа к актуальной версии базы.
interface CategoryRepository {
fun contains(id:Long):Boolean
fun findById(id:Long): Category?
fun count():Int
fun clear()
fun batcher(size:Int):Batcher
}
interface TorrentRepository {
fun search(name:String):List
fun count(): Int
fun clear()
fun batcher(size:Int):Batcher
}
interface VersionRepository {
fun getCurrentVersion():Long?
fun updateCurrentVersion(version:Long)
fun clear()
}
Хотелось бы напомнить, если кто-то забыл или не знал, что вопрос после имени типа означает, что значения может не быть, т.е. оно может быть null-ом. Если же вопроса нет, то чаще всего попытка пропихнуть null провалится еще на этапе компиляции. От лирического отступления перейдем к нашим баранам интерфейсам. Интерфейсы специально сделаны минималистичными, чтобы не отвлекать от главного. И в целом их смысл ясен, кроме batcher-ов у первых двух. Опять же из-за специфики, нам нужно один раз записать много данных, а потом они не меняются. Из-за этого для изменения есть только один метод, который предоставляет возможность batch-добавления. Давайте посмотрим на него поближе.
Batcher
Очень простой интерфейс, позволяющий добавлять сущности конкретного типа:
interface Batcher : Closeable {
fun add(value:T)
}
также, Batcher наследуется от Closable, чтобы можно было отправить на добавление начатую неполную пачку, когда в источнике больше данных нет. Работают они примерно по следующей логике: при создании batcher-а задается размер пачки, при добавлении сущности накапливаются в буффере пока пачка не разрастется до заданного размера, после этого выполняется групповая операция добавления, которая в общем случае работает быстрее, чем набор единичных добавлений. Причем, у категорий Batcher будет с функциональностью добавления только уникальных значений, для торрентов же простая реализация, использующая JdbcTemplate.updateBatch (). Идеального размера для пачки не существует, поэтому эти параметры я вынес в конфигурацию приложения (см. application.yaml)
clear ()
Когда я говорил об одном методе, изменяющим данные, я немного слукавил, ведь у всех репозиториев есть метод clear (), который просто удаляет все старые данные перед обработкой новой версии дампа. По факту, используем truncate table …, потому что delete from… без where работает сильно медленнее, а для нашей ситуации действие аналогичное, если в база не поддерживает операцию truncate, можно просто пересоздать таблицу, что по скорости тоже будет существенно быстрей, чем удаление всех строк.
Интерфейс чтения
Здесь будут только необходимые методы, такие как search () у торрентов, который мы будем использовать для поиска, или findById () у категорий, чтобы собрать полноценный результат при поиске. count () нам нужен только, чтобы вывести в лог сколько мы отпроцессили данных, для дела он не нужен. В реализации для jdbc просто используется JdbcTemplate для выборки и мэпинга, например:
private val rowMapper = RowMapper { rs: ResultSet, rowNum: Int ->
Torrent(
rs.getLong("id"), rs.getLong("category_id"),
rs.getString("hash"), rs.getString("name"),
rs.getLong("size"), rs.getDate("created")
)
}
override fun search(name: String): List {
if(name.isEmpty())
return emptyList()
val parts = name.split(" ")
val whereSql = parts.map { "UPPER(name) like UPPER(?)" }.joinToString(" AND ")
val parameters = parts.map { it.trim() }.map { "%$it%" }.toTypedArray()
return jdbcTemplate.query("SELECT id, category_id, hash, name, size, created FROM torrent WHERE $whereSql", rowMapper, *parameters)
}
Таким нехитрым способом мы реализуем поиск, который находит название, содержащее все слова запроса. Мы не используем ограничение количества записей, отдаваемых за раз, вроде разбиения по страницам, что безусловно стоило бы сделать в реальном проекте, но для нашего небольшого эксперимента, можно обойтись и без этого. Думаю, здесь же стоит заметить, что такое решение в лоб потребует полного обхода таблицы каждый раз для нахождения всех результатов, что для сравнительно небольшой базы rutracker-а может и так уж много, но для публичного продакшена, конечно, не подошло бы. Для ускорения поиска нужно дополнительное решение в виде индекса, может быть родной полнотекстовый поиск или стороннее решение вроде apache lucene, elasticsearch или многие другие. Создание такого индекса, конечно, увеличит и время создания базы и её размер. Но в нашем приложении мы остановимся на простой выборке с обходом, так как наша система скорее учебная.
Импорт
Большая часть нашей системы — это импорт данных из csv-файлов в наше хранилище. Здесь есть сразу несколько аспектов, на которые стоило бы обратить внимание. Во-первых, наша исходная база хоть и не очень большая, но тем не менее уже такого свойства, когда стоит аккуратно относиться к её размерам — т.е. нужно подумать, как сократить время перенесения данных, вероятно копирование данных в лоб может оказаться долгим. И второе, csv-база денормализована, а мы хотим получить разделение на категорию и торрент. Значит, нужно решить, как мы будем производить это разделение.
Производительность
Начнем с чтения. В моей реализации использован самописный парсер csv на kotlin взятый из другого моего проекта, который немного быстрее и чуть более внимателен к производимому типу исключений, чем существующие на рынке опенсорса, но по сути не меняющий порядок скорости разбора, т.е. можно было бы с тем же успехом взять почти любой парсер, умеющий работать в потоке, например commons-csv.
Теперь запись. Как мы уже видели раньше, я добавил батчеры для того, чтобы сократить накладные расходы на добавление большого числа записей. Для категорий проблема не столько в количестве, сколько в том, что они многократно повторяются. Некоторое количество тестов показало, что проверить наличие перед добавлением в пачку быстрее, чем создавать огромные пачки из запросов типа MERGE INTO. Что и понятно, если учесть, что первым делом проверка идет в уже существующей пачке прямо в памяти, тогда появился специальный batcher проверяющий уникальность.
Ну и конечно тут стоило подумать о том, чтобы распараллелить этот процесс. Убедившись, что в разных файлах содержатся независимые друг от друга данные, я выбрал каждый такой файл объектом работы для рабочего, работающего в своем потоке.
private fun importCategoriesAndTorrents(directory:Path) = withExecutor { executor ->
val topCategories = importTopCategories(directory)
executor
.invokeAll(topCategories.map { createImportFileWorker(directory, it) })
.map { it.get() }
}
private fun createImportFileWorker(directory: Path, topCategory: CategoryAndFile):Callable = Callable {
val categoryBatcher = categoryRepository.batcher(importProperties.categoryBatchSize)
val torrentBatcher = torrentRepository.batcher(importProperties.torrentBatchSize)
(categoryBatcher and torrentBatcher).use {
parser(directory, topCategory.file).use {
it
.map { createCategoryAndTorrent(topCategory.category, it) }
.forEach {
categoryBatcher.add(it.category)
torrentBatcher.add(it.torrent)
}
}
}
}
Для такой работы хорошо подойдет пул с фиксированным количеством потоков. Мы отдаем executor-у сразу все задачи, но выполнять одновременно он будет столько задач, сколько есть потоков в пуле, а по выполнению одной задачи поток будет отдаваться другой. Необходимое количество потоков не угадаешь, но можно подобрать экспериментально. По умолчанию число потоков равняется количеству ядер, что часто бывает не самой плохой стратегией. Так как пул нам нужен только на время импорта, создаем его, отрабатываем и закрываем. Для этого делаем небольшую утилитную inline-функцию withExecutor (), которую мы уже использовали выше:
private inline fun withExecutor(block:(ExecutorService)->R):R {
val executor = createExecutor()
try {
return block(executor)
} finally {
executor.shutdown()
}
}
private fun createExecutor(): ExecutorService = Executors.newFixedThreadPool(importProperties.threads)
Inline-функция хороша тем, что она существует только при компиляции и помогает упорядочить код, привести его в порядок и переиспользовать функции с лямбда-параметрами, при этом не имея никаких накладных расходов. Ведь код, который мы пишем в такой функции, будет встроен компилятором по месту использования. Это удобно, например, для случаев, когда нам нужно что-то закрывать в finally блоке, и мы не хотим, чтобы это отвлекало от общей логики программы.
Разделение
Убедившись, что сущности могут никак друг от друга не зависеть во время импорта, я решил собрать все сущности (категории и торренты) в один проход, заранее создав только категории верхнего уровня (заодно получив информацию о файлах с торрентами), выбрав их за единицу распараллеливания.
Rest
Теперь у нас уже почти всё есть, чтобы добавить контроллер для получения данных поиска по торрентам в виде json. На выходе хотелось бы иметь сгруппированные по категориям торренты. Определим специальный бин, определяющий соотвествующую структуру ответа:
data class CategoryAndTorrents(val category:Category, val torrents:List)
Готово, осталось только запросить торреты, сгруппировать и отсортировать их:
@RequestMapping("/api/torrents")
class TorrentsController(val torrentRepository: TorrentRepository, val categoryRepository: CategoryRepository) {
@ResponseBody
@RequestMapping(method = arrayOf(RequestMethod.GET))
fun find(@RequestParam name:String):List = torrentRepository
.search(name)
.asSequence()
.groupBy { it.categoryId }
.map { CategoryAndTorrents(categoryRepository.findById(it.key)!!, it.value.sortedBy { it.name }) }
.sortedBy { it.category.name }
.toList()
}
Пометив аннотацией @RequestParam параметр name мы ожидаем, что спринг запишет значение request-параметра «name» в параметр нашей функции. Пометив же метод аннотацией @ResponseBody, мы просим спринг преобразовать возвращаемый из метода бин в json.
Немного о DI
Также в предыдущем коде можно заметить, что репозитории приходят конроллеру в конструкторе. Подобным образом сделано и в остальных местах этого приложения: сами бины, создаваемые спрингом не знают о di, а принимают все свои зависимости в конструкторе, даже без всяких аннотаций. Реальная же связь происходит на уровне спринг-конфигурации:
@Configuration
open class RestConfiguration {
@Bean
open fun torrentsController(torrentRepository: TorrentRepository, categoryRepository: CategoryRepository):TorrentsController
= TorrentsController(torrentRepository, categoryRepository)
}
Спринг передает зависимости, созданные другой конфигурацией, в параметры метода, создающего контроллер, — зависимости передаются контроллеру.
Итог
Готово! Запускаем, проверяем (в составе по адресу localhost:8080/ идет javascript-клиент для нашего сервиса, описание которого выходит за рамки этой статьи) — работает! На моей машине импорт идет примерно 80 секунд, вполне неплохо. И запрос на поиск идет еще секунд 5 — не так хорошо, но тоже работает.
О целях
Когда я был начинающим программистом, мне очень хотелось узнать, как же пишут программы другие более опытные разработчики, как они думают и рассуждают, хотел, чтобы они поделились опытом. В этой статье я хотел показать, как я рассуждал во время работы над этой задачей, показать какие-то реальные решения каких-то вполне обыденных и не очень проблем, использование технологий и их аспектов, с которыми мне приходилось столкнуться. Возможно даже кто-то захочет сделать более удачную реализацию репозиториев, или вообще всей этой задачи и рассказать об этом. Или просто предложит его в комментариях, от этого всем мы только увеличим свои знания и опыт.