Пишем свой Http Kafka Sink Connector
В данной статье приведу реализацию своего kafka http sink connector. Он не претендует на универсальность, но возможно поможет разобраться как разработать свой connector.
Confluent Http Sink Connector — платный, другие варианты с github мне не подошли. Про Kafka Connect можно почитать здесь. Статья предполагает, что есть понимание того зачем нужен Kafka Connect Framework и как его использовать. Представленный код написан на Kotlin.
Для начала зададим Schema для нашего коннектора:
val HTTP_REQUEST_SCHEMA: Schema = SchemaBuilder.struct()
.name(HTTP_REQUEST_SCHEMA_NAME)
.field(FIELD_HTTP_METHOD, Schema.STRING_SCHEMA)
.field(
FIELD_HTTP_HEADERS,
SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA).optional().build()
)
.field(FIELD_HTTP_BODY, Schema.OPTIONAL_STRING_SCHEMA)
.field(FIELD_HTTP_URL, Schema.STRING_SCHEMA)
.build()
На вход коннектора должна обязательно поступать Struct со Schema = HTTP_REQUEST_SCHEMA, т.е. сообщения из Kafka с помощью converter и transforms должны быть приведены к Struct со схемой HTTP_REQUEST_SCHEMA.
Для реализации коннектора будем использовать стандартный java.net.http.HttpClient. Класс конфигурации для коннектора будет выглядеть так:
class HttpSinkConfig(private val props: Map) : AbstractConfig(configDef, props) {
companion object {
private const val RESPONSE_VALIDATOR_CLASS_NAME = "response.validator"
private const val CONNECTION_TIMEOUT_MS = "connectionTimeoutMs"
val configDef = ConfigDef()
.define(
RESPONSE_VALIDATOR_CLASS_NAME,
ConfigDef.Type.STRING,
HttpSuccessStatusResponseValidator::class.java.name,
ConfigDef.Importance.HIGH,
"Class name of validator"
)
.define(
CONNECTION_TIMEOUT_MS,
ConfigDef.Type.LONG,
2000,
ConfigDef.Importance.HIGH,
"Http connection timeout in ms"
)
}
fun responseValidator(): HttpResponseValidator = (Class.forName(getString(RESPONSE_VALIDATOR_CLASS_NAME)).getDeclaredConstructor().newInstance() as HttpResponseValidator)
.apply { init(props) }
fun httpClient(): HttpClient =
HttpClient.newBuilder()
.connectTimeout(Duration.ofMillis(connectionTimeoutMs()))
.build()
private fun connectionTimeoutMs(): Long = getLong(CONNECTION_TIMEOUT_MS)
}
В данной реализации создается дефолтный HttpClient с одной лишь настройкой, но реализацию конфигурации можно расширить, добавив ssl свойства и другие специфичные свойства, которые необходимо добавить для настройки HttpClient. Для этого по аналогии с connectionTimeoutMs нужно объявить их в ConfigDef.
responseValidator будет использоваться для валидации ответов, т.е. будет возможность задать валидатор, определяет какие ответы считать успешными, а какие ошибочными. Определим интерфейс для этих целей:
interface HttpResponseValidator : (HttpResponse) -> Unit {
@Throws(RetriableException::class)
override fun invoke(response: HttpResponse)
fun init(props: Map)
}
Приведем реализацию, которая классифицирует ответы относительно http response code. Задается 3 типа http response code:
successCodes — трактуем ответ, как успешный
retryCodes — трактуем ответ как временно ошибочный, при котором выполняется переотправка
errorCodes — трактуем ответ как ошибочный, дальнейшее поведение зависит от настройки таски «error.tolerance»: all — обработка продолжается, none — обработка сообщений завершается
class StatusResponseValidator : HttpResponseValidator {
private lateinit var retryCodes: List
private lateinit var successCodes: List
private lateinit var errorCodes: List
override fun invoke(response: HttpResponse) {
if (response.statusCode() !in successCodes) {
if (response.statusCode() in retryCodes || retryCodes.isEmpty() && response.statusCode() !in errorCodes)
throw RetriableException("Status $response.statusCode() is not success $successCodes")
else throw IllegalArgumentException("Status $response.statusCode() is not success $successCodes")
}
}
override fun init(props: Map) {
val config = AbstractConfig(
ConfigDef()
.define(
RETRY_CODES,
ConfigDef.Type.LIST,
listOf(),
ConfigDef.Importance.LOW,
"Http response codes for retry"
)
.define(
SUCCESS_CODES,
ConfigDef.Type.LIST,
listOf("200"),
ConfigDef.Importance.HIGH,
"Success http response codes"
)
.define(
ERROR_CODES,
ConfigDef.Type.LIST,
listOf(),
ConfigDef.Importance.HIGH,
"Error http response codes"
),
props
)
retryCodes = config.getList(RETRY_CODES).map { it.toInt() }
successCodes = config.getList(SUCCESS_CODES).map { it.toInt() }
errorCodes = config.getList(ERROR_CODES).map { it.toInt() }
}
companion object {
private const val RETRY_CODES = "response.validator.codes.retry"
private const val SUCCESS_CODES = "response.validator.codes.success"
private const val ERROR_CODES = "response.validator.codes.error"
}
}
Пример того, сконфигурировать response validator в настройках таски:
{
....
"response.validator": "ru.typik.kafka.connect.task.StatusResponseValidator",
"response.validator.codes.success": "200",
"response.validator.codes.error": "400",
....
}
Самая простая реализация таски с HttpClient будет выглядеть так:
class HttpSinkTask : SinkTask() {
companion object {
private val log = LoggerFactory.getLogger(this::class.java)
}
protected lateinit var config: HttpSinkConfig
protected lateinit var httpClient: HttpClient
protected lateinit var responseValidator: HttpResponseValidator
override fun version(): String = "1.0"
override fun stop() {}
override fun flush(currentOffsets: MutableMap) { }
override fun start(props: Map) {
log.info("Starting http sink task...")
config = HttpSinkConfig(props)
httpClient = config.httpClient()
responseValidator = config.responseValidator()
}
override fun put(records: Collection) {
if (records.isEmpty()) return
log.debug(
"Received {} records. First record kafka coordinates:({}-{}-{}).",
records.size, records.first().topic(), records.first().kafkaPartition(), records.first().kafkaOffset()
)
records.forEach { record ->
record.toHttpRequestSafe()
?.let { request ->
request.send()
.also { response ->
log.trace("Http request: {}, Http response: {}", request, response)
response.validate(record)
}
}
}
}
private fun HttpRequest.send() = try {
httpClient.send(this, HttpResponse.BodyHandlers.ofString())
} catch (ex: Exception) {
// Сообщения будут переобработаны через backoffTimeoutMs
log.error("Error during sending http request: $this", ex)
log.info("Context timeout before retry")
context.timeout(config.backoffTimeoutMs())
log.info("Throw exception after context timeout")
throw RetriableException(ex)
}
protected fun HttpResponse.validate(record: SinkRecord) {
try {
responseValidator(this)
} catch (ex: RetriableException) {
// Сообщения будут переобработаны через backoffTimeoutMs
log.info("Context timeout before retry")
context.timeout(config.backoffTimeoutMs())
log.info("Throw exception after timeout")
throw ex
} catch (ex: Exception) {
log.error("Matching response failed", ex)
// Поведение зависит от настройки таски errors.tolerance:
// * Обработка завершается, если "errors.tolerance": "none"
// * Обработка продолжается, а сообщения отправляются в dead letter,
// если "errors.tolerance": "all"
context.errantRecordReporter().report(record, ex)
}
}
protected fun SinkRecord.toHttpRequestSafe() = try {
toHttpRequest()
} catch (ex: Exception) {
log.error("Invalid record", ex)
// Поведение зависит от настройки таски errors.tolerance:
// * Обработка завершается, если "errors.tolerance": "none"
// * Обработка продолжается, а сообщения отправляются в dead letter,
// если "errors.tolerance": "all"
context.errantRecordReporter().report(this, ex)
null
}
protected fun SinkRecord.toHttpRequest() =
(value() as Struct)
.let { httpStruct ->
HttpRequest.newBuilder()
.uri(URI.create(httpStruct.getHttpUrl()))
.method(
httpStruct.getHttpMethod(),
httpStruct.getHttpBody()?.let { HttpRequest.BodyPublishers.ofString(it) } ?: HttpRequest.BodyPublishers.noBody()
)
.apply {
httpStruct.getHttpHeaders()?.forEach { (k, v) -> header(k, v) }
}
.build()
}
}
Реализация класса коннектора:
class HttpSinkConnector : SinkConnector() {
private val log = LoggerFactory.getLogger(HttpSinkConnector::class.java)
private lateinit var settings: Map
override fun version(): String = "1.0"
override fun start(props: MutableMap) {
log.info("Starting HttpSyncSinkConnector...")
settings = props
}
override fun taskClass(): Class = HabrHttpSinkTask::class.java
override fun taskConfigs(maxTasks: Int): List
Была приведена простая реализация коннектора, где сообщения будут обрабатываться строго одно за одним. Попробуем немного оптимизировать данную реализацию за счет использования асинхронного метода HttpClient.sendAsync
вместо синхронного HttpClient.send
. Идея в том, чтобы посылать несколько запросов параллельно, обрабатывая сообщения пачками. Такой подход в некоторых случаях может быть более оптимальным, если это предусмотрено реализацией Http Server.
Чтобы иметь возможность конфигурировать максимальное количество параллельных запросов приведем интерфейс и реализацию SinkRecordGrouper
, который делит список входящих сообщений на входе метода SinkTask.put
на подсписки, которые в свою очередь обрабатываются строго последовательно, при этом элементы подсписков обрабатываются параллельно:
interface SinkRecordGrouper : (List) -> List> {
override fun invoke(records: List): List>
fun init(props: Map)
}
Приведем реализацию, которая кроме собственно разбивки на подсписки по количеству элементов, еще и группирует их по ключу, т.е. элементы с одинаковыми ключами помещаются в разные подсписки, чтобы они не обрабатывались параллельно, а строго последовательно, в некоторых случаях такая логика необходима.
class KeyGrouper : SinkRecordGrouper {
private var parallelCount by Delegates.notNull()
override fun invoke(records: List): List> {
val result = mutableListOf>()
val batch = mutableListOf(*records.toTypedArray())
while (batch.isNotEmpty()) {
val keySet = mutableSetOf()
val subResult = mutableListOf()
for (r in batch) {
if (r.key() !in keySet) {
subResult.add(r)
keySet.add(r.key())
}
if (subResult.size >= parallelCount)
break
}
batch.removeAll(subResult)
result.add(subResult)
}
return result
}
override fun init(props: Map) {
val config = AbstractConfig(
ConfigDef()
.define(
PARALLEL_COUNT,
ConfigDef.Type.LONG,
5,
ConfigDef.Importance.LOW,
"How many requests to send in parallel"
),
props
)
parallelCount = config.getLong(PARALLEL_COUNT)
}
companion object {
private const val PARALLEL_COUNT = "grouper.parallelCount"
}
}
Приведем обновленную реализацию HttpSinkTask
с использованием HttpClient.sendAsync
и KeyGrouper
:
class HttpSinkTask : SinkTask() {
companion object {
private val log = LoggerFactory.getLogger(this::class.java)
}
private lateinit var config: HttpSinkConfig
private lateinit var httpClient: HttpClient
private lateinit var responseValidator: HttpResponseValidator
private lateinit var grouper: SinkRecordGrouper
override fun version(): String = "1.0"
override fun stop() {}
override fun flush(currentOffsets: MutableMap) {
}
override fun start(props: Map) {
log.info("Starting http sink task...")
config = HttpSinkConfig(props)
httpClient = config.httpClient()
responseValidator = config.responseValidator()
grouper = KeyGrouper().apply { init(props) }
}
override fun put(records: Collection) {
if (records.isEmpty()) return
log.debug(
"Received {} records. First record kafka coordinates:({}-{}-{}).",
records.size, records.first().topic(), records.first().kafkaPartition(), records.first().kafkaOffset()
)
grouper(records.toList()).forEach { subRecords ->
subRecords.map { record ->
record.toHttpRequestSafe()
?.let { request ->
request.sendAsync()
?.exceptionally { ex ->
// Батч будет переобработан через backoffTimeoutMs
log.error("Error handling response, http request: $this", ex)
log.info("Context timeout before retry")
context.timeout(config.backoffTimeoutMs())
log.info("Throw exception after context timeout")
throw RetriableException(ex)
}
?.thenApply { response -> response.validate(record) }
}
}
.forEach {
try {
it?.join()
} catch (ex: CompletionException) {
ex.cause?.let { throw it }
}
}
}
}
private fun HttpRequest.sendAsync() = try {
httpClient.sendAsync(this, HttpResponse.BodyHandlers.ofString())
} catch (ex: Exception) {
// Батч будет переработа после backoffTimeoutMs
log.error("Error during sending http request: $this", ex)
log.info("Context timeout before retry")
context.timeout(config.backoffTimeoutMs())
log.info("Throw exception after context timeout")
throw RetriableException(ex)
}
protected fun HttpResponse.validate(record: SinkRecord) {
try {
responseValidator(this)
} catch (ex: RetriableException) {
// Сообщения будут переобработаны через backoffTimeoutMs
log.info("Context timeout before retry")
context.timeout(config.backoffTimeoutMs())
log.info("Throw exception after timeout")
throw ex
} catch (ex: Exception) {
log.error("Matching response failed", ex)
// Поведение зависит от настройки таски errors.tolerance:
// * Обработка завершается, если "errors.tolerance": "none"
// * Обработка продолжается, а сообщения отправляются в dead letter,
// если "errors.tolerance": "all" )
context.errantRecordReporter().report(record, ex)
}
}
protected fun SinkRecord.toHttpRequestSafe() = try {
toHttpRequest()
} catch (ex: Exception) {
log.error("Invalid record", ex)
// report a record processing error to the context
// depending on the error handling strategy settings:
// * processing is stopped ( "errors.tolerance": "none" )
// * processing is continued and this record is sent to the dead letter ( "errors.tolerance": "all" )
context.errantRecordReporter().report(this, ex)
null
}
protected fun SinkRecord.toHttpRequest() =
(value() as Struct)
.let { httpStruct ->
HttpRequest.newBuilder()
.uri(URI.create(httpStruct.getHttpUrl()))
.method(
httpStruct.getHttpMethod(),
httpStruct.getHttpBody()?.let { HttpRequest.BodyPublishers.ofString(it) } ?: HttpRequest.BodyPublishers.noBody()
)
.apply {
httpStruct.getHttpHeaders()?.forEach { (k, v) -> header(k, v) }
}
.build()
}
}
Как может выглядеть конфигурация таски:
{
"connector.class": "ru.typik.kafka.connect.HttpAsyncSinkConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "ru.typik.kafka.connect.converter.ProtobufConverter",
"value.converter.protoClassName": "ru.typik.debt.proto.NotificationModel$NotificationData",
"consumer.override.group.id": "${tpp:consumer-group}",
"auto.create": "false",
"tasks.max": "1",
"topics": "${tpp:topic}",
"errors.tolerance": "all",
"errors.log.enable": true,
"errors.log.include.messages": true,
"errors.deadletterqueue.topic.name": "${tpp:deadLetter}",
"errors.deadletterqueue.topic.replication.factor": "${tpp:replication-factor}",
"errors.deadletterqueue.context.headers.enable": true,
"transforms": "http",
"transforms.http.type": "ru.typik.HttpTransform",
"grouper.parallelCount": "50",
"backoffTimeoutMs": "${tpp:backoffTimeoutMs}",
"response.validator": "ru.typik.kafka.connect.task.StatusResponseValidator",
"response.validator.codes.success": "200",
"response.validator.codes.error": "400"
}
Как может выглядеть реализация HttpTransform
:
class HttpTransform> : Transformation {
override fun close() {}
protected fun Struct.getMethod(): String = "GET”
protected fun Struct.getUrl(): String = "http://localhost:8080”
protected fun Struct.getBody(): String? = "{}”
protected fun Struct.getHeaders(): Map? = mapOf(
"Content-Type" to "application/json",
"Accept" to "application/json"
)
override fun apply(record: R): R =
record.newRecord(
record.topic(),
record.kafkaPartition(),
record.keySchema(),
record.key(),
HTTP_REQUEST_SCHEMA,
(record.value() as Struct).let { struct ->
Struct(HTTP_REQUEST_SCHEMA)
.put(FIELD_HTTP_METHOD, struct.getMethod())
.put(FIELD_HTTP_URL, struct.getUrl())
.put(FIELD_HTTP_BODY, struct.getBody())
.put(FIELD_HTTP_HEADERS, struct.getHeaders())
},
record.timestamp()
)
}
Я потестировал данную реализацию коннектора с различными значениями parallelCount
и построил графики в Graphana с использованием стандартных метрик Kafka Connect. В качестве Http Server взял wire mock, который отвечает с небольшой задержкой.
Конфигурация wiremock:
{
"mappings": [
{
"priority": 1,
"request": {
"method": "GET",
"urlPattern": "/"
},
"response": {
"status": 200,
"fixedDelayMilliseconds": 200,
"headers": {
"Content-Type": "application/json"
}
}
}
]
}
Получившиеся графики:
Нагрузочное тестирование
Это достаточно условный сценарий тестирования, все зависит от реализации Http Server, будет ли выхлоп от оптимизации с параллельными запросами. Возможно для кого-то подойдет вариант с оптимизацией, которая позволяет группировать http запросы в батчи и слать их одним запросом, но в этом случае Http Server тоже должен быть готов к такой специфичной логике обработки.
Habrahabr.ru прочитано 21133 раза