Concurrency структуры в .net. ConcurrentQueue изнутри
ConcurrentQueue можно отнести к lock-free конкурентным структурам данных. В ее реализации нет блокировок (lock, Mutex…) и реализована она с использованием: — классической функции CompareExchange; — SpinWait— volatile (используется как memory-barrier)В основу ConcurrentQueue заложена структура ring-buffer (кольцевой буфер).Ring-buffer (кольцевой буфер)Кольцевой буфер идеально подходит для реализации структуры данных «очередь» (FIFO).В его основе лежит массив данных и 2 указателя — начало (start) и конец (end).Предусмотрено две основные операции: Push — добавление в конец. При добавлении нового элементов в буфер, счетчик end увеличивается на 1 и на его место записывается новый элемент. Если мы «уперлись» в верхнюю границу массива, то значение end обнуляется («переходит» на начало массива) и элементы начинают записываться в начало массива. Запись возможна пока индекс end не достиг индекса start.
Pop — выборка элементов сначала. Выборка элементов происходит с элемента start, последовательно увеличивая его значение, до тех пока не достигнет end. Выборка возможна, пока индекс start не достиг индекса end.
Блочный кольцевой буфер
Устройство ConcurrentQueue немного сложнее, чем классический кольцевой буфер. В его реализации используется понятие сегмента (Segment). ConcurrentQueue состоит из связанного списка (однонаправленного) сегментов. Размер сегмента равен 32.
private class Segment {
volatile VolatileBool[] m_state;
volatile T[] m_array;
volatile int m_low;
volatile int m_high;
volatile Segment m_next;
}
Первоначально в ConcurrentQueue создается 1 сегментПо мере необходимости к нему справа добавляется новые сегментыВ результате получается однонаправленный связанный список. Начало связанного списка задает m_head, конец — m_tail. Ограничения: m_head сегмент может иметь пустые ячейки только слева
m_tail сегмент может иметь пустые ячейки только справа
если m_head = m_tail то пустые ячейки могут быть как слева, так и справа.
В сегментах, между m_head и m_tail пустых ячеек быть не может.
Добавление элемента (Enqueue)
Ниже представлен примерный алгоритм добавление элементов в сегмент.Увеличивается m_high на 1
В массив m_array с индексом m_high записывается новое значение.
index = Interlocked.Increment (ref this.m_high);
if (index <= 31)
{
m_array[index] = value;
m_state[index].m_value = true;
}
m_state – массив состояния ячеек, если значение true – элемент записан в ячейку, если false — еще нет. По сути, это некий «Commit» записи. Нужен он для того, чтобы между операциями увеличения индекса Interlocked.Increment и записью значения m_array[index] = value не произошло чтение элемента другим потоком. Тогда чтение данных будет осуществляться после выполнения:
while (!this.m_state[index].m_value)
spinWait2.SpinOnce();
Добавление нового сегмента (Segment.Grow)
Как только m_high текущего сегмента становится равным 31, запись в текущий сегмент прекращается и создается новый сегмент (текущие сегменты продолжают жить своей жизнью).
m_next = new ConcurrentQueue
ConcurrentQueue