Продюсеры и консьюмеры с Apache Kafka в Python

4c2a5a374baa3f5d2f9fc93b1fb9813d.png

Салют, Хабр!

Apache Kafka — это распределенная платформа потоковой обработки, предназначенная для построения систем обработки данных. Kafka позволяет публиковать, подписываться, хранить и обрабатывать потоки данных в реальном времени. Все это дает нам очень высокую пропускную способность и масштабируемость.

Основные фигуры в кафке это продюсеры и консюмеры. Продюсеры — это компоненты, которые производят и отправляют данные в Kafka. Они могут быть чем угодно: от простых скриптов до сложных систем. Консюмеры — это те, кто подписывается на данные и обрабатывает их. Они могут быть реализованы в различных формах, например, для анализа данных или мониторинга.

В статье мы и поговорим именно про продюсерах и консюмерах в экосистеме Kafka в коннекте с Python.

Запустим кафку на локалке

Скачиваем Java, так как Kafka написана на этом языке.

Переходим на официальный сайт Apache Kafka и скачивем последнюю версию Kafka. Выбираем бинарный файл, соответствующий вашей операционной системе.

Kafka использует Zookeeper для координации, поэтому сначала нужно запустить Zookeeper. Распакаваем скачанный файл и запускаем Zookeeper, используя скрипт zookeeper-server-start.sh (или .bat для Windows) из папки Kafka.

Далее запускаем Kafka Server используя kafka-server-start.sh (или .bat).

Библиотеки для работы с кафкой в питоне

1. confluent-kafka-pythonразработана Confluent, компанией, стоящей за коммерческим развитием Kafka. Библиотека обеспечивает высокую производительность и поддерживает последние функции Kafka, включая управление потреблением сообщений. Она написана на C и Python, что звучит очень круто и хорошо показывает себя на практике

2. kafka-python полностью написана на питоне, что делает ее легко устанавливаемой и используемой. Идеально подходит для разработчиков, которые предпочитают работать исключительно в питон-экосистеме, не опираясь на внешние зависимости C-библиотек. Следует учесть, что она может быть менее производительной по сравнению с confluent-kafka-python.

В статье будем использовать confluent-kafka-python. Потому что она предлагает лучшее сочетание производительности и поддержки новых возможностей Kafka. К тому же, благодаря своей близости к исходному проекту Kafka, confluent-kafka-python часто обновляется и включает последние улучшения и исправления.

Так что:

pip install confluent-kafka

Producer

Создаем скрипт и начинаем с импорта нужной библиотеки:

from confluent_kafka import Producer

Определяем параметры конфигурации для нашего producer’а. Эти настройки включают в себя адреса серверов Kafka (в нашем случае, это локальный сервер), а также другие опции:

config = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'mygroup',
    'auto.offset.reset': 'earliest'
}

Теперь инициализируем объект Producer и пишем функцию для отправки сообщений:

producer = Producer(config)

def send_message(topic, message):
    producer.produce(topic, value=message)
    producer.flush()

Вызываем функцию send_message, указывая тему и сообщение:

send_message('test_topic', 'Hello Kafka World!')

'test_topic' — это название темы в Kafka, куда мы отправляем сообщение 'Hello Kafka World!'.

Можно определить функцию обратного вызова для отслеживания успешной доставки сообщений или ошибок:

def delivery_report(err, msg):
    if err is not None:
        print(f'Message delivery failed: {err}')
    else:
        print(f'Message delivered to {msg.topic()} [{msg.partition()}]')

# используем эту функцию при отправке
producer.produce('test_topic', 'Hello again!', callback=delivery_report)
producer.flush()

Можно автоматически подтверждать смещения:

config['enable.auto.commit'] = True
config['auto.commit.interval.ms'] = 1000

Для тонкой обработки, можно управлять смещениями вручную:

config['enable.auto.commit'] = False

for msg in consumer:
    process_message(msg)
    consumer.commit()

Kafka позволяет прямо работать с партициями в теме:

from confluent_kafka import TopicPartition

# подписка на конкретную партицию
topic_partition = TopicPartition('test_topic', 0)
consumer.assign([topic_partition])

# перемещение к определенному смещению в партиции
consumer.seek(TopicPartition('test_topic', 0, 10))

К примеру у нас есть приложение, которое собирает данные с различных источников, сериализует их в JSON и отправляет в Kafka. Создадим функцию для генерации этих данных, функцию для их сериализации и функцию для отправки в Kafka:

import json
import random
import time
from confluent_kafka import Producer

# конфигурация Producer'а
config = {
    'bootstrap.servers': 'localhost:9092',
    'client.id': 'python-producer'
}
producer = Producer(config)

# функция для генерации случайных данных
def generate_data():
    return {
        'sensor_id': random.randint(1, 100),
        'temperature': random.uniform(20.0, 30.0),
        'humidity': random.uniform(30.0, 50.0),
        'timestamp': int(time.time())
    }

# функция для сериализации данных в JSON
def serialize_data(data):
    return json.dumps(data)

# функция для отправки сообщения
def send_message(topic, data):
    producer.produce(topic, value=data)
    producer.flush()

# основной цикл отправки сообщений
try:
    while True:
        # генерируем случайные данные
        data = generate_data()
        
        # сериализуем данные
        serialized_data = serialize_data(data)

        # отправляем данные в Kafka
        send_message('sensor_data', serialized_data)

        # логирование отправленного сообщения
        print(f'Sent data: {serialized_data}')

        # пауза между отправками
        time.sleep(1)
except KeyboardInterrupt:
    print('Stopped.')

producer.close()

Симулируем сбор данных от сенсоров. Каждый сенсор имеет уникальный ID, а также измеряет температуру и влажность. Данные генерируются случайным образом, сериализуются в JSON и отправляются в Kafka topic 'sensor_data'. Используем цикл для непрерывной отправки данных, имитируя реальный поток данных от сенсоров.

Consumer

Перед тем, как начать, нужно настроить параметры нашего consumer’а.

from confluent_kafka import Consumer, KafkaException

config = {
    'bootstrap.servers': 'localhost:9092',  # Список серверов Kafka
    'group.id': 'mygroup',                  # Идентификатор группы потребителей
    'auto.offset.reset': 'earliest'         # Начальная точка чтения ('earliest' или 'latest')
}

Создадим объект Consumer и настраиваем его с использованием наших параметров:

consumer = Consumer(config)

Consumer должен подписаться на одну или несколько тем. Предположим, что мы хотим подписаться на тему с именем 'test_topic'.

consumer.subscribe(['test_topic'])

Создадим бесконечный цикл, который будет получать сообщения из Kafka:

try:
    while True:
        msg = consumer.poll(timeout=1.0)  # ожидание сообщения
        if msg is None:                   # если сообщений нет
            continue
        if msg.error():                   # обработка ошибок
            raise KafkaException(msg.error())
        else:
            # действия с полученным сообщением
            print(f"Received message: {msg.value().decode('utf-8')}")
except KeyboardInterrupt:
    pass
finally:
    consumer.close()  # не забываем закрыть соединение

Внутри цикла чтения сообщений можно реализовать любую логику обработки этих сообщений. Это может быть запись в базу данных, обработка данных для аналитики, отправка уведомлений и многое другое.

Иногда нужно обрабатывать только определенные сообщения, например, фильтруя их по ключу или содержимому:

for msg in consumer:
    if msg.key() == b'specific_key':
        # обработка сообщений с определенным ключом
        pass
    elif b'important' in msg.value():
        # обработка сообщений, содержащих 'important'
        pass

Напишем пример consumer’а, который подключается к Kafka cluster, подписывается на определённый topic и обрабатывает входящие сообщения:

from confluent_kafka import Consumer, KafkaException, KafkaError
import sys
import logging

def create_consumer(config):
    consumer = Consumer(config)

    def basic_consume_loop(consumer, topics):
        try:
            # подписываемся на топик
            consumer.subscribe(topics)

            while True:
                msg = consumer.poll(timeout=1.0)
                if msg is None: continue
                if msg.error():
                    if msg.error().code() == KafkaError._PARTITION_EOF:
                        sys.stderr.write('%% %s [%d] reached end at offset %d\n' %
                                        (msg.topic(), msg.partition(), msg.offset()))
                    elif msg.error():
                        raise KafkaException(msg.error())
                else:
                    print(f"Received message: {msg.value().decode('utf-8')}")
        except KeyboardInterrupt:
            pass
        finally:
            consumer.close()

    return basic_consume_loop

def main():
    kafka_config = {
        'bootstrap.servers': 'localhost:9092', 
        'group.id': 'mygroup',
        'auto.offset.reset': 'earliest'
    }

    consumer_loop = create_consumer(kafka_config)
    consumer_loop(['test_topoc'])

if __name__ == '__main__':
    logging.basicConfig(level=logging.INFO)
    main()

Сериализация данных на примере consumer

JSON, будучи текстовым форматом, легко читаем и универсален, , но может быть не таким эффективным по сравнению с бинарными форматами, особенно при больших объемах данных:

import json
from confluent_kafka import Consumer

# настройка consumer'а
# ...

consumer.subscribe(['json_topic'])

for msg in consumer:
    if msg.value():
        data = json.loads(msg.value().decode('utf-8'))

Avro в свою очередь предлагает хорошую производительность благодаря бинарной сериализации и сжатию. Он идеален для больших данных. Для работы с Avro в Kafka, можно использовать confluent-kafka-python совместно с confluent-kafka-python[avro]:

from confluent_kafka import DeserializingConsumer
from confluent_kafka.avro import AvroDeserializer
from confluent_kafka.avro.serializer import SerializerError
from confluent_kafka.schema_registry import SchemaRegistryClient

schema_registry_conf = {'url': 'http://myschemaregistry.com'}
schema_registry_client = SchemaRegistryClient(schema_registry_conf)

avro_deserializer = AvroDeserializer(schema_registry_client)

consumer_conf = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'avro_group',
    'auto.offset.reset': 'earliest',
    'key.deserializer': avro_deserializer,
    'value.deserializer': avro_deserializer
}

consumer = DeserializingConsumer(consumer_conf)
consumer.subscribe(['avro_topic'])

try:
    while True:
        try:
            msg = consumer.poll(1.0)
            if msg is None:
                continue
            data = msg.value()
            if data is not None:
        except SerializerError as e:
            print(f"Error deserializing Avro message: {e}")
            continue
finally:
    consumer.close()

Однако, чтобы максимально эффективно использовать Kafka, необходимо хорошо понимать его более глубокие концепции и принципы работы, поэтому предлагаю вам обратить внимание на онлайн-курс Apache Kafka от моих друзей из OTUS. А познакомиться с форматом обучения вы сможете на бесплатном уроке. Регистрируйтесь, будет интересно!

© Habrahabr.ru