Забываете передавать аргументы в функцию? Вам поможет contextvars

veyydvnoed0iszivpdazjypyets.png

Мы в Яндекс.Такси любим писать логи. Ещё больше мы любим, когда логи помогают нам расследовать проблемы в продакшене. При нагрузке в десятки тысяч RPS просто набора лог-записей мало. Хочется уметь фильтровать логи по пользователю, видеть последовательность вызовов клиентского API, а также углубляться в логи запроса.

Для реализации такого интерфейса каждая лог-запись в обработчике сопровождается метаинформацией: id заказа, пользователя, запроса. Однако иногда разработчики забывают добавить метаинформацию при логировании.

lwoq-cxf1mvzg0vxm4obrr3mkiy.png
Из логов хорошо понятно, что именно пошло не так


Проблема

Представим себе такой обработчик:

@route('/feedback')
async def feedback_handler(request):
    log_extra = {
        'handler': 'some-handler',
        'user_id': request.user_id,
    }
    ...
    order = await db.feedbacks.find_one(order_id=request.order_id)
    if order is None:
        logger.warning(f'user has no active orders in db', extra=log_extra)
        return process_without_order(request)

# output:
# text="user has no active orders in db" handler=some-handler user_id=some-guid

Хендлер запроса POST /feedback?order_id=d34db33f пытается найти указанный в query-параметре заказ, если заказа в БД нет — рапортует об этом в логе. Вызов логера сопровождается явной передачей аргумента extra. В нём содержится метаинформация, которую нужно вывести вместе с текстом лога. У такого подхода есть проблемы:


  • Аргумент extra необязателен для функций-логеров.
  • Отсутствие extra никак не влияет на результат тестов (тесты на логи мы обычно не пишем).
  • Проблемы с extra ловятся на ревью часто, но далеко не всегда.
  • Однажды созданный extra нужно тащить сквозь все сигнатуры, хотя фактически он не влияет на логику функции:
# handler.py
async def handler(request):
    log_extra = derive_extra(request)
    # ...
    await foo.some_logic1(arg, kwarg=kwarg, log_extra=log_extra)
    # ...

# foo.py
async def some_logic1(..., log_extra=None):
    # ...
    await bar.some_logic2(..., log_extra=log_extra)
    # ...

# bar.py
async def some_logic2(..., log_extra=None):
    # ...
    await qux.some_logic3(..., log_extra=log_extra)
    # ...

# qux.py
async def some_logic3(..., log_extra=None):
    # ...
    if something:
        logger.warning('Something has happened', extra=log_extra)
    # ...

На практике разработчики периодически забывают добавлять метаинформацию к логированию, из-за чего поведение некоторых хендлеров тяжело объяснить с точки зрения логов.

4drsnmehfc6ncbn9ivamz_yal5g.png
Из логов не понять причину 404 кода


Решения


Пропатчить логер

Зададимся целью сделать параметр extra обязательным для логирующих методов. Стандартная библиотека предоставляет возможность задать кастомный класс логера для потребителей функции logging.getLogger(). Воспользуемся этим и реализуем класс, для логинг-методов которого параметр extra будет обязательным.


Например, так
import logging

class MyLogger(logging.getLoggerClass()):
    sentinel = object()

    def __init__(self, name):
        logging.Logger.__init__(self, name=name)

    def _log(self, *args, extra=sentinel, **kwargs):
        if extra == self.sentinel:
            raise ValueError('Extra missed in logger call')
        super()._log(*args, **kwargs)

logging.setLoggerClass(MyLogger)
logger = logging.getLogger(__name__)
logger.error('With extra', extra={'meta': 'data'})
logger.error('Explicit no extra', extra=None)
logger.error('Error message without extra')

# output:
# With extra
# Explicit no extra
# Traceback (most recent call last):
# ...
# ValueError: Extra missed in logger call

При запуске тестов приложение будет падать на каждом вызове логера без extra.

Такая реализация выглядит опасно. Если мы не покроем тестами все строчки с логингом, мы рискуем получить исключение при логировании в продакшене. Да, неприятно попадать в непокрытую тестами строчку на бою, однако вряд ли бизнес согласен пятисотить при непереданном extra. Попробуем развить идею форсирования далее.


Статический анализ

Попробуем автоматически отслеживать наличие extra в вызовах логера до запуска приложения, основываясь на исходном коде. Кажется, что такое делается едва ли не грепом, но раз уж мы пишем на питоне, воспользуемся модулем ast. За несколько минут можно сделать наколеночный скрипт, и он в простейшем случае обнаружит все вызовы объекта logger, которым не был передан параметр extra.


Пример простейшей реализации
# linter.py
import ast, sys
code = open(sys.argv[1]).read()
for node in ast.walk(ast.parse(code)):
    # remember: not production-ready, just proof of concept
    if isinstance(node, ast.Call) and not isinstance(node.func, ast.Name):
        is_logger = node.func.value.id == 'logger'
        if is_logger and 'extra' not in [k.arg for k in node.keywords]:
            print(f'Missing extra at line {node.lineno}')

# lint_example.py
def foobar():
    log_extra = {'meta': 'data'}
    bar()
    logger.warning('Has no extra')
    baz()
    logger.warning('Has extra', extra=log_extra)
    logger.warning('No extra again')

# $ python3.7 linter.py lint_example.py
# Missing extra at line 4
# Missing extra at line 7

Вложив в него ещё несколько часов разработки, можно получить приемлемое для внедрения в CI решение. Однако у такого подхода есть два недостатка. Во-первых, существуют лог-записи без метаинформации (например, логи фреймворка до начала обработки запроса). Для таких вызовов разработчику придётся либо явно передавать extra=None, либо добавлять ignore-строку. Во-вторых, разработчики всё ещё вынуждены тащить log_extra сквозь сигнатуры всех методов.

Итог: вариант с линтером приемлем, но не идеален.

Обсудив проблему, мы решили попробовать автоматически добавлять метаинформацию к логированию, сняв эту обязанность с разработчика. Посмотрим, какие у нас есть варианты в зависимости от подхода к написанию приложений. Для простоты заменим обращения к логеру вызовами функции log, обёртки над print.


Синхронное однопоточное приложение

Первое решение, которое приходит в голову, — глобальная переменная, мы сохраняем в неё контекст в начале обработки запроса и достаём его при необходимости что-то залогировать.

LOG_EXTRA = {}

def set_log_extra(extra):
    global LOG_EXTRA
    LOG_EXTRA = extra

def get_log_extra():
    return LOG_EXTRA

def log(text):
    extra_tskv = '\t'.join(f'{k}={v}' for k, v in get_log_extra().items())
    msg = f'text="{text}"\t{extra_tskv}'
    print(msg)

@app.route('/feedback')
def feedback_handler_sync_singlethreaded(request):
    set_log_extra({ ... })
    order = db.feedbacks.find_one(order_id=request.order_id)
    if order is None:
        log('user has no active orders in db')
        return process_without_order(request)

# output:
# text="user has no active orders in db" handler=some-handler user_id=some-guid

Такой подход будет действовать, если каждый воркер веб-сервера работает в отдельном процессе. Однако многие синхронные веб-серверы поддерживают работу в многопоточном режиме, а для них решение с глобальной переменной не подходит из-за гонок.


Представим следующую ситуацию
  1. Начинаем обрабатывать запрос № 1, с помощью set_log_extra сохраняем в глобальную переменную контекст запроса.


  2. Не находим заказ в базе — наступает время логировать ошибку.


  3. В этот момент соседний поток начал выполнять запрос № 2 и вызвал set_log_extra со своим контекстом.


  4. Поскольку память у потоков общая, set_log_extra обновит значение глобальной переменной для всех тредов.


  5. В результате обработчик запроса № 1 залогирует ошибку с контекстом из запроса № 2.


Получается, что «всего лишь» глобальной переменной недостаточно для многопоточных серверов.


Многопоточное приложение

Проблема с гонками имеет тривиальное решение: будем хранить глобальную переменную не в общей для потоков памяти, а в локальной памяти каждого треда. В UNIX для этого есть готовый механизм thread-local storage, а Python предоставляет аналогичную по смыслу функциональность через threading.local:

import threading

LOG_EXTRA = threading.local()
LOG_EXTRA.data = {}

def set_log_extra(extra):
    global LOG_EXTRA
    LOG_EXTRA.data = extra

def get_log_extra():
    return LOG_EXTRA.data

# ...

В таком случае каждый поток будет работать со своим собственным значением поля LOG_EXTRA.data.

Однако в последнее время большинство новых бэкендов пишутся на асинхронных фреймворках вроде aiohttp. В мире event-loop«ов все запросы обрабатываются в разных корутинах, но в рамках одного потока. Это значит, что для асинхронных приложений threading.local не решит проблему: с точки зрения корутин мы продолжаем использовать «единую» глобальную переменную.


Асинхронное приложение

А если бы у нас был асинхронный аналог threading.local, то есть нечто, что может хранить состояние «выполняемой в данном контексте» задачи. В случае с asyncio некоторым аналогом тредов являются таски — инстансы asyncio.Task, исполняющие корутины поверх event-loop«а.

В используемом нами фреймворке aiohttp при выполнении конкретного запроса запускается много корутин, но по умолчанию все они будут выполняться в рамках одной asyncio.Task (если мы, конечно, не породим новые таски самостоятельно).

Кажется, что этим фактом можно воспользоваться: asyncio предоставляет интерфейс для получения текущей таски через asyncio.current_task, и мы можем сохранить контекст выполнения нашего запроса в ней через определение собственного атрибута:

import asyncio
_log_extra_key = '_log_extra'

def set_log_extra(log_extra, task=None):
    if task is None:
        task = asyncio.current_task()
    setattr(task, _log_extra_key, log_extra)

def get_log_extra(task=None):
    if task is None:
        task = asyncio.current_task()
    return getattr(task, _log_extra_key, None)

def log(text):
    extra_tskv = '\t'.join(f'{k}={v}' for k, v in get_log_extra().items())
    msg = f'text=""{text}""\t{extra_tskv}'
    print(msg)

async def feedback_handler_example():
    set_log_extra({'handler': 'some-handler', 'user_id': 'some-guid'})
    log(f'no explicit extra')

asyncio.run(feedback_handler_async())

# output:
# text="no explicit extra" handler=some-handler user_id=some-guid

Однако такой подход не будет работать, если при выполнении запроса мы запустим ещё одну таску. В качестве примера попробуем параллельно сделать запрос в два микросервиса:

# ...

async def call_foo():
    # ...
    log(f'service_foo')

async def call_bar():
    # ...
    log(f'service_bar')

async def feedback_handler_example():
    set_log_extra({'handler': 'some-handler', 'user_id': 'some-guid'})
    await asyncio.gather(
        call_foo(),
        call_bar(),
    )
    log(f'no explicit extra')

asyncio.run(feedback_handler_example())

# output:
# text="service_foo"
# text="service_bar"
# text="no explicit extra" handler=some-handler user_id=some-guid

Дело в том, что вызов asyncio.gather порождает новую таску, для которой никто не вызывал set_log_extra, и, как следствие, у неё нет атрибута, хранящего extra-информацию. К счастью, asyncio позволяет кастомизировать механизм создания тасок для event-loop«а при помощи loop.set_task_factory. Если при создании новой таски мы будем присваивать ей атрибут от родительской задачи, мы решим проблему «теряющегося» extra:

# ...

def log_extra_factory():
    default_task_factory = asyncio.Task

    @functools.wraps(default_task_factory)
    def custom_task_factory(loop, coro):
        child_task = default_task_factory(coro, loop=loop)
        if not loop.is_running():
            return child_task
        current_extra = get_log_extra()
        set_log_extra(current_extra, task=child_task)
        return child_task

    return custom_task_factory

async def feedback_handler_example():
    set_log_extra({'handler': 'some-handler', 'user_id': 'some-guid'})
    await asyncio.gather(
        call_foo(),
        call_bar(),
    )
    log(f'no explicit extra')

async def main():
    asyncio.get_event_loop().set_task_factory(log_extra_factory())
    await feedback_handler_example()

asyncio.run(main())

# output:
# text="fetch_data_from_db" handler=some-handler user_id=some-guid
# text="fetch_data_from_service" handler=some-handler user_id=some-guid
# text="no explicit extra" handler=some-handler user_id=some-guid

В целом мы уже получили достаточно интересное решение. Однако благодаря модулю contextvars, который появился в стандартной библиотеке Python 3.7, больше не нужно вручную задавать атрибуты тасок: желаемого результата можно добиться и без потенциально опасной работы с атрибутами.


Contextvars

В каком-то смысле контекстные переменные похожи на threadlocal-переменные: значение threading.local-переменной зависит от потока, а значение contextvars.ContextVar — от активного сейчас контекста (инстанса contextvars.Context), коих может быть несколько в рамках одного потока:

from contextvars import ContextVar, Context

ctx_var = ContextVar('ctx_var')

def get_name():
    # значение ctx_var будет зависеть от контекста
    return ctx_var.get()

for name in ['Alice', 'Bob']:
    ctx = Context()
    # ctx.run() запускает некоторый callable внутри контекста ctx
    # устанавливаем значение ctx_var внутри контекста ctx
    ctx.run(ctx_var.set, name)
    # запускаем get_name внутри ctx
    print(ctx.run(get_name))

# output:
# Alice
# Bob

У contextvars есть два важных свойства, которые помогут нам решить задачу с log_extra:


Встроенная интеграция с asyncio

Начиная с Python 3.7 методы Loop.call_{at,later,soon} (а также Future.add_done_callback), ответственные за выполнение корутин в рамках event-loop«а, принимают опциональный параметр context, в рамках которого потом будет выполняться корутина:

# asyncio pseudocode
def call_soon(self, callback, *args, context=None):
    if context is None:
        # use copy of current context by default
        context = contextvars.copy_context()
    # ... later
    context.run(callback, *args)

Кроме того, теперь каждая таска по умолчанию выполняет все корутины в «своём» контексте, который по умолчанию равен копии текущего контекста:

# asyncio pseudocode
class Task:
    def __init__(self, coro):
        ...
        # Get the current context snapshot.
        self._context = contextvars.copy_context()
        self._loop.call_soon(self._step, context=self._context)

    def _step(self, exc=None):
        ...
        # Every advance of the wrapped coroutine is done in
        # the task's context.
        self._loop.call_soon(self._step, context=self._context)
        ...

Другими словами:


  • Все корутины в рамках таски выполняются в едином контексте.
  • Дочерние таски выполняются в контексте-копии текущей таски. Дети имеют доступ ко всем контекстным переменным родителя, однако модификация происходит исключительно в рамках локального контекста, без влияния на соседей.

Вот как это выглядит на практике:

import asyncio, contextvars

ctx_var = contextvars.ContextVar('ctx_var')

async def main():
    ctx_var.set('aaa')
    print(f'ctx_var in main: {ctx_var.get()}')
    await same_task()
    print(f'ctx_var in main after same_task call: {ctx_var.get()}')
    await asyncio.Task(other_task())
    print(f'ctx_var in main after asyncio.Task(other_task()) call: {ctx_var.get()}')

async def same_task():
    print(f'ctx_var in same_task: {ctx_var.get()}')
    ctx_var.set('bbb')
    print(f'ctx_var in same_task after set: {ctx_var.get()}')

async def other_task():
    print(f'ctx_var in other_task before set: {ctx_var.get()}')
    ctx_var.set('ccc')
    print(f'ctx_var in other_task after set: {ctx_var.get()}')

asyncio.run(main())

# output:
# ctx_var in main: aaa
# ctx_var in same_task: aaa
# ctx_var in same_task after set: bbb
# ctx_var in main after same_task call: bbb  # (1)
# ctx_var in other_task before set: bbb  # (2)
# ctx_var in other_task after set: ccc
# ctx_var in main after asyncio.Task(other_task()) call: bbb  # (3)

Обратите внимание на три важных аспекта:


  1. Корутина same_task, выполняющаяся в одной таске с main, повлияла на значение ctx_var для них обеих, так как они выполняются в одном контексте.
  2. Корутина other_task, выполняющаяся в дочерней таске от main, унаследовала значение ctx_var, установленное в main.
  3. Однако модификация ctx_var внутри other_task не повлияла на значение внутри main, так как дочерняя таска копирует родительский контекст, что делает их независимыми.


Копирование контекста за O (1)

Поскольку каждое создание таски теперь приводит к копированию контекста, важно сделать копирование легковесным. Для этого модуль contextvars хранит элементы с помощью immutable-структуры данных HAMT (hash array mapped trie), для которой асимптотическая сложность копирования не зависит от размера контекста:

import contextvars, time
class Timer:
    def __enter__(self):
        self.start = time.perf_counter()
    def __exit__(self, *args):
        print(time.perf_counter() - self.start)

LOOP_COUNT = 1000
CTXVAR_COUNT = 42000

print(f'Context with {len(contextvars.copy_context())} variables')
with Timer():
    for _ in range(LOOP_COUNT):
        contextvars.copy_context()

cvars = [contextvars.ContextVar(f'cvar_{i}', default=i) for i in range(CTXVAR_COUNT)]
for v in cvars:
    v.set('somevalue')

print(f'Context with {len(contextvars.copy_context())} variables')
with Timer():
    for _ in range(LOOP_COUNT):
        contextvars.copy_context()
# output
# Context with 0 variables
# 0.00015302500000000108
# Context with 42000 variables
# 0.00015314099999999835

Поскольку нам больше не нужно вручную задавать атрибуты для тасок, мы можем избавиться от кастомной фактори — функции get_log_extra и set_log_extra становятся обёртками над методами контекстной переменной:

import asyncio
import functools
import contextvars

_log_extra_data = contextvars.ContextVar('_log_extra_data')

def set_log_extra(log_extra):
    _log_extra_data.set(log_extra)

def get_log_extra():
    return _log_extra_data.get()

def log(text):
    extra_tskv = '\t'.join(f'{k}={v}' for k, v in get_log_extra().items())
    msg = f'text="{text}"\t{extra_tskv}'
    print(msg)

async def call_foo():
    log(f'service_foo')

async def call_bar():
    log(f'service_bar')

async def feedback_handler_example():
    set_log_extra({'handler': 'some-handler', 'user_id': 'some-guid'})
    await asyncio.gather(
        call_foo(),
        call_bar(),
    )
    log(f'no explicit extra')

async def main():
    await feedback_handler_example()

asyncio.run(main())
# output
# text="service_foo"      handler=some-handler    user_id=some-guid
# text="service_bar"      handler=some-handler    user_id=some-guid
# text="no explicit extra"        handler=some-handler    user_id=some-guid

Выглядит сильно лучше. Однако перед внедрением в продакшен необходима пара финальных штрихов.

Важно помнить, что «копирование» контекста не влечёт за собой глубокого копирования объектов. Это значит, что изменение контекстных переменных мутабельных типов будет влиять на родительский контекст. Представим, что после обращения к стороннему сервису мы хотим дополнительно логировать response_id в качестве extra-информации:

async def external_call():
    # ...
    get_log_extra()['response_id'] = 'some_external_response_id'
    log('log from external call')

async def feedback_handler_example():
    set_log_extra({'handler': 'some-handler', 'user_id': 'some-guid'})
    await asyncio.Task(external_call())
    log('log from parent task ')

async def main():
    await feedback_handler_example()

asyncio.run(main())
# output
# text="log from external call"  handler=some-handler    user_id=some-guid       response_id=some_external_response_id
# text="log from parent task"     handler=some-handler    user_id=some-guid       response_id=some_external_response_id

Видно, что дочерняя таска повлияла на родителя. Зачастую такое поведение нежелательно, лучше модифицировать экстру через глубокое копирование:

import copy

# ...

def get_log_extra(should_copy=False):
    extra = _log_extra_data.get()
    if should_copy:
        return copy.deepcopy(extra)
    return extra

def update_log_extra(update):
    current_extra = get_log_extra(should_copy=True)
    set_log_extra({**current_extra, **update})

async def external_call():
    # ...
    update_log_extra({'response_id': 'some_external_response_id'})
    log('log from external call')

async def feedback_handler_example():
    set_log_extra({'handler': 'some-handler', 'user_id': 'some-guid'})
    await asyncio.Task(external_call())
    log(' log from parent task)

async def main():
    await feedback_handler_example()

asyncio.run(main())
# output
# text="log from external call"  handler=some-handler    user_id=some-guid       response_id=some_external_response_id
# text="log from parent task"     handler=some-handler    user_id=some-guid


Контекстные переменные и тредпул

Периодически при написании асинхронных приложений требуется отдать какую-то функцию на выполнение в тредпул (например, из-за отсутствия асинхронного аналога у библиотеки). Здесь нас ждёт неприятный сюрприз: потоки из пула по умолчанию работают в «собственном» контексте, в котором может и не быть созданных в «основном» потоке переменных:

import asyncio
import functools
import contextvars

_log_extra_data = contextvars.ContextVar('_log_extra_data')

# ...

def log(text):
    extra_tskv = '\t'.join(f'{k}={v}' for k, v in get_log_extra().items())
    msg = f'text="{text}"\t{extra_tskv}'
    print(msg)

async def feedback_handler_example():
    set_log_extra({'handler': 'some-handler', 'user_id': 'some-guid'})
    await asyncio.get_event_loop().run_in_executor(None, log, 'some-info')
    log(f'no explicit extra')

async def main():
    await feedback_handler_example()

asyncio.run(main())
# output:
# Traceback (most recent call last):
# ...
#   File "contextvars_threadpool.py", line 11, in get_log_extra
#     return _log_extra_data.get()
# LookupError: 

В потоке из пула переменной _log_extra_data ни разу не присваивалось значение, из-за чего метод get() поднимает исключение LookupError. Для нашей задачи такое поведение кажется сомнительным: по умолчанию хотелось бы, чтобы потоки в пуле вели себя так же, как и дочерние таски, т. е. наследовали контекст родителя. Определим свой собственный пул, выполняющий target-функцию в копии текущего контекста, и зададим его пулом по умолчанию для event-loop«а:

import asyncio
import concurrent.futures
import contextvars
import functools
import typing

class CtxCopyThreadPoolExecutor(concurrent.futures.ThreadPoolExecutor):
    def submit(
            self, fn: typing.Callable, *args, **kwargs,
    ) -> concurrent.futures.Future:
        ctx = contextvars.copy_context()
        return super().submit(
            ctx.run,
            functools.partial(fn, *args, **kwargs),
        )

# ...
async def feedback_handler_example():
    set_log_extra({'handler': 'some-handler', 'user_id': 'some-guid'})
    await asyncio.get_event_loop().run_in_executor(None, log, 'some-info')
    log(f'no explicit extra')

async def main():
    asyncio.get_event_loop().set_default_executor(CtxCopyThreadPoolExecutor())
    await feedback_handler_example()

asyncio.run(main())
# text="from threadpool"        handler=some-handler    user_id=some-guid
# text="no explicit extra"      handler=some-handler    user_id=some-guid


Итоги

На практике внедрение «автоматического» extra привело к следующему: мы больше не теряем логи при расследовании инцидентов в продакшене, не таскаем ненужный аргумент сквозь сигнатуры методов и не тратим силы на отлов его отсутствия на ревью.

Некоторые сомневаются насчёт контекстных переменных: фактически они представляют собой глобальные переменные на стероидах, ещё и с магией в виде неявного копирования контекста между корутинами.

Мы считаем, что практическая польза этого инструмента перевешивает идеологические недостатки, однако будем рады услышать ваши рассуждения на эту тему в комментариях.

© Habrahabr.ru