[Из песочницы] Простейшие Lock-Free объекты для двух потоков
Здесь было много статей об универсальных Lock-free объектах, однако, для некоторых частных случаев они излишне громоздки. Мой случай как раз таким и являлся: требовалось организовать одностороннюю передачу информации от одного потока другому. Главный поток запускает рабочий, после чего он может только запросить его остановку и никак больше управлять он им не может. В свою очередь рабочий поток может уведомлять главный о своем текущем состоянии (прогрессе выполнения), а также отсылать промежуточные результаты выполнения. Получается, что требуется только передача данных от рабочего к главному потоку.Разумеется, возможно, я изобрёл велосипед или, хуже того, велосипед с глюками. Поэтому комментарии и критика очень приветствуются!
Объект состоянияСостояние нашего рабочего потока представлено в виде некоторого класса. При этом главный поток не обязан всегда забирать данные, хранящиеся в объекте состояния (например, не важно, если главный поток пропустит промежуточное значение прогресса выполнения, ему важно получить последнее актуальное на данный момент).Для реализации lock-free передачи состояния нам потребуется три его экземпляра (разных объектов одного класса):
var ReadItem: TLockFreeWorkState; CurrentItem: TLockFreeWorkState; WriteItem: TLockFreeWorkState; Идея такова: рабочий поток имеет свободный доступ к объекту WriteItem. Когда все данные сохранены выполняется операция InterlockedExchange с объектом в CurrentItem, после чего главный поток каким-то образом уведомляется о готовности нового состояния (в моем примере использован обычный PostMessage). Главный поток в обработчике уведомления выполняет операцию InterlockedExchange объекта CurrentItem с объектом ReadItem, после чего может свободно читать данные из ReadItem.Получается такой себе «пузырек»: данные о состоянии появляются во WriteItem и далее «всплывают» через CurrentItem в ReadItem. Кстати, я не придумал нормального названия для базового класса такой структуры, поэтому назвал просто TLockFreeWorkState (возможно у кого-то найдется идея получше).
Тут есть один нюанс: главный поток может обращаться за текущим состоянием в любое время. Если мы всегда будет выполнять InterlockedExchange, то попеременно будем возвращать актуальное и предыдущее состояние.
Предотвратить это нам поможет обычный флажок Newest в классе. При записи состояния рабочий поток всегда выставляет WriteItem.Newest:= True, и после InterlockedExchange этот флажок оказывается в CurrentItem. Главный поток в начале проверяет CurrentItem.Newest и, только если он True, делает InterlockedExchange после чего сразу его сбрасывает ReadItem.Newest в False. Я посчитал, что читать CurrentItem.Newest из главного потока безопасно, но поправьте меня, если не прав.
Теперь все это в виде упрощенного кода (опущено привидение типов для большей наглядности):
type TLockFreeWorkState = class public Newest: Boolean; end;
function Read (var CurrentItem, ReadItem: TLockFreeWorkState): Boolean; begin if CurrentItem.Newest then begin ReadItem:= InterlockedExchangePointer (CurrentItem, ReadItem); ReadItem.Newest:= False; Result:= True; end else Result:= False; end;
procedure Write (var CurrentItem, WriteItem: TLockFreeWorkState); begin WriteItem.Newest:= True; WriteItem:= InterlockedExchangePointer (CurrentItem, WriteItem); end; Объект очереди В чем-то тут подход схожий, но для реализации нам потребуется изначально только один объект, но две ссылки на него: var ReadQueue: TLockFreeWorkQueue; WriteQueue: TLockFreeWorkQueue; Изначально создается один экземпляр TLockFreeWorkQueue и записывается в переменные ReadQueue и WriteQueue. Класс представляет собой кольцевой буфер и имеет следующее описание: TLockFreeWorkQueue = class public Head: Integer; Tail: Integer; Items: array[0…QueueCapacity — 1] of TObject; end; где QueueCapacity является некоторой константой (больше нуля), которая определяет длину кольцевого буфера.При добавлении элемента в очередь рабочий поток выполняет InterlockedExchangeComparePointer элемента WriteQueue.Items[Tail]. При этом элемент сравнивается с Nil и в случае успеха в него записывается добавляемый элемент. Если операция прошла успешно, то значение Tail увеличивается на 1 и сбрасывается в 0, если достигнут QueueCapacity. Мы можем свободно оперировать с Tail, так как доступ к этой переменной имеет только рабочий поток (поток-писатель). Также после этого рабочий поток должен уведомить главный о том, что в очереди появились элементы. Если операция не удалась, то это означает, что очередь заполнена, но об этом позже.
Главный поток по уведомлению от рабочего начинает цикл чтения элементов из очереди (на самом деле чтение можно начинать в любой момент). Для извлечения элемента вызывается InterlockedExchangePointer для элемента ReadQueue.Items[Head] куда записывается значение Nil. Если извлеченный элемент не Nil, то значение Head увеличивается на 1 и сбрасывается в 0, если достигнут QueueCapacity.
Теперь разберемся со случаем переполнения буфера. Для новых элементов мы вполне может создать новый объект очереди и продолжить писать в него, а чтобы этот объект можно было найти потоку-читателю, мы должны передать на него ссылку в текущем заполненном объекте очереди. Для этого добавим дополнительное поле NextQueue в класс:
TLockFreeWorkQueue = class public Head: Integer; Tail: Integer; Items: array[0…QueueCapacity — 1] of TObject; NextQueue: TLockFreeWorkQueue; end; Теперь если при записи элемента InterlockedExchangeComparePointer возвращает не Nil (очередь заполнена), то создаем новый объект очереди NewWriteQueue: TLockFreeWorkQueue, записываем добавляемый элемент в нее, выполняем InterlockedExchangePointer с переменной WriteQueue.NextQueue и в конце сохраняем NewWriteQueue в переменной WriteQueue. Таким образом после этой операции значения в ReadQueue и WriteQueue уже будут ссылаться на разные объекты.В главном потоке нам нужно добавить обработку пустой очереди. Если при чтении InterlockedExchangePointer для элемента ReadQueue.Items[Head] возвращает Nil, то нам необходимо дополнительно проверить поле NextQueue, для чего мы также выполняем InterlockedExchangePointer (ReadQueue.NextQueue, Nil). Если при этом возвращается не Nil, то сохраняем объект в NewReadQueue, удаляем текущий объект ReadQueue, и присваиваем этой переменной значение NewReadQueue.
Вот упрощенный код для операций добавления элемента в очередь:
procedure AddQueueItem (var WriteQueue: TLockFreeWorkQueue; Item: TObject); var NewWriteQueue: TLockFreeWorkQueue; begin if InterlockedCompareExchangePointer (WriteQueue.Items[WriteQueue.Tail]), Item, Nil) = Nil then begin // Added successfully Inc (WriteQueue.Tail); if WriteQueue.Tail = QueueCapacity then WriteQueue.Tail:= 0; end else begin // WriteQueue full. Create new chained queue. NewWriteQueue:= TLockFreeWorkQueue.Create; NewWriteQueue.Items[0] := Item; Inc (NewWriteQueue.Tail); if NewWriteQueue.Tail = QueueCapacity then // Check single-item queue NewWriteQueue.Tail:= 0; InterlockedExchangePointer (WriteQueue.NextQueue, NewWriteQueue); WriteQueue:= NewWriteQueue; end; end; и извлечения элемента из очереди: function ExtractQueueItem (var ReadQueue: TLockFreeWorkQueue): TObject; var NewReadQueue: TLockFreeWorkQueue; begin Result:= Nil; repeat Result:= InterlockedExchangePointer (ReadQueue.Items[ReadQueue.Head], Nil); if Result = Nil then begin // No new items in this queue. Check next queue is available NewReadQueue:= InterlockedExchangePointer (ReadQueue.NextQueue, Nil); if Assigned (NewReadQueue) then begin ReadQueue.Free; ReadQueue:= NewReadQueue; end else // No new item in queue Exit; end; until Result <> Nil; // Item extracted successfully Inc (ReadQueue.Head); if ReadQueue.Head = QueueCapacity then ReadQueue.Head:= 0; end; В этом коде я возможно несколько перестраховался. Не уверен, что для операций с полем NextQueue вообще нужно применять InterlockedExchangePointer, возможно будет безопасным выполнять прямое чтение и запись.Тестовый пример Рабочий и причесанный код вместе с простеньким консольным примером можно посмотреть под спойлером.Тестовый пример program LockFreeTest;
{$APPTYPE CONSOLE}
{$R *.res}
uses SysUtils, Classes, Windows, Messages;
// Lock-free work thread state //////////////////////////////////////////////// type TLockFreeWorkState = class protected FNewest: Boolean; public class function Read (var CurrentItem, ReadItem): Boolean; class procedure Write (var CurrentItem, WriteItem); property Newest: Boolean read FNewest write FNewest; end;
class function TLockFreeWorkState.Read (var CurrentItem, ReadItem): Boolean; begin if TLockFreeWorkState (CurrentItem).Newest then begin pointer (ReadItem) := InterlockedExchangePointer (pointer (CurrentItem), pointer (ReadItem)); TLockFreeWorkState (ReadItem).Newest:= False; Result:= True; end else Result:= False; end;
class procedure TLockFreeWorkState.Write (var CurrentItem, WriteItem); begin TLockFreeWorkState (WriteItem).Newest:= True; pointer (WriteItem) := InterlockedExchangePointer (pointer (CurrentItem), pointer (WriteItem)); end;
// Lock-free work thread queue //////////////////////////////////////////////// type TLockFreeWorkQueue = class public const QueueCapacity = 4; // Small value for test purposes public type TLockFreeWorkQueueItems = array[0…QueueCapacity — 1] of TObject; public Head: Integer; // Access from main thread only Tail: Integer; // Access from work thread only NextQueue: TLockFreeWorkQueue; Items: TLockFreeWorkQueueItems; public destructor Destroy; override; class procedure Add (var WriteQueue: TLockFreeWorkQueue; Item: TObject); static; class function Extract (var ReadQueue: TLockFreeWorkQueue): TObject; static; end;
destructor TLockFreeWorkQueue.Destroy; var i: Integer; begin // Free non-extracted items for i:= 0 to QueueCapacity — 1 do Items[i].Free; // Free NextQueue if exists NextQueue.Free; inherited; end;
class procedure TLockFreeWorkQueue.Add (var WriteQueue: TLockFreeWorkQueue; Item: TObject); var NewWriteQueue: TLockFreeWorkQueue; begin // Check item assigned (can’t add empty items) if not Assigned (Item) or not Assigned (WriteQueue) then Exit; if InterlockedCompareExchangePointer (pointer (WriteQueue.Items[WriteQueue.Tail]), pointer (Item), Nil) = Nil then begin // Added successfully Inc (WriteQueue.Tail); if WriteQueue.Tail = QueueCapacity then WriteQueue.Tail:= 0; end else begin // WriteQueue full. Create new chained queue. NewWriteQueue:= TLockFreeWorkQueue.Create; NewWriteQueue.Items[0] := Item; Inc (NewWriteQueue.Tail); if NewWriteQueue.Tail = QueueCapacity then // Check single-item queue NewWriteQueue.Tail:= 0; InterlockedExchangePointer (pointer (WriteQueue.NextQueue), NewWriteQueue); WriteQueue:= NewWriteQueue; end; end;
class function TLockFreeWorkQueue.Extract (var ReadQueue: TLockFreeWorkQueue): TObject; var NewReadQueue: TLockFreeWorkQueue; begin Result:= Nil; if not Assigned (ReadQueue) then Exit; repeat Result:= InterlockedExchangePointer (pointer (ReadQueue.Items[ReadQueue.Head]), Nil); if Result = Nil then begin // No new items in this queue. Check next queue is available NewReadQueue:= InterlockedExchangePointer (pointer (ReadQueue.NextQueue), Nil); if Assigned (NewReadQueue) then begin ReadQueue.Free; ReadQueue:= NewReadQueue; end else // No new item in queue Exit; end; until Result <> Nil; // Item extracted successfully Inc (ReadQueue.Head); if ReadQueue.Head = QueueCapacity then ReadQueue.Head:= 0; end;
// Test work thread /////////////////////////////////////////////////////////// const WM_MAINNOTIFY = WM_USER + 1;
type TWorkThreadState = class (TLockFreeWorkState) public Progress: Integer; end;
TWorkThreadQueueItem = class public ItemData: Integer; end;
TWorkThread = class (TThread) protected FMainHandle: THandle; FMainNotified: Integer; // State fields FStateRead: TWorkThreadState; FStateCurrent: TWorkThreadState; FStateWrite: TWorkThreadState; // Queue fields FQueueRead: TLockFreeWorkQueue; FQueueWrite: TLockFreeWorkQueue; // Debug (test) fiels FDebugReadQueue: Boolean; procedure Execute; override; procedure SetState; procedure AddQueueItem (Item: TWorkThreadQueueItem); procedure NotifyMain; public constructor Create (CreateSuspended: Boolean); destructor Destroy; override; function GetState: TWorkThreadState; function ExtractQueueItem: TWorkThreadQueueItem; procedure NotificationProcessed; property MainHandle: THandle read FMainHandle; end;
constructor TWorkThread.Create (CreateSuspended: Boolean); begin inherited Create (CreateSuspended); // State objects FStateRead:= TWorkThreadState.Create; FStateCurrent:= TWorkThreadState.Create; FStateWrite:= TWorkThreadState.Create; // Queue objects FQueueRead:= TLockFreeWorkQueue.Create; FQueueWrite:= FQueueRead; end;
destructor TWorkThread.Destroy; begin inherited; FStateRead.Free; FStateCurrent.Free; FStateWrite.Free; // Always destroy read queue only: only read queue may have NextQueue reference FQueueRead.Free; end;
procedure TWorkThread.NotifyMain; begin if InterlockedExchange (FMainNotified, 1) = 0 then PostMessage (FMainHandle, WM_MAINNOTIFY, 0, 0); end;
procedure TWorkThread.NotificationProcessed; begin InterlockedExchange (FMainNotified, 0); end;
function TWorkThread.GetState: TWorkThreadState; begin TLockFreeWorkState.Read (FStateCurrent, FStateRead); Result:= FStateRead; end;
procedure TWorkThread.SetState; begin TLockFreeWorkState.Write (FStateCurrent, FStateWrite); end;
procedure TWorkThread.AddQueueItem (Item: TWorkThreadQueueItem); begin TLockFreeWorkQueue.Add (FQueueWrite, Item); end;
function TWorkThread.ExtractQueueItem: TWorkThreadQueueItem; begin Result:= TWorkThreadQueueItem (TLockFreeWorkQueue.Extract (FQueueRead)); end;
procedure TWorkThread.Execute; const TestQueueCountToFlush = 10; var ProgressIndex: Integer; TestQueueCount: Integer; Item: TWorkThreadQueueItem; begin TestQueueCount:= 0; ProgressIndex:= 0; while not Terminated do begin // Send current progress if FStateWrite.Progress <> ProgressIndex then begin // All state object fields initialization required FStateWrite.Progress:= ProgressIndex; SetState; NotifyMain; end; // Emulate calculation Sleep (500); Inc (ProgressIndex); // Put intermediate result in queue Item:= TWorkThreadQueueItem.Create; Item.ItemData:= ProgressIndex; AddQueueItem (Item); Inc (TestQueueCount); if TestQueueCount = TestQueueCountToFlush then begin TestQueueCount:= 0; // Allow queue reading from main thread FDebugReadQueue:= True; NotifyMain; end; end; end;
// Test application /////////////////////////////////////////////////////////// type TMain = class protected FHandle: THandle; FThread: TWorkThread; procedure WndProc (var Message: TMessage); public constructor Create; destructor Destroy; override; function Run: Boolean; property Handle: THandle read FHandle; end;
var Main: TMain;
constructor TMain.Create; begin FHandle:= AllocateHWnd (WndProc); FThread:= TWorkThread.Create (True); FThread.FMainHandle:= Handle; FThread.Start; writeln ('Work thread started'); end;
destructor TMain.Destroy; begin writeln ('Stopping work thread…'); FThread.Free; writeln ('Work thread stopped'); DeallocateHWnd (FHandle); inherited; end;
procedure TMain.WndProc (var Message: TMessage); var State: TWorkThreadState; Item: TWorkThreadQueueItem; begin if Message.Msg = WM_MAINNOTIFY then begin FThread.NotificationProcessed; State:= FThread.GetState; // Show current progress writeln ('Work progress ', State.Progress); // Check queue reading allowed if FThread.FDebugReadQueue then begin writeln ('Read queue…'); repeat Item:= FThread.ExtractQueueItem; try if Assigned (Item) then writeln ('Queue item: ', Item.ItemData); finally Item.Free; end; until not Assigned (Item); FThread.FDebugReadQueue:= False; end; end else Message.Result:= DefWindowProc (Handle, Message.Msg, Message.wParam, Message.lParam); end;
function TMain.Run: Boolean; var Msg: TMsg; begin writeln ('Start message loop (Ctrl+C to break)'); Result:= True; while Result do case Integer (GetMessage (Msg, Handle, 0, 0)) of 0: Break; -1: Result:= False; else begin TranslateMessage (Msg); DispatchMessage (Msg); end; end; end;
// Console event handler //////////////////////////////////////////////////////
function ConsoleEventProc (CtrlType: DWORD): BOOL; stdcall; begin Result:= False; case CtrlType of CTRL_CLOSE_EVENT, CTRL_C_EVENT, CTRL_BREAK_EVENT: if Assigned (Main) then begin PostMessage (Main.Handle, WM_QUIT, 0, 0); Result:= True; end; end; end;
// Main procedure /////////////////////////////////////////////////////////////
begin {$IFDEF DEBUG} ReportMemoryLeaksOnShutdown:= True; {$ENDIF} try SetConsoleCtrlHandler (@ConsoleEventProc, True); Main:= TMain.Create; try Main.Run; finally FreeAndNil (Main); end; except on E: Exception do Writeln (E.ClassName, ': ', E.Message); end; end. В нормальной ситуации при появлении элемента в очереди он при первой возможности должен извлекаться главным потоком. Однако для тестирования переполнения очереди я добавил поле TWorkThread.FDebugReadQueue, которое при значении False запрещает главному потоку читать из очереди (в методе TWorkThread.Execute введна константа TestQueueCountToFlush = 10, которая разрешает главному потоку чтение только после 10 добавленных элементов).К сожалению тестовый пример слишком прост и не генерирует коллизий чтения/записи между потоками, когда переключение потока происходит внутри служебных функций чтения/записи. Но тут я не уверен, можно ли вообще проверить все узкие места алгоритма и во что нужно превратить код для этого.