JCoro — асинхронность на сопрограммах в Java
К исследованиям в этой сфере меня вдохновила статья Асинхронность: назад в будущее. В ней автор описывает идею о том, как, используя сопрограммы, можно упростить асинхронный код так, чтобы выглядел он так же, как обычный синхронный, но сохранял плюшки, которые нам даёт применение асинхронных операций. Вкратце, суть подхода такова: если у нас есть механизм, позволяющий сохранять и восстанавливать контекст выполнения (поддержка сопрограмм), то код на цепочках callback'ов
startReadSocket((data) -> {
startWriteFile(data, (result) -> {
if (result == ok) ...
});
});
мы можем переписать так:
data = readSocket();
result = writeFile(data);
if (result == ok) ...
Здесь readSocket() и writeFile() — сопрограммы, в которых асинхронные операции вызываются следующим образом:
byte[] readSocket() {
byte[] result = null;
startReadSocket((data) -> {
result = data;
resume();
});
yield();
return result;
}
Методы yield() и resume() сохраняют и восстанавливают контекст выполнения, со всеми фреймами и локальными переменными. Происходит следующее: при вызове readSocket() мы планируем асинхронную операцию вызовом startReadSocket() и выполняем yield(). Yield() сохраняет контекст выполнения и поток завершается (возвращается в пул). Когда асинхронная операция будет выполнена, мы вызовем resume() перед выходом из callback'a, и тем самым возобновим выполнение кода. Управление снова получит основная функция, которая вызовет writeFile(). writeFile() устроен аналогично, и всё повторится.
Сделав единожды такое преобразование для всех используемых асинхронных операций и поместив полученные функции в библиотеку, мы получаем инструмент, позволяющий нам писать асинхронный код так, как будто это обычный синхронный код. Мы получаем возможность сочетать плюсы синхронного кода (читабельность, удобная обработка ошибок) и асинхронного (производительность). Плата за это удобство — необходимость как-то сохранять и восстанавливать контекст выполнения. В статье автор описывает реализацию на С++, мне же захотелось заиметь что-то такое в Java. Об этом и пойдёт речь.
javaflow
В первую очередь надо было найти реализацию сопрограмм для JVM. Среди нескольких вариантов самой подходящей оказалась библиотека javaflow. Она бы вполне подошла для эксперимента, но, к сожалению, проект давно заброшен. Потыкав палочкой (декомпилятором) в генерируемый ей код, я выяснил, что в javaflow есть несколько серьезных проблем:
- Совсем не поддерживаются лямбды. Это не удивительно, с учётом того, что последний релиз библиотеки был в 2008 году.
- Код инструментируется крайне не оптимально — инструментируются все вызовы внутри метода, хотя большинство из них никогда не приведёт к вызову suspend(). В результате байт-код сильно распухает, и в реальной жизни такой подход будет неприемлемо медленно работать.
- Нет поддержки reflection. Если в процессе исполнения кода какой-то метод может быть вызван через reflection, javaflow не сможет в этом месте сохранить и восстановить контекст выполнения. А это критично при повседневном программировании, и дело даже не в принципиальной возможности, а в том, что почти все используют сейчас DI-контейнеры, которые работают через reflection. Поэтому запрещать reflection нельзя, это слишком сильное ограничение для программистов.
Несмотря на всё это, javaflow помог разобраться в том, как можно реализовать сохранение и восстановление состояния. Далее было 2 варианта: пытаться поддерживать javaflow или написать свою реализацию. По очевидным соображениям (фатальный недостаток) был избран второй способ.
jcoro
Сопрограммы, добавляемые в язык, где их не было, расширяют его. Чтобы писать приложения, которые полностью используют преимущества предлагаемого подхода, и не материться при этом, нужно сделать их удобными. При чтении кода мы должны сразу видеть, что вот эта функция является сопрограммой и выполняет асинхронную операцию, и поэтому её надо запускать в рамках контекста, поддерживающего сохранение и восстановление стека. В языке C# для этого есть ключевые слова async и await. В Java, к сожалению, добавить свои ключевые слова не представляется реальным, но можно воспользоваться аннотациями! Выглядит всё это, конечно, громоздко, но что поделать. Может быть, придумается ещё что-нибудь. А пока так:
Coro coro = Coro.initSuspended(new ICoroRunnable() {
@Override
@Async({@Await("foo")})
public void run() {
int i = 5;
double f = 10;
final String argStr = foo(i, f, "argStr");
}
@Async(@Await("yield"))
private String foo(int x, double y, String m) {
Coro c = Coro.get();
c.yield();
return "returnedStr";
}
});
coro.start();
coro.resume();
Наличие аннотации @Async говорит jcoro о том, что нужно инструментировать байткод этого метода, сделав его сопрограммой. Сигнатуры точек восстановления задаются аннотациями @Await. Все вызовы внутри сопрограммы, сигнатуры которых есть в списке @Await-аннотаций, становятся точками восстановления. Сопрограмма в jcoro — это метод, помеченный аннотацией @Async и имеющий хотя бы одну точку восстановления. Если в методе нет ни одной точки восстановления, он не будет инструментирован. Точка восстановления — это вызов Coro.yield() или любой вызов (сопрограммы), который может в конечном счёте привести к вызову Coro.yield().
Сначала создаётся экземпляр Coro — это объект, который хранит в себе сохранённое состояние сопрограммы и может её запускать, сохранять и восстанавливать. Изначально сопрограмма только инициализируется, но не запускается. При вызове start() управление получает метод run(), который первым делом проверяет, не нужно ли восстанавливать состояние. Пока мы только запустили сопрограмму, и run() просто начинает выполнять свой код. Метод выполняет код, вызывает foo(). Внутри foo() выполняется такая же проверка — не нужно ли восстанавливать состояние? Ответ отрицательный, и, аналогично, код метода начинает выполняться с начала. А вот при вызове yield() происходит следующее. Сам вызов yield() только устанавливает флаг «isYielding» и больше ничего не делает, но код после вызова, увидев этот флаг, не продолжает выполнение, а сохраняет своё состояние и сразу же завершается, возвращая null. То же самое происходит уровнем выше. И далее метод start() возвращает управление. Что мы имеем на этот момент? Код до вызова yield() выполнен, состояние выполнения сохранено в экземпляре Coro. Далее мы зовём resume(). Это приводит к повторному вызову метода run(). И, как и в первый раз, метод проверяет, не нужно ли восстановить состояние. На этот раз это действительно нужно сделать, и метод, вспомнив, что остановился он на вызове foo(), восстанавливает свои локальные переменные и стек и переходит прямо к вызову foo(), не выполняя кода, который был перед ним. В методе foo() происходит то же самое — он восстанавливает стек и локальные переменные, а потом переходит сразу к вызову yield(). Вызов yield() сам по себе ничего не делает, кроме сброса внутреннего флага. После него метод foo() завершает выполнение, возвращая строку «returnedStr». Остаётся метод run(), который так же благополучно завершается, возвращая управление коду, вызывающему resume(). На выходе мы имеем полностью отработавшую сопрограмму, выполнение которой мы разбили на две части.
Как это поможет нам в написании асинхронных приложений?
Предположим, что нам нужно написать серверное приложение, которое в ответ на запрос обращается к базе данных, потом что-то делает с данными, далее применяет их к шаблону, и возвращает кусочек разметки. Классическое серверное веб-приложение. Почти на всех этапах мы можем использовать асинхронные операции. Установка соединения, чтение данных из сокета при получении запроса, все сетевые операции с базой данных, чтение файла при загрузке шаблона, отправка результата в сокет. CPU в таком сценарии должен быть занят только планированием асинхронных операций, логикой препроцессинга данных и шаблонизацией. В остальное время процессор может отдохнуть. Давайте попробуем прикинуть, как это можно было бы организовать в коде. Набросаем сервер:
public static void main(String[] args) {
Coro.initSuspended(new ICoroRunnable() {
@Async({@Await("accept")})
public void run() {
final AsynchronousServerSocketChannel listener = bind(new InetSocketAddress(5000));
// Чтобы использовать процессор по максимуму, создаём пул потоков по количеству ядер
ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
while (true) {
AsynchronousSocketChannel channel = accept(listener); // Асинхронная операция
executorService.submit(new Runnable() {
@Override
public void run() {
Coro.initSuspended(new ICoroRunnable() {
@Async({@Await("handle")})
public void run() {
// Код обработки запроса - в нём тоже можно вызывать асинхронные операции,
// так как он работает в сохраняемом контексте
handle(channel);
}
}).start();
}
});
}
}
}).start();
}
@Async({@Await("read"), @Await("write")})
public static void handle(AsynchronousSocketChannel channel) {
ByteBuffer buffer = ByteBuffer.allocate(10 * 1024);
Integer read = read(channel, buffer); // Асинхронная операция
write(channel, outBuffer); // Асинхронная операция
channel.close();
}
В коде опущены некоторые блоки try-catch, необходимые для корректной компиляции, это сделано чтобы было легче читать код.
Внутри handle можно добавить любую логику. Например, определение «контроллера» и вызов его через reflection, внедрение зависимостей. Но нужно быть аккуратным с вызовами кода, содержащего точки восстановления, через reflection или неинструментированные библиотеки. Об этом чуть ниже.
С точки зрения утилизации потоков это работает следующим образом. Есть пул рабочих потоков, а есть системный пул потоков, который JVM резервирует для выполнения callback-ов асинхронных операций. Когда какая-то асинхронная операция завершается, один из потоков начинает выполнять callback. В нём сначала восстанавливается состояние сопрограммы, потом сопрограмма продолжает выполнение, либо доходя до завершения, либо до следующей асинхронной операции. После того как сопрограмма завершается (или приостанавливает выполнение после планирования очередной асинхронной операции), поток возвращается в пул. Таким образом, один запрос по очереди может обрабатываться разными потоками, и это накладывает некоторые ограничения на наш код. Так, например, мы не можем пользоваться thread local-переменными, если нет уверенности, что между put и get выполнение сопрограммы не будет прервано. С другой стороны, схема выглядит близкой к оптимальной, и обещает хорошую производительность.
Реализация
В отличие от javaflow, jcoro не инструментирует все методы и все вызовы внутри них. Инструментированию подлежат только сопрограммы — те методы, в которых есть хотя бы одна точка восстановления. Точка восстановления — это вызов, который при выполнении может привести в конечном итоге к вызову yield(). То есть это не обязательно должно происходить при каждом вызове, достаточно теоретической возможности. Как вообще инструментируется код? Как можно сохранить и восстановить состояние выполнения целого потока? Оказывается, это совсем не трудно. Достаточно каждый метод, который претендует на гордое звание сопрограммы, превратить в маленькую state-машину. Для этого в начале метода дописывается байт-код, который ничего не делает, если восстанавливаться не нужно, а если нужно, то выполняет switch(state) и по значению состояния переходит на вызов точки восстановления, на которой выполнение было приостановлено. Этого достаточно, потому что сохранение состояния может произойти только в момент вызова точки восстановления (и сам вызов yield() тоже является точкой восстановления). Ну и плюс к этому нужно не забыть восстановить локальные переменные и стек фрейма. Так как в JVM состояние фрейма однозначно идентифицируется этим набором (состояние стека, локальных переменных и текущая инструкция), то после этого можно утверждать, что всё у нас работает правильно. Аналогичным образом отрабатывает сохранение-восстановление на всём стеке выполнения.
Возвращаясь к нашему примеру, давайте посмотрим на то, во что он превратится:
@Async(@Await("yield"))
private String foo(int a, double b, String c) {
Coro c = Coro.get();
c.yield();
return "returnedStr";
}
Эта сопрограмма не делает ничего полезного, а только приостанавливает свою работу, а после возвращает значение. В байт-коде это выглядит так:
private java.lang.String foo(int, double, java.lang.String);
descriptor: (IDLjava/lang/String;)Ljava/lang/String;
flags: ACC_PRIVATE
Code:
stack=1, locals=6, args_size=4
0000: invokestatic org/jcoro/Coro.get:()Lorg/jcoro/Coro;
0003: astore 5
0005: aload 5
0007: invokevirtual org/jcoro/Coro.yield:()V
0010: ldc "returnedStr"
0012: areturn
После инструментирования мы увидим вот что (результат приведён в виде unified diff, к сожалению, хабр не поддерживает подсвечивания строк):
private java.lang.String foo(int, double, java.lang.String);
descriptor: (IDLjava/lang/String;)Ljava/lang/String;
flags: ACC_PRIVATE
Code:
- stack=1, locals=6, args_size=4
+ stack=2, locals=6, args_size=4
+ 0: invokestatic org/jcoro/Coro.getSafe:()Lorg/jcoro/Coro; // Получаем текущую сопрограмму
+ 3: ifnull 0000 // Если её нет - переходим к началу метода
+ 6: invokestatic org/jcoro/Coro.popState:()Ljava/lang/Integer; // popState() вернёт нам не null, если есть сохранённое состояние
+ 9: dup
+ 10: ifnull 32 // Восстанавливать ничего не надо - переходим на начало метода
+ 13: invokestatic org/jcoro/Coro.isUnpatchableCall:()Z // Это для поддержки неинструментируемых вызовов
+ 16: ifeq 23
+ 19: invokestatic org/jcoro/Coro.popRef:()Ljava/lang/Object;
+ 22: pop
+ 23: ldc 0
+ 25: invokestatic org/jcoro/Coro.setUnpatchableCall:(Z)V
+ 28: pop
+ 29: goto 43
+ 32: pop
0000: invokestatic org/jcoro/Coro.get:()Lorg/jcoro/Coro; // Тело метода до первой точки восстановления
0003: astore 5
0005: aload 5
+ 40: goto 0007 // Далее - код восстановления состояния перед первой точкой восстановления
+ 43: invokestatic org/jcoro/Coro.popRef:()Ljava/lang/Object;
+ 46: checkcast "Lorg/jcoro/Coro;"
+ 49: astore 5
+ 51: invokestatic org/jcoro/Coro.popRef:()Ljava/lang/Object;
+ 54: checkcast "Ljava/lang/String;"
+ 57: astore 4
+ 59: invokestatic org/jcoro/Coro.popDouble:()D
+ 62: dstore_2
+ 63: invokestatic org/jcoro/Coro.popInt:()I
+ 66: istore_1
+ 67: invokestatic org/jcoro/Coro.popRef:()Ljava/lang/Object;
+ 70: checkcast "Lorg/jcoro/tests/SimpleTest$1;"
+ 73: astore_0
+ 74: invokestatic org/jcoro/Coro.popRef:()Ljava/lang/Object;
+ 77: checkcast "Lorg/jcoro/Coro;"
0007: invokevirtual org/jcoro/Coro.yield:()V // Точка восстановления
+ 83: invokestatic org/jcoro/Coro.isYielding:()Z // Далее - код сохранения состояния
+ 86: ifeq 0010
+ 89: aload_0
+ 90: invokestatic org/jcoro/Coro.pushRef:(Ljava/lang/Object;)V
+ 93: iload_1
+ 94: invokestatic org/jcoro/Coro.pushInt:(I)V
+ 97: dload_2
+ 98: invokestatic org/jcoro/Coro.pushDouble:(D)V
+ 101: aload 4
+ 103: invokestatic org/jcoro/Coro.pushRef:(Ljava/lang/Object;)V
+ 106: aload 5
+ 108: invokestatic org/jcoro/Coro.pushRef:(Ljava/lang/Object;)V
+ 111: aload_0
+ 112: invokestatic org/jcoro/Coro.pushRef:(Ljava/lang/Object;)V
+ 115: ldc 0
+ 117: invokestatic org/jcoro/Coro.pushState:(I)V
+ 120: aconst_null // Возвращаем null, если сопрограмма не завершена
+ 121: areturn
0010: ldc "returnedStr" // Код завершения метода
0012: areturn
В начале метода добавился код, определяющий точку восстановления, а перед и после точки восстановления — код для восстановления и сохранения. Если бы точек восстановление было больше, то в начале вместо простого перехода мы бы увидели switch. Есть ещё один нюанс. Раз уж мы пользуемся параллельными стеками для сохранения-восстановления фреймов, то мы должны соблюдать порядок добавления и получения объектов. Если мы сначала кладём на стек объект А, а потом Б, то получать мы их должны в обратном порядке. Поэтому если мы сохраняем сначала локальные переменные, а потом стек фрейма, то восстановление мы должны выполнять наоборот. И плюс сюда отлично вписывается обработка ссылки на объект вызова (this). При сохранении он кладётся на стек крайним, а при восстановлении забирается первым (если, конечно, точка восстановления — нестатический метод). В приведённом примере локальных переменных нет, но с ними код был бы почти такой же.
Unpatchable-код
К сожалению, описанная стратегия сохранения и восстановления стека работает только если есть возможность инструментировать все сопрограммы. Если какой-то метод, который содержит в себе точку восстановления, мы не можем инструментировать, эта стратегия не сработает. Такое возможно, если мы зовём код посредством рефлекшена или же библиотеки, которую невозможно инструментировать. И если с библиотеками ещё можно что-то придумать, то без рефлекшена ну никак нельзя. Все программисты хотят использовать DI-контейнеры, прокси и AOP. Однако, можно заметить, что чаще всего такого рода вызовы — полностью stateless, то есть сколько их не вызывай, они по сути ничего не делают, кроме передачи управления дальше. И при возобновлении сопрограммы можно вызвать такой метод вторично, просто передав в него те же самые аргументы. А уже в коде, который позовёт он, продолжить восстанавливать состояние. И для поддержки этого механизма нужна лишь вторая стратегия сохранения состояния, при которой аргументы сохраняются перед вызовом, а не после. Эта стратегия сейчас поддерживается в jcoro, а для использования нужно всего лишь помечать точки восстановления как @Await(patchable = false).
Информацию о том, во что превращается вызов метода с использованием каждой из стратегий, можно найти на вики.
Поддержка лямбд
Лямбды поддерживаются, но кривовато. Есть две проблемы. Одна из них заключается в том, что в java сложно повесить аннотации на лямбду, и ещё сложнее их прочитать. Единственное найденное мной решение основано на появившихся недавно Type Annotations и выглядит следующим образом:
Coro coro = Coro.initSuspended((@Async({@Await(value = "yield")}) ICoroRunnable) () -> {
Coro.get().yield();
});
Компилятор, когда видит такое, добавляет в class-файл аннотацию и связывает её с инструкцией invokedynamic. И это работает, но, к сожалению, не всегда. Иногда компилятор связывает такую аннотацию не с этой инструкцией, а с предыдущей (скорее всего, это баг), а иногда — вообще не записывает аннотации в class-файл. Например, это происходит при компиляции такого кода:
public static void main(String[] args) {
Runnable one = (@TypeAnn("1") Runnable) () -> {
Runnable two = (@TypeAnn("2") Runnable) () -> {
Runnable three = (@TypeAnn("3") Runnable) () -> {
Runnable four = (@TypeAnn("4") Runnable) () -> {
};
};
};
};
}
В class-файле окажутся аннотированными только инструкции invokedynamic для внешних двух лямбд. А аннотации для внутренних двух лямбд компилятор проигнорирует. Это тоже скорее всего баг, я отправил его в Oracle, но подтверждения пока не получил. Буду надеяться, что с этим получится разобраться.
Вторая же проблема связана с тем, что лямбды — довольно странные создания в мире Java. Вызываются они как экземплярные методы, но на самом деле представляют собой методы статические. И этот корпускулярно-волновой дуализм создаёт концептуальную проблему для механизма сохранения-восстановления. Дело в том, что для оптимальной стратегии восстановления мы в теле экземплярного метода должны сохранить this (см схему). Но ссылка на экземпляр функционального интерфейса есть только у вызывающего кода! В конечном счёте, мы приходим к необходимости использовать сохранение аргументов перед выполнением лямбды, то есть, всё тот же вариант с patchable=false (который предназначался для обхода проблем с рефлекшеном). А он работает медленнее. Хотя, быть может, это и не критично по сравнению с неудобствами, которые доставляет необходимость прописывать patchable=false на каждой лямбде-сопрограмме.
Суммируя эти две проблемы, можно сделать неутешительный вывод: лямбды-сопрограммы использовать пока не рекомендуется.
Текущее состояние и планы
Проект доступен по адресу https://github.com/elw00d/jcoro. Сейчас доступен движок, набор тестов к нему и несколько примеров. Для доведения технологии до ума необходимо сделать следующее:
- В рамках доработок движка — оптимизировать генерацию stack map frames и порешать проблемы с лямбдами
- Написать maven и gradle плагины для инструментирования указанных jar-ников или наборов class-файлов
- Провести тестирование производительности, написав 3 сервера с одинаковой функциональностью. Один будет использовать блокирующую модель, второй — асинхронную на коллбеках (обычное nio, без jcoro), и третий — асинхронный с использованием jcoro. Нужно оценить, сколько кушает сохранение-восстановление контекста по сравнению с кодом, который этого не делает. Очень надеюсь, что это будет не слишком много.
- Разработка окружения. Самая важная часть. Нужно сделать врапперы и аналоги для самых важных библиотек. В первую очередь это, конечно, jdbc. Нужно придумать какой-то «стандарт» для асинхронного jdbc, а потом сделать для него реализации на самые популярные базы данных — mysql, postgresql, mssql. И затем — враппер для jcoro, который бы заворачивал асинхронные операции в сопрограммы. Сюда же — реализация какого-то примера каркаса для написания веб-приложений.
- Написать плагин к IntelliJ IDEA, который бы помогал в написании сопрограмм. Сопрограммы и точки восстановления бы как-то выделялись визуально, а ошибки при написании кода (нет аннотации @Await на точке восстановления, нет аннотации @Async) подсказывались при анализе исходника.
- Оформить документацию, написать внятный User Guide итд.
Если у вас появится желание помочь или попробовать jcoro в деле, welcome! Для публичной коммуникации, наверное, проще всего будет использовать Github Issues.