[Из песочницы] Пишем свой SynchronizationContext
Началась эта история довольно давно, когда я впервые попытался работать с UI не из UI-потока. И когда я начал ловить различные “глюки”, я понял, что делать это нужно осторожно. Позднее я столкнулся с этим в дотнет мире и именно в тот момент я впервые познакомился с SynchronizationContext. Но тогда, почитав про устройство этого объекта, я посчитал, что этих знаний мне достаточно. Сделать это можно, например, здесь: SynchronizationContext — когда MSDN подводит.
Вспомнил про SynchronizationContext я только с выходом c# 5 и его async/await, т.к. этот механизм взаимодействует как раз с этим самым контекстом синхронизации. Делается это для того, чтобы после асинхронной операции, код мог выполняться в вызывающем асинхронную операцию потоке, что очень удобно при работе с UI. Но запустив этот небольшой код в UI-потоке и любом другом:
Debug.WriteLine(Thread.CurrentThread.ManagedThreadId);
await Task.Run(() => Debug.WriteLine(Thread.CurrentThread.ManagedThreadId));
Debug.WriteLine(Thread.CurrentThread.ManagedThreadId);
Мы увидим, что код возвращается в исходный поток только при запуске в UI-потоке. Все дело в том, что контекст синхронизации задан только в UI-потоке (кроме wcf и т.д.). В голову сразу же приходит мысль, нужно просто задать контекст синхронизации нужному потоку. Но здесь нас ждет проблема, стандартная реализация SynchronizationContext не дает нам нужных возможностей. Она позволяет продолжать исполнять код в текущем потоке или в потоке из пула. После того, как я не нашел реализации, которую можно просто скопировать, запустить и увидеть желаемый результат, я решил попробовать реализовать свою и представить, как бы оно могла выглядеть на деле. Об этом и пойдет речь ниже.
Для выполнения кода в SynchronizationContext предусмотрены два виртуальных метода Send (синхронное выполнение) и Post (асинхронное). Поэтому наследуемся от SynchronizationContext и переопределяем нужные методы.
class CustomSynchronizationContext : SynchronizationContext, IDisposable
{
private readonly AutoResetEvent _eventReset;
private readonly Queue<KeyValuePair<SendOrPostCallback, object>> _workItems;
private readonly Thread _thread;
public CustomSynchronizationContext()
{
_eventReset = new AutoResetEvent(false);
_workItems = new Queue<KeyValuePair<SendOrPostCallback, object>>();
_thread = new Thread(DoWork);
_thread.Start(this);
}
private void DoWork(object obj)
{
SynchronizationContext.SetSynchronizationContext(obj as SynchronizationContext);
while (true)
{
while (_workItems.Count > 0)
{
var item = _workItems.Dequeue();
item.Key(item.Value);
}
_eventReset.Reset();
_eventReset.WaitOne();
}
}
public override void Post(SendOrPostCallback d, object state)
{
_workItems.Enqueue(new KeyValuePair<SendOrPostCallback, object>(d, state));
_eventReset.Set();
}
public void Dispose()
{
_eventReset.Dispose();
_thread.Abort();
}
}
Запускаем.
static void Main(string[] args)
{
var syncContext = new CustomSynchronizationContext();
Console.WriteLine(Thread.CurrentThread.ManagedThreadId);
syncContext.Post(o => Console.WriteLine(Thread.CurrentThread.ManagedThreadId), null);
}
И ожидаемо видим разные потоки. Что здесь происходит? Во-первых, для удобства, создаем и присваиваем поток внутри контекста, а не контекст потоку. Так мы будем уверены, что никто кроме нас не сможет влиять на этот поток. Во-вторых, заводим очередь, в которой будем хранить делегаты для выполнения в созданном потоке. В-третьих, “прикостыливаем” AutoResetEvent, чтобы поток не завершался и не зацикливался без дела. Ну и IDisposable. Обратите внимание, что при удалении контекста, здесь же будет попытка ликвидировать поток. Т.е. такой код:
static void Main(string[] args)
{
using (var syncContext = new CustomSynchronizationContext())
{
Console.WriteLine(Thread.CurrentThread.ManagedThreadId);
syncContext.Post(o => Console.WriteLine(Thread.CurrentThread.ManagedThreadId), null);
}
}
скорее всего выведет информацию только об изначальном потоке. Возможно это не то, что хотелось бы, но для демонстрационного примера, думаю, сойдет. К тому же это легко исправить.
Что с обработкой исключений? Проверим.
static void Main(string[] args)
{
var syncContext = new CustomSynchronizationContext();
try
{
syncContext.Post(o => { throw new Exception("TestException"); }, null);
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}
Ожидаемо падает в нашем “особенном” потоке. Самое время вспомнить, что у нас есть еще и метод Send, который отвечает за синхронное выполнение. Это должно позволить дождаться завершения выполнения делегата и получить исключение. Попробуем.
class CustomSynchronizationContext : SynchronizationContext, IDisposable
{
private readonly AutoResetEvent _workerResetEvent;
private readonly ConcurrentQueue<WorkItem> _workItems;
private readonly Thread _thread;
public CustomSynchronizationContext()
{
_workerResetEvent = new AutoResetEvent(false);
_workItems = new ConcurrentQueue<WorkItem>();
_thread = new Thread(DoWork);
_thread.Start(this);
}
private void DoWork(object obj)
{
SynchronizationContext.SetSynchronizationContext(obj as SynchronizationContext);
while (true)
{
WorkItem workItem;
while (_workItems.TryDequeue(out workItem))
workItem.Execute();
_workerResetEvent.Reset();
_workerResetEvent.WaitOne();
}
}
public override void Send(SendOrPostCallback d, object state)
{
if (Thread.CurrentThread == _thread)
d(state);
else
{
using (var resetEvent = new AutoResetEvent(false))
{
var wiExecutionInfo = new WorkItemExecutionInfo();
_workItems.Enqueue(new SynchronousWorkItem(d, state, resetEvent, ref wiExecutionInfo));
_workerResetEvent.Set();
resetEvent.WaitOne();
if (wiExecutionInfo.HasException)
throw wiExecutionInfo.Exception;
}
}
}
public override void Post(SendOrPostCallback d, object state)
{
_workItems.Enqueue(new AsynchronousWorkItem(d, state));
_workerResetEvent.Set();
}
public void Dispose()
{
_workerResetEvent.Dispose();
_thread.Abort();
}
private class WorkItemExecutionInfo
{
public bool HasException => Exception != null;
public Exception Exception { get; set; }
}
private abstract class WorkItem
{
protected readonly SendOrPostCallback SendOrPostCallback;
protected readonly object State;
protected WorkItem(SendOrPostCallback sendOrPostCallback, object state)
{
SendOrPostCallback = sendOrPostCallback;
State = state;
}
public abstract void Execute();
}
private class SynchronousWorkItem : WorkItem
{
private readonly AutoResetEvent _syncObject;
private readonly WorkItemExecutionInfo _workItemExecutionInfo;
public SynchronousWorkItem(SendOrPostCallback sendOrPostCallback, object state, AutoResetEvent resetEvent,
ref WorkItemExecutionInfo workItemExecutionInfo) : base(sendOrPostCallback, state)
{
if (workItemExecutionInfo == null)
throw new NullReferenceException(nameof(workItemExecutionInfo));
_syncObject = resetEvent;
_workItemExecutionInfo = workItemExecutionInfo;
}
public override void Execute()
{
try
{
SendOrPostCallback(State);
}
catch (Exception ex)
{
_workItemExecutionInfo.Exception = ex;
}
_syncObject.Set();
}
}
private class AsynchronousWorkItem : WorkItem
{
public AsynchronousWorkItem(SendOrPostCallback sendOrPostCallback, object state)
: base(sendOrPostCallback, state)
{
}
public override void Execute()
{
SendOrPostCallback(State);
}
}
}
Здесь для удобства вводим класс WorkItem, который будет выполнять код (делегат) нужным нам способом. От него наследуем еще два SynchronousWorkItem и AsynchronousWorkItem, по названию понятно в чем их различие. В реализациях отличие только в том, что в синхронной версии реализовано ожидание (AutoResetEvent) и поглощение исключения, которое далее будет брошено в изначальном потоке. Теперь KeyValuePair можно поменять на WorkItem, ну и поменяем простую очередь на конкурентную. Также в методе Send добавляем проверку текущего потока и если он вдруг окажется “нашим”, то просто запустим делегат здесь же.
Снова проверяем.
static void Main(string[] args)
{
var syncContext = new CustomSynchronizationContext();
try
{
syncContext.Send(o => { throw new Exception("TestException"); }, null);
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}
Теперь исключение успешно обработано. Ну и пришло время запустить самый первый пример кода, который упоминался в статье, на потоке с только что созданным контекстом синхронизации.
static void Main(string[] args)
{
var syncContext = new CustomSynchronizationContext();
syncContext.Post(TestAsyncMethod, null);
}
async static void TestAsyncMethod(object obj)
{
Console.WriteLine(Thread.CurrentThread.ManagedThreadId);
await Task.Run(() => Console.WriteLine(Thread.CurrentThread.ManagedThreadId));
Console.WriteLine(Thread.CurrentThread.ManagedThreadId);
}
Мой вывод:
9
10
9