UDP и C# Reactive Extensions
Недавно прочитал пост о UDP и C# async/await, в котором описание решения несложной задачи по опросу устройств по UDP одним клиентом. Решение задачи с помощью async\await действительно сокращает объем кода, по сравнению с ручной реализацией асинхронных вызовов. С другой стороны создает много проблем с синхронизацией задач, конкурентным доступом к данным и обработкой исключений. Полученное решение очень подвержено ошибкам. Первоначальная версия автора содержала ошибки неосвобождения ресурсов.Можно ли сделать проще и надежнее?
А в чем собственно проблема? Проблема в методе UdpClient.Receive (-Async). Этот метод не реентераблен, то есть если клиент уже ждет прихода датаграммы, то нельзя вызвать этот метод еще раз. Даже если не выпадет ошибка, то вполне можно получить датаграмму, ожидаемую другим «потоком». Поэтому нужно писать дополнительный код, который синхронизирует действия пользователя и состояние UdpClient.async\await и Tasks Parallel Library не имеет готовых средств синхронизации. Нужно или руками писать код, как в исходной статье, или использовать готовые библиотеки, вроде TPL Dataflow. Но, увы, Dataflow очень тяжеловесен.
Reactive Extensions
Вместо TPL Dataflow можно использовать Reactive Extensions (RX). RX описывает асинхронные потоки данных (асинхронные последовательности). В RX много функций, для создания потоков данных и манипуляции ими. RX позволяет работать не только с IO, но и «потоками событий», генерируемыми элементами интерфейса. Это позволяет всю программу описать в виде набора потоков данных.Пример кода
Для решения исходной задачи понадобится в проект добавить библиотеку Rx-Main из NuGet и написать несколько хелперов:
public static IObservable
public static IObservable
public static IObservable
public IObservable
return o.Take (1).Timeout (TimeSpan.FromMilliseconds (timeOut)); } Да-да, RX поддерживает Linq для асинхронных последовательностей.Это Linq выражение довольно тяжело для понимания без знания RX, но суть его очень простая: после получения результата из потока SendObservable подписаться на поток receiveStream и получить только те элементы, которые удовлетворяют предикату в where, вернуть буфер из полученной датаграммы. Далее берется один результат получившейся последовательности и навешивается тайм-аут.Самая важная часть кода — определение receiveStream:
receiveStream = client.ReceiveStream ().Publish ().RefCount (); Горячие, холодные и теплые последовательности Когда вы работаете с последовательностями RX, то важно знать их «температуру».Холодные последовательности — те, которые появляются при появлении подписчика последовательности и исчезают когда подписчик перестает существовать.Метод-расширение ReceiveStream возвращает как раз такую последовательность. Это значит, что у каждого подписчика будет своя последовательность, то есть будут параллельно происходить несколько вызов UdpClient.ReceiveAsync и проблема, описанная в начале, не решается.
Горячие последовательности — которые существуют независимо от подписчиков. Например последовательность движений мыши пользователя. Функция Publish в коде выше позволяет превратить холодную последовательность в горячую. Но это несет другую проблему. Если в конструкторе UdpClient не указать порт и вызывать Receive до вызова Send, то выпадет ошибка.
Поэтому нам нужен промежуточный вариант — последовательность должна быть общей для всех подписчиков и должна существовать, пока есть хотя бы один подписчик. Такая последовательность называется «теплой» и создается вызовом RefCount.
Подписка на события
Для тестирования я написал также функцию «сервера»:
public IDisposable Listen (Func
Заключение Получившийся кож не содержит ни одного цикла, ни одной явной синхронизации, ни одного создания потока или задачи. При этом код полностью асинхронный и безопасный.RX обязательно стоить изучить, даже если вы не будете его использовать. Основная часть Rx была придумана с помощью применения принципа двойственности монад к стандартным интерфейсам IEnumerable и IEnumerator, поэтому RX получился компактный и мощный. Кроме того RX есть и для JavaScript, C++, Java, Scala и Python, Ruby.Исходный код вместе с клиентом и сервером выложил на github — github.com/gandjustas/UdpRxSample.