Concurrency структуры в .net. ConcurrentQueue изнутри

ConcurrentQueue можно отнести к lock-free конкурентным структурам данных. В ее реализации нет блокировок (lock, Mutex…) и реализована она с использованием: — классической функции CompareExchange; — SpinWait— volatile (используется как memory-barrier)В основу ConcurrentQueue заложена структура ring-buffer (кольцевой буфер).Ring-buffer (кольцевой буфер)Кольцевой буфер идеально подходит для реализации структуры данных «очередь» (FIFO).67827604e0124f3bb8b5e557f484211aВ его основе лежит массив данных и 2 указателя — начало (start) и конец (end).e91c4be7b7094177acff05fbb3f59adaПредусмотрено две основные операции: Push — добавление в конец. При добавлении нового элементов в буфер, счетчик end увеличивается на 1 и на его место записывается новый элемент. Если мы «уперлись» в верхнюю границу массива, то значение end обнуляется («переходит» на начало массива) и элементы начинают записываться в начало массива. Запись возможна пока индекс end не достиг индекса start. Pop — выборка элементов сначала. Выборка элементов происходит с элемента start, последовательно увеличивая его значение, до тех пока не достигнет end. Выборка возможна, пока индекс start не достиг индекса end. Блочный кольцевой буфер Устройство ConcurrentQueue немного сложнее, чем классический кольцевой буфер. В его реализации используется понятие сегмента (Segment). ConcurrentQueue состоит из связанного списка (однонаправленного) сегментов. Размер сегмента равен 32. private class Segment { volatile VolatileBool[] m_state; volatile T[] m_array; volatile int m_low; volatile int m_high; volatile Segment m_next; } Первоначально в ConcurrentQueue создается 1 сегментd3aa102681eb41da812030101c6e3ff0По мере необходимости к нему справа добавляется новые сегментыdc69ae6867d84c4b9e6dc9d5324b3ff8e9947506af23467bb27e87b831ca5039В результате получается однонаправленный связанный список. Начало связанного списка задает m_head, конец — m_tail. Ограничения: m_head сегмент может иметь пустые ячейки только слева m_tail сегмент может иметь пустые ячейки только справа если m_head = m_tail то пустые ячейки могут быть как слева, так и справа. В сегментах, между m_head и m_tail пустых ячеек быть не может. Добавление элемента (Enqueue) Ниже представлен примерный алгоритм добавление элементов в сегмент.Увеличивается m_high на 1 В массив m_array с индексом m_high записывается новое значение. index = Interlocked.Increment (ref this.m_high); if (index <= 31) { m_array[index] = value; m_state[index].m_value = true; } m_state – массив состояния ячеек, если значение true – элемент записан в ячейку, если false — еще нет. По сути, это некий «Commit» записи. Нужен он для того, чтобы между операциями увеличения индекса Interlocked.Increment и записью значения m_array[index] = value не произошло чтение элемента другим потоком. Тогда чтение данных будет осуществляться после выполнения: while (!this.m_state[index].m_value) spinWait2.SpinOnce(); Добавление нового сегмента (Segment.Grow) Как только m_high текущего сегмента становится равным 31, запись в текущий сегмент прекращается и создается новый сегмент (текущие сегменты продолжают жить своей жизнью). m_next = new ConcurrentQueue.Segment (this.m_index + 1L, this.m_source); m_source.m_tail = this.m_next; m_next — ссылка на следующий сегментm_source.m_tail — ссылка последний сегмент списка сегментов.Выборка элемента (TryDequeue) В основе выборки элементов из очереди лежат две базовые функциональности: Примерный алгоритм работы выборки: Получить m_low Увеличить m_low на 1, с использованием CompareExchange Если m_low больше 31 — перейти на следующий сегмент Дождаться коммита (m_state[low].m_value) элемента с индексом m_low. SpinWait spinWait1 = new SpinWait (); int low = this.Low; if (Interlocked.CompareExchange (ref this.m_low, low + 1, low) == low) { SpinWait spinWait2 = new SpinWait (); while (! this.m_state[low].m_value) spinWait2.SpinOnce (); result = this.m_array[low]; Count vs IsEmpty Код IsEmpty: ConcurrentQueue.Segment segment = this.m_head; if (! segment.IsEmpty) return false; if (segment.Next == null) return true; SpinWait spinWait = new SpinWait (); for (; segment.IsEmpty; segment = this.m_head) { if (segment.Next == null) return true; spinWait.SpinOnce (); } return false; Т.е. по сути, это найти первый непустой сегмент. Если он найден — очередь не пуста.Код Count:

ConcurrentQueue.Segment head; ConcurrentQueue.Segment tail; int headLow; int tailHigh; this.GetHeadTailPositions (out head, out tail, out headLow, out tailHigh); if (head == tail) return tailHigh — headLow + 1; return 32 — headLow + 32 * (int) (tail.m_index — head.m_index — 1L) + (tailHigh + 1); По сути, он ищет первый и последний сегмент и на основе этих двух сегментов вычисляет кол-во элементов.Вывод — операция Count будет занимать больше процессорного времени, чем IsEmpty.Снепшот & GetEnumerator Структура ConcurrentQueue поддерживает технологию снепшотов для получения целостного набора элементов.Целостные данные возвращают: ToArray ICollection.CopyTo GetEnumerator Операторы выше так же работаю без блокировок, а целостность достигается за счет введения счетчика volatile int m_numSnapshotTakers в рамках всей очереди — число операций, работающих со снепшотами в текущий момент времени. Т.е. каждая операция, которая хочет получить целостную картину, должна реализовать следующую код: Interlocked.Increment (ref this.m_numSnapshotTakers); try { …//Итератор по всем сегментам } finally { Interlocked.Decrement (ref this.m_numSnapshotTakers); } В дополнении к этому, изменения у нас «пишет» только операция Dequeue, поэтому только в ней проверяется необходимость удалять ссылку на элемент очереди: if (this.m_source.m_numSnapshotTakers <= 0) this.m_array[low] = default (T);

© Habrahabr.ru