Освоение gRPC на Python: Продвинутые техники. Часть III
После рассмотрения прошлых тем gRPC на Python:
Перейдем к третьей (завершающей) части, в которой разберем следующие разделы:
Interceptor;
Трассировка с использованием Jaeger и OpenTelemetry;
Reflection;
Потоковая передача данных;
Health Checking и цепочка вызовов (Deadlines).
Перед тем, как приступить к изучению новых технологий, ознакомимся с архитектурой микросервиса, реализованного в нашем демонстрационном проекте.
Архитектура демонстрационного проекта
Архитектура демонстрационного микросервиса
Postman — клиент-приложение для удобной демонстрации работы streaming запросов и ответов, а также для показа выполнения функционала reflection на стороне сервера.
Browser — клиент-приложение, используем любой браузер.
API gateway FastAPI — сервер, предназначенный для запуска gPRC сервисов и также для удобной демонстрации работы.
Interceptor — перехватчики в микросервисах gRPC, реализующие логику валидации запроса (аутентификацию).
Order — микросервис gRPC, реализующий CRUD операции заказов.
Check — микросервис gRPC, реализующий проверку статуса заказа. Служит для показа выполнения цепочки вызовов в микросервисной архитектуре.
Echo — микросервис gRPC, являющийся эхо-сервером. Служит для показа работы потоковой передачи данных.
Health — микросервис gRPC, реализующий проверку работоспособности сервиса.
Jaeger — платформа для трассировки, используемая для мониторинга микросервисной архитектуры.
Tracing (OpenTelemetry) — набор стандартов и инструментов для сбора данных о производительности и работы микросервисов.
DB — база данных (SQLite), используемая для хранения информации о заказах.
Interceptor
Interceptor — это компонент, который позволяет перехватывать и изменять запросы и ответы на стороне клиента или сервера до их обработки или отправки. Interceptor позволяет добавлять дополнительную функциональность, такую как логирование, аутентификацию, авторизацию, кэширование и другие функции, не изменяя основной код приложения. В нашем случае, мы будем использовать данную технологию, в качестве аутентификации.
Для демонстрации аутентификации с помощью Interceptor, была добавлена конечная точка get_token FastApi для получения jwt-токена с временем жизни 1 день, с помощью которого будем получать доступ к сервисам gRPC.
@router.get("/get_token")
async def get_token() -> JSONResponse:
payload = {
"username": "0xN1ck",
"exp": datetime.utcnow() + timedelta(days=1)
}
token = jwt.encode(payload, settings.SECRET_KEY, algorithm='HS256')
return JSONResponse({'rpc-auth': f'{token}'}, status_code=status.HTTP_200_OK)
Итак, как же нам реализовать саму логику Interceptor? Для этого мы выполним следующие шаги:
Первым этапом создадим файл interceptor.py, в котором реализуем два класса AuthInterceptor — для сервера и KeyAuthClientInterceptor — для клиента.
from functools import partial
import grpc
import jwt
from grpc.aio import ClientCallDetails
class AuthInterceptor(grpc.aio.ServerInterceptor):
def __init__(self, key):
# При инициализации создаем атрибут _valid_metadata,
# который будет хранить секретный ключ для валидации токена пользователя
self._valid_metadata = key
@staticmethod
async def deny(_, context, details):
# Функция, предназначенная для отправки сообщений пользователю при отработках ошибок в функции intercept_service
await context.abort(grpc.StatusCode.UNAUTHENTICATED, details)
async def intercept_service(self, continuation, handler_call_details):
# Получаем кортеж, содержащий метаданные
metadatums = handler_call_details.invocation_metadata
try:
# Получаем токен из метаданных
resault = next(filter(lambda x: x.key == 'rpc-auth', metadatums))
if jwt.decode(resault.value, self._valid_metadata, algorithms=['HS256']):
return await continuation(handler_call_details)
except StopIteration:
return grpc.unary_unary_rpc_method_handler(partial(self.deny, details="Токен не найден"))
except jwt.ExpiredSignatureError:
return grpc.unary_unary_rpc_method_handler(partial(self.deny, details="Время жизни токена истекло"))
except jwt.InvalidTokenError:
return grpc.unary_unary_rpc_method_handler(partial(self.deny, details="Токен не валиден"))
class KeyAuthClientInterceptor(grpc.aio.UnaryUnaryClientInterceptor):
def __init__(self, user_token):
# Получаем токен пользователя
self.user_token: str = user_token
async def intercept_unary_unary(self, continuation, client_call_details, request):
# Добавляем токен в метаданные с ключом rpc-auth и отправляем запрос на сервер
metadata = []
if client_call_details.metadata is not None:
metadata = list(client_call_details.metadata)
metadata.append(("rpc-auth", self.user_token))
new_details = ClientCallDetails(
client_call_details.method,
client_call_details.timeout,
metadata,
client_call_details.credentials,
client_call_details.wait_for_ready,
)
response = await continuation(new_details, request)
return response
В классе KeyAuthClientInterceptor добавляем токен в метаданные с ключом rpc-auth и отправляем запрос на сервер.
В классе AuthInterceptor добавляем проверку на валидность токена. Если токен валиден, то пропускаем запрос далее, в противном случае возвращаем ошибку.
Теперь нам нужно применить данные классы в наших клиентах и сервере gPRC.
Для сервиса Order реализуем клиента следующим образом в файле clients/order.py:
import grpc
from fastapi import Request
from grpc_core.protos.order import order_pb2_grpc
from grpc_core.servers.interceptors import KeyAuthClientInterceptor
from settings import settings
async def grpc_order_client(request: Request):
# Так как мы используем FastApi для демонстрации, прокинем токен в заголовок запроса
auth = request.headers.get("rpc-auth")
# При создании канала используем параметр interceptors для добавления нашего перехватчика, в который передаем токен
channel = grpc.aio.insecure_channel(
f'{settings.GRPC_HOST_LOCAL}:{settings.GRPC_PORT}',
interceptors=[
KeyAuthClientInterceptor(auth),
],
)
client = order_pb2_grpc.OrderServiceStub(channel)
return client
Таким образом мы создаем клиента для сервиса Order, в который передаем наш перехватчик с полученным токеном для валидации. Для сервиса Check реализация клиента будет аналогичная.
И теперь регистрируем перехватчики на сервере похожим способом, как и на клиенте в файле servers/manager.py:
self.server = aio.server(
ThreadPoolExecutor(max_workers=10),
interceptors=[
AuthInterceptor(settings.SECRET_KEY),
]
)
Используем секретный ключ в классе AuthInterceptor для валидации токена.
P.S. В классе KeyAuthClientInterceptor, унаследованного от grpc.aio.UnaryUnaryClientInterceptor, мы создали interceptor для клиента, переопределив метод intercept_unary_unary, который служит для унарных (UnaryUnary) вызовов. Хотя в нашей архитектуре проекта видно, что мы будем рассматривать не только унарные вызовы, но и UnaryStream, StreamUnary, StreamStream в сервисе Echo, мы не определяли необходимые перехватчики для клиента сервиса Echo, потому что будем использовать Postman для демонстрации, который реализует запросы к сервису собственными методами «из-под капота». Создание данных перехватчиков можете рассмотреть самостоятельно. Логика будет несильно отличаться.
Результат проделанной работы:
Невалидный токен
Получаем в случае не валидного токена ошибку. Наблюдаем, что перехватчик исправно отработал.
Валидный токен
В данном запросе видим, что с данным токеном запрос прошел успешно.
Трассировка с использованием Jaeger и OpenTelemetry
При разработке микросервисов часто возникают проблемы с пониманием, что происходит внутри приложения и как запросы проходят через все эти сервисы. И для мониторинга таких процессов нам может помочь трассировка. Она может отслеживать, как запросы проходят через разные части системы, и видеть, сколько времени занимает каждая операция.
Это важно, потому что в микросервисной архитектуре запрос может проходить через десятки сервисов, и любая задержка или ошибка в одном из них может повлиять на работу всего приложения.
Jaeger и OpenTelemetry — это два инструмента, которые помогают с трассировкой. Jaeger помогает следить за запросами в распределённых системах. OpenTelemetry — это открытый стандарт, который объединяет практики для сбора трассировок, метрик и логов.
Приступим к процессу применения данных технологий на практике.
Первым делом, при установке необходимых библиотек в демонстрационном проекте, пришлось понизить версию gRPC из-за конфликта зависимостей protobuf. Плагин в OpenTelemetry from opentelemetry.exporter.jaeger.proto.grpc import JaegerExporter
требовал protobuf версии 3.20.0, а последняя версия gRPC использовала protobuf пятой версии. В файле pyproject.toml настроены все зависимости.
Следующим шагом необходимо развернуть сам сервис Jaeger. Для этого используем Docker Compose со следующим конфигурационным файлом.
version: '3.9'
services:
jaeger:
image: jaegertracing/all-in-one:${JAEGER_VERSION:-latest}
ports:
- "16686:16686"
- "4318:4318"
- "14250:14250"
environment:
- LOG_LEVEL=debug
networks:
- jaeger-example
networks:
jaeger-example:
Теперь можно приступить к написанию трассировочного кода. Для этого мы при запуске сервера gRPC (servers/manager.py) реализуем настройку трассировки.
# Создаем экспортёр для отправки трассировочных данных в Jaeger.
# Указываем адрес коллектора и разрешаем небезопасное соединение (без шифрования).
jaeger_exporter = JaegerExporter(
collector_endpoint=f'{settings.JAEGER_HOST}:{settings.JAEGER_PORT}',
insecure=True
)
# Создаем процессор для пакетной обработки трассировочных данных (спанов).
# Он будет собирать спаны и отправлять их в Jaeger с использованием созданного экспортёра.
span_processor = BatchSpanProcessor(jaeger_exporter)
# Устанавливаем глобальный провайдер трассировки.
# Указываем, что ресурс трассировки будет иметь имя "Order".
# Это имя будет использоваться для идентификации службы в системе трассировки.
trace.set_tracer_provider(
TracerProvider(resource=Resource.create({SERVICE_NAME: "Order"}))
)
# Добавляем созданный процессор спанов в провайдер трассировки.
# Это необходимо для того, чтобы спаны обрабатывались и отправлялись в Jaeger.
trace.get_tracer_provider().add_span_processor(span_processor)
# Создаем инструмент для автоматической трассировки gRPC сервера.
grpc_server_instrumentor = GrpcAioInstrumentorServer()
# Включаем автоматическую трассировку.
# Это позволит автоматически отслеживать все вызовы gRPC на сервере.
grpc_server_instrumentor.instrument()
# Такая же настройка клиента, как и для сервера
grpc_client_instrumentor = GrpcAioInstrumentorClient()
grpc_client_instrumentor.instrument()
Спан представляет собой единицу выполнения операции. Он отслеживает конкретные события, связанные с запросом, создавая картину того, что произошло во время выполнения этой операции.
Далее рассмотрим использование трассировочного кода в одном из наших сервисов. Возьмем для примера OrderService и реализуем трассировку в методе ListOrders.
async def ListOrders(self, request, context) -> order_pb2.ListOrdersResponse:
# Получаем трассировщик для текущего модуля
tracer = trace.get_tracer(__name__)
# Создаем новый span для текущего метода ListOrders
with tracer.start_as_current_span("ListOrders") as span:
try:
logger.info(f'Получен запрос на получение списка заказов')
result = await OrderHandler.list_orders()
response = self.message.dict_to_rpc(
data=result.dict(),
request_message=order_pb2.ListOrdersResponse(),
)
# Устанавливаем атрибут статус-кода RPC в span
span.set_attribute("rpc.grpc.status_code", "OK")
# Добавляем событие успешного ответа в span
span.add_event("Successful response", {"response": str(response)})
return response
except Exception as e:
# Устанавливаем статус span как ошибочный и добавляем сообщение об ошибке
span.set_status(Status(StatusCode.ERROR, str(e)))
# Добавляем событие ошибки в span с деталями об ошибке
span.add_event("Error response", {"error": str(e)})
Проверим, как будет отрабатывать наша трассировка.
Jaeger
Request info
Мы видим, что после отправки нескольких запросов, трассировка успешно произвела записи в Jaeger.
Reflection
Reflection — это механизм, который позволяет клиентам динамически исследовать возможности сервиса gRPC. Он предоставляет информацию о доступных методах и структурах данных сервиса, что упрощает разработку, отладку и тестирование, особенно в случаях, когда отсутствуют заранее подготовленные протобаф-файлы (protobuf). В нашем случае это упростит работу в приложение Postman.
На практике настройка Reflection проста и настраивается в инициализации сервера gRPC (servers/manager.py).
# Определение кортежа SERVICE_NAMES, содержащего полные имена сервисов, зарегистрированных на сервере.
SERVICE_NAMES = (
# Получение полных имени сервисов (OrderService, ...) из дескрипторов (order_pb2, ...).
order_pb2.DESCRIPTOR.services_by_name["OrderService"].full_name,
echo_pb2.DESCRIPTOR.services_by_name["EchoService"].full_name,
health_pb2.DESCRIPTOR.services_by_name["Health"].full_name,
check_pb2.DESCRIPTOR.services_by_name["CheckStatusOrderService"].full_name,
# Добавление стандартного имени сервиса reflection (reflection service).
reflection.SERVICE_NAME,
)
# Включение reflection для перечисленных в SERVICE_NAMES сервисов.
reflection.enable_server_reflection(SERVICE_NAMES, self.server)
Так как на нашем сервере настроен Interceptor, то в Postman также потребуется добавить токен в метаданные.
Reflection token
Reflection success
По скриншотам видно, что Postman увидел наши сервисы без применения protobuf-файлов.
Потоковая передача данных
gRPC поддерживает четыре режима взаимодействия между сервером и клиентом:
Unary gRPC (Однонаправленное gRPC): клиент отправляет запрос и ожидает ответа от сервера.
Server Streaming gRPC (Потоковая передача от сервера): сервер отвечает на запрос клиента потоком сообщений, завершая передачу сообщением о состоянии.
Client Streaming gRPC (Потоковая передача от клиента): клиент отправляет поток сообщений серверу, который в ответ посылает одно подтверждающее сообщение.
Bi Directional Streaming gRPC (Двунаправленная потоковая передача): клиент и сервер обмениваются потоками сообщений одновременно, при этом каждый поток передается независимо в обоих направлениях.
Streaming
Так как до этого мы уже использовали обычные Unary gRPC (Однонаправленное gRPC) запросы, то пропустим данный пункт и рассмотрим остальные три режима взаимодействия в Echo сервисе.
Первым делом, когда начинаем писать сервис gRPC, наша работа начинается с protobuf-файла. Для эхо-сервера он будет выглядеть следующим образом:
syntax = "proto3";
package echo;
message EchoMessage {
string username = 1;
string message = 2;
}
message DelayedReply {
repeated EchoMessage response = 1;
}
service EchoService {
// Client Streaming
rpc ClientStream (stream EchoMessage) returns (DelayedReply);
// Server Streaming
rpc ServerStream (EchoMessage) returns (stream EchoMessage);
// Both Streaming
rpc BothStream (stream EchoMessage) returns (stream EchoMessage);
}
После генерации python-файлов c помощью grpc-tools, как было рассмотрено в предыдущей нашей статье приступим к написанию Echo сервиса (servers/services/echo.py).
import asyncio
from loguru import logger
from grpc_core.protos.echo import echo_pb2
from grpc_core.protos.echo import echo_pb2_grpc
from grpc_core.servers.utils import GrpcParseMessage
class EchoService(echo_pb2_grpc.EchoServiceServicer):
def __init__(self):
self.message = GrpcParseMessage()
# Асинхронный метод для обработки клиентского стрима.
async def ClientStream(self, request_iterator, context) -> echo_pb2.DelayedReply:
# Создание ответа с отложенным ответом.
response = echo_pb2.DelayedReply()
# Асинхронный цикл для обработки каждого запроса из стрима.
async for request in request_iterator:
logger.info(f'Приняли запрос от стрим клиента: {self.message.rpc_to_dict(request)}')
response.response.append(request)
return response
# Асинхронный метод для обработки серверного стрима.
async def ServerStream(self, request, context):
logger.info(f'Приняли запрос от клиента: {self.message.rpc_to_dict(request)}')
# Цикл для отправки нескольких ответов клиенту.
for _ in range(3):
# Отправка текущего запроса обратно в качестве ответа.
yield request
logger.info(f'Ответил стрим сервер: {self.message.rpc_to_dict(request)}')
await asyncio.sleep(1)
# Асинхронный метод для обработки двунаправленного стрима.
async def BothStream(self, request_iterator, context):
# Асинхронный цикл для обработки каждого запроса из стрима.
async for request in request_iterator:
logger.info(f'Приняли запрос от стрима клиента: {self.message.rpc_to_dict(request)}')
# Цикл для отправки нескольких ответов клиенту на каждый запрос.
for i in range(3):
# Отправка текущего запроса обратно в качестве ответа.
yield request
logger.info(f'Ответил стрим сервер: {self.message.rpc_to_dict(request)}')
await asyncio.sleep(1)
Как видим, для эхо-сервера настройка streaming-методов ClientStream, ServerStream и BothStream была совсем не сложной. Давайте посмотрим, как это все будет выглядеть в приложении Postman на примере BothStream.
BothStream
Streaming both
Health Checking и цепочка вызовов (Deadlines).
Health Checking
Health Checking — это механизм, используемый для мониторинга и контроля состояния сервисов, чтобы убедиться, что они работают должным образом и могут обрабатывать запросы. Данная технология осуществляется с помощью функций Check и Watch и возвращает статусы: UNKNOWN, SERVING и NOT_SERVING. Для проверки состояния используется функция Check с помощью отправки Unary запроса и для мониторинга — функция Watch с помощью ServerStream запроса.
Для рассмотрения Health Checking на практике реализуем сервис Health (servers/services/health.py). Данный подход можно использовать и для других сервисов, которые выполняют свою логику, переопределив методы Check и Watch. Также
можно ознакомится с примером, который указан на официальном сайте gRPC Health Checking example.
import asyncio
from grpc_health.v1 import health_pb2
from grpc_health.v1 import health_pb2_grpc
class HealthService(health_pb2_grpc.HealthServicer):
# Асинхронный метод для проверки состояния.
async def Check(self, request, context):
# Возвращаем объект HealthCheckResponse со статусом SERVING, указывая, что сервис работает нормально
return health_pb2.HealthCheckResponse(
status=health_pb2.HealthCheckResponse.SERVING
)
# Асинхронный метод для подписки на обновления (мониторинга) состояния.
async def Watch(self, request, context):
while True:
current_status = health_pb2.HealthCheckResponse.SERVING
response = health_pb2.HealthCheckResponse(status=current_status)
yield response
await asyncio.sleep(1)
Цепочка вызовов (Deadlines).
Для рассмотрения данного вопроса был создан сервис Check, который принимает запрос от Order. Он выполняет логику проверки состояния заказа и возвращает статус его выполнения, и при получении ответа сервис Order обновляет статус заказа в БД. По итогу у нас получается следующая цепочка вызовов, в которой рассмотрим технологию Deadlines.
Chain of calls
# (api/oreder.py)
@router.post("/check")
async def check_order_status(
uuid: str,
client: t.Any = Depends(grpc_order_client),
key: str = Security(api_key_header),
) -> JSONResponse:
try:
# устанавливаем тайм-аут в 2 секунды, определяя за сколько времени должна будет отработать вся цепочка вызовов
order = await client.CheckStatusOrder(check_pb2.CheckStatusOrderRequest(uuid=uuid), timeout=2)
except AioRpcError as e:
raise HTTPException(status_code=404, detail=e.details())
return JSONResponse(MessageToDict(order))
Далее на сервисе Order поставим ожидание в 1 секунду и выполним логгирование, чтобы убедиться, сколько времени еще осталось, перед отправлением запроса на сервис Check.
await asyncio.sleep(1)
logger.info(f'Осталось времени в цепочке вызовов: {context.time_remaining()}') # 0.98
client = await grpc_check_client(auth=auth)
response = await client.CheckStatusOrder(
check_pb2.CheckStatusOrderRequest(uuid=request.uuid),
# передаем в тайм-аут, оставшееся время на выполнение
timeout=context.time_remaining()
)
await OrderHandler.update_after_check_order(response)
Тут наблюдаем, что осталось всего 0.98 секунд. Если сервис Check не успеет за это время, то отработает исключение, и можем получить следующий результат.
Deadline
В случае, если мы поставим ожидание более 2 секунд на сервисе Order, то и запрос не пройдет далее и цепочка вызовов будет завершена.
Из всего этого можно выделить следующие преимущества Deadlines:
Управление временем выполнения запросов. Установка сроков позволяет ограничить время, в течение которого запрос может выполняться. Это предотвращает зависание клиента в ожидании ответа и позволяет более эффективно использовать ресурсы.
Повышение надежности системы. Сроки помогают предотвратить ситуации, когда один медленный или зависший запрос блокирует выполнение других запросов. Это особенно важно в высоконагруженных системах.
Заключение
Мы завершили серию статей, посвящённых изучению gRPC на Python, охватив широкий спектр тем и аспектов, которые пригодятся при проектировании gRPC-сервисов. В третьей заключительной части мы подробно рассмотрели:
Interceptor: Разобрались, как реализовать перехватчики для аутентификации запросов с использованием JWT-токенов. Технология позволяет добавлять дополнительную функциональность без изменения основного кода приложения.
Трассировка с использованием Jaeger и OpenTelemetry: Настроили и применили инструменты для трассировки, что позволяет отслеживать прохождение запросов через микросервисы и выявлять возможные задержки или ошибки.
Reflection: Реализовали механизм Reflection, который позволяет клиентам динамически исследовать возможности сервиса gRPC. Это упростило работу с Postman без необходимости заранее подготовленных protobuf-файлов.
Потоковая передача данных: Рассмотрели и реализовали различные типы взаимодействия (Client Streaming, Server Streaming, Bi-Directional Streaming) в gRPC на примере Echo-сервиса. Это позволило продемонстрировать обмен данными в реальном времени между клиентом и сервером.
Health Checking и цепочка вызовов (Deadlines): Рассмотрели механизм Health Checking для мониторинга состояния сервисов, а также реализовали цепочку вызовов между микросервисами, применив использование Deadlines для контроля времени выполнения запросов.
В данной серии статей были охвачены ключевые аспекты и предоставлены практические примеры, которые помогут Вам в ваших проектах. Теперь вы знаете, как создавать и интегрировать различные компоненты gRPC, что позволит строить эффективные и надёжные микросервисные архитектуры.
P.S. Для ознакомления с лабораторными проектом можно перейти по ссылке grpc_example