Пишем свой Http Kafka Sink Connector

В данной статье приведу реализацию своего kafka http sink connector. Он не претендует на универсальность, но возможно поможет разобраться как разработать свой connector.

Confluent Http Sink Connector — платный, другие варианты с github мне не подошли. Про Kafka Connect можно почитать здесь. Статья предполагает, что есть понимание того зачем нужен Kafka Connect Framework и как его использовать. Представленный код написан на Kotlin.

Для начала зададим Schema для нашего коннектора:

val HTTP_REQUEST_SCHEMA: Schema = SchemaBuilder.struct()
   .name(HTTP_REQUEST_SCHEMA_NAME)
   .field(FIELD_HTTP_METHOD, Schema.STRING_SCHEMA)
   .field(
       FIELD_HTTP_HEADERS,
       SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA).optional().build()
   )
   .field(FIELD_HTTP_BODY, Schema.OPTIONAL_STRING_SCHEMA)
   .field(FIELD_HTTP_URL, Schema.STRING_SCHEMA)
   .build()

На вход коннектора должна обязательно поступать Struct со Schema = HTTP_REQUEST_SCHEMA, т.е. сообщения из Kafka с помощью converter и transforms должны быть приведены к Struct со схемой HTTP_REQUEST_SCHEMA.

Для реализации коннектора будем использовать стандартный java.net.http.HttpClient. Класс конфигурации для коннектора будет выглядеть так:

class HttpSinkConfig(private val props: Map) : AbstractConfig(configDef, props) {

   companion object {
       private const val RESPONSE_VALIDATOR_CLASS_NAME = "response.validator"
       private const val CONNECTION_TIMEOUT_MS = "connectionTimeoutMs"

       val configDef = ConfigDef()
           .define(
               RESPONSE_VALIDATOR_CLASS_NAME,
               ConfigDef.Type.STRING,
               HttpSuccessStatusResponseValidator::class.java.name,
               ConfigDef.Importance.HIGH,
               "Class name of validator"
           )
           .define(
               CONNECTION_TIMEOUT_MS,
               ConfigDef.Type.LONG,
               2000,
               ConfigDef.Importance.HIGH,
               "Http connection timeout in ms"
           )
   }

   fun responseValidator(): HttpResponseValidator =       (Class.forName(getString(RESPONSE_VALIDATOR_CLASS_NAME)).getDeclaredConstructor().newInstance() as HttpResponseValidator)
           .apply { init(props) }
  
   fun httpClient(): HttpClient =
       HttpClient.newBuilder()
           .connectTimeout(Duration.ofMillis(connectionTimeoutMs()))
           .build()


   private fun connectionTimeoutMs(): Long = getLong(CONNECTION_TIMEOUT_MS)
}

В данной реализации создается дефолтный HttpClient с одной лишь настройкой, но реализацию конфигурации можно расширить, добавив ssl свойства и другие специфичные свойства, которые необходимо добавить для настройки HttpClient. Для этого по аналогии с connectionTimeoutMs нужно объявить их в ConfigDef. 

responseValidator будет использоваться для валидации ответов, т.е. будет возможность задать валидатор, определяет какие ответы считать успешными, а какие ошибочными. Определим интерфейс для этих целей:

interface HttpResponseValidator : (HttpResponse) -> Unit {
   @Throws(RetriableException::class)
   override fun invoke(response: HttpResponse)
   fun init(props: Map)
}

Приведем реализацию, которая классифицирует ответы относительно http response code. Задается 3 типа http response code:

  1. successCodes — трактуем ответ, как успешный

  2. retryCodes — трактуем ответ как временно ошибочный, при котором выполняется переотправка

  3. errorCodes — трактуем ответ как ошибочный, дальнейшее поведение зависит от настройки таски «error.tolerance»: all — обработка продолжается, none — обработка сообщений завершается

class StatusResponseValidator : HttpResponseValidator {
   private lateinit var retryCodes: List
   private lateinit var successCodes: List
   private lateinit var errorCodes: List


   override fun invoke(response: HttpResponse) {
       if (response.statusCode() !in successCodes) {
           if (response.statusCode() in retryCodes || retryCodes.isEmpty() && response.statusCode() !in errorCodes)
               throw RetriableException("Status $response.statusCode() is not success $successCodes")
           else throw IllegalArgumentException("Status $response.statusCode() is not success $successCodes")
       }
   }

   override fun init(props: Map) {
       val config = AbstractConfig(
           ConfigDef()
               .define(
                   RETRY_CODES,
                   ConfigDef.Type.LIST,
                   listOf(),
                   ConfigDef.Importance.LOW,
                   "Http response codes for retry"
               )
               .define(
                   SUCCESS_CODES,
                   ConfigDef.Type.LIST,
                   listOf("200"),
                   ConfigDef.Importance.HIGH,
                   "Success http response codes"
               )
               .define(
                   ERROR_CODES,
                   ConfigDef.Type.LIST,
                   listOf(),
                   ConfigDef.Importance.HIGH,
                   "Error http response codes"
               ),
           props
       )
       retryCodes = config.getList(RETRY_CODES).map { it.toInt() }
       successCodes = config.getList(SUCCESS_CODES).map { it.toInt() }
       errorCodes = config.getList(ERROR_CODES).map { it.toInt() }
   }

   companion object {
       private const val RETRY_CODES = "response.validator.codes.retry"
       private const val SUCCESS_CODES = "response.validator.codes.success"
       private const val ERROR_CODES = "response.validator.codes.error"
   }
}

Пример того, сконфигурировать response validator в настройках таски:

{
  ....
"response.validator": "ru.typik.kafka.connect.task.StatusResponseValidator",
"response.validator.codes.success": "200",
"response.validator.codes.error": "400",
  ....
}

Самая простая реализация таски с HttpClient будет выглядеть так:

class HttpSinkTask : SinkTask() {

   companion object {
       private val log = LoggerFactory.getLogger(this::class.java)
   }

   protected lateinit var config: HttpSinkConfig
   protected lateinit var httpClient: HttpClient
   protected lateinit var responseValidator: HttpResponseValidator

   override fun version(): String = "1.0"
   override fun stop() {}
   override fun flush(currentOffsets: MutableMap) { }

   override fun start(props: Map) {
       log.info("Starting http sink task...")
       config = HttpSinkConfig(props)
       httpClient = config.httpClient()
       responseValidator = config.responseValidator()
   }

   override fun put(records: Collection) {
       if (records.isEmpty()) return

       log.debug(
           "Received {} records. First record kafka coordinates:({}-{}-{}).",
           records.size, records.first().topic(), records.first().kafkaPartition(), records.first().kafkaOffset()
       )

       records.forEach { record ->
               record.toHttpRequestSafe()
                   ?.let { request ->
                       request.send()
                           .also { response ->
                               log.trace("Http request: {}, Http response: {}", request, response)
                               response.validate(record)
                           }
                   }
       }
   }

   private fun HttpRequest.send() = try {
       httpClient.send(this, HttpResponse.BodyHandlers.ofString())
   } catch (ex: Exception) {
       // Сообщения будут переобработаны через backoffTimeoutMs
       log.error("Error during sending http request: $this", ex)
       log.info("Context timeout before retry")
       context.timeout(config.backoffTimeoutMs())
       log.info("Throw exception after context timeout")
       throw RetriableException(ex)
   }

   protected fun HttpResponse.validate(record: SinkRecord) {
       try {
           responseValidator(this)
       } catch (ex: RetriableException) {
           // Сообщения будут переобработаны через backoffTimeoutMs
           log.info("Context timeout before retry")
           context.timeout(config.backoffTimeoutMs())
           log.info("Throw exception after timeout")
           throw ex
       } catch (ex: Exception) {
           log.error("Matching response failed", ex)
           // Поведение зависит от настройки таски errors.tolerance:
           // * Обработка завершается, если "errors.tolerance": "none"
           // * Обработка продолжается, а сообщения отправляются в dead letter,
           //    если "errors.tolerance": "all" 
           context.errantRecordReporter().report(record, ex)
       }
   }

   protected fun SinkRecord.toHttpRequestSafe() = try {
       toHttpRequest()
   } catch (ex: Exception) {
       log.error("Invalid record", ex)
       // Поведение зависит от настройки таски errors.tolerance:
           // * Обработка завершается, если "errors.tolerance": "none"
           // * Обработка продолжается, а сообщения отправляются в dead letter,
           //    если "errors.tolerance": "all" 
       context.errantRecordReporter().report(this, ex)
       null
   }

   protected fun SinkRecord.toHttpRequest() =
       (value() as Struct)
           .let { httpStruct ->
               HttpRequest.newBuilder()
                   .uri(URI.create(httpStruct.getHttpUrl()))
                   .method(
                       httpStruct.getHttpMethod(),
                       httpStruct.getHttpBody()?.let { HttpRequest.BodyPublishers.ofString(it) } ?: HttpRequest.BodyPublishers.noBody()
                   )
                   .apply {
                       httpStruct.getHttpHeaders()?.forEach { (k, v) -> header(k, v) }
                   }
                   .build()
           }
}

Реализация класса коннектора:

class HttpSinkConnector : SinkConnector() {

   private val log = LoggerFactory.getLogger(HttpSinkConnector::class.java)
   private lateinit var settings: Map

   override fun version(): String = "1.0"

   override fun start(props: MutableMap) {
       log.info("Starting HttpSyncSinkConnector...")
       settings = props
   }

   override fun taskClass(): Class = HabrHttpSinkTask::class.java
   override fun taskConfigs(maxTasks: Int): List> = 
      List(maxTasks) { settings }
   override fun stop() {}
   override fun config(): ConfigDef = HttpSinkConfig.configDef

   override fun validate(connectorConfigs: Map): Config {
       return super.validate(connectorConfigs)
   }
}

Была приведена простая реализация коннектора, где сообщения будут обрабатываться строго одно за одним. Попробуем немного оптимизировать данную реализацию за счет использования асинхронного метода HttpClient.sendAsync вместо синхронного HttpClient.send. Идея в том, чтобы посылать несколько запросов параллельно, обрабатывая сообщения пачками. Такой подход в некоторых случаях может быть более оптимальным, если это предусмотрено реализацией Http Server.

Чтобы иметь возможность конфигурировать максимальное количество параллельных запросов приведем интерфейс и реализацию SinkRecordGrouper, который делит список входящих сообщений на входе метода SinkTask.put на подсписки, которые в свою очередь обрабатываются строго последовательно, при этом элементы подсписков обрабатываются параллельно:

interface SinkRecordGrouper : (List) -> List> {
   override fun invoke(records: List): List>
   fun init(props: Map)
}

Приведем реализацию, которая кроме собственно разбивки на подсписки по количеству элементов, еще и группирует их по ключу, т.е. элементы с одинаковыми ключами помещаются в разные подсписки, чтобы они не обрабатывались параллельно, а строго последовательно, в некоторых случаях такая логика необходима.

class KeyGrouper : SinkRecordGrouper {

   private var parallelCount by Delegates.notNull()

   override fun invoke(records: List): List> {
       val result = mutableListOf>()
       val batch = mutableListOf(*records.toTypedArray())

       while (batch.isNotEmpty()) {
           val keySet = mutableSetOf()
           val subResult = mutableListOf()
           for (r in batch) {
               if (r.key() !in keySet) {
                   subResult.add(r)
                   keySet.add(r.key())
               }

               if (subResult.size >= parallelCount)
                   break
           }
           batch.removeAll(subResult)
           result.add(subResult)
       }
       return result
   }


   override fun init(props: Map) {
       val config = AbstractConfig(
           ConfigDef()
               .define(
                   PARALLEL_COUNT,
                   ConfigDef.Type.LONG,
                   5,
                   ConfigDef.Importance.LOW,
                   "How many requests to send in parallel"
               ),
           props
       )
       parallelCount = config.getLong(PARALLEL_COUNT)
   }

   companion object {
       private const val PARALLEL_COUNT = "grouper.parallelCount"
   }
}

Приведем обновленную реализацию HttpSinkTask с использованием HttpClient.sendAsync и KeyGrouper:

class HttpSinkTask : SinkTask() {

   companion object {
       private val log = LoggerFactory.getLogger(this::class.java)
   }

   private lateinit var config: HttpSinkConfig
   private lateinit var httpClient: HttpClient
   private lateinit var responseValidator: HttpResponseValidator
   private lateinit var grouper: SinkRecordGrouper
  
   override fun version(): String = "1.0"
   override fun stop() {}
   override fun flush(currentOffsets: MutableMap) {
   }

   override fun start(props: Map) {
       log.info("Starting http sink task...")
       config = HttpSinkConfig(props)
       httpClient = config.httpClient()
       responseValidator = config.responseValidator()
       grouper = KeyGrouper().apply { init(props) }
   }


   override fun put(records: Collection) {
       if (records.isEmpty()) return


       log.debug(
           "Received {} records. First record kafka coordinates:({}-{}-{}).",
           records.size, records.first().topic(), records.first().kafkaPartition(), records.first().kafkaOffset()
       )
      
       grouper(records.toList()).forEach { subRecords ->
           subRecords.map { record ->
               record.toHttpRequestSafe()
                   ?.let { request ->
                       request.sendAsync()
                           ?.exceptionally { ex ->
                               // Батч будет переобработан через backoffTimeoutMs
                               log.error("Error handling response, http request: $this", ex)
                               log.info("Context timeout before retry")
                               context.timeout(config.backoffTimeoutMs())
                               log.info("Throw exception after context timeout")
                               throw RetriableException(ex)


                           }
                           ?.thenApply { response -> response.validate(record) }
                   }
           }
               .forEach {
                   try {
                       it?.join()
                   } catch (ex: CompletionException) {
                       ex.cause?.let { throw it }
                   }
               }
       }
   }


   private fun HttpRequest.sendAsync() = try {
       httpClient.sendAsync(this, HttpResponse.BodyHandlers.ofString())
   } catch (ex: Exception) {
       // Батч будет переработа после backoffTimeoutMs
       log.error("Error during sending http request: $this", ex)
       log.info("Context timeout before retry")
       context.timeout(config.backoffTimeoutMs())
       log.info("Throw exception after context timeout")
       throw RetriableException(ex)
   }

   protected fun HttpResponse.validate(record: SinkRecord) {
       try {
           responseValidator(this)
       } catch (ex: RetriableException) {
           // Сообщения будут переобработаны через backoffTimeoutMs
           log.info("Context timeout before retry")
           context.timeout(config.backoffTimeoutMs())
           log.info("Throw exception after timeout")
           throw ex
       } catch (ex: Exception) {
           log.error("Matching response failed", ex)
           // Поведение зависит от настройки таски errors.tolerance:
           // * Обработка завершается, если "errors.tolerance": "none"
           // * Обработка продолжается, а сообщения отправляются в dead letter,
           //    если "errors.tolerance": "all" )
           context.errantRecordReporter().report(record, ex)
       }
   }

   protected fun SinkRecord.toHttpRequestSafe() = try {
       toHttpRequest()
   } catch (ex: Exception) {
       log.error("Invalid record", ex)
       // report a record processing error to the context
       // depending on the error handling strategy settings:
       // * processing is stopped ( "errors.tolerance": "none" )
       // * processing is continued and this record is sent to the dead letter ( "errors.tolerance": "all" )
       context.errantRecordReporter().report(this, ex)
       null
   }

   protected fun SinkRecord.toHttpRequest() =
       (value() as Struct)
           .let { httpStruct ->
               HttpRequest.newBuilder()
                   .uri(URI.create(httpStruct.getHttpUrl()))
                   .method(
                       httpStruct.getHttpMethod(),
                       httpStruct.getHttpBody()?.let { HttpRequest.BodyPublishers.ofString(it) } ?: HttpRequest.BodyPublishers.noBody()
                   )
                   .apply {
                       httpStruct.getHttpHeaders()?.forEach { (k, v) -> header(k, v) }
                   }
                   .build()
           }
}

Как может выглядеть конфигурация таски:

{
 "connector.class": "ru.typik.kafka.connect.HttpAsyncSinkConnector",
 "key.converter": "org.apache.kafka.connect.storage.StringConverter",
 "value.converter": "ru.typik.kafka.connect.converter.ProtobufConverter",
 "value.converter.protoClassName": "ru.typik.debt.proto.NotificationModel$NotificationData",
 "consumer.override.group.id": "${tpp:consumer-group}",
 "auto.create": "false",
 "tasks.max": "1",
 "topics": "${tpp:topic}",
 "errors.tolerance": "all",
 "errors.log.enable": true,
 "errors.log.include.messages": true,
 "errors.deadletterqueue.topic.name": "${tpp:deadLetter}",
 "errors.deadletterqueue.topic.replication.factor": "${tpp:replication-factor}",
 "errors.deadletterqueue.context.headers.enable": true,
 "transforms": "http",
 "transforms.http.type": "ru.typik.HttpTransform",
 "grouper.parallelCount": "50",
 "backoffTimeoutMs": "${tpp:backoffTimeoutMs}",
 "response.validator": "ru.typik.kafka.connect.task.StatusResponseValidator",
 "response.validator.codes.success": "200",
 "response.validator.codes.error": "400"
}

Как может выглядеть реализация HttpTransform:

class HttpTransform> : Transformation {

   override fun close() {}

   protected fun Struct.getMethod(): String = "GET”
   protected fun Struct.getUrl(): String = "http://localhost:8080”
   protected fun Struct.getBody(): String? = "{}”
   protected fun Struct.getHeaders(): Map? = mapOf(
       "Content-Type" to "application/json",
       "Accept" to "application/json"
   )

   override fun apply(record: R): R =
       record.newRecord(
           record.topic(),
           record.kafkaPartition(),
           record.keySchema(),
           record.key(),
           HTTP_REQUEST_SCHEMA,
           (record.value() as Struct).let { struct ->
               Struct(HTTP_REQUEST_SCHEMA)
                   .put(FIELD_HTTP_METHOD, struct.getMethod())
                   .put(FIELD_HTTP_URL, struct.getUrl())
                   .put(FIELD_HTTP_BODY, struct.getBody())
                   .put(FIELD_HTTP_HEADERS, struct.getHeaders())
           },
           record.timestamp()
       )
}

Я потестировал данную реализацию коннектора с различными значениями parallelCount и построил графики в Graphana с использованием стандартных метрик Kafka Connect. В качестве Http Server взял wire mock, который отвечает с небольшой задержкой. 

Конфигурация wiremock:

{
 "mappings": [
   {
     "priority": 1,
     "request": {
       "method": "GET",
       "urlPattern": "/"
     },
     "response": {
       "status": 200,
       "fixedDelayMilliseconds": 200,
       "headers": {
         "Content-Type": "application/json"
       }
     }
   }
 ]
}

Получившиеся графики:

Нагрузочное тестирование

Нагрузочное тестирование

Это достаточно условный сценарий тестирования, все зависит от реализации Http Server, будет ли выхлоп от оптимизации с параллельными запросами. Возможно для кого-то подойдет вариант с оптимизацией, которая позволяет группировать http запросы в батчи и слать их одним запросом, но в этом случае Http Server тоже должен быть готов к такой специфичной логике обработки.

© Habrahabr.ru