Многопоточность в Unity средствами реактивных расширений

de38c03330344b7eb6fb78aa98e7d468.jpg

В данной статье будут рассмотрены основные проблемы, возникающие при разработке многопоточных мобильных игр средствами Unity, а также способы их решения с помощью UniRx (реактивные расширения для Unity).

Статья состоит из двух частей. Первая посвящена многопоточности для «самых маленьких», в ней доступным языком рассказывается о потоках и способах их создания, о синхронизации потоков. Вторая часть посвящена реактивным расширениям, их устройству, принципу работы и способам применения.
Поскольку одним из языков для написания скриптов в Unity является C#, на котором мы и разрабатываем приложения, весь код будет написан только на нем. Для углубленного понимания принципов многопоточности и реактивных расширений советуем прочесть основы многопоточности и что такое реактивные расширения. Если же читатель знаком с данной темой, то первый раздел можно пропустить.

Многопоточность для самых маленьких


Многопоточными называют приложения, которые выполняют несколько задач одновременно в отдельных потоках. Приложения, использующие многопоточность, более оперативно реагируют на действия пользователя, поскольку пользовательский интерфейс остается активным, в то время как задачи, требующие интенсивной работы процессора, выполняются в других потоках. Многопоточные приложения на языке C# при использовании Mono разрабатываются с помощью ключевых слов: Thread, ThreadPool и асинхронных делегатов.

Давайте рассмотрим многопоточное приложение на примере стройки. Предположим, что каждый рабочий выполняет свои обязанности одновременно с другими рабочими. К примеру, один моет полы, другой моет окна и т.д. (и все это происходит одновременно). Это и есть наши потоки.

c383baa179a94202a32c4aa270e3ba65.jpg

Thread — класс, который позволяет создавать новые потоки внутри существующего приложения.
Асинхронные делегаты — асинхронный вызов метода с помощью делегата, который определен с такой же сигнатурой, что и вызываемый метод. Для асинхронного вызова метода необходимо использовать метод BeginInvoke. При таком подходе делегат берет из пула поток и в нем выполняет некий код.

ThreadPool — реализация паттерна «пул объектов». Его смысл в эффективном управлении потоками:: создании, удалении, назначении им какой-то работы. Возвращаясь к строительной аналогии, ThreadPool — это прораб, который контролирует количество строителей на стройке и назначает каждому из них задачу.

42446463b557437f9a876c22a4489755.jpg

Инструменты для синхронизации потоков


Язык C# предоставляет инструменты для синхронизации потоков. Эти инструменты представлены в виде lock и Monitor. Они используются для того, чтобы выполнение блока кода не осуществлялось одновременно несколькими потоками. Но есть один нюанс. Использование этих инструментов может привести к deadlock’у (взаимоблокировке потоков). Это происходит так: поток А ожидает, когда поток В вернет управление, а поток В, в свою очередь, ожидает, когда поток А выполнит заблокированный код. Поэтому многопоточность и синхронизацию потоков необходимо использовать с осторожностью.

Проблемы встроенных механизмов многопоточности в Unity


Основной проблемой, с которой мы сталкиваемся при разработке однопоточных приложений — это UI-фризы, вызванные выполнением сложных операций в основном потоке. В Unity имеется механизм распараллеливания задач, представленный в виде coroutine (корутинов), но он работает в одном потоке, и если запустить в корутине что-либо «тяжеловесное» — привет, фриз. Если нас устраивает параллельное выполнение функций в основном потоке, то можно использовать корутины. Ничего сложного в этом нет, в документации Unity эта тема очень хорошо освещена. Однако, хотелось бы напомнить, что корутины — это итераторы, которые в Unity работают следующим образом:

  • первым делом идет регистрация корутина,
  • далее, после каждого вызова Update и перед вызовом LateUpdate, Unity опрашивает все зарегистрированные корутины и обрабатывает код, который описан внутри метода, имеющий возвращаемый тип IEnumerator.


Помимо плюсов, корутины также имеют и минусы:

  1. Невозможно получить возвращаемое значение
    private IEnumerator LoadGoogle()
    {
       var www = new WWW("http://google.com");
       yield return www;
    
       //Хочу получить www.text и с ним работать.
    }
    
  2. Обработка ошибок
    private IEnumerator LoadGoogle()
    {
       try
       {
           var www = new WWW("http://google.com");
           yield return www;
       }
       catch
       {
           yield return null;
       }
    }
    
  3. Костыли с callback’ами
    private IEnumerator LoadGoogle(Action<string> callback)
    {
       var www = new WWW("http://google.com");
       yield return www;
       if (callback != null)
       {
           callback(www.text);
       }
    }
    
  4. Нельзя обрабатывать тяжеловесные методы в корутинах
       void Start()
       {
           Debug.Log(string.Format("Thread id in start method = {0}", Thread.CurrentThread.ManagedThreadId));
           StartCoroutine(this.HardMethod());
       }
    
       private IEnumerator HardMethod()
       {
           while (true)
           {
               Thread.Sleep(1001);
               Debug.Log(string.Format("Thread id in HardMethod method = {0}", Thread.CurrentThread.ManagedThreadId));
               yield return new WaitForEndOfFrame();
           }
       }
    //Output:
    //Thread id in start method = 1
    //Thread id in HardMethod method = 1
    //Thread id in HardMethod method = 1
    //Thread id in HardMethod method = 1
    


Как упоминалось ранее, корутины работают в основном потоке. По этой причине мы получаем фризы, запуская в них тяжеловесные методы.

Ряд этих недостатков легко устраняется с помощью реактивных расширений, которые в дальнейшем принесут еще много различных фич и облегчат разработку.

Что такое реактивные расширения?


Реактивные расширения — это набор библиотек, которые позволяют работать с событиями и асинхронными вызовами в стиле Linq. Задача подобных расширений — упростить написание кода, в котором фигурирует асинхронное взаимодействие. В Unity используется библиотека UniRx, которая предоставляет базовый функционал реактивных расширений. UniRx — реализация реактивных расширений для Unity на базе .NET Reactive Extensions. Почему же нельзя использовать эту родную реализацию? Потому что стандартные RX в Unity не работают. Библиотека является кроссплатформенной и поддерживается на платформах PC/Mac/Android/iOS/WP8/WindowsStore.

Что же предоставляет нам UniRx?

  • Многопоточность
  • LINQ-подобные методы
  • Упрощенный синтаксис асинхронного взаимодействия
  • Кроссплатформенность


Как это работает?


Основой реактивных расширений являются интерфейсы IObserver<T> и IObservable<T>. Они предоставляют обобщенный механизм для push-уведомления, также известный как шаблон проектирования «Наблюдатель».

  • Интерфейс IObservable представляет класс, который отправляет уведомления (поставщик).
    Интерфейс IObserver представляет класс, который их получает (наблюдатель).
    T представляет класс, предоставляющий информацию для уведомлений.

    Реализация IObserver подготавливает к получению уведомлений от поставщика (реализация IObservable), передавая свой экземпляр методу поставщика IObservable<T>.Subscribe. Этот метод возвращает объект IDisposable, который может использоваться для отказа от подписки наблюдателя до того, как поставщик завершит отправку уведомлений.

    Интерфейс IObserver определяет три следующих метода, которые должен реализовать наблюдатель:

    • Метод OnNext, который обычно вызывается поставщиком для предоставления наблюдателю новых данных или сведений о состоянии.
    • Метод OnError, который обычно вызывается поставщиком для указания того, что данные являются недоступными, поврежденными, или у поставщика возникли другие ошибки.
    • Метод OnCompleted, который обычно вызывается поставщиком для подтверждения завершения отправки уведомлений наблюдателю.

    Также в UniRx реализован Scheduler — основной компонент, с помощью которого реализована многопоточность. Базовые временные операции (Interval, Timer) в UniRx реализованы с помощью MainThread. Это означает, что большинство операций (кроме Observable.Start) работают в основном потоке и потокобезопасностью, в данном случае, можно пренебречь. Observable.Start по умолчанию использует ThreadPool Scheduler, это означает, что будет создан поток.

    С основными понятиями и теоретическими знаниями мы ознакомились, теперь рассмотрим примеры использования UniRx библиотеки.

    Пример создания наблюдателя


    В данном примере мы попытаемся получить данные из какого-либо интернет-ресурса с помощью библиотеки UniRx. Для скачивания данных с помощью реактивных расширений нам необходимо создать наблюдателя и воспользоваться классом ObservableWWW, который является оберткой над стандартным классом WWW Unity. Метод Get использует корутины и возвращает IObservable, к которому мы подпишем наблюдателя. Данный подход позволяет избежать костылей, описанных в разделе «Проблемы встроенных механизмов многопоточности в Unity».
    private void Start()
       {
           var observer = Observer.Create<string>(
               x =>
               {
                   Debug.Log("OnNext: " + x);
               },
               ex => Debug.Log("OnError: " + ex.Message),
               () => Debug.Log("OnCompleted"));
    
           ObservableWWW.Get("http://qweqweqwe.qwer.qwer/").Subscribe(observer);
       }
    
    //Output:
    //OnError: Exception of type 'UniRx.WWWErrorException' was thrown.
    
    

    Если изменить ссылку на адекватную, допустим, на http://www.nixsolutions.com/, то получим следующий результат:
    //Output:
    //OnNext: ”html код страницы”
    //OnCompleted
    
    

    Пример создания последовательности subject


    Здесь мы подписались на два Debug.Log’а, первый выполняется всегда, когда срабатывает метод OnNext, а второй срабатывает лишь при условии.
    void Start()
    {
       this.subject = new Subject<int>();
       this.subject.Subscribe(x => Debug.Log(x));
       this.subject.Where(x => x % 2 == 0).Subscribe(x => Debug.Log(string.Format("Hello from {0}", x)));
    }
    
    // Update is called once per frame
    void Update()
    {
       this.sub.OnNext(this.i++);
    }
    
    //Output:
    //0
    //Hello from 0
    //1
    //2
    //Hello from 2
    
    

    Пример использования EveryUpdate


    Важной фичей в данных расширениях является метод EveryUpdate. Он позволяет вынести код из методов Update и классов-наследников MonoBehaviour. Здесь мы проверяем клики мышкой и выводим какой-то текст.
    Observable.EveryUpdate()
        .Where(x => Input.GetMouseButton(buttonIndex))
        .Subscribe(x => Debug.Log(outputString));
    //Output:
    //Left button pressed
    //Right button pressed
    
    

    Пример работы с массивами


    Также интересной особенностью данных расширений является работа с массивами. Код, который представлен ниже, при выполнении в однопоточном приложении будет фризить поток отображения.
    var arr = Enumerable.Range(0, 5);
          foreach (var i in arr)
               {
                   Thread.Sleep(1000);
                   Debug.Log(string.Format("Result = {0}, UtcNow = {1}, ThreadId = {2}",
                             i, DateTime.UtcNow, Thread.CurrentThread.ManagedThreadId));
               }
    //Output:
    //Result = 0, UtcNow = 8/25/2015 1:23:14 PM, ThreadId = 1
    //Result = 1, UtcNow = 8/25/2015 1:23:16 PM, ThreadId = 1
    //Result = 2, UtcNow = 8/25/2015 1:23:17 PM, ThreadId = 1
    //Result = 3, UtcNow = 8/25/2015 1:23:18 PM, ThreadId = 1
    //Result = 4, UtcNow = 8/25/2015 1:23:19 PM, ThreadId = 1
    
    

    Для решения этой проблемы можно использовать UniRx, с явным указанием ThreadPool scheduler, который будет сам распределять нагрузку между потоками.
               var arr2 = Enumerable.Range(0, 5).ToObservable(Scheduler.ThreadPool);
               arr2.Subscribe(
                   x =>
                   {
                       Thread.Sleep(1000);
                       Debug.Log(string.Format("Result = {0}, UtcNow = {1}, ThreadId =
                       {2}", x, DateTime.UtcNow, Thread.CurrentThread.ManagedThreadId));
                   });
    //Output:
    //Result = 0, UtcNow = 8/25/2015 1:23:20 PM, ThreadId = 2
    //Result = 1, UtcNow = 8/25/2015 1:23:21 PM, ThreadId = 3
    //Result = 2, UtcNow = 8/25/2015 1:23:22 PM, ThreadId = 4
    //Result = 3, UtcNow = 8/25/2015 1:23:23 PM, ThreadId = 5
    //Result = 4, UtcNow = 8/25/2015 1:23:24 PM, ThreadId = 5
    
    

    Интересной особенностью данного подхода является то, что для каждой итерации будет назначен свободный поток.

    Пример обработки сложных методов


    В этом примере у нас пара сложных методов, которые, при исполнении в основном потоке, зафризят наше приложение. При использовании Rx все будет отлично исполняться, мы получим возвращаемые значения из методов и обработаем их.
    private void Awake()
       {
           var heavyMethod = Observable.Start(() =>
               {
                   var timeToSleep = 1000;
                   var returnedValue = 10;
                   Debug.Log(string.Format("Thread = {0} UtcNow = {1}",                            Thread.CurrentThread.ManagedThreadId, DateTime.UtcNow));
                   Thread.Sleep(timeToSleep);
                   return returnedValue;
               });
    
           var heavyMethod2 = Observable.Start(() =>
           {
               var timeToSleep = 2000;
               var returnedValue = 20;
               Debug.Log(string.Format("Thread = {0} UtcNow = {1}", Thread.CurrentThread.ManagedThreadId, DateTime.UtcNow));
               Thread.Sleep(timeToSleep);
               return returnedValue;
           });
    
           Observable.WhenAll(heavyMethod, heavyMethod2)
               .ObserveOnMainThread()
               .Subscribe(result =>
               {
                   Debug.Log(string.Format("Thread = {0}, first result = {1}, second result = {2} UtcNow = {3}", Thread.CurrentThread.ManagedThreadId, result[0], result[1], DateTime.UtcNow));
               });
       }
    //Output:
    //Thread = 5 UtcNow = 8/25/2015 2:06:55 PM
    //Thread = 3 UtcNow = 8/25/2015 2:06:55 PM
    //Thread = 1, first result = 10, second result = 20 UtcNow = 8/25/2015 2:06:57 PM
    
    

    Пример использование биндингов


    Еще одним замечательным механизмом является биндинг. С его помощью можно с легкостью реализовать паттерн MVP. В этом примере моделью является класс Enemy, в котором мы описываем реактивные свойства. Свойство IsDead зависит непосредственно от CurrentHp: когда оно меньше нуля, IsDead становится = true.
    public class Enemy
       {
           public Enemy(int initialHp)
           {
               this.CurrentHp = new ReactiveProperty<long>(initialHp);
               this.IsDead = this.CurrentHp.Select(x => x <= 0).ToReactiveProperty();
           }
    
           public ReactiveProperty<long> CurrentHp { get; private set; }
    
           public ReactiveProperty<bool> IsDead { get; private set; }
       }
    
    

    Presenter отвечает за связь модели и отображения, с его помощью мы можем биндить реактивные свойства модели к частям отображения. Класс MvpExample является presenter’ом и имеет ссылку как на модель (класс Enemy), так и на отображение (Button и Toggle). Также благодаря реактивным расширениям у нас есть возможность с помощью кода задавать поведение различным UI-элементам. С помощью методов OnClickAsObservable и OnValueChangedAsObservable мы описали поведение Button и Toggle.
    public class MvpExample : MonoBehaviour
    {
       private const int EnemyHp = 1000;
    
       [SerializeField]
       private Button myButton;
    
       [SerializeField]
       private Toggle myToggle;
    
       [SerializeField]
       private Text myText;
    
       private void Start()
       {
           var enemy = new Enemy(EnemyHp);
    
           this.myButton.OnClickAsObservable().Subscribe(x => enemy.CurrentHp.Value -= 99); //При клике на кнопку мы изменяет CurrentHp у экземпляра Enemy
           this.myToggle.OnValueChangedAsObservable().SubscribeToInteractable(this.myButton); //При изменении состояние Toggle изменятся и состояние кнопки
    
           enemy.CurrentHp.SubscribeToText(this.myText);
           enemy.IsDead.Where(isDead => isDead)
               .Subscribe(_ =>
                   {
                       this.myToggle.interactable = this.myButton.interactable = false;
                   });
       }
    }
    
    

    Далее мы прибиндили реактивные свойства к UI-элементам. При изменении CurrentHp у Enemy, у нас автоматически будет изменяться и текст. Когда IsDead изменит свое состояние на true, тогда у нас отключатся и кнопка, и Toggle.

    Выводы


    Использование реактивных расширений при разработке приложений на Unity имеет множество преимуществ. Основным из них является упрощение синтаксиса для построения многопоточных приложений. Количество костылей с корутинами значительно уменьшается, приложение становится более гибким и быстрым. Также при построении многопоточного приложения с помощью UniRx необходимо помнить, что любая часть данных должна быть защищена от изменения их значений множеством потоков.

    Полезные ссылки:

© Habrahabr.ru