Продюсеры и консьюмеры с Apache Kafka в Python
Салют, Хабр!
Apache Kafka — это распределенная платформа потоковой обработки, предназначенная для построения систем обработки данных. Kafka позволяет публиковать, подписываться, хранить и обрабатывать потоки данных в реальном времени. Все это дает нам очень высокую пропускную способность и масштабируемость.
Основные фигуры в кафке это продюсеры и консюмеры. Продюсеры — это компоненты, которые производят и отправляют данные в Kafka. Они могут быть чем угодно: от простых скриптов до сложных систем. Консюмеры — это те, кто подписывается на данные и обрабатывает их. Они могут быть реализованы в различных формах, например, для анализа данных или мониторинга.
В статье мы и поговорим именно про продюсерах и консюмерах в экосистеме Kafka в коннекте с Python.
Запустим кафку на локалке
Скачиваем Java, так как Kafka написана на этом языке.
Переходим на официальный сайт Apache Kafka и скачивем последнюю версию Kafka. Выбираем бинарный файл, соответствующий вашей операционной системе.
Kafka использует Zookeeper для координации, поэтому сначала нужно запустить Zookeeper. Распакаваем скачанный файл и запускаем Zookeeper, используя скрипт zookeeper-server-start.sh
(или .bat
для Windows) из папки Kafka.
Далее запускаем Kafka Server используя kafka-server-start.sh
(или .bat
).
Библиотеки для работы с кафкой в питоне
1. confluent-kafka-python
разработана Confluent, компанией, стоящей за коммерческим развитием Kafka. Библиотека обеспечивает высокую производительность и поддерживает последние функции Kafka, включая управление потреблением сообщений. Она написана на C и Python, что звучит очень круто и хорошо показывает себя на практике
2. kafka-python
полностью написана на питоне, что делает ее легко устанавливаемой и используемой. Идеально подходит для разработчиков, которые предпочитают работать исключительно в питон-экосистеме, не опираясь на внешние зависимости C-библиотек. Следует учесть, что она может быть менее производительной по сравнению с confluent-kafka-python
.
В статье будем использовать confluent-kafka-python
. Потому что она предлагает лучшее сочетание производительности и поддержки новых возможностей Kafka. К тому же, благодаря своей близости к исходному проекту Kafka, confluent-kafka-python
часто обновляется и включает последние улучшения и исправления.
Так что:
pip install confluent-kafka
Producer
Создаем скрипт и начинаем с импорта нужной библиотеки:
from confluent_kafka import Producer
Определяем параметры конфигурации для нашего producer’а. Эти настройки включают в себя адреса серверов Kafka (в нашем случае, это локальный сервер), а также другие опции:
config = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'mygroup',
'auto.offset.reset': 'earliest'
}
Теперь инициализируем объект Producer и пишем функцию для отправки сообщений:
producer = Producer(config)
def send_message(topic, message):
producer.produce(topic, value=message)
producer.flush()
Вызываем функцию send_message
, указывая тему и сообщение:
send_message('test_topic', 'Hello Kafka World!')
'test_topic'
— это название темы в Kafka, куда мы отправляем сообщение 'Hello Kafka World!'
.
Можно определить функцию обратного вызова для отслеживания успешной доставки сообщений или ошибок:
def delivery_report(err, msg):
if err is not None:
print(f'Message delivery failed: {err}')
else:
print(f'Message delivered to {msg.topic()} [{msg.partition()}]')
# используем эту функцию при отправке
producer.produce('test_topic', 'Hello again!', callback=delivery_report)
producer.flush()
Можно автоматически подтверждать смещения:
config['enable.auto.commit'] = True
config['auto.commit.interval.ms'] = 1000
Для тонкой обработки, можно управлять смещениями вручную:
config['enable.auto.commit'] = False
for msg in consumer:
process_message(msg)
consumer.commit()
Kafka позволяет прямо работать с партициями в теме:
from confluent_kafka import TopicPartition
# подписка на конкретную партицию
topic_partition = TopicPartition('test_topic', 0)
consumer.assign([topic_partition])
# перемещение к определенному смещению в партиции
consumer.seek(TopicPartition('test_topic', 0, 10))
К примеру у нас есть приложение, которое собирает данные с различных источников, сериализует их в JSON и отправляет в Kafka. Создадим функцию для генерации этих данных, функцию для их сериализации и функцию для отправки в Kafka:
import json
import random
import time
from confluent_kafka import Producer
# конфигурация Producer'а
config = {
'bootstrap.servers': 'localhost:9092',
'client.id': 'python-producer'
}
producer = Producer(config)
# функция для генерации случайных данных
def generate_data():
return {
'sensor_id': random.randint(1, 100),
'temperature': random.uniform(20.0, 30.0),
'humidity': random.uniform(30.0, 50.0),
'timestamp': int(time.time())
}
# функция для сериализации данных в JSON
def serialize_data(data):
return json.dumps(data)
# функция для отправки сообщения
def send_message(topic, data):
producer.produce(topic, value=data)
producer.flush()
# основной цикл отправки сообщений
try:
while True:
# генерируем случайные данные
data = generate_data()
# сериализуем данные
serialized_data = serialize_data(data)
# отправляем данные в Kafka
send_message('sensor_data', serialized_data)
# логирование отправленного сообщения
print(f'Sent data: {serialized_data}')
# пауза между отправками
time.sleep(1)
except KeyboardInterrupt:
print('Stopped.')
producer.close()
Симулируем сбор данных от сенсоров. Каждый сенсор имеет уникальный ID, а также измеряет температуру и влажность. Данные генерируются случайным образом, сериализуются в JSON и отправляются в Kafka topic 'sensor_data'
. Используем цикл для непрерывной отправки данных, имитируя реальный поток данных от сенсоров.
Consumer
Перед тем, как начать, нужно настроить параметры нашего consumer’а.
from confluent_kafka import Consumer, KafkaException
config = {
'bootstrap.servers': 'localhost:9092', # Список серверов Kafka
'group.id': 'mygroup', # Идентификатор группы потребителей
'auto.offset.reset': 'earliest' # Начальная точка чтения ('earliest' или 'latest')
}
Создадим объект Consumer и настраиваем его с использованием наших параметров:
consumer = Consumer(config)
Consumer должен подписаться на одну или несколько тем. Предположим, что мы хотим подписаться на тему с именем 'test_topic'.
consumer.subscribe(['test_topic'])
Создадим бесконечный цикл, который будет получать сообщения из Kafka:
try:
while True:
msg = consumer.poll(timeout=1.0) # ожидание сообщения
if msg is None: # если сообщений нет
continue
if msg.error(): # обработка ошибок
raise KafkaException(msg.error())
else:
# действия с полученным сообщением
print(f"Received message: {msg.value().decode('utf-8')}")
except KeyboardInterrupt:
pass
finally:
consumer.close() # не забываем закрыть соединение
Внутри цикла чтения сообщений можно реализовать любую логику обработки этих сообщений. Это может быть запись в базу данных, обработка данных для аналитики, отправка уведомлений и многое другое.
Иногда нужно обрабатывать только определенные сообщения, например, фильтруя их по ключу или содержимому:
for msg in consumer:
if msg.key() == b'specific_key':
# обработка сообщений с определенным ключом
pass
elif b'important' in msg.value():
# обработка сообщений, содержащих 'important'
pass
Напишем пример consumer’а, который подключается к Kafka cluster, подписывается на определённый topic и обрабатывает входящие сообщения:
from confluent_kafka import Consumer, KafkaException, KafkaError
import sys
import logging
def create_consumer(config):
consumer = Consumer(config)
def basic_consume_loop(consumer, topics):
try:
# подписываемся на топик
consumer.subscribe(topics)
while True:
msg = consumer.poll(timeout=1.0)
if msg is None: continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
sys.stderr.write('%% %s [%d] reached end at offset %d\n' %
(msg.topic(), msg.partition(), msg.offset()))
elif msg.error():
raise KafkaException(msg.error())
else:
print(f"Received message: {msg.value().decode('utf-8')}")
except KeyboardInterrupt:
pass
finally:
consumer.close()
return basic_consume_loop
def main():
kafka_config = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'mygroup',
'auto.offset.reset': 'earliest'
}
consumer_loop = create_consumer(kafka_config)
consumer_loop(['test_topoc'])
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
main()
Сериализация данных на примере consumer
JSON, будучи текстовым форматом, легко читаем и универсален, , но может быть не таким эффективным по сравнению с бинарными форматами, особенно при больших объемах данных:
import json
from confluent_kafka import Consumer
# настройка consumer'а
# ...
consumer.subscribe(['json_topic'])
for msg in consumer:
if msg.value():
data = json.loads(msg.value().decode('utf-8'))
Avro в свою очередь предлагает хорошую производительность благодаря бинарной сериализации и сжатию. Он идеален для больших данных. Для работы с Avro в Kafka, можно использовать confluent-kafka-python
совместно с confluent-kafka-python[avro]
:
from confluent_kafka import DeserializingConsumer
from confluent_kafka.avro import AvroDeserializer
from confluent_kafka.avro.serializer import SerializerError
from confluent_kafka.schema_registry import SchemaRegistryClient
schema_registry_conf = {'url': 'http://myschemaregistry.com'}
schema_registry_client = SchemaRegistryClient(schema_registry_conf)
avro_deserializer = AvroDeserializer(schema_registry_client)
consumer_conf = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'avro_group',
'auto.offset.reset': 'earliest',
'key.deserializer': avro_deserializer,
'value.deserializer': avro_deserializer
}
consumer = DeserializingConsumer(consumer_conf)
consumer.subscribe(['avro_topic'])
try:
while True:
try:
msg = consumer.poll(1.0)
if msg is None:
continue
data = msg.value()
if data is not None:
except SerializerError as e:
print(f"Error deserializing Avro message: {e}")
continue
finally:
consumer.close()
Однако, чтобы максимально эффективно использовать Kafka, необходимо хорошо понимать его более глубокие концепции и принципы работы, поэтому предлагаю вам обратить внимание на онлайн-курс Apache Kafka от моих друзей из OTUS. А познакомиться с форматом обучения вы сможете на бесплатном уроке. Регистрируйтесь, будет интересно!