Книга «Паттерны разработки на Python: TDD, DDD и событийно-ориентированная архитектура»
Привет, Хаброжители! Популярность Python продолжает расти, а значит, проекты, созданные на этом языке программирования, становятся все масштабнее и сложнее. Многие разработчики проявляют интерес к высокоуровневым паттернам проектирования, таким как чистая и событийно-управляемая архитектура и паттерны предметно-ориентированного проектирования (DDD). Но их адаптация под Python не всегда очевидна.
Гарри Персиваль и Боб Грегори познакомят вас с проверенными паттернами, чтобы каждый питонист мог управлять сложностью приложений и получать максимальную отдачу от тестов. Теория подкреплена примерами на чистом Python, лишенном синтаксической избыточности Java и C#.
В этой книге:
- «Инверсия зависимостей» и ее связи с портами и адаптерами (гексагональная/чистая архитектура).
- Различия между паттернами «Сущность», «Объект-значение» и «Агрегат» в рамках DDD.
- Паттерны «Репозиторий» и UoW, обеспечивающие постоянство хранения данных.
- Паттерны «Событие», «Команда» и «Шина сообщений».
- Разделение ответственности на команды и запросы (CQRS).
- Событийно-управляемая архитектура и реактивные расширения.
Команды и обработчик команд
В предыдущей главе мы говорили об использовании событий как способа представления данных на входе в систему. Мы превратили наше приложение в машину для обработки сообщений.
Ради этого мы преобразовали все функции варианта использования в обработчики событий. Когда API получает POST для создания новой партии товара, он создает новое событие BatchCreated и обрабатывает его так, как если бы это было внутреннее событие. Это может показаться нелогичным. В конце концов, партия товара еще не создана, именно поэтому мы и вызывали API. Мы собираемся исправить этот концептуальный недостаток с помощью ввода команд и покажем, как они могут обрабатываться одной и той же шиной сообщений по другим правилами.
Код для этой главы находится в ветке chapter_10_commands на GitHub:
git clone https://github.com/cosmicpython/code.git
cd code
git checkout chapter_10_commands
# или, если пишете код по ходу чтения, возьмите за основу
# материал из предыдущей главы:
git checkout chapter_09_all_messagebus
Команды и события
Подобно событиям, команды представляют собой тип сообщений — инструкции, посылаемые одной частью системы другой. Мы обычно представляем команды немыми структурами данных и можем обрабатывать их почти так же, как события.
Но различия между командами и событиями очень важны.
Команды посылаются одним актором другому конкретному актору в надежде, что в результате произойдет то или иное событие. Когда мы отправляем форму в обработчик API, то посылаем команду. Командам даются имена в форме глаголов повелительного наклонения вроде «allocate stock» («разместить товарный запас») или «delay shipment» («задержать поставку»).
Команды улавливают намерение (intent). Они выражают то, чего мы хотим от системы. Когда команды не выполняются, отправитель в результате должен получить информацию об ошибке.
Акторы транслируют события всем заинтересованным слушателям (listeners). Когда мы публикуем событие BatchQuantityChanged, то не знаем, кто его подхватит. Событиям даются имена в форме глаголов прошедшего времени или причастных оборотов вроде «order allocated to stock» («заказ размещен в товарном запасе») или «shipment delayed» («отгрузка задержана»).
Мы часто используем события, чтобы сообщить об успешных командах.
События улавливают факты о том, что происходило в прошлом. Поскольку мы не знаем, кто будет обрабатывать событие, отправителей не должно волновать, удалось получателям выполнить команду или нет. В табл. 10.1 сравниваются события и команды.
Какие команды сейчас есть в нашей системе?
Вытаскиваем несколько команд (src/allocation/domain/commands.py)
class Command:
pass
@dataclass
class Allocate(Command): 1
orderid: str
sku: str
qty: int
@dataclass
class CreateBatch(Command): 2
ref: str
sku: str
qty: int
eta: Optional[date] = None
@dataclass
class ChangeBatchQuantity(Command): 3
ref: str
qty: int
1 commands.Allocate заменит events.AllocationRequired.
2 commands.CreateBatch заменит events.BatchCreated.
3 commands.ChangeBatchQuantity заменит events.BatchQuantityChanged.
Различия в обработке исключений
Замена имен и глаголов — это прекрасно, но жонглирование словами не поменяет поведение системы. Мы хотим обращаться с событиями почти так же, как с командами, но не одинаково. Давайте посмотрим, как изменяется шина сообщений.
Направляем события и команды по-разному (src/allocation/service_layer/messagebus.py)
Message = Union[commands.Command, events.Event]
def handle(message: Message, uow: unit_of_work.AbstractUnitOfWork): 1
results = []
queue = [message]
while queue:
message = queue.pop(0)
if isinstance(message, events.Event):
handle_event(message, queue, uow) 2
elif isinstance(message, commands.Command):
cmd_result = handle_command(message, queue, uow) 2
results.append(cmd_result)
else:
raise Exception(f'{message} was not an Event or Command')
return results
1 У нее все еще есть основная точка входа handle (), принимающая message — команду либо событие.
2 Мы отправляем события и команды двум разным вспомогательным функциям, которые показаны ниже.
Вот как происходит работа с событиями:
События не могут прервать поток (src/allocation/service_layer/messagebus.py)
def handle_event(
event: events.Event,
queue: List[Message],
uow: unit_of_work.AbstractUnitOfWork
):
for handler in EVENT_HANDLERS[type(event)]: 1
try:
logger.debug('handling event %s with handler %s', event,
handler)
handler(event, uow=uow)
queue.extend(uow.collect_new_events())
except Exception:
logger.exception('Exception handling event %s', event)
continue 2
1 События передаются диспетчеру, который может делегировать их многочисленным обработчикам для каждого события.
2 Он отлавливает и логирует ошибки, но не позволяет им прерывать обработку сообщений.
И вот как выполняются команды:
Команды заново инициируют исключения (src/allocation/service_layer/messagebus.py)
def handle_command(
command: commands.Command,
queue: List[Message],
uow: unit_of_work.AbstractUnitOfWork
):
logger.debug('handling command %s', command)
try:
handler = COMMAND_HANDLERS[type(command)] 1
result = handler(command, uow=uow)
queue.extend(uow.collect_new_events())
return result 3
except Exception:
logger.exception('Exception handling command %s', command)
raise 2
1 Диспетчер команд ожидает, что для каждой команды будет лишь один обработчик.
2 Если инициируются какие-либо ошибки, то они быстро останавливают работу и потом всплывают.
3 Инструкция return result с нами ненадолго; как уже упоминалось в подразделе «Уродливый костыль: шине сообщений приходится возвращать результаты» на с. 189, это костыль, позволяющий шине сообщений возвращать ссылку на партию товара, чтобы API мог его использовать. Мы исправим это в главе 12.
Мы также заменяем единый словарь HANDLERS разными словарями для команд и событий. По соглашению команды могут иметь только один обработчик.
Новые словари с обработчиками (src/allocation/service_layer/messagebus.py)
EVENT_HANDLERS = {
events.OutOfStock: [handlers.send_out_of_stock_notification],
} # тип: Dict[Type[events.Event], List[Callable]]
COMMAND_HANDLERS = {
commands.Allocate: handlers.allocate,
commands.CreateBatch: handlers.add_batch,
commands.ChangeBatchQuantity: handlers.change_batch_quantity,
} # тип: Dict[Type[commands.Command], Callable]
События, команды и обработка ошибок
Многие разработчики чувствуют себя некомфортно на этом этапе и спрашивают: «Что происходит, когда событие не обрабатывается? Что я должен сделать, чтобы система находилась в согласованном состоянии?» Если удается обрабатывать половину событий в messagebus.handle, прежде чем процесс остановится из-за нехватки памяти, то как быть с проблемами из-за потерянных сообщений?
Начнем с худшего случая: событие обработать не получается и система остается в несогласованном состоянии. Какая ошибка могла бы к этому привести? Нередко системы оказываются в таком состоянии, когда завершена только половина операции.
Представьте: мы размещаем заказ клиента на три штуки артикула КРЕСЛО_ЖЕЛАННОЕ, DESIRABLE_BEANBAG, но каким-то образом у нас не получается уменьшить количество оставшегося товара. Это привело бы к несогласованному состоянию: три штуки товара одновременно и размещены в заказе, и имеются в наличии, в зависимости от того, как вы на это посмотрите. И если позже мы разместим еще один заказ на те же самые кресла, то у службы поддержки клиентов начнутся проблемы.
Но мы уже позаботились о таких ситуациях с помощью настройки службы размещения заказов. Мы тщательно определили агрегаты, которые действуют как границы согласованности, и ввели паттерн UoW, который управляет атомарным успехом или неудачей обновления в агрегате.
Например, когда мы размещаем заказ на товар, границей согласованности является агрегат Product. Это означает, что мы не можем нечаянно превысить лимит размещений: конкретная товарная позиция заказа либо размещена, либо нет — тут нет места для несогласованных состояний.
По определению мы не требуем, чтобы два агрегата согласовывались сразу, поэтому если мы не сможем обработать событие и обновить только один агрегат, то система все равно может впоследствии стать согласованной. Мы не должны нарушать никаких системных ограничений.
С помощью этого примера мы можем лучше понять причину разбивки сообщений на команды и события. Когда пользователь хочет, чтобы система что-то сделала, мы представляем его запрос как команду. Эта команда должна изменить один агрегат и либо справиться с работой, либо нет. Все остальные мелкие детали вроде очистки и рассылки уведомлений можно организовать с помощью события. Успешность команды не зависит от успешности обработчиков событий.
Давайте рассмотрим еще один пример (из другого, воображаемого проекта), чтобы понять почему.
Представьте, что мы проектируем онлайн-магазин, который продает предметы роскоши. Отдел маркетинга хочет вознаграждать клиентов за повторные заказы. Мы будем отмечать клиентов как VIP-персон после того, как они совершат свою третью покупку, и это даст им право на приоритетное обслуживание и специальные предложения. Критерии принятия решения о клиенте по этой схеме имеют следующее содержание:
При наличии клиента с двумя заказами в своей истории,
когда клиент делает третий заказ,
он должен быть помечен как VIP-персона.
Когда клиент становится VIP-персоной впервые,
мы должны отправить ему имейл с поздравлениями
Мы применяем технические приемы, уже рассмотренные в этой книге, и создаем новый агрегат History, который регистрирует заказы и может инициировать события предметной области, когда эти правила удовлетворяются. Мы структурируем код следующим образом:
VIP-клиент (пример кода для другого проекта)
class History: # Агрегат
def __init__(self, customer_id: int):
self.orders = set() # Set[HistoryEntry]
self.customer_id = customer_id
def record_order(self, order_id: str, order_amount: int): 1
entry = HistoryEntry(order_id, order_amount)
if entry in self.orders:
return
self.orders.add(entry)
if len(self.orders) == 3:
self.events.append(
CustomerBecameVIP(self.customer_id)
)
def create_order_from_basket(uow, cmd: CreateOrder): 2
with uow:
order = Order.from_basket(cmd.customer_id, cmd.basket_items)
uow.orders.add(order)
uow.commit() # инициирует OrderCreated
def update_customer_history(uow, event: OrderCreated): 3
with uow:
history = uow.order_history.get(event.customer_id)
history.record_order(event.order_id, event.order_amount)
uow.commit() # инициирует CustomerBecameVIP
def congratulate_vip_customer(uow, event: CustomerBecameVip): 4
with uow:
customer = uow.customers.get(event.customer_id)
email.send(
customer.email_address,
f'Congratulations {customer.first_name}!'
)
1 Агрегат History улавливает правила перевода клиента в VIP-категорию. Отличное решение на случай, если в будущем правила станут более сложными.
2 Первый обработчик создает заказ для клиента и инициирует событие предметной области OrderCreated.
3 Второй обработчик обновляет объект History, регистрируя заказ.
4 Наконец, мы отправляем имейл клиенту, когда он становится VIP-персоной.
По этому коду можно получить некоторое представление об обработке ошибок в событийно-управляемой системе.
В текущей реализации мы инициируем события об агрегате после того, как сохранили состояние в базе данных. Что, если мы инициировали эти события до операции сохранения и зафиксировали все изменения одновременно? Ведь, казалось бы, так можно быть уверенными в том, что вся работа была завершена. Разве это не безопаснее?
Но что произойдет, если почтовый сервер будет загружен? Если вся работа должна быть завершена в одно и то же время, то занятый почтовый сервер не даст получить деньги за заказы.
Что произойдет, если в реализации агрегата History есть баг? Неужели не получится забрать деньги только потому, что в клиенте не удается распознать VIP-персону?
Благодаря разделению обязанностей получилось изолировать неудачи отдельных частей системы, что повысило ее общую надежность. Единственная часть, которая действительно должна завершиться — это обработчик команд, которой создает заказ. Это единственная часть, которая интересует клиента, и ставится в приоритет у стейкхолдеров.
Заметьте, как ловко мы совместили транзакционные границы с началом и концом бизнес-процессов. Используемые в коде имена соответствуют жаргону компании, а обработчики — этапам наших критериев на естественном языке. Это соответствие имен и структуры помогает лучше понимать системы по мере их разрастания.
Синхронное восстановление после ошибок
Надеемся, мы убедили вас в том, что если события отказывают независимо от команд, которые их инициировали, то это вполне нормально. Что же делать, чтобы оправиться от неизбежных ошибок?
Первое — знать, когда произошла ошибка, и для этого мы обычно полагаемся на логи.
Давайте еще раз посмотрим на метод handle_event из шины сообщений.
Текущая функция-обработчик (src/allocation/service_layer/messagebus.py)
def handle_event(
event: events.Event,
queue: List[Message],
uow: unit_of_work.AbstractUnitOfWork
):
for handler in EVENT_HANDLERS[type(event)]:
try:
logger.debug('Обработка события %s обработчиком %s',
event, handler)
handler(event, uow=uow)
queue.extend(uow.collect_new_events())
except Exception:
logger.exception('Исключение при обработке события %s',
event)
continue
Когда мы обрабатываем сообщение в системе, то сначала записываем в лог, что собираемся сделать. Для нашего варианта использования CustomerBecameVIP (клиент становится VIP) логи могут выглядеть следующим образом:
Обработка события CustomerBecameVIP(customer_id=12345)
обработчиком <функция congratulate_vip_customer at 0x10ebc9a60>
Поскольку для типов сообщений мы решили использовать dataclasses, то получаем аккуратно выведенную сводку входящих данных, которую можно скопировать и вставить в оболочку Python, чтобы воссоздать объект.
При возникновении ошибки можно использовать записанные данные, чтобы воспроизвести проблему в юнит-тесте или сообщение в системе.
Ручное воспроизведение хорошо работает в тех случаях, когда нужно исправить баг, перед тем как обработать событие повторно. Однако системы всегда будут испытывать некоторый фоновый уровень самоустраняющегося отказа. Сюда входят, например, сбои в сети, взаимоблокировки таблиц и кратковременные простои, вызванные развертыванием.
В большинстве случаев можно элегантно восстановиться, попробовав еще раз. Народная мудрость гласит: «Если с первого раза не получилось, повторите операцию с экспоненциальным откатом».
Обработчик с повторной попыткой (src/allocation/service_layer/messagebus.py)
from tenacity import Retrying, RetryError, stop_after_attempt, wait_
exponential 1
...
def handle_event(
event: events.Event,
queue: List[Message],
uow: unit_of_work.AbstractUnitOfWork
):
for handler in EVENT_HANDLERS[type(event)]:
try:
for attempt in Retrying( 2
stop=stop_after_attempt(3),
wait=wait_exponential()
):
with attempt:
logger.debug('Обработка события %s обработчиком
%s', event, handler)
handler(event, uow=uow)
queue.extend(uow.collect_new_events())
except RetryError as retry_failure:
logger.error(
'Не получилсоь обработать событие %s раз, отказ!,
retry_failure.last_attempt.attempt_number
)
continue
1 Tenacity — это библиотека, в которой реализованы часто встречающиеся паттерны для повторных попыток.
2 Здесь мы настраиваем шину сообщений на повторение операций до трех раз с экспоненциально увеличивающимся ожиданием между попытками.
Повторная попытка выполнения операций, которые могут завершиться неудачно, — это, вероятно, единственный лучший способ повысить отказоустойчивость ПО. Опять же из паттернов UoW и «Обработчик команд» следует, что каждая попытка начинается с согласованного состояния и не оставляет дела наполовину законченными.
В какой-то момент придется отказаться от попыток обработать сообщение. Создавать надежные системы с распределенными сообщениями сложно, и здесь мы опустим хитрости. В эпилоге есть ссылки на дополнительные источники.
Выводы
В этой книге мы решили рассказать о событиях, и только потом о командах, хотя в других книгах обычно делается наоборот. Сделать явными запросы, на которые наша система может ответить, дав им имя и собственную структуру данных, — довольно фундаментальная вещь. Вероятно, вы столкнетесь с тем, что люди называют паттерном «Обработчик команд» то, что мы делаем с паттернами «Событие», «Команда» и «Шина сообщений».
В табл. 10.2 описаны некоторые моменты, о которых следует подумать, прежде чем приступать к работе.
В главе 11 поговорим об использовании событий в качестве паттерна интеграции.
Более подробно с книгой можно ознакомиться на сайте издательства
» Оглавление
» Отрывок
Для Хаброжителей скидка 25% по купону — Python
По факту оплаты бумажной версии книги на e-mail высылается электронная книга.