[Из песочницы] AsyncCollections: история одного велосипеда

С давних времён я был большим поклонником System.Collections.Concurrent и BlockingCollection в особенности. Сколько раз это чудо инженерной мысли выручало в самых разнообразнейших ситуациях — не счесть.С чуть менее древних времён в обиход прочно вошли async/await. Казалось бы, жизнь прекрасна, но есть одно «но»: асинхронный код миксовать с блокирующим кодом как-то не очень-то хочется. А BlockingCollection, как несложно догадаться (хотя бы из названия), в ряде случаев поток блокирует.

Ложный след: Nito.AsyncExОднажды я наткнулся упоминание библиотеки Nito.AsyncEx за авторством Stephen Cleary, в которой нашёлся класс с интригующим названием AsyncCollection. Однако, взглянув что у него находится под капотом, я остался в некотором недоумении: там оказался AsyncLock из этой же библиотеки, навешенный на все действия над обёрнутой IProducerConsumerCollection. AsyncLock, в свою очередь, активно юзает самые обыкновенные lock-и и тонкий слой магии, распутывать который мне внезапно расхотелось. Даже если эта реализация делает то что заявлено, она выглядит несколько навороченно, монструозно и, возможно, не очень оптимально. Неужели нельзя решить эту задачу более аккуратно? Все мы знаем чем чреваты подобные мысли. Visual Studio, New project…

AsyncQueue Для начала определимся что мы вообще хотим от нашей асинхронной коллекции. В качестве отправной точки можно взять следующий интерфейс: public interface IAsyncCollection: IEnumerable { int Count { get; } void Add (T item); Task TakeAsync (); } Кроме того, для простоты остановимся на том, что наша коллекция — это очередь. Почему именно очередь? Да примерно по той же причине, по которой очередь по дефолту используется в BlockingCollection.Дальше следует напряжённая работа мысли, связанная с попыткой определить возможные состояния нашей коллекции. На первый взгляд их может быть 3 штуки:

1. Элементов в коллекции нет, но были вызовы TakeAsync (), Task-и которых нужно завершить, когда элементы появятся (для простоты и краткости, далее я буду называть их awaiter-ами). В этом случае:

Awaiter-ы явно нужно где-то хранить. Напрашивается очередь, конкретнее — ConcurrentQueue. Если происходит вызов TakeAsync (), у нас появляется новый awaiter, закидываем его в очередь awaiter-ов. Если происходит вызов Add (), у нас появляется новый элемент, с помощью которого можно мгновенно взять один из awaiter-ов и завершить его. 2. Awaiter-ов нет, но были вызовы Add (). Ситуация полностью симметрична предыдущей: Элементы нужно где-то хранить. Где? В ConcurrentQueue, где же ещё. Если происходит вызов Add (), появляется новый элемент, закидываем его в очередь элементов. Если происходит вызов TakeAsync (), появляется новый awaiter, который можно мгновенно завершить, забрав верхний элемент из очереди. 3. Обе очереди — и очередь awaiter-ов, и очередь элементов — пустые. В зависимости от следующего действия переходим либо в состояние 1, либо в состояние 2: Если происходит вызов Add (), появляется новый элемент, пытаемся взять для него awaiter из очереди, там пусто, пытаемся добавить его в очередь элементов… В этот самый момент происходит вызов TakeAsync (), появляется новый awaiter, пытаемся взять для него элемент из очереди, там пока что пусто, пытаемся добавить его в очередь awaiter-ов… Упс. Мы всё сломали: awaiter и элемент сидят в разных очередях и ждут друг друга. Что делать? Развешивать локи не хочется, мы не для того ушли от напичканной ими реализации из Nito.AsyncEx. Что в таких случаях делают всякие ConcurrentQueue? Понимают, что прямо сейчас в соседнем потоке происходит операция, которая вот-вот завершится и после которой мы сможем сделать что-нибудь полезное, создают SpinWait и крутятся в ожидании. Попробуем воспроизвести эту идею у нас. Нужно: понять в каком состоянии мы находимся (1 или 2) одновременно отрапортовать, что мы начали свою операцию, будь то добавление awaiter-а или добавление элемента в зависимости от состояния либо добавить awaiter/элемент в очередь, либо покрутиться, пока в противоположную очередь не добавят элемент/awaiter, который мы тут же заберём Первые два требования очень уж сильно напоминают работу класса Interlocked; для хранения состояния можно использовать что-то типа баланса очередей: TakeAsync () атомарно уменьшает баланс на единицу, Add () атомарно же его увеличивает. И по значению баланса, которое вернёт Interlocked.Increment/Interlocked.Decrement, можно узнать о том, что грядёт новый элемент/awaiter, ещё до того, как он появится в соответствующей очереди. Довольно болтовни, попробуем закодить всё вышеперечисленное: public class AsyncQueue: IAsyncCollection { private ConcurrentQueue _itemQueue = new ConcurrentQueue(); private ConcurrentQueue> _awaiterQueue = new ConcurrentQueue>();

// _queueBalance < 0 means there are free awaiters and not enough items. // _queueBalance > 0 means the opposite is true. private long _queueBalance = 0; public void Add (T item) { long balanceAfterCurrentItem = Interlocked.Increment (ref _queueBalance);

if (balanceAfterCurrentItem > 0) { // Items are dominating, so we can safely add a new item to the queue. _itemQueue.Enqueue (item); } else { // There’s at least one awaiter available or being added as we’re speaking, so we’re giving the item to it.

TaskCompletionSource awaiter; SpinWait spin = new SpinWait ();

while (!_awaiterQueue.TryDequeue (out awaiter)) spin.SpinOnce ();

awaiter.SetResult (item); } }

public Task TakeAsync () { long balanceAfterCurrentAwaiter = Interlocked.Decrement (ref _queueBalance);

if (balanceAfterCurrentAwaiter < 0 ) { // Awaiters are dominating, so we can safely add a new awaiter to the queue.

var taskSource = new TaskCompletionSource(); _awaiterQueue.Enqueue (taskSource); return taskSource.Task; } else { // There’s at least one item available or being added, so we’re returning it directly.

T item; SpinWait spin = new SpinWait ();

while (!_itemQueue.TryTake (out item)) spin.SpinOnce ();

return Task.FromResult (item); } } } Тестируем, с удивлением обнаруживаем, что оно вроде бы даже работает. Победа? С одной стороны, да, с другой, разогнавшийся творческий порыв так просто не остановить…Полезные (и не очень) плюшки Посмотрим внимательно на то что у нас получилось. Синхронный Add (), асинхронный TakeAsync ()… Стоп, асинхронный метод без возможности его отмены? Непорядок. Исправляем.Во-первых, при отмене CancellationToken-а нужно немедленно отменить соответствующий таск:

public Task TakeAsync (CancellationToken cancellationToken) { // …

if (balanceAfterCurrentAwaiter < 0 ) { var taskSource = new TaskCompletionSource(); _awaiterQueue.Enqueue (taskSource);

cancellationToken.Register ( state => { TaskCompletionSource awaiter = state as TaskCompletionSource; awaiter.TrySetCanceled (); }, taskSource, useSynchronizationContext: false);

return taskSource.Task; } else { // … } } Во-вторых, выковырять отменённый awaiter откуда-то из середины очереди мы явно не сможем, поэтому нужно научить Add () пропускать оный отменённый awaiter. Баланс при этом волшебным образом поддерживается автоматически: private bool TryAdd (TItem item) { long balanceAfterCurrentItem = Interlocked.Increment (ref _queueBalance);

if (balanceAfterCurrentItem > 0) { _itemQueue.Enqueue (item); return true; } else { TaskCompletionSource awaiter; SpinWait spin = new SpinWait ();

while (!_awaiterQueue.TryDequeue (out awaiter)) spin.SpinOnce ();

// Returns false if the cancellation occurred earlier. return awaiter.TrySetResult (item); } }

public void Add (TItem item) { while (! TryAdd (item)) ; } В-третьих, старый метод TakeAsync () (который без CancellationToken) вообще можно вынести в extension к интерфейсу IAsyncCollection: public interface IAsyncCollection: IEnumerable { int Count { get; } void Add (T item); Task TakeAsync (CancellationToken cancellationToken); }

public static class AsyncCollectionExtensions { public static Task TakeAsync(this IAsyncCollection collection) { return collection.TakeAsync (CancellationToken.None); } } Кстати, про IAsyncCollection. Если присмотреться, то наша реализация AsyncQueue не обязана быть прибита гвоздями к ConcurrentQueue, для хранения элементов подойдёт любая потокобезопасная IProducerConsumerCollection. Например, ConcurrentStack. Поэтому можно сделать вот так: public class AsyncCollection: IAsyncCollection where TItemQueue: IProducerConsumerCollection, new () { private TItemQueue _itemQueue = new TItemQueue (); private ConcurrentQueue> _awaiterQueue = new ConcurrentQueue>();

// … }

public class AsyncQueue: AsyncCollection> { }

public class AsyncStack: AsyncCollection> { } С одной стороны, хотелось бы не плодить type parameter-ы, а просто принимать IProducerConsumerCollection в конструкторе, но вот беда: нам могут подсунуть коллекцию, на которую уже ссылаются снаружи и в которую могут снаружи же понапихать элементов (или, что ещё хуже, забрать часть наших элементов), там самым разрушив синхронизацию между реальным состоянием коллекции и запомненным балансом. С factory method та же проблема, так что коллекцию придётся создавать самим.Бенчмарки! Настало время померить скорость работы нашего велосипеда. Для прогона бенчмарков есть пакет BenchmarkDotNet, реализующий кучу мелких деталей, которые желательно учитывать при прогоне бенчмарков, так что его и заюзаем. Общая идея бенчмарка заключается в следующем: class AsyncQueueBenchmark { private const int _consumerThreadCount = 3; private const int _producerThreadCount = 3; private const int _itemsAddedPerThread = 10000; private const int _itemsAddedTotal = _producerThreadCount * _itemsAddedPerThread; private IAsyncCollection _currentQueue; private CancellationTokenSource _cancelSource; private int _itemsTaken;

// Выполнение этого метода будет измеряться private void DdosCurrentQueue () { _consumerTasks = Enumerable.Range (0, _consumerThreadCount) .Select (_ => Task.Run (() => RunConsumerAsync ())) .ToArray ();

_producerTasks = Enumerable.Range (0, _producerThreadCount) .Select (_ => Task.Run (() => RunProducer ())) .ToArray ();

Task.WaitAll (_producerTasks); Task.WaitAll (_consumerTasks); }

private async Task RunConsumerAsync () { try { CancellationToken cancelToken = _cancelSource.Token;

while (_itemsTaken < _itemsAddedTotal && !cancelToken.IsCancellationRequested ) { int item = await _currentQueue.TakeAsync( cancelToken ); int itemsTakenLocal = Interlocked.Increment( ref _itemsTaken );

if (itemsTakenLocal >= _itemsAddedTotal) { _cancelSource.Cancel (); break; } } } catch (OperationCanceledException) { } }

private void RunProducer () { for (int i = 0; i < _itemsAddedPerThread; i++ ) { int item = 42; _currentQueue.Add( item ); } } Т.е. просто берём фиксированную пачку элементов, фигачим их в очередь в несколько потоков, параллельно в несколько же потоков эту очередь разгребаем, засекаем сколько времени на это уйдёт. Подсовываем разные реализации IAsyncCollection, сравниваем. В забеге участвуют:1. Свеженавелосипеденный AsyncQueue2. Nito.AsyncEx.AsyncCollection в следующем виде:

class NitoAsyncCollectionAdapter: IAsyncCollection { private Nito.AsyncEx.AsyncCollection _collection;

public NitoAsyncCollectionAdapter () { _collection = new Nito.AsyncEx.AsyncCollection(); }

#region IAsyncCollection Members

public void Add (T item) { _collection.Add (item); }

public Task TakeAsync (System.Threading.CancellationToken cancellationToken) { return _collection.TakeAsync (cancellationToken); }

#endregion } 3. BlockingCollection (ну как же не сравнить с ней) в виде: class BlockingCollectionAdapter: IAsyncCollection { private BlockingCollection _collection;

public BlockingCollectionAdapter () { _collection = new BlockingCollection(); }

#region IAsyncCollection Members

public void Add (T item) { _collection.Add (item); }

public Task TakeAsync (System.Threading.CancellationToken cancellationToken) { T item = _collection.Take (cancellationToken); return Task.FromResult (item); }

#endregion } Результаты: HellBrick.AsyncCollections.AsyncQueue: 1ms | Stats: MedianTicks= 3368, MedianMs= 1, Error=06.34% Nito.AsyncEx.AsyncCollection: 12ms | Stats: MedianTicks=40503, MedianMs=12, Error=31.36% System.Concurrent.BlockingCollection: 2ms | Stats: MedianTicks= 7222, MedianMs= 2, Error=38.82% Интуитивная оценка Nito.AsyncEx.AsyncCollection не подвела: это действительно монструозная тормознутая хрень. Но самое интересное: нам удалось обогнать BlockingCollection по производительности и при этом обойтись без блокирования потоков. Win! Открываем тортик или любую другую бонусную вкусняшку и едем дальше.AsyncBatchQueue Мне периодически приходилось использовать небольшой враппер над BlockingCollection, который принимал на вход одиночные элементы и отдавал их пачками определённого размера. При этом, если за определённое время нужное количество элементов так и не набралось, срабатывал таймер и делал принудительный flush того что мы успели набрать. Кто хочет асинхронную версию подобной штуки? Я хочу.Для начала обойдёмся без таймера и ручного flush-а. Собранные пачки элементов логично хранить и отдавать средствами нашей новой AsyncQueue:

public class AsyncBatchQueue { private int _batchSize; private Batch _currentBatch; private AsyncQueue> _batchQueue = new AsyncQueue>();

public AsyncBatchQueue (int batchSize) { _batchSize = batchSize; _currentBatch = new Batch (this); }

public void Add (T item) { SpinWait spin = new SpinWait ();

while (!_currentBatch.TryAdd (item)) spin.SpinOnce (); }

public Task> TakeAsync (CancellationToken cancellationToken) { return _batchQueue.TakeAsync (cancellationToken); }

private class Batch: IReadOnlyList { private AsyncBatchQueue _queue; // ?

public Batch (AsyncBatchQueue queue) { _queue = queue; }

public bool TryAdd (T item) { // ? } } } Что здесь происходит: в методе Add нужно попытаться додбавить элемент в текущий batch и, если мы его заполнили, за-flush-ить его в _batchQueue. При этом вполне возможна ситуация, когда другой поток нас опередил, в данный момент занимается добавлением/flush-ем, но при этом ещё не успел записать в _currentBatch ссылку на новый (пустой) batch. Отсюда старый добрый SpinWait.Основная магия будет в nested классе Batch, идея которого самым наглым образом позаимствована из реализации ConcurrentQueue (кстати, если кто не читал исходники, рекомендую ознакомиться: там есть много интересного). Идея эта заключается в следующем:

Элементы храним в обычном массиве, благо размер мы знаем заранее Проблемы с concurrency решаются с помощью Interlocked.Increment поля, где хранится индекс последнеего вставленного элемента Если поток захватил последний слот массива, то на него [поток, не слот] возлагается ответственность по выполнению flush-а текущего batch-а Если поток захватил слот, выходящий за границы массива, то нам не повезло: этот batch уже заполнен и потоку нужно крутиться в ожидании нового Выглядит это как-то так. (Осторожно, код пока нежизнеспособен! Чуть позже расскажу почему.) private class Batch: IReadOnlyList { private AsyncBatchQueue _queue; private T[] _items; private int _lastReservationIndex = -1; private int _count = -1;

public Batch (AsyncBatchQueue queue) { _queue = queue; _items = new T[ _queue._batchSize ]; }

public bool TryAdd (T item) { int index = Interlocked.Increment (ref _lastReservationIndex);

// The following is true if someone has beaten us to the last slot and we have to wait until the next batch comes along. if (index >= _queue._batchSize) return false;

// The following is true if we’ve taken the last slot, which means we’re obligated to flush the current batch and create a new one. if (index == _queue._batchSize — 1) FlushInternal (_queue._batchSize);

_items[ index ] = item; return true; }

private void FlushInternal (int count) { _count = count; _queue._currentBatch = new Batch (_queue); _queue._batchQueue.Add (this); } } Дальше было бы неплохо всё же реализовать IReadOnlyList. Тут всплывает один нюанс: никто не гарантирует, что когда мы за-flush-или batch, все элементы массива заполнены реальными данными. Поток, схвативший последний элемент, мог просто оказаться быстрее. Напрашивается решение: для каждого слота массива хранить флаг, определяющий, можно ли читать соответствующее значение. private class Batch: IReadOnlyList { // … private bool[] _finalizationFlags;

public Batch (AsyncBatchQueue queue) { // … _finalizationFlags = new bool[ _queue._batchSize ]; }

public bool TryAdd (T item) { // … _items[ index ] = item; _finalizationFlags[ index ] = true;

return true; }

public T this[ int index ] { get { if (index >= _count) throw new IndexOutOfRangeException ();

return GetItemWithoutValidation (index); } }

private T GetItemWithoutValidation (int index) { SpinWait spin = new SpinWait (); while (!_finalizationFlags[ index ]) spin.SpinOnce ();

return _items[ index ]; }

// … остальные методы реализуется через GetItemWithoutValidation } А теперь начинается настоящая магия. Проблема в том, что в коде есть куча мест, где компилятор с процессором могут всё испортить, переставляя инструкции местами и кэшируя то, что кэшировать категорически нельзя.

1. В AsyncBatchCollection.Add () значение _currentBatch может быть прочитано однажды и закэшировано, в результате чего, если batch заполнился, поток будет крутиться вечно. volatile спешит на помощь:

public class AsyncBatchQueue { // … private volatile Batch _currentBatch; // … } 2. В методе FlushInternal () batch может добавиться в выходную очередь до того, как будет заполнено поле _count. Втыкаем full fence: private void FlushInternal (int count) { _count = count; _queue._currentBatch = new Batch (_queue);

// The full fence ensures that the current batch will never be added to the queue before _count is set. Thread.MemoryBarrier ();

_queue._batchQueue.Add (this); } 3. В методе TryAdd инструкции записи в _items[ index ] и _finalizationFlags[ index ] могут быть переставлены местами. Опять втыкаем full fence: public bool TryAdd (T item) { // …

// The full fence prevents setting finalization flag before the actual item value is written. _items[ index ] = item; Thread.MemoryBarrier (); _finalizationFlags[ index ] = true;

return true; } 4. Обратная проблема (чтение элемента перед флагом) может произойти в GetItemWithoutValidation. Втыкаем сами-знаете-что: private T GetItemWithoutValidation (int index) { SpinWait spin = new SpinWait (); while (!_finalizationFlags[ index ]) spin.SpinOnce ();

// The full fence prevents reading item value before finalization flag is set. Thread.MemoryBarrier (); return _items[ index ]; } 5. Всё в том же методе значение _finalizationFlags[ index ] может быть закэшировано, из-за чего поток будет крутиться вечно. Обычно подобное решается навешиванием на поле модификатора volatile, но сделать это с элементом массива не представляется возможным, поэтому ну вы поняли: private T GetItemWithoutValidation (int index) { SpinWait spin = new SpinWait (); while (!_finalizationFlags[ index ]) { spin.SpinOnce ();

// The full fence prevents caching any part of _finalizationFlags[ index ] expression. Thread.MemoryBarrier (); }

// … } Здесь, кстати, стоит сделать небольшое отступление В ConcurrentQueue аналогичная проблема решается весьма необычным образом: internal volatile VolatileBool[] m_state;

struct VolatileBool { public VolatileBool (bool value) { m_value = value; } public volatile bool m_value; } Если бы VolatileBool был классом вместо структуры, всё было бы предельно просто: даже если ссылку на экземпляр VolatileBool где-нибудь закэшируют, чтение volatile m_value гарантированно будет возвращать реальное значение поля. Почему этот финт работает со структурой, которой по идее положено быть скопированной в момент вызова m_state[ index ], я так и не понял. Вроде опасные места на этом кончились и базовый функционал должен работать (по крайней мере, мне искренне хотелось бы в это верить).

А теперь вкорячиваем таймер Всё вроде бы здорово, но есть один (уже не связанный с многопоточностью) нюанс: если в коллекцию добавят количество элементов, не кратное batchSize, то остаток мы никогда не увидим. Нужна возможность делать flush вручную, а лучше — по таймеру. Самый простой способ — сделать так, чтобы вызов метода Flush () пытался сразу схватить последний слот в массиве, таким образом пометив batch как заполненный. При этом нужно в обязательном порядке запомнить последнее реальное значение _lastReservationIndex, иначе мы не сможем узнать сколько реально слотов занято (спойлер: тут на помощь приходит Interlocked.CompareExchange ()). Всего возможны 5 вариантов развития событий:_lastReservationIndex < 0. flush-ить нечего. _lastReservationIndex >= _queue._batchSize. FlushInternal () выполнит поток, схвативший последний слот, делать ничего не надо. _lastReservationIndex годный и у нас получилось атомарно установить его в _queue._batchSize. Мы знаем реальное кол-во элементов в массиве, можно делать FlushInternal (). Между чтением прошлого значения _lastReservationIndex и записью туда же нового значения пролез другой поток и схватил последний элемент. По сути, ситуация повторяет вариант №2: ничего не делаем. То же что в №4, но batch не заполнен. Крутимся, пробуем ещё раз. public class AsyncBatchQueue: IEnumerable> { // …

public void Flush () { SpinWait spin = new SpinWait (); while (!_currentBatch.TryFlush ()) spin.SpinOnce (); }

// …

private class Batch: IReadOnlyList { // …

public bool TryFlush () { int expectedPreviousReservation = Volatile.Read (ref _lastReservationIndex);

// We don’t flush if the batch doesn’t have any items or if another thread is about to flush it // However, we report success to avoid unnecessary spinning. if (expectedPreviousReservation < 0 || expectedPreviousReservation >= _queue._batchSize) return true;

int previousReservation = Interlocked.CompareExchange (ref _lastReservationIndex, _queue._batchSize, expectedPreviousReservation);

// Flush reservation has succeeded. if (expectedPreviousReservation == previousReservation) { FlushInternal (previousReservation + 1); return true; }

// The following is true if someone has completed the batch by the time we tried to flush it. // Therefore the batch will be flushed anyway even if we don’t do anything. // The opposite means someone has slipped in an update and we have to spin. return previousReservation >= _queue._batchSize; }

// … } } Готово! Осталось навесить сверху таймер — это настолько лишённый магии процесс, что я попробую обойтись без копипейста связанного с ним кода. Бенчмарков тоже не будет, т.к. я не знаю с кем можно было бы сравнить производительность.Что дальше? Во-первых, обе рассмотренные коллекции страдают от одного тонкого недостатка. Если кто-нибудь сделает Thread.Abort (), то в самый неожиданный момент может вылететь ThreadAbortException и разрушить столь тщательно поддерживаемую консистентность состояния коллекций. В вышеупомянутой ConcurrentQueue (да и в куче других мест) эта проблема решается весьма экстравагантным образом: try { } finally { // Insert Thread.Abort ()-safe code here } Кейс довольно редкий, но на всякий случай было бы неплохо от него всё же защититься. Возможно, когда-нибудь я это всё же сделаю.Во-вторых, для счастья не хватает ещё как минимум одной асинхронной коллекции: приоритезированной очереди. И, в отличие от BlockingCollection, тривиальной реализации с использованием TakeFromAny () на горизонте не видно. Продолжение следует?…

P.S. Для тех, кто героически дочитал до конца: Nuget package: www.nuget.org/packages/AsyncCollections/Source code: github.com/HellBrick/AsyncCollections

Если есть критика, баги, пожелания или просто здравые мысли — пишите, буду рад обсудить.

© Habrahabr.ru