Project Loom: Современная маcштабируемая многопоточность для платформы Java

j7qnpzyfew1ltbzchql_9qmcjhq.png

Эффективное использование многочисленных ядер современных процессоров — сложная, но всё более важная задача. Java была одним из первых языков программирования со встроенной поддержкой concurrency. Ее concurrency-модель, основанная на нативных тредах, хорошо масштабируется для тысяч параллельно выполняющихся стримов, но оказывается слишком тяжеловесной для современного реактивного программирования с сотнями тысяч параллельных потоков.

Ответ на эту проблему — Project Loom. Он определяет и реализует в Java новые легковесные параллельные примитивы.

Алан Бейтман, руководитель проекта OpenJDK Core Libraries Project, потратил большую часть последних лет на проектирование Loom таким образом, чтобы он естественно и органично вписывался в богатый набор существующих библиотек Java и парадигм программирования. Об этом он и рассказал на Joker 2020. Под катом — запись с английскими и русскими субтитрами и перевод его доклада.


Меня зовут Алан Бейтман, я работаю в группе Java Platform в Oracle, преимущественно над OpenJDK. Сегодня я буду говорить о Project Loom.

Мы занялись этим проектом в конце 2017 года (точнее, технически в начале 2018-го). Он появился как проект в OpenJDK для того, чтобы упростить написание масштабируемых многопоточных приложений. Цель в том, чтобы позволить разработчикам писать масштабируемые многопоточные приложения в так называемом синхронном стиле. Это достигается путем доведения базовой единицы многопоточности — потока — до такой легковесности, чтобы им можно было представлять любую параллельную задачу. Даже задачи, которые блокируются или выполняются в течение длительного времени.

Кто-то говорит: чем больше несвязанных запросов поступает в серверное приложение, тем большей степени многопоточности можно достичь.

План выступления такой:


  1. Начну с пары слов о мотивации этого проекта.
  2. Поговорю о том, как мы имплементировали эти так называемые легкие потоки.
  3. Переключусь на IDE и покажу несколько демо, напишу немного кода.
  4. Наконец, рассмотрю другие аспекты проекта.

Платформа Java (и язык, и JVM) во многом построена на концепции потоков:


  • Если вы сталкиваетесь с исключением, то получаете трассировку стека определенного потока.
  • Вы можете связать некоторые данные с потоками, используя ThreadLocal.
  • Если вы находитесь в отладчике и выполняете пошаговое выполнение кода, вы шагаете по выполнению потока. Когда вы нажимаете step over, это означает переход к следующей инструкции в потоке, с которым вы работаете.
  • А когда вы находитесь в профайлере, профайлеры обычно группируют данные по потокам, сообщают вам, какие потоки выполняются и что они делают.

В общем, всё, что касается платформы и инструментов, связано с потоками.

В Java API поток означает java.lang.Thread. В реализации JDK есть только одна реализация потока, которая фактически основана на потоке операционной системы. Между java.lang.Thread и потоком ОС существует связь один-к-одному. Те из вас, кто уже давно работает с платформой Java, могут вспомнить зелёные потоки в ранних выпусках JDK. Я немного расскажу об этом позже. Но по меньшей мере последние 20 лет, когда мы говорим о java.lang.Thread, мы говорим о тонкой оболочке вокруг потока ОС.

Cами потоки ОС ничего не знают о Java. Они очень общие. Обычно они должны поддерживать множество разных языков и сред выполнения. Они ничего не знают о том, как эти языки и программы используют стек, и им обычно приходится выделять очень большой фиксированный стек, чтобы поддерживать универсальность
программ, которые должны выполняться.

Другой аспект потоков операционной системы заключается в том, что они проходят через ядро ОС ​​для переключения контекста, что обычно требует
значительное количество времени. В течение многих лет это было около микросекунды или больше.

Еще одна особенность потоков операционной системы заключается в том, что ядро ОС ​​должно иметь некоторую форму планирования. Ему нужно выбрать, какой поток
запускать на каждом ядре процессора. Веб-серверы ведут себя совсем иначе, чем, например, поток с вычислениями для воспроизводения видео. Планировщик ОС должен быть очень общим и в некотором роде компромиссным, чтобы поддерживать все различные варианты использования.

Давайте немного поговорим об использовании потоков. Можно писать многопоточные приложения в стиле, который мы называем синхронным. Простой, императивный блокирующий синхронный код. Задачи выполняются в одном потоке от начала до конца. Код обычно очень легко читать и писать. Именно так большинство из нас, я думаю, учились его писать. Мы научились работать так с одним потоком до того, как узнали, что такое поток.

Этот синхронный стиль очень хорошо сочетается с дизайном языка Java. Он очень хорошо сочетается с инструментами. И в целом, как любит говорить мой коллега Рон Пресслер, гармонично сочетается с платформой. Но поскольку поток по сути представляет собой тонкую оболочку вокруг потока ОС, это ограниченный ресурс.

Если каждая транзакция или запрос использует поток, то максимальное количество потоков — это максимальное количество транзакций, которые система может обрабатывать за раз. По сути, это наш уровень многопоточности.

А с современным сервером теоретически вы можете иметь миллионы сетевых подключений. Я видел, как Хайнц Кабуц делает демо с Project Loom, где он фактически использовал два миллиона соединений. И серверы могут поддерживать подобное, если у них достаточно памяти.

Итак, если у вас миллионы сетевых подключений, но только тысячи активных потоков, потоки становятся вашим ограничивающим фактором, если на каждое соединение приходится один поток. Так что они ограничивают наш уровень многопоточности. И это, безусловно, может сильно повлиять на пропускную способность.

Ладно, что нам с этим делать?

Если потоки — дорогой ресурс, почему бы нам ими не делиться? Это означает пулы потоков. Вместо создания потока для каждого запроса или транзакции мы заимствуем поток из пула, выполняем транзакцию, а затем возвращаем его в пул.

Но у пулов много проблем. Думаю, большинство тех, кто их использовал, знает о проблеме того, как они засоряют ThreadLocals. Эта так называемая утечка памяти с пулами потоков существует долгое время. Также проблематична отмена: если я хочу отменить транзакцию, но она почти завершена, я, возможно, прерываю поток после того, как он был взят для другой транзакции.

И даже если бы мы решили приведенные проблемы, этого все равно недостаточно, потому что мы все еще выполняем транзакцию от начала до конца в одном потоке. Это занимает его на протяжении всей операции, а уровень многопоточности по-прежнему ограничен количеством потоков, которые может обработать ОС.

Мы можем заметить, что для большого количества запросов и транзакций,
которые не упираются в CPU, поток проводит большую часть своего времени в ожидании, в блокировке, ожидая базу данных, IO, что-то еще. А нам, на самом деле, не нужен поток, когда он ожидает.

Предположим, что мы использовали подход, при котором поток во время ожидания возвращается в пул. Это позволяет одновременно выполнять гораздо больше транзакций. Проблема в том, что многие API, блокирующие API, удерживают поток на протяжении какой-либо операции — в ожидании блокировки, ожидании данных в сокете.

Это приводит нас к созданию новых API, по существу, несовместимых со старыми. Или в итоге у нас есть синхронные и асинхронные версии API.

Другой вопрос, что это вынуждает нас разделять транзакцию или запрос на мелкие кусочки, где разные части работают на разных потоках. Большая проблема с этим — мы теряем всякую связь с потоками, контекст становится очень и очень трудно отслеживать. Например, если появилось исключение, которое было брошено для определенной части или определенного этапа транзакции, — у меня нет общего контекста, где оно на самом деле было брошено.

Когда мы отлаживаем, это будет отладка лишь этапа транзакции. Если мы профилируем, то не увидим многого о том, что на самом деле происходит. Потому что многие транзакции выглядят ничего не делающими, они просто ждут IO. Профайлер, который смотрит на незанятый пул потоков, не особо много видит.

Я говорю здесь о том, что мы называем асинхронным стилем. Его преимущество в том, что он очень масштабируемый. Количество транзакций больше не ограничено количеством потоков. Но его трудно читать и иногда трудно поддерживать, потому что мы потеряли контекст.

Таким образом, в целом это позволяет нам лучше использовать аппаратные ресурсы, но наше приложение сложнее писать и поддерживать.

Что приводит нас к дилемме.

nyfyt_xjvzsm4now8_arj9ew3ue.png

Разработчики могут написать простой синхронный код и потратить больше денег на оборудование. Или, если вы хотите эффективно использовать оборудование и управлять асинхронным приложением, тогда мы больше платим разработчикам.

Итак, как нам решить эту дилемму? Что, если бы мы могли снизить стоимость потоков и иметь их неограниченное количество? Тогда мы могли бы написать простой синхронный код, который гармонирует с платформой, полностью использует оборудование и масштабируется как асинхронный код. Project Loom именно об этом.


Давайте пойдем дальше и поговорим немного об API.

Если Project Loom снижает стоимость потоков, то как это будет отражаться на разработчиках и на API? Эта проблема сложнее, чем кажется на первый взгляд, и мы потратили более двух лет на борьбу с ней.

Один из вариантов, с которого мы начали и к которому в итоге вернулись, — это
использование для легких форм потоков java.lang.Thread. Это старый API, который существует с JDK 1.0. Проблема в том, что у него много «багажа». Там есть такие вещи, как группы потоков, загрузчик классов контекстов потоков. Есть множество полей и других API, которые связаны с потоками, которые просто не интересны.

Другой вариант — начать все сначала и ввести совершенно новую
конструкцию или новый API. Если вы с самого начала интересовались Project Loom, возможно, вы видели некоторые из ранних прототипов, где мы представили для дешевых легких потоков совершенно новый API под названием fiber.

Помимо изучения API, мы также много изучали, как люди используют потоки. Оказалось, что одни их части используются очень широко, другие — в меньшей степени. Thread.currentThread () используется и прямо и косвенно везде, например, для блокировки.

Вопрос, который часто возникает в викторинах: «Сколько раз Thread.currentThread () используется при первом использовании популярной библиотеки логирования?» Люди, не знающие ответа на этот вопрос, могут ответить 2 или 5. Правильный ответ — 113.

Другой широко используемый аспект потока — это ThreadLocals. Они используются везде, что иногда не радует. Если сломать Thread.currentThread () или ThreadLocals, то в контексте этих новых более дешевых потоков будет не запустить много уже существующего кода. Поэтому вначале, когда у нас был fiber API, нам пришлось эмулировать Thread API, чтобы существующий код запускался в контексте того, что называлось в то время fiber. Таким образом, мы могли уйти от кода, использующего Thread, не повредив нарыв.

Итак, .currentThread () и Threadlocals очень широко используются. Но в потоках есть и редко используемый «багаж». И здесь нам немного помогает расширенная политика депрекации. Если некоторые из этих старых областей со временем могли бы исчезнуть, подвергувшись депрекации, окончательной депрекации и, в конечном итоге, удалению — тогда, может быть, удастся жить с java.lang.Thread.

Два года исследований, около пяти прототипов — и мы пришли к выводу, что избежать
гравитационного притяжения 25 лет существующего кода невозможно. Эти новые дешевые потоки будут представлены с существующим API java.lang.Thread. То есть java.lang.Thread будет представлять и потоки ОС, и новые дешевые потоки.

Мы также решили дать этим новым потокам имя. Оно появилось благодаря Брайану Гетцу, он придумал название «виртуальный поток» (virtual thread).

Использование привычного Thread — хорошая новость для разработчиков. Нет новой модели программирования, нет новых концепций для изучения, вместо этого вам фактически придется отучиться от некоторых старых привычек. Когда я говорю «отучиться», я имею в виду такие вещи, как пулы потоков, ThreadLocals и так далее.

Как реализованы эти виртуальные потоки?

aool_zgqdmmt0fybdnryk2u7ah8.png

Они мультиплексируются поверх небольшого пула потоков операционной системы. Я сказал «потоки» во множественном числе, и вот тут уместно вспомнить уже упомянутые green threads. Ранние выпуски JDK, особенно 1.0.1.1 с классической виртуальной машиной, поддерживали модель, где потоки мультиплексировались в один-единственный поток ОС. То, что мы делаем теперь, перекликается с этим, но сейчас речь о более чем одном потоке ОС.

Итак, у нас есть набор потоков, на которые эти виртуальные потоки мультиплексируются. Под капотом виртуальная машина HotSpot была обновлена ​​для поддержки новой конструкции: scoped stackful one-shot delimited continuations. Виртуальные потоки объединяют континуации в HotSpot с планировщиками в библиотеке Java. Когда код, выполняющийся в виртуальном потоке, блокируется, скажем, в операции блокировки или в блокирующей IO-операции, соответствующая континуация приостанавливается, стек потока, на концептуальном уровне, вымещается в кучу Java, а планировщик выберет и возобновит другой виртуальный поток в этом же потоке ОС. Исходный виртуальный поток может быть возобновлен в том же потоке ОС или в другом.

Таким образом, код, в котором есть остановка и возобновление, может делать это в разных потоках ОС с течением времени. Есть небольшой набор потоков операционной системы, который обычно как минимум соответствует количеству ядер в системе. Они поддерживают выполнение многих виртуальных потоков, осуществляя прогресс кусочек за кусочком.

Пользовательский код, использующий API Java, не знает о распределении, которое
происходит под капотом, а yield и resume происходит глубоко в библиотеках JDK, поэтому мы говорим, что планирование является вытесняющим и не требует сотрудничества со стороны кода пользователя.

С точки зрения стоимости, поток ОС слева, виртуальный поток справа.

1jginepczgtbfhcr-_l5d8xobqq.png

Обычно операционная система резервирует около мегабайта стека для потока операционной системы. Некоторые ядра выделяют дополнительные данные ядра, и 16КБ не редкость. Это то, что операционная система имеет на поток ОС. Кроме того, виртуальная машина HotSpot добавляет к этому пару КБ метаданных.

Виртуальные потоки намного дешевле, текущий прототип составляет около 256 байт на виртуальный поток. Еще есть стек, он уменьшается и увеличивается по мере необходимости и обычно составляет пару КБ — в этом главное преимущество
виртуальных потоков перед потоками ОС.

Переключение задач также немного лучше, в типичных ОС оно составляет около микросекунды, в некоторых случаях может быть хуже. Лучший вариант с виртуальными потоками на данный момент работает лучше, около пары сотен наносекунд. Меньший размер и лучшее время переключения контекста означают, что мы можем иметь столько потоков, сколько захотим, и уровень многопоточности может
расти без ограничений.

Самое время перейти от слайдов к IDE и показать вам несколько примеров в коде.


У меня открыта IDE с пустым методом, и мы начнем с самого начала.

import ...

public class Demo {
    public static void main(String[] args) throws Exception {...}

    void run() throws Exception {

    }
}

Я упомянул, что мы ввели новый фабричный метод, и начну с использования фабричного метода Thread.startVirtualThread ().

import ...

public class Demo {
    public static void main(String[] args) throws Exception {...}

    void run() throws Exception {

        Thread thread = Thread.startVirtualThread(() -> System.out.println("hello"));
        thread.join();

    }
}

Вывел сообщение «hello», ничего особенного. Это немного отличается от использования конструкторов и метода start (), здесь всего лишь один фабричный метод.

Я изменю тело лямбда-выражения, просто выведу трассировку стека, чтобы вы могли видеть, что на самом деле происходит.

void run() throws Exception {

    Thread thread = Thread.startVirtualThread(Thread::dumpStack);
    thread.join();

}

Этот референс-метод просто вызывает дамп стека в контексте виртуального потока.

ikjf1sp1ckrt42vc8ljizbxlozy.png

Возможно, это выглядит немного иначе, чем то, что вы видели бы с обычным java.lang.Thread, потому что фреймы, которые вы видите здесь, не те, что вы видите в обычном JDK. Это своего рода эквивалент запуска потока, потому что виртуальный поток запускает континуацию. Это дает представление о том, в чем вы можете увидеть разницу.

Давайте рассмотрим еще один из аспектов API. Что делает этот startVirtualThread()?

В дополнение к введению фабричных методов для создания виртуальных потоков, текущий прототип имеет новый билдер для создания потоков. Создадим билдер с помощью метода Thread.builder(). Мы можем при этом вызвать несколько методов, позволяющих настроить поток: является ли он потоком-демоном, какое у него имя и некоторые другие аспекты.

hvre0vuq6hmrpsnsjukmi_oqkw8.jpeg

В числе этих методов есть virtual (). Создание виртуального потока cо startVirtualThread(), было, по сути, тем же самым. Вот длинная форма того, что я сделал минуту назад:

void run() throws Exception {

    Thread thread = Thread.builder().virtual().task(() -> {
        System.out.println("hello");
    }).start();

    thread.join();

    }

}

Мы снова сделали то же самое многословнее, но теперь использовали билдер потоков. А он избавляет нас от того, чтобы сначала использовать конструктор для создания потоков, а затем вызывать setDaemon () или setName (). Это очень полезно.

Это хорошее улучшение API для тех, кто в конечном итоге использует Thread API напрямую. Запускаем — и получаем то же, что и в случае с startVirtualThread ().

Еще мы можем создать ThreadFactory.

void run() throws Exception {
    ThreadFactory factory = Thread.builder().name(prefix:"worker-", start:0).factory();
}

Это создает фабрику потоков — она создает потоки, которые называют себя worker-0, worker-1, worker-2 и так далее. На самом деле worker — это только начальный аффикс, который добавляется к префиксу. Это еще один полезный способ создания фабрик потоков.

Покажу вам простой пример того, что вы можете делать с билдером потоков, используя возможности, которые дают нам дешёвые виртуальные потоки.

Большинство людей фактически не используют Thread API напрямую. Начиная с JDK 5, они перешли на использование ThreadExecutor и других API из java.util.concurrent.

Я хочу показать вам использование одного из этих ThreadExecutor. Мы создадим множество потоков и покажем вам, что на самом деле происходит.

Я собираюсь создать ExecutorService executor:

try (ExecutorService executor = Executors.newVirtualThreadExecutor()) {

}

Этот фабричный метод для Executors создает виртуальные потоки. Обратите внимание, что здесь я использую try-with-resources. Одна из вещей, которые мы сделали в Loom, — мы модернизировали ExecutorService для расширения AutoCloseable, чтобы вы могли использовать их с конструкцией try-with-resources.

Приятная вещь в том, что когда он закрыт, как на примере сверху, он гарантирует, что все задачи, которые являются потоками в этом случае, будет завершены до того, как завершится закрытие. На самом деле закрытие будет ждать, пока все задачи или потоки не будут завершены, что очень полезно. Этот тип Executor’а создает поток для каждой задачи, поэтому он сильно отличается от пулов потоков, которые вы обычно создаете с помощью фабричного класса Executors.

Давайте создадим здесь миллион потоков.

import ...

  public class Demo {
      public static void main(String[] args) throws Exception {...}

      void run() throws Exception {
          try (ExecutorService executor = Executors.newVirtualThreadExecutor()) {
              IntStream.range(0, 1_000_000).forEach(i -> {
                  executor.submit(() -> { });
              });
          }
      }

      String fetch(String url) throws IOException {...}

      void sleep(Duration duration) {...}
  }

Я использую IntStream.range (), вместо цикла for. Это вызовет метод executor.submit () один миллион раз, он создаст миллион потоков, которые ничего не делают. Если это запустить, ничего интересного не произойдет — «Process finished with exit code 0».

Для наглядности давайте добавим счетчик, который будет обновляться каждым из потоков.

import ...

public class Demo {
    public static void main(String[] args) throws Exception {...}

    void run() throws Exception {
        AtomicInteger counter = new AtomicInteger();
        try (ExecutorService executor = Executors.newVirtualThreadExecutor()) {
            IntStream.range(0, 1_000_000).forEach(i -> {
                executor.submit(counter::incrementAndGet);
            });
        }
        System.out.println(counter.get());
    }

    String fetch(String url) throws IOException {...}

    void sleep(Duration duration) {...}
  }

Мы создаем Executor, отправляем миллион задач, каждая из этих задач будет увеличивать счетчик, и когда все закончится, выводится значение счетчика. Если все работает правильно, как у нас, должен быть выведен миллион.

Отрабатывает быстро — как видите, эти потоки очень дешевы в создании.

Давайте покажу вам, что еще мы можем делать с Executor’ами. У меня есть метод, который просто принимает байты из определенного URL-адреса, создает из него строку. Это не очень интересно — разве что то, что это блокирующая операция.

String fetch(String url) throws IOExpection {
    try (InputStream in = URI.create(url).toURL().openStream()) {
        byte[] bytes = in.readAllBytes();
        return new String(bytes, charsetName:"ISO-8859-1");
    }
}

Он создает сетевые подключения, HTTP-соединение будет ожидать результатов от сервера, мы просто воспользуемся этим как частью примера.

Давайте посмотрим вот на что:

void run() throws Exception {

       try (ExecutorService executor = Executors.newVirtualThreadExecutor()) {

           Callable task1 = () -> fetch(url:"https://jokerconf.com/");
           Callable task1 = () -> fetch(url:"https://jokerconf.com/en");

           String first = executor.invokeAny(List.of(task1, task2));
           System.out.println(first.length());
       }     
}

Вот одна задача, которая получает HTML-страницу с jokerconf.com. Я собираюсь создать вторую задачу, которая будет делать то же самое, но будет получать английскую версию страницы. Если кто-то говорит на двух языках, то он сможет читать и русскую и английскую страницу, это не имеет значения.

Мы используем executor.invokeAny() и даем ему две задачи.
ExecutorService имеет несколько комбинаторов, invokeAny(), invokeAll(), они существуют уже давно. Мы можем использовать их с виртуальными потоками.

В данном случае мы подставим в first результат, в зависимости от того, какая из этих задач будет выполнена первой.

Я запущу два виртуальных потока. Один из них получит первую страницу, другой — вторую, в зависимости от того, что вернется первым, я получу результат в String first. Другой будет отменен (прерван). Запускаем — и получаем результат:»200160», то есть одна из страниц размером 200 КБ.

Итак, что произошло: были созданы два потока, один выполнял блокирующую операцию получения данных с первого URL-адреса, другой — со второго URL-адреса, и я получил то, что пришло первое. Если запущу еще пару раз, буду получать разные значения: одна из страниц всего 178 КБ, другая — 200 КБ.

Это один из комбинаторов. На самом деле, я бы мог хотеть обе страницы и что-то с ними сделать, в этом случае я мог бы использовать invokeAll().

void run() throws Exception {

       try (ExecutorService executor = Executors.newVirtualThreadExecutor()) {

           Callable task1 = () -> fetch(url:"https://jokerconf.com/");
           Callable task1 = () -> fetch(url:"https://jokerconf.com/en");

           executor.invokeAll(List.of(task1, task2)); List>Future>String>>
                   .stream() Stream>
                   .map(Future::join) Stream
                   .map(String::length) Stream
                   .forEach(System.out.println);
       }     
}

Как видите, это не слишком интересно — всё, что мы здесь делаем, это invokeAll(). Мы выполним обе задачи, они выполняются в разных потоках. InvokeAll() блокируется до тех пор, пока не будет доступен результат всех задач, потому что вы получаете здесь Future, которые гарантированно будут выполнены. Создаем поток, получаем результат, получаем длины, а затем просто выводим их. Получаем 200 КБ и 178 КБ. Вот что вы можете делать с ExecutorService.

Покажу вам еще кое-что. В рамках Loom мы немного поработали над CompletableFuture, чтобы вы могли делать подобное. Добавить задачу и получить CompletableFuture, а не Future, и тогда я смогу написать такой код:

void run() throws Exception {

       try (ExecutorService executor = Executors.newVirtualThreadExecutor()) {

           Callable task1 = () -> fetch(url:"https://jokerconf.com/");
           Callable task1 = () -> fetch(url:"https://jokerconf.com/en");

           CompletableFuture future1 = executor.submitTask(task1);
           CompletableFuture future2 = executor.submitTask(task2);

           CompletableFuture.completed(future1, future2) Stream>
                   .map(Future::join) Stream
                   .map(String::length) Stream
                   .forEach(System.out.println);
       }     
}

Я вызываю в CompletableFuture-метод под названием completed (). Это возвращает мне стрим, который заполняется Future в ленивом режиме по мере их завершения. Это намного интереснее, чем invokeAll (), который я показал ранее, поскольку метод не блокируется, пока не будут выполнены все задачи. Вместо этого поток заполняется результатом в ленивом режиме. Это похоже на стримо-подобную форму CompletionService, если вы когда-нибудь такое видели.

Я запущу несколько раз, порядок, вероятно, будет случайным. В любом случае, это дает вам представление о других вещах, которые вы можете делать с CompletableFuture, стримами и виртуальными потоками.

Еще одна вещь, которую я хочу сделать, забегая вперед. Мы еще поговорим об этом подробнее после демо. В прототипе есть ограничение. Виртуальные потоки делают то, что мы называем закреплением потока ОС, когда мы пытаемся выполнить IO-операции, удерживая монитор. Я объясню это лучше после демо, но пока у меня открыта IDE, покажу вам это на практике и объясню, на что это влияет.

import ...

public class Demo {
    public static void main(String[] args) throws Exception {...}

    void run() throws Exception {

        Thread.startVirtualThread(() ->

            sleep(Duration.ofSeconds(2));

        }).join();
    }

    String fetch(String url) throws IOException {...}

    void sleep(Duration duration) {...}
  }

Виртуальный выполняет блокирующую операцию, спит две секунды, завершается. Пока не слишком интересно. Теперь предположим, что он должен спать, пока держит монитор.

void run() throws Exception {

        Thread.startVirtualThread(() -> {

            Object lock = new Object();
            synchronized (lock) {
                sleep(Duration.ofSeconds(2));
            }
        }).join();

}

Я запускаю это с диагностическим свойством, которое даст мне трассировку стека, когда поток закреплен.

tz_qgxrkvey3b-1472p5jux2_gw.png

Мы видим трассировку стека, которая говорит мне, что поток был закреплен. Она снабжена примечаниями, которые говорят мне, где удерживается монитор. Это ограничение в нашем текущем прототипе. По сути, потоки могут блокироваться, удерживая монитор. Поток закрепляет соответствующий поток ОС, эта скорее качество сервиса текущей реализации, чем ошибка. Я вернусь к тому, что мы делаем c этим, через несколько минут. Это своего рода базовое демо.

У меня есть более полная демонстрация, переключусь для этого на немного другой проект.

package demo;

import ...

@Path("/")
public class SleepService {

    @GET
    @Path("sleep")
    @Producers(MediaType.APPLICATION_JSON)
    public String sleep(@QueryParam("millis") long millis) throws Exception {
        Thread.sleep(millis);
        return "{ \"millis\": \"" + millis + "\" };
    }
}

Уже существуют несколько серверов, которые работают с виртуальными потоками.
Есть сервер Helidon MP. Я думаю, MP означает MicroProfile. Helidon настроен, они недавно внесли некоторые изменения, теперь вы можете запустить его со свойством, при котором он будет запускать каждый запрос в отдельном виртуальном потоке. Мой код может выполнять операции блокировки, и они не будут закреплять базовый поток ОС. У меня может быть намного больше запросов, чем потоков, работающих одновременно и выполняющих блокирующие операции, это действительно очень полезно.

Первый сервис, который я вам покажу, — что-то вроде эквивалента «hello world» при использовании подобных служб. Запускаем код из примера выше, переходим в окно терминала и вводим curl-команду.

3p8modvrtjuv_pzb4lpufrfbfoe.png

Curl-команда кодирует параметр миллисекунд обратно в JSON, который возвращается.
Не слишком интересно, потому что все, что было сделано, — это сон. Остановлю сервер и вставлю Thread.dumpStack ():

public String sleep(@QueryParam("millis") long millis) throws Exception {
    Thread.dumpStack();
    Thread.sleep(millis);
    return "{ \"millis\": \"" + millis + "\" };
}

Снова запущу сервер. Я снова выполняю команду curl, которая устанавливает HTTP-соединение с сервером, она подключается к эндпоинту сна, параметр — millis=100.

curl http://localhost:8081/sleep?millis=100

Посмотрим на вывод: печатается трассировка стека, созданная Thread.dumpStack() в сервисе.

cowycseie3smsubzutmoeivrkfk.png

Огромная трассировка стека, мы видим здесь кучу всего: код Helidon, код Weld, JAX-RS… Довольно интересно просто увидеть это всё. Это сервер, который создает виртуальный поток для каждого запроса, что довольно интересно.

Теперь посмотрим на более сложный сервис. Я показал вам комбинаторы
invokeAny и involeAll в простом демо в самом начале, когда показывал новый ExecutorService.

import ...

@Path("/")
public class AggregatorServices {

    @GET
    @Path("anyOf")
    @Produces(MediaType.APPLICATION_JSON)
    public String anyOf(@QueryParam("left") String left,
                        @QueryParam("right") String right) throws Exception {
        if (left == null || right == null) {
            throw new WebApplicationException(Response.Status.BAD_REQUEST);
        }

        try (var executor :ExecutorService = Executors.newVirtualThreadExecutor()) {
            Callable task1 = () -> query(left);
            Callable task2 = () -> query(right);

            // return the first to succeed, cancel the other
            return executor.invokeAny(List.of(task1, task2));
        }
    }

    @GET
    @Path("allOf")
    @Produces(MediaType.APPLICATION_JSON)
    public String allOf(@QueryParam("left") String left,
                        @QueryParam("right") String right) throws Exception {

        if (left == null || right == null) {
            throw new WebApplicationException(Response.Status.BAD_REQUEST)
        }

        try (var executor :ExecutorService = Executors.newVirtualThreadExecutor()) {
            Callable task1 = () -> query(left);
            Callable task2 = () -> query(right);

            // if one falls, the other is cancelled
            return executor.invokeAll(List.of(task1, task2), cancelOnException: true) List>
                    .stream() Stream>
                    .map(Future::join) Stream
                    .collect(Collectors.joining(delimiter:", ", prefix:"{", suffix:" }"));
        }
    }

    private String query(String endpoint) {...}
}

Здесь у нас несколько сервисов, они находятся в этом исходном файле под названием AggregatorServices. Здесь есть две службы, два метода я бы сказал: anyOf и allOf. anyOf выполняет левый и правый запросы и выбирает тот, который возвращается первым, а другой отменяет.

Начнем с anyOf. Я вызвал curl-команду:

curl http://localhost:8081/anyOf?left=/greeting\&right=/sleep?millis=200

localhost:8081 — это текущий порт, эндпоинт — anyOf, и я дал ей два параметра — left и right. Я выполняю это и получаю «hello world»:

{"message":"Hello World!"}$

Причина в том, что сервис приветствия просто выводит «hello world», а сервис сна спит 200 мс. Я предполагаю, что большую часть времени «hello world» будет быстрее, чем 200 мс, и всегда будет возвращаться «hello world».

Если я уменьшу сон до 1 мс, то, возможно, сервис сна завершится раньше, чем другой сервис.

Теперь давайте изменим запрос на allOf, который объединит два результата:

curl http://localhost:8081/allOf?left=/greeting\&right=/sleep?millis=1

Запускаю и получаю два результата.

{ {"message":"Hello World!"}, { "millis": "1" } }$

Что интересно в allOf, он делает два запроса параллельно.

private String query(String endpoint) {
        URI uri = URI.create("http://localhost:8081").resolve(endpoint);
        return ClientBuilder.newClient() Client
                    .target(uri) WebTarget
                    .request(MediaType.APPLICATION_JSON) Invocation.Builder
                    .get(String.class);
    }

Кстати, это блокирующий код. Он использует клиентский API JAX-RS для подключения к этому эндпойнту. Он использует вызов invokeAll (), а затем .stream (), .map для получения результата, а затем Collectors.joining (), для объединения в JSON.

Это простой пример разветвления. Интересно то, что тут invokeAll () — это вариант, в котором есть параметр cancelOnException. Если вы хотите вызвать несколько задач одновременно, но если одна из них не работает, вы отменяете все остальные. Это важно сделать, чтобы не застрять в ожидании завершения всех остальных задач.

В этих примерах я использую сборку для раннего доступа Loom. Мы очень близки к первому рампдауну JDK 16, поэтому код, над которым я работаю — это JDK 16, каков он сейчас, плюс вся реализация Loom.


Поговорим об ограничениях.

В настоящее время существует два сценария, в которых виртуальные потоки, пытающиеся уступить, не освобождают базовый поток ОС. В примере я показал, как можно получить трассировку стека, когда поток закреплен, что на самом деле очень полезно для диагностики проблем такого типа.

Первый сценарий возникает, когда вы используете нативные фреймы. Если у вас есть код JNI, вы вызываете код JNI, он обращается обратно в код Java, а затем этот код Java пытается заблокироваться или совершить IO-операцию. На континуации есть нативный фрейм, мы мало что можем сделать. Это потенциально постоянное ограничение, но подобное должно происходить очень и очень редко.

Второй случай более проблематичен, и это то, что я показал в демонстрации: попытка
парковки, удерживая монитор. Скорее всего, первая версия будет с этим ограничением, это особенность реализации, влияющая на качество сервиса. В настоящее время ведутся исследования по реимплементации мониторов, чтобы преодолеть это ограничение. Но это серьезный объем работы, на это потребуется время.

На самом деле это не очень критично. По той простой причине, что всё, что сегодня использует мониторы Java, можно механически преобразовать из использования synchronized и wait-notify в использование блокировок из java.util.concurrent. Так что существуют эквиваленты мониторов в блокировках java.util.concurrent и различные формы блокировок, самый простой из которых — ReentrantLock, они очень хорошо работают с виртуальными потоками.

Что вы можете сделать при подготовке к Loom?

Если вам нравится Loom и вы заинтересованы в том, чтобы подготовить код для работы с Loom, есть пара вещей, о которых стоит подумать.

Предположим, у вас миллион потоков и код, который использует много ThreadLocals. Хотя виртуальные потоки поддерживают ThreadLocals, при их большом количестве требуется много памяти. Тут есть над чем подумать. Мы уже довольно давно работаем в JDK над устранением многих из ThreadLocals, которые использовались в различных местах.

Распространенным является кэширование объектов SimpleDateFormat.
SimpleDateFormat может быть дорогостоящим в создании, они не являются потокобезопасными, поэтому люди повсеместно кэшируют их в ThreadLocals.

В JDK мы заменили кэширование SimpleDateFormats на новый неизменяемый формат даты java.date dateformatter. Он неизменяем, вы можете сохранить его
в static final поле, это достаточно хорошо. Мы удалили ThreadLocals и из некоторых других мест.

Другая сложность сводится к масштабированию приложения и обработке десятков тысяч запросов. Если у вас много данных на запрос или на транзакцию, это может занимать много места. Если у вас миллион TCP-соединений, это миллион буферов сокетов. Если вы оборачиваете каждый из них в BufferedOutputStream, PrintStream или что-то в&nb

© Habrahabr.ru