работа с Kafka в Go: практическое применение

Автор статьи Якушков Федор.
Apache Kafka — это мощная распределённая платформа для обработки потоков данных, которая завоевала популярность благодаря своей способности эффективно управлять большими объёмами информации в реальном времени. В этой статье мы подробно разберём, как использовать Kafka в языке программирования Go с помощью библиотеки kafka-go. Мы рассмотрим все ключевые аспекты: от event-driven архитектуры до топиков и партиций, от создания продюсеров и консьюмеров до управления оффсетами и обработки ошибок. Разберем гарантии доставки, а также обсудим, где и как применять Kafka в проектах.
Что такое Apache Kafka?
Apache Kafka — это распределённый брокер сообщений, разработанный для обработки потоков данных с высокой пропускной способностью, масштабируемостью и отказоустойчивостью. Kafka позволяет приложениям обмениваться данными через топики — логические категории или потоки сообщений. В топики данные отправляются продюсерами (producers), а читаются консьюмерами (consumers). Основные преимущества Kafka: — Высокая производительность: Kafka может обрабатывать миллионы сообщений в секунду. — Масштабируемость: данные распределяются по кластерам брокеров. — Надёжность: благодаря репликации данных и механизмам отказоустойчивости. Kafka идеально подходит для систем, где требуется асинхронная обработка данных, таких как микросервисы, аналитика в реальном времени или системы обработки логов.
Event-Driven Архитектура и её связь с Kafka

Event-driven архитектура — это подход к проектированию систем, при котором компоненты взаимодействуют через события. Событие — это запись о том, что что-то произошло (например, «пользователь зарегистрировался» или «заказ создан»). В Kafka события представлены сообщениями, которые публикуются в топики. Преимущества event-driven архитектуры:
Асинхронность: компоненты системы не ждут ответа друг от друга, что снижает задержки.
Гибкость: новые потребители событий могут быть добавлены без изменения продюсеров.
Масштабируемость: обработка событий легко распараллеливается. Kafka играет ключевую роль в таких системах, выступая в качестве посредника между продюсерами (генерирующими события) и консьюмерами (обрабатывающими их). Это делает её идеальной основой для микросервисов, потоковой аналитики и других современных архитектур.
Топики и Партиции: основы работы Kafka

Партиции в Kafka Партиция — это основная единица хранения данных внутри топика. Каждый топик может содержать несколько партиций, что позволяет распределять данные между разными брокерами в кластере Kafka и обеспечивать параллельную обработку: разные консьюмеры могут читать данные из отдельных партиций одновременно.
Структура партиции Партиция представляет собой упорядоченный лог сообщений, где каждое сообщение получает уникальный оффсет — порядковый номер, определяющий его позицию в партиции. Оффсеты помогают консьюмерам отслеживать, какие сообщения уже были обработаны. Более подробно управление оффсетами рассмотрено далее в статье. Например, если у топика orders три партиции, то сообщения внутри них будут распределены следующим образом:
• Партиция 0: оффсеты 0, 1, 2…
• Партиция 1: оффсеты 0, 1, 2…
• Партиция 2: оффсеты 0, 1, 2…
Это позволяет Kafka масштабировать обработку данных и эффективно управлять нагрузкой.
Работа с kafka
Для работы с Kafka в Go мы будем использовать библиотеку kafka-go от Segmentio. Это одна из самых популярных и удобных библиотек, предоставляющая высокоуровневый API для создания продюсеров и консьюмеров, а также поддержку дополнительных функций, таких как управление оффсетами и обработка ошибок.
Для начала добавим библиотеку в ваш проект с помощью Go modules:
go get github.com/segmentio/kafka-go
Убедитесь, что у вас запущен Kafka-брокер (например, на стандартном порту localhost:9092
), чтобы протестировать примеры.
Создание продюсера
Продюсер в Kafka — это сервис, который отправляет сообщения в Kafka.
Он может быть использован для передачи данных из различных источников, по типу баз данных, файлов или другие приложений.
package main
import (
"context"
"log"
"github.com/segmentio/kafka-go"
)
func main() {
ctx := context.Background()
writer := kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{"localhost:9092"},
Topic: "my-topic",
})
defer writer.Close()
err := writer.WriteMessages(ctx, kafka.Message{
Value: []byte("Hello, Kafka!"),
})
if err != nil {
log.Fatal("Ошибка при отправке:", err)
}
}
Brokers: список адресов брокеров Kafka.
Topic: имя топика, куда отправляются сообщени
WriteMessages: метод для отправки одного или нескольких сообщений.
Вы можете указать дополнительные параметры, такие как Key (ключ для определения партиции) или настроить BatchSize для пакетной отправки. Ниже есть пример с ключом.
Создание консьюмера
Консьюмер читает сообщения из топика Kafka. В kafka-go консьюмеры поддерживают как индивидуальное чтение из партиций, так и работу в составе группы консьюмеров.
пример:
package main
import (
"context"
"fmt"
"log"
"github.com/segmentio/kafka-go"
)
func main() {
reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: "my-topic",
GroupID: "my-groupID",
})
defer reader.Close()
msg, err := reader.ReadMessage(context.Background())
if err != nil {
log.Fatal("Ошибка при отправке:", err)
}
fmt.Println([]byte(msg.Value))
}
GroupID: позволяет объединять консьюмеров в группу для распределения партиций между ними.
ReadMessage: возвращает сообщение из топика.
Управление оффсетами
Как уже упоминалось, оффсет — это числовой идентификатор, который указывает позицию каждого сообщения внутри партиции топика. Kafka позволяет консьюмерам управлять оффсетами автоматически или вручную.
Автоматическое управление
По умолчанию kafka-go оффсеты коммитятся автоматически с заданным интервалом, который определяется параметром CommitInterval
. Это удобно для простых случаев, но может привести к потере сообщений при сбоях.
Ручное управление
Для точного контроля можно отключить автоматический коммит и коммитить оффсеты вручную после чтения:
reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: "my-topic",
GroupID: "my-group",
CommitInterval: 0, // Отключаем автоматический коммит
})
for {
msg, err := reader.ReadMessage(context.Background())
if err != nil {
panic(err)
}
fmt.Printf("Received: %s\n", string(msg.Value))
// Коммитим оффсет вручную после обработки
err = reader.CommitMessages(context.Background(), msg)
if err != nil {
panic(err)
}
}
Ручное управление полезно, если вы хотите гарантировать, что сообщение обработано перед коммитом оффсета.
Гарантии доставки в Kafka
Apache Kafka предоставляет три основные гарантии доставки сообщений, которые влияют на надёжность и производительность системы:
At most once
Это означает, что сообщение будет доставлено не более одного раза. При этом возможна потеря сообщений, так как продюсер может не успеть отправить их на брокер, но повторная отправка не будет происходить.At least once
Гарантируется, что сообщение будет доставлено как минимум один раз. Это означает, что если продюсер не получит подтверждения о доставке, то он повторно отправит сообщение. Это стандартное поведение Kafka.Exactly once
Это гарантирует, что сообщение будет доставлено ровно один раз. Для этого включаются дополнительные механизмы, такие как идемпотентность и транзакции, чтобы избежать дублирования сообщений, как на стороне продюсера, так и на стороне консьюмера.
At least once (По умолчанию)
По умолчанию Kafka использует гарантию At least once для доставки сообщений. Это означает, что если продюсер не получает подтверждения от брокера, он повторно отправит сообщение. В этом случае возможны дубли, но сообщение будет доставлено.
Пример использования At least once:
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/segmentio/kafka-go"
)
func main() {
ctx := context.Background()
writer := kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{"localhost:9092"},
Topic: "my-topic",
RequiredAcks: -1, // Подтверждение от всех реплик
MaxAttempts: 10, //кол-во попыток доставки(по умолчанию всегда 10)
BatchSize: 100, // Ограничение на количество сообщений(по дефолту 100)
WriteTimeout: 10 * time.Second, //время ожидания для записи(по умолчанию 10сек)
Balancer: &kafka.RoundRobin{}, //балансировщик.
})
defer writer.Close()
err := writer.WriteMessages(ctx, kafka.Message{
Value: []byte("Hello, Kafka!"),
})
if err != nil {
log.Fatal("Ошибка при отправке:", err)
}
fmt.Println("Сообщение отправлено с гарантией At least once.")
}
Пояснение:
В данном примере мы используем kafka.NewWriter () для отправки сообщения. Это означает, что если Kafka не получит подтверждение, она автоматически повторно отправит сообщение. Это стандартная гарантия в Kafka.
Чтобы использовать иные гарантии, то в конфигурации надо добавить настройку для поля RequiredAcks.
Как эти параметры влияют на доставку сообщений:
RequiredAcks: 0 — минимальная гарантия, сообщения могут быть потеряны (at most once).
RequiredAcks: 1 — сообщения гарантированно доставляются хотя бы на одного лидера, но могут быть потеряны при сбоях в сети или на брокере ((At least once).
RequiredAcks: -1 — самые надежные гарантии, сообщения подтверждаются всеми репликами, что минимизирует риск потери сообщений, но с увеличением времени ожидания. Значение -1 стоит по умолчанию (At least once, но с дополнительными подтверждениями).
Почти всегда используется гарантия At least once, но не упомянуть другие я не мог. Я показал подробные настройки, но почти всегда стоят дефолтные, так как в библиотеке эти значения стоят как базовые из коробки и выставлять их не нужно. можно использовать такую конфигурацию почти всегда:
writer := kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{"localhost:9092"},
Topic: "my-topic",
Balancer: &kafka.RoundRobin{},
})
Балансировщик используется для распределения сообщений по партициям.
RoundRobin — равномерное распределение по партициям.
LeastBytes — это реализация балансировщика, который направляет сообщения в партиции, получивший наименьшее количество данных.
Hash — балансировщик по умолчанию для сообщений с ключам.
Вы так же можете создать свой кастомный балансировщик.
Масштабирование с помощью партиций
Партиции в Kafka позволяют распараллеливать обработку данных и масштабировать систему. Чем больше партиций у топика, тем больше консьюмеров могут читать данные одновременно. Например, если у топика 4 партиции, то до 4 консьюмеров в одной группе могут получать данные параллельно.
Как Kafka распределяет сообщения по партициям?
Kafka может отправлять сообщения в партиции двумя способами:
1. Если у сообщения есть ключ (key) → Kafka использует хеширование ключа, чтобы всегда направлять сообщения с одинаковым ключом в одну и ту же партицию. Это гарантирует порядок сообщений для одного ключа.
2. Если ключ не указан → Kafka распределяет сообщения по партициям равномерно (алгоритм round-robin)
Чтение из конкретной партиции Хотя Kafka поддерживает возможность читать сообщения из конкретной партиции, это не рекомендуется делать, т.к. нарушает балансировку нагрузки. Обычно консьюмеры в группе автоматически получают партиции, распределённые Kafka. Если же нужно читать данные из конкретной партиции, это можно сделать так:
reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: "my-topic",
Partition: 0, // Указываем конкретную партицию
MinBytes: 10e3, // Минимальный объём данных для чтения(установка не обязательна)
MaxBytes: 10e6, // Максимальный объём данных(установка не обязательна)
})
Использование ключей при отправке сообщений Ключ (key) в Kafka — это необязательное поле сообщения, которое управляет его маршрутизацией по партициям. Это нужно чтобы соблюдать строгий порядок обработки (например, все события от одного пользователя должны обрабатываться последовательно конкретным сервисом).
Пример отправки с ключом:
package main
import (
"context"
"log"
"github.com/segmentio/kafka-go"
)
func main() {
writer := kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{"localhost:9092"},
Topic: "my-topic",
Balancer: &kafka.Hash{},
})
defer writer.Close()
err := writer.WriteMessages(context.Background(), kafka.Message{
Key: []byte("user-123"), //устанавливаем ключ
Value: []byte("Hello, Kafka!"),
})
if err != nil {
log.Fatal(err)
}
}
В этом примере все сообщения с ключом «user-123» попадут в одну и ту же партицию, обеспечивая порядок обработки. Это сделано для маршрутизации.
Как Kafka распределяет партиции между консьюмерами?
Kafka сама балансирует нагрузку между партициями, так что вручную управлять этим нет необходимости.
Kafka автоматически распределяет партиции между консьюмерами в одной группе. Когда новый консьюмер подключается или отключается, происходит ребалансировка, и партиции перераспределяются заново.
Важно! Количество активных консьюмеров в группе не может превышать количество партиций. Если партиций меньше, чем консьюмеров, лишние консьюмеры будут простаивать.
Где применять Kafka в Go?
Kafka и библиотека Segmentio отлично подходят для множества задач:
Микросервисная архитектура
Kafka позволяет микросервисам общаться через события. Например, сервис заказов публикует событие «OrderCreated», а сервис уведомлений подхватывает его и отправляет письмо клиенту.Обработка потоков данных
Если у вас есть логи, метрики или данные с IoT-устройств, Kafka поможет передавать и обрабатывать их в реальном времени.Event Sourcing
Kafka хранит историю всех событий, так что в любой момент можно восстановить состояние системы. Это полезно для аудита, отладки и репликации данных.Аналитика в реальном времени
Используя Kafka Streams или консьюмеров в Go, можно строить дашборды и отчёты прямо на лету, без задержек.
Заключение
Kafka и библиотека kafka-go дают разработчикам мощный инструмент для построения масштабируемых и отказоустойчивых систем. Главное — понимать, как все это работает и правильно управлять ошибками. kafka-go упрощает работу с продюсерами и консьюмерами, позволяя сосредоточиться на бизнес-логике, а не на низкоуровневых деталях.
Попробуйте добавить Kafka в свой следующий проект и сообщения от hr’ок вам гарантированы.