Многопоточность в Unity средствами реактивных расширений
В данной статье будут рассмотрены основные проблемы, возникающие при разработке многопоточных мобильных игр средствами Unity, а также способы их решения с помощью UniRx (реактивные расширения для Unity).
Статья состоит из двух частей. Первая посвящена многопоточности для «самых маленьких», в ней доступным языком рассказывается о потоках и способах их создания, о синхронизации потоков. Вторая часть посвящена реактивным расширениям, их устройству, принципу работы и способам применения.
Поскольку одним из языков для написания скриптов в Unity является C#, на котором мы и разрабатываем приложения, весь код будет написан только на нем. Для углубленного понимания принципов многопоточности и реактивных расширений советуем прочесть основы многопоточности и что такое реактивные расширения. Если же читатель знаком с данной темой, то первый раздел можно пропустить.
Многопоточность для самых маленьких
Многопоточными называют приложения, которые выполняют несколько задач одновременно в отдельных потоках. Приложения, использующие многопоточность, более оперативно реагируют на действия пользователя, поскольку пользовательский интерфейс остается активным, в то время как задачи, требующие интенсивной работы процессора, выполняются в других потоках. Многопоточные приложения на языке C# при использовании Mono разрабатываются с помощью ключевых слов: Thread, ThreadPool и асинхронных делегатов.
Давайте рассмотрим многопоточное приложение на примере стройки. Предположим, что каждый рабочий выполняет свои обязанности одновременно с другими рабочими. К примеру, один моет полы, другой моет окна и т.д. (и все это происходит одновременно). Это и есть наши потоки.
Thread — класс, который позволяет создавать новые потоки внутри существующего приложения.
Асинхронные делегаты — асинхронный вызов метода с помощью делегата, который определен с такой же сигнатурой, что и вызываемый метод. Для асинхронного вызова метода необходимо использовать метод BeginInvoke. При таком подходе делегат берет из пула поток и в нем выполняет некий код.
ThreadPool — реализация паттерна «пул объектов». Его смысл в эффективном управлении потоками:: создании, удалении, назначении им какой-то работы. Возвращаясь к строительной аналогии, ThreadPool — это прораб, который контролирует количество строителей на стройке и назначает каждому из них задачу.
Инструменты для синхронизации потоков
Язык C# предоставляет инструменты для синхронизации потоков. Эти инструменты представлены в виде lock и Monitor. Они используются для того, чтобы выполнение блока кода не осуществлялось одновременно несколькими потоками. Но есть один нюанс. Использование этих инструментов может привести к deadlock’у (взаимоблокировке потоков). Это происходит так: поток А ожидает, когда поток В вернет управление, а поток В, в свою очередь, ожидает, когда поток А выполнит заблокированный код. Поэтому многопоточность и синхронизацию потоков необходимо использовать с осторожностью.
Проблемы встроенных механизмов многопоточности в Unity
Основной проблемой, с которой мы сталкиваемся при разработке однопоточных приложений — это UI-фризы, вызванные выполнением сложных операций в основном потоке. В Unity имеется механизм распараллеливания задач, представленный в виде coroutine (корутинов), но он работает в одном потоке, и если запустить в корутине что-либо «тяжеловесное» — привет, фриз. Если нас устраивает параллельное выполнение функций в основном потоке, то можно использовать корутины. Ничего сложного в этом нет, в документации Unity эта тема очень хорошо освещена. Однако, хотелось бы напомнить, что корутины — это итераторы, которые в Unity работают следующим образом:
- первым делом идет регистрация корутина,
- далее, после каждого вызова Update и перед вызовом LateUpdate, Unity опрашивает все зарегистрированные корутины и обрабатывает код, который описан внутри метода, имеющий возвращаемый тип IEnumerator.
Помимо плюсов, корутины также имеют и минусы:
- Невозможно получить возвращаемое значение
private IEnumerator LoadGoogle() { var www = new WWW("http://google.com"); yield return www; //Хочу получить www.text и с ним работать. }
- Обработка ошибок
private IEnumerator LoadGoogle() { try { var www = new WWW("http://google.com"); yield return www; } catch { yield return null; } }
- Костыли с callback’ами
private IEnumerator LoadGoogle(Action<string> callback) { var www = new WWW("http://google.com"); yield return www; if (callback != null) { callback(www.text); } }
- Нельзя обрабатывать тяжеловесные методы в корутинах
void Start() { Debug.Log(string.Format("Thread id in start method = {0}", Thread.CurrentThread.ManagedThreadId)); StartCoroutine(this.HardMethod()); } private IEnumerator HardMethod() { while (true) { Thread.Sleep(1001); Debug.Log(string.Format("Thread id in HardMethod method = {0}", Thread.CurrentThread.ManagedThreadId)); yield return new WaitForEndOfFrame(); } } //Output: //Thread id in start method = 1 //Thread id in HardMethod method = 1 //Thread id in HardMethod method = 1 //Thread id in HardMethod method = 1
Как упоминалось ранее, корутины работают в основном потоке. По этой причине мы получаем фризы, запуская в них тяжеловесные методы.
Ряд этих недостатков легко устраняется с помощью реактивных расширений, которые в дальнейшем принесут еще много различных фич и облегчат разработку.
Что такое реактивные расширения?
Реактивные расширения — это набор библиотек, которые позволяют работать с событиями и асинхронными вызовами в стиле Linq. Задача подобных расширений — упростить написание кода, в котором фигурирует асинхронное взаимодействие. В Unity используется библиотека UniRx, которая предоставляет базовый функционал реактивных расширений. UniRx — реализация реактивных расширений для Unity на базе .NET Reactive Extensions. Почему же нельзя использовать эту родную реализацию? Потому что стандартные RX в Unity не работают. Библиотека является кроссплатформенной и поддерживается на платформах PC/Mac/Android/iOS/WP8/WindowsStore.
Что же предоставляет нам UniRx?
- Многопоточность
- LINQ-подобные методы
- Упрощенный синтаксис асинхронного взаимодействия
- Кроссплатформенность
Как это работает?
Основой реактивных расширений являются интерфейсы IObserver<T>
и IObservable<T>
. Они предоставляют обобщенный механизм для push-уведомления, также известный как шаблон проектирования «Наблюдатель».
- Интерфейс IObservable представляет класс, который отправляет уведомления (поставщик).
Интерфейс IObserver представляет класс, который их получает (наблюдатель).
T представляет класс, предоставляющий информацию для уведомлений.Реализация IObserver подготавливает к получению уведомлений от поставщика (реализация IObservable), передавая свой экземпляр методу поставщика
IObservable<T>.Subscribe
. Этот метод возвращает объект IDisposable, который может использоваться для отказа от подписки наблюдателя до того, как поставщик завершит отправку уведомлений.Интерфейс IObserver определяет три следующих метода, которые должен реализовать наблюдатель:
- Метод OnNext, который обычно вызывается поставщиком для предоставления наблюдателю новых данных или сведений о состоянии.
- Метод OnError, который обычно вызывается поставщиком для указания того, что данные являются недоступными, поврежденными, или у поставщика возникли другие ошибки.
- Метод OnCompleted, который обычно вызывается поставщиком для подтверждения завершения отправки уведомлений наблюдателю.
Также в UniRx реализован Scheduler — основной компонент, с помощью которого реализована многопоточность. Базовые временные операции (Interval, Timer) в UniRx реализованы с помощью MainThread. Это означает, что большинство операций (кромеObservable.Start
) работают в основном потоке и потокобезопасностью, в данном случае, можно пренебречь.Observable.Start
по умолчанию использует ThreadPool Scheduler, это означает, что будет создан поток.С основными понятиями и теоретическими знаниями мы ознакомились, теперь рассмотрим примеры использования UniRx библиотеки.
Пример создания наблюдателя
В данном примере мы попытаемся получить данные из какого-либо интернет-ресурса с помощью библиотеки UniRx. Для скачивания данных с помощью реактивных расширений нам необходимо создать наблюдателя и воспользоваться классомObservableWWW
, который является оберткой над стандартным классомWWW
Unity. МетодGet
использует корутины и возвращает IObservable, к которому мы подпишем наблюдателя. Данный подход позволяет избежать костылей, описанных в разделе «Проблемы встроенных механизмов многопоточности в Unity».private void Start() { var observer = Observer.Create<string>( x => { Debug.Log("OnNext: " + x); }, ex => Debug.Log("OnError: " + ex.Message), () => Debug.Log("OnCompleted")); ObservableWWW.Get("http://qweqweqwe.qwer.qwer/").Subscribe(observer); } //Output: //OnError: Exception of type 'UniRx.WWWErrorException' was thrown.
Если изменить ссылку на адекватную, допустим, на http://www.nixsolutions.com/, то получим следующий результат://Output: //OnNext: ”html код страницы” //OnCompleted
Пример создания последовательности subject
Здесь мы подписались на два Debug.Log’а, первый выполняется всегда, когда срабатывает методOnNext
, а второй срабатывает лишь при условии.void Start() { this.subject = new Subject<int>(); this.subject.Subscribe(x => Debug.Log(x)); this.subject.Where(x => x % 2 == 0).Subscribe(x => Debug.Log(string.Format("Hello from {0}", x))); } // Update is called once per frame void Update() { this.sub.OnNext(this.i++); } //Output: //0 //Hello from 0 //1 //2 //Hello from 2
Пример использования EveryUpdate
Важной фичей в данных расширениях является методEveryUpdate
. Он позволяет вынести код из методов Update и классов-наследниковMonoBehaviour
. Здесь мы проверяем клики мышкой и выводим какой-то текст.Observable.EveryUpdate() .Where(x => Input.GetMouseButton(buttonIndex)) .Subscribe(x => Debug.Log(outputString)); //Output: //Left button pressed //Right button pressed
Пример работы с массивами
Также интересной особенностью данных расширений является работа с массивами. Код, который представлен ниже, при выполнении в однопоточном приложении будет фризить поток отображения.var arr = Enumerable.Range(0, 5); foreach (var i in arr) { Thread.Sleep(1000); Debug.Log(string.Format("Result = {0}, UtcNow = {1}, ThreadId = {2}", i, DateTime.UtcNow, Thread.CurrentThread.ManagedThreadId)); } //Output: //Result = 0, UtcNow = 8/25/2015 1:23:14 PM, ThreadId = 1 //Result = 1, UtcNow = 8/25/2015 1:23:16 PM, ThreadId = 1 //Result = 2, UtcNow = 8/25/2015 1:23:17 PM, ThreadId = 1 //Result = 3, UtcNow = 8/25/2015 1:23:18 PM, ThreadId = 1 //Result = 4, UtcNow = 8/25/2015 1:23:19 PM, ThreadId = 1
Для решения этой проблемы можно использовать UniRx, с явным указанием ThreadPool scheduler, который будет сам распределять нагрузку между потоками.var arr2 = Enumerable.Range(0, 5).ToObservable(Scheduler.ThreadPool); arr2.Subscribe( x => { Thread.Sleep(1000); Debug.Log(string.Format("Result = {0}, UtcNow = {1}, ThreadId = {2}", x, DateTime.UtcNow, Thread.CurrentThread.ManagedThreadId)); }); //Output: //Result = 0, UtcNow = 8/25/2015 1:23:20 PM, ThreadId = 2 //Result = 1, UtcNow = 8/25/2015 1:23:21 PM, ThreadId = 3 //Result = 2, UtcNow = 8/25/2015 1:23:22 PM, ThreadId = 4 //Result = 3, UtcNow = 8/25/2015 1:23:23 PM, ThreadId = 5 //Result = 4, UtcNow = 8/25/2015 1:23:24 PM, ThreadId = 5
Интересной особенностью данного подхода является то, что для каждой итерации будет назначен свободный поток.Пример обработки сложных методов
В этом примере у нас пара сложных методов, которые, при исполнении в основном потоке, зафризят наше приложение. При использовании Rx все будет отлично исполняться, мы получим возвращаемые значения из методов и обработаем их.private void Awake() { var heavyMethod = Observable.Start(() => { var timeToSleep = 1000; var returnedValue = 10; Debug.Log(string.Format("Thread = {0} UtcNow = {1}", Thread.CurrentThread.ManagedThreadId, DateTime.UtcNow)); Thread.Sleep(timeToSleep); return returnedValue; }); var heavyMethod2 = Observable.Start(() => { var timeToSleep = 2000; var returnedValue = 20; Debug.Log(string.Format("Thread = {0} UtcNow = {1}", Thread.CurrentThread.ManagedThreadId, DateTime.UtcNow)); Thread.Sleep(timeToSleep); return returnedValue; }); Observable.WhenAll(heavyMethod, heavyMethod2) .ObserveOnMainThread() .Subscribe(result => { Debug.Log(string.Format("Thread = {0}, first result = {1}, second result = {2} UtcNow = {3}", Thread.CurrentThread.ManagedThreadId, result[0], result[1], DateTime.UtcNow)); }); } //Output: //Thread = 5 UtcNow = 8/25/2015 2:06:55 PM //Thread = 3 UtcNow = 8/25/2015 2:06:55 PM //Thread = 1, first result = 10, second result = 20 UtcNow = 8/25/2015 2:06:57 PM
Пример использование биндингов
Еще одним замечательным механизмом является биндинг. С его помощью можно с легкостью реализовать паттерн MVP. В этом примере моделью является класс Enemy, в котором мы описываем реактивные свойства. СвойствоIsDead
зависит непосредственно отCurrentHp
: когда оно меньше нуля,IsDead
становится = true.public class Enemy { public Enemy(int initialHp) { this.CurrentHp = new ReactiveProperty<long>(initialHp); this.IsDead = this.CurrentHp.Select(x => x <= 0).ToReactiveProperty(); } public ReactiveProperty<long> CurrentHp { get; private set; } public ReactiveProperty<bool> IsDead { get; private set; } }
Presenter отвечает за связь модели и отображения, с его помощью мы можем биндить реактивные свойства модели к частям отображения. КлассMvpExample
является presenter’ом и имеет ссылку как на модель (классEnemy
), так и на отображение (Button
иToggle
). Также благодаря реактивным расширениям у нас есть возможность с помощью кода задавать поведение различным UI-элементам. С помощью методовOnClickAsObservable
иOnValueChangedAsObservable
мы описали поведение Button и Toggle.public class MvpExample : MonoBehaviour { private const int EnemyHp = 1000; [SerializeField] private Button myButton; [SerializeField] private Toggle myToggle; [SerializeField] private Text myText; private void Start() { var enemy = new Enemy(EnemyHp); this.myButton.OnClickAsObservable().Subscribe(x => enemy.CurrentHp.Value -= 99); //При клике на кнопку мы изменяет CurrentHp у экземпляра Enemy this.myToggle.OnValueChangedAsObservable().SubscribeToInteractable(this.myButton); //При изменении состояние Toggle изменятся и состояние кнопки enemy.CurrentHp.SubscribeToText(this.myText); enemy.IsDead.Where(isDead => isDead) .Subscribe(_ => { this.myToggle.interactable = this.myButton.interactable = false; }); } }
Далее мы прибиндили реактивные свойства к UI-элементам. При измененииCurrentHp
уEnemy
, у нас автоматически будет изменяться и текст. КогдаIsDead
изменит свое состояние наtrue
, тогда у нас отключатся и кнопка, иToggle
.Выводы
Использование реактивных расширений при разработке приложений на Unity имеет множество преимуществ. Основным из них является упрощение синтаксиса для построения многопоточных приложений. Количество костылей с корутинами значительно уменьшается, приложение становится более гибким и быстрым. Также при построении многопоточного приложения с помощью UniRx необходимо помнить, что любая часть данных должна быть защищена от изменения их значений множеством потоков.Полезные ссылки: