CancellationTokenSource и «утечки памяти»

77fbf829b7583edb1ea1b45991fc0a1f.png

Всем привет!

Я работаю в Тинькофф, в проекте «Тинькофф Телефония». Наша основная задача — организация IP-телефонии внутри компании и за ее пределами. В день через нас проходит несколько миллионов звонков. Всей этой кухней занимаются около 10 служб. 

В какой-то момент мы столкнулись с проблемой: во время релиза сервисы останавливаются неприлично долго. Нас это печалило, поэтому мы приняли решение прокидывать везде и всюду CancellationToken. 

Чуть позже столкнулись с другой проблемой: все наши сервисы начали потихоньку пожирать доступную память. Не так чтобы слишком сильно и быстро, но в целом в перспективе нескольких недель — заметно. 

Медленное и уверенное пожирание памяти на протяжении недели.Медленное и уверенное пожирание памяти на протяжении недели.

В один прекрасный день терпение лопнуло, и мы с Женей Мишустиным и Кириллом Марковым накинулись на эту проблему: Женя с Кириллом ловили и анализировали дампы, а я занимался поиском причин такого поведения.

Особенно отчетливо выделялся один сервис, который запускался по расписанию и выполнял обработку записанных звонков. Так как таких записей довольно много, то их анализ и обработка затягивались на длительное время. А после завершения память освобождалась. При этом нельзя сказать, что внутри создавалось какое-то большое количество объектов для работы.

Потребление памяти при выполнении одной итерации обработки записанных звонков. GC высвобождает около 6 Гб памяти.Потребление памяти при выполнении одной итерации обработки записанных звонков. GC высвобождает около 6 Гб памяти.

А пару месяцев назад началась беда: один из сервисов, который запускает по расписанию некоторые задачи, внезапно стал неприлично прожорлив! Он мог за довольно короткий промежуток времени захватить 2—5 ГБ. И тут стало ясно, что пришло время стряхнуть пыль с dotMemory и WinDbg!  

Спойлер для тех, кому лень читать и разбираться: в CancellationTokenSource нет утечек — есть особенности его использования.

Начали снимать дампы, смотреть. Во многих была примерно одинаковая картина: дамп весит около 5 ГБ, а dotMemory упорно показывает размер куч до 1—2 ГБ.

e74bd727c53024df050c879635dd6093.png6fb6352920d94076107c900a3bae75b0.pngРазмер этого дампа около 3,5 Гб.Размер этого дампа около 3,5 Гб.

Ну, вроде все нормально, ничего критичного. Но дамп-то огромный! Чем занята остальная память? Думали, что это серверный режим работы GC (описание режимов, настройка). Меняли, ждали — не помогает. 

Женя уже долгое время наблюдал за неуправляемой памятью сервисов, и ее размер не давали ему покоя: он мог достигать 8—10 ГБ. Почему она так растет?

Размер управляемой и «не очень» памяти.Размер управляемой и «не очень» памяти.

Узнав о проблеме, наш руководитель поделился опытом: в смежном проекте было подобное — там неуправляемая память была забита строками от сторонней библиотеки. А что же может быть у нас? Мы пользуемся неуправляемым клиентом Oracle и еще одной утилитой, которая тоже может лезть в unmanged-код. Но с ними никаких проблем не было. Мы их не меняли, не обновляли… А вдобавок еще и dotMemory вот так просто не хочет нам показывать содержимое неуправляемой памяти.Настало время другого героя WinDbg. Немного поболтав с Google, я нашел статейку: Investigating issues with Unmanaged Memory. First steps, где по шагам расписывается, как заглянуть в зазеркалье — в неуправляемую память. Почитали и сделали. Картина удивила:

871c8af089e63eada76344b326f59524.png3911df6f80a7d93ca7e6abe7a3873639.png

Как понимаете, внутри ошметки SQL-запросов, ORA-ошибок и так далее. Напрашивается только один вывод:

4730b7e7836b57179369966cbe3e11a7.jpg

Каждый раз смотреть таким образом неуправляемую память — ну очень долго и неудобно. Тут Женя вспомнил про книгу Windows Internals, которую не так давно прочел, где рассказывается про VMMap. VMMap позволяет заглянуть в неуправляемую память на живом процессе и посмотреть, что за строки там хранятся.

Это пример с исправленной версии. Но поверьте: в большой куче было много строк, связанных с проблемным кодом.Это пример с исправленной версии. Но поверьте: в большой куче было много строк, связанных с проблемным кодом.

Как я уже говорил, во всех снятых исследуемых снимках размер управляемых куч был невелик. Но ответа на вопрос, почему неуправляемая память не освобождается, мы так и не нашли. Однажды удалось снять дамп, где по куче еще не успел пройтись GC.

2d8c8118fa78c87c04401bf6ce202c4d.png

Сразу внимание привлекли две области памяти: список объектов на 600 МБ и 17 экземпляров CancellationTokenSource на 3,5 ГБ.

93d09aa57e889df8ab4f37af6c794cec.pngНевероятных размеров токен отмены.Невероятных размеров токен отмены.

Что ж, есть два вопроса:

  1. Почему столько объектов в списке?

  2. Почему такой огромный CancellationTokenSource?

С первой областью памяти разобрались довольно быстро и просто, но это будет неинтересно читателю, так как касалось бизнеса.

А вот что касается второй — тут начинается самое интересное. Давайте попробуем разобраться, что же хранит CancellationTokenSource и как это туда попадает? И начнем, как и полагается, с теории.

CancellationTokenSource — это

Signals to a CancellationToken that it should be canceled.

Если говорить простым языком, то эта штука создает CancellationToken, который можно использовать для отмены выполнения операции. Обычно мы проверяем состояние токена отмены, и если он отменен, то прерываем выполнение или кидаем исключение. Сам токен перевести в отменённое состояние может его родитель (Source). Все просто. 

Но как работает эта связка? Почему в итоге у нас один CancellationTokenSource занял более 3 ГБ? Давайте заглянем внутрь, что же там такое хранится?

05f10e1ac4936eef2e022b47a268fe62.png

В самом Source хранится массив CancellationCallbackInfo с информацией о каком-то действии, которое необходимо вызвать при срабатывании отмены. Основной объект, для которого нужно вызвать метод, — это… OracleCommand. Если посмотреть все остальные объекты массива, то все они выглядят одинаково. А что лежит во втором по величине CancellationTokenSource?

5a09d6308b4e5b708dc126240cbc386a.png

Все то же самое, только запрос другой. Теперь осталось понять, как какой-то OracleCommand, с которым напрямую никто не работает, попадает в 

m_registeredCallbacksLists -> CancellationCallbackInfo -> StateForCallback –> _target

Вряд ли есть какая-то магия, но пока непонятно. Я решил начать с исследования исходников CancellationTokenSource, а конкретно — найти место, где создается этот самый CancellationCallbackInfo и StateForCallback. Кстати, StateForCallback — это делегат, как видно из двух предыдущих изображений, а значит, _target — это какое-то внутреннее поле, где хранится объект, метод которого и «сохранен» в делегате. Получается, что где-то какой-то метод из OracleCommand передается в качестве делегата. Ищем, где создается CancellationCallbackInfo и StateForCallback. Оказалось — в CancellationTokenSource.InternalRegister.

Сам метод InternalRegister вызывается из нескольких перегрузок CancellationToken.Register. Согласно документации, этот метод предназначен для:

Registers a delegate that will be called when this CancellationToken is canceled.

С этим все ясно, но где происходит регистрация? Ответ на этот вопрос решил искать в лоб — прямыми переходами по реализациям вызываемых методов, куда передается CancellationToken. Мой поиск начался с первого метода, который выходит за пределы нашего кода:

System.Data.Entity.QueryableExtensions.ToListAsync(query, cancellationToken.Value).

Далее следует скучное «протыкивание» по вызовам. Сразу перейду к интересному. В какой-то момент IDE выдала список реализаций метода, но Oracle внутри не было.

d4badfd733dd8404f4dafda28c4c6526.png

Напомню: у нас используется неуправляемый клиент с управляемой оберткой. В списке ничего похожего на слово Oracle нет, ну да ладно. Посмотрим, как сделано у других — например, в Postgre-клиенте:

public override Task ReadAsync(CancellationToken cancellationToken)
{
    CheckClosed();

    if (cancellationToken.IsCancellationRequested)
        return Task.FromCanceled(cancellationToken);

    var fastRead = TryFastRead();
    if (fastRead.HasValue)
        return fastRead.Value ? PGUtil.TrueTask : PGUtil.FalseTask;

    using (NoSynchronizationContextScope.Enter())
        return Read(true);
}

Что, неужели конец? Но ведь ответа на вопрос, как происходит регистрация, я не нашел. Решил поискать вызов метода Register во всех доступных символах и исходниках в проекте. И… Кажется, получилось.

Результаты поиска по проекту вызова метода регистрации.Результаты поиска по проекту вызова метода регистрации.

Oracle-клиента я все равно не нашел, но нашел очень похожее в Postgre. Скорее же смотреть, что там происходит:

async ValueTask ExecuteReaderAsync(CommandBehavior behavior, bool async, CancellationToken cancellationToken)
{
    var connector = CheckReadyAndGetConnector();
    connector.StartUserAction(this);
    try
    {
        using (cancellationToken.Register(cmd => ((NpgsqlCommand)cmd!).Cancel(), this))
        {
            ValidateParameters(connector.TypeMapper);

            switch (IsExplicitlyPrepared)
            {
            case true:
                Debug.Assert(_connectorPreparedOn != null);
                if (_connectorPreparedOn != connector)
                {
                    // The command was prepared, but since then the connect
.......

Метод довольно большой, но токен отмены используется только в одном месте — в самом начале. То есть мы регистрируем метод отмены «команды». Значит, нечто подобное должно быть и в Oracle-клиенте. Но поиск не давал никакой информации. 

Выход один — декомпиляция. После нескольких кликов получил «исходный» код на C#. Но вот опять проблема: внутри — ни слова ни про Task, ни про Async, ни про CancellationToken. То есть клиент оказался абсолютно синхронным (вот это открытие). Однако, если внимательнее поискать, слово Cancel все-таки встречается:

public override void Cancel()
{
    int errCode = 0;
  
    if (ConfigBaseClass.m_TraceLevel != 0)
        OraTrace.Trace(1U, " (ENTRY) OracleCommand::Cancel()\n");
    
    if (this.m_connection == null)
        throw new InvalidOperationException();
  
    this.CheckConStatus();
    try
    {
.......

Еще пара кликов приводит нас в базовый класс DbCommand и в метод ExecuteDbDataReaderAsync:

/// Большой интересный комментарий
protected virtual Task ExecuteDbDataReaderAsync(
    CommandBehavior behavior,
    CancellationToken cancellationToken)
{
    if (cancellationToken.IsCancellationRequested)
        return ADP.CreatedTaskWithCancellation();

    CancellationTokenRegistration tokenRegistration = new CancellationTokenRegistration();

    if (cancellationToken.CanBeCanceled)
        tokenRegistration = cancellationToken.Register(new Action(this.CancelIgnoreFailure));

    try
    {
        return Task.FromResult(this.ExecuteReader(behavior));
    }
    catch (Exception ex)
    {
        tokenRegistration.Dispose();
        return ADP.CreatedTaskWithException(ex);
    }
}

И вот она — регистрация!

tokenRegistration = cancellationToken.Register(new Action(this.CancelIgnoreFailure));

Но почему я не попал сюда в самом начале? Какой же все-таки стек вызова? Точки останова и IDE нам в помощь. Выглядит это примерно так:

Длинный stack trace
IDbAsyncEnumerableExtensions.ToListAsync()
async IDbAsyncEnumerableExtensions.ForEachAsync()
LazyAsyncEnumerator.MoveNextAsync()
LazyAsyncEnumerator.FirstMoveNextAsync()
AsyncTaskMethodBuilder.Start.d__9>()
LazyAsyncEnumerator.d__9.MoveNext()
ObjectQuery..GetAsyncEnumerator>b__32_0()
ObjectQuery.GetResultsAsync()
ObjectQuery.GetResultsAsync()
AsyncTaskMethodBuilder>.Start.d__43>()
ObjectQuery.d__43.MoveNext()
DefaultExecutionStrategy.ExecuteAsync>()
ObjectQuery.<>c__DisplayClass43_0.b__0()
ObjectContext.ExecuteInTransactionAsync>()
AsyncTaskMethodBuilder>.Startd__156>>()
ObjectContext.d__156>.MoveNext()
ObjectQuery.<>c__DisplayClass43_0.b__1()
ObjectQueryExecutionPlan.ExecuteAsync()
AsyncTaskMethodBuilder>.Startd__10>()
ObjectQueryExecutionPlan.d__10.MoveNext()
EntityCommandDefinition.ExecuteStoreCommandsAsync()
AsyncTaskMethodBuilder.Startd__26>()
EntityCommandDefinition.d__26.MoveNext()
DbCommand.ExecuteReaderAsync()
InterceptableDbCommand.ExecuteDbDataReaderAsync()
DbCommandDispatcher.ReaderAsync()
InternalDispatcher.DispatchAsync, System.Data.Common.DbDataReader>()
DbCommandDispatcher.<>c.b__9_0()
DbCommand.ExecuteReaderAsync()
DbCommand.ExecuteDbDataReaderAsync()
CancellationToken.Register()

Довольно далеко и неявно. Главное, что мы разобрались!

Выводы

  1. Проверять, отменен ли CancellationToken, и кидать исключение или каким-то образом завершать цикл.

  2. Регистрировать свой метод отмены или завершения работы в CancellationToken. Регистрация происходит через передачу делегата, который содержит в себе ссылку на объект.

В нашем случае использовался второй способ. В самом объекте OracleCommand хранится SQL-запрос и ответ. Детально работу неуправляемого клиента я не изучал, но наверняка выделяется некоторый участок памяти под ответ сервера БД. Схема работы такая:

  1. Создаем CancellationTokenSource.

  2. Начинаем цикл обработки записей звонков.

    1. Создаем OracleCommand с новым запросом.

    2. Передаем CancellationToken OracleCommand при выполнении очередного запроса.

    3. OracleCommand регистрирует свой метод Cancel () в CancellationToken, сохраняя ссылку на себя в делегате.

    4. Выполняется запрос, выделяется память под ответ.

    5. Выполняется полезная нагрузка для очередной записи.

  3. Через N итераций завершаем цикл.

  4. Завершаем процесс обработки.

  5. Вызываем завершение CancellationTokenSource.

  6. Вызывается отмена для КАЖДОЙ регистрации OracleCommand.

Или вот еще пример «бесконечного» цикла, где каждые 15 секунд из БД достаются записи и проверяются на актуальность:

private async Task UpdateRuleAsync(CancellationToken ct)
{
    do
    {
    	if (ct.IsCancellationRequested)
    	{
    		return;
    	}
    	//Выбираем из БД все правила по названию сервиса (5-10 записей).
    	var daos = await _repository.FindAllAsync(x => x.ServiceName.Equals(_serviceName, StringComparison.OrdinalIgnoreCase), innerCts.Token).ConfigureAwait(false);
    	var rules = daos.Select(x => new ManualRuleDto(x)).ToArray();

    	//Если правила изменились, то применяем их
    	if (!oldRules.IsEqual(rules))
    	{
    		oldRules = rules;
    		Update(rules);
    	}
    
    	await Task.Delay(15000, ct).ConfigureAwait(false);
    } while (!ct.IsCancellationRequested);
}

То есть в CancellationTokenSource будут храниться все объекты, которые зарегистрировали свой собственный метод отмены/завершения, до окончания работы с этим CancellationTokenSource.

Рекомендации

  1. Не используйте ОДИН CancellationTokenSource, если необходимо совершить гарантированно короткие операции большое количество раз, например в цикле.

  2. Используйте CancellationTokenSource для детерминированных операций, которые должны закончиться в какое-то определенное время.

  3. А если очень нужно отказаться от п. 1, то используйте для каждой итерации свой CancellationTokenSource и связывайте его с основным через CreateLinkedTokenSource.

  4. Неуправляемый Oracle Client — полностью синхронный внутри.

Надеюсь, было не очень скучно ;)

2677dc522bf20b4e9941145dbd0a1fa1.png

© Habrahabr.ru