UDP и C# async/await

Недавно возникла необходимость решить следующую несложную задачку: есть несколько десятков устройств (учебных комплексов), у которых нужно регулярно запрашивать их текущее состояние. Комплексы общаются по протоколу UDP, и хотелось сделать так, чтобы не задумываться о цикле опроса и определении, от какого же устройства пришел ответ, а просто посылать запрос — и когда пришел результат — записывать его. Задачу эту я решал и раньше, но захотелось посмотреть, насколько концепция async/await упростит и сократит код. Оказалось, что финальный результат занимает меньше странички.Вся логика опроса состоит всего лишь из двух методов — цикла чтения сокета UDP и метода посылки команды на устройство.

Когда посылаем команду, есть две вещи, которые надо принять во внимание — это 1) после посылки команды нам надо ждать ответа от устройства и 2) ответ может не прийти — тогда необходимо вернуть исключение, которое скажет нам о таймауте.

Асинхронный метод посылки команды выглядит следующим образом (*см. Update 1):

public async Task SendReceiveUdpAsync (byte[] msg, string ip, int port, int timeOut) { var tokenSource = new CancellationTokenSource (timeOut); var token = tokenSource.Token; var tcs = new TaskCompletionSource(); var key = new Tuple(ip, port); _tcsDictionary.Add (key, tcs);

await _client.SendAsync (msg, msg.Length, ip, port); var result = await tcs.Task.WithCancellation (token);

if (_tcsDictionary.ContainsKey (key)) _tcsDictionary.Remove (key); return result; }

Здесь _client — это стандартный UdpClient.Мы посылаем команду и по await ждем результата, который нам должен вернуть Task, сохраненный в словарике с ключом нашего соединения (именно от него мы и ждем ответ). Когда чтение начинается — мы заносим TaskCompletionSource в словарик, когда мы получаем ответ и соединение больше не нужно — удаляем из словарика.Сам словарик (ConcurrentDictionary используем вместо Dictionary для того, чтобы избежать проблем с кросспоточными вызовами):

private ConcurrentDictionary, TaskCompletionSource> _tcsDictionary; Тут есть момент, который заслуживает внимания — это метод-расширение WithCancellation (token). Он нужен для того, чтобы поддержать отмену операции при помощи CancellationToken, и отменяет задачу, возвращая исключение при превышении заданного таймаута.

static class TaskExtension { public static async Task WithCancellation(this Task task, CancellationToken cancellationToken) { var tcs = new TaskCompletionSource();

using (cancellationToken.Register ( s => ((TaskCompletionSource)s).TrySetResult (true), tcs)) if (task!= await Task.WhenAny (task, tcs.Task)) throw new OperationCanceledException (cancellationToken); return await task; } }

А вот и сам цикл чтения: читаем, пока хватит сил, и если пришедшая датаграмма имеет адресом соединение, ключ с параметрами которого мы уже занесли в словарик, то результат помещается в TaskCompletionSource по этому ключу, и мы переходим обратно в метод посылки сообщения на await tcs.Task, только уже имея на руках нужный результат от устройства, этот результат и вернем в место вызова. Task.Run (() => { IPEndPoint ipEndPoint = null; while (true) { var receivedBytes = _client.Receive (ref ipEndPoint); var ip = ipEndPoint.Address.ToString (); var port = ipEndPoint.Port; var key = new Tuple(ip, port); TaskCompletionSource tcs;

if (_tcsDictionary.TryGetValue (key, out tcs)) tcs.SetResult (receivedBytes); } }); Итог радует. Вот так async-await упростил задачу опроса множества устройств по протоколу UDP.Update 1Как было справедливо отмечено в комментариях, метод SendReceiveUdpAsync необходимо переписать таким образом, чтобы в случае отмены задачи и выброса исключения удалялось значение из словарика:

public async Task SendReceiveUdpAsync (byte[] msg, string ip, int port, int timeOut) { try{ …

return result; } finally{ if (_tcsDictionary.ContainsKey (key)) _tcs.Dictionary.Remove (key); } }

© Habrahabr.ru