[Перевод] Реализуем простые кооперативные потоки на C
Правда, функции setjmp()
и longjmp()
не предназначены для поддержки каких угодно перескоков. Они проектировались для весьма специфичного практического случая. Представьте, что вы выполняете какую-либо сложную операцию, например, делаете HTTP-запрос. В данном случае будет задействован сложный набор вызовов функций и, если любой из них закончится неудачно, вы должны будете вернуть специальный код ошибки от каждого из них. Такой код будет выглядеть как в следующем листинге, везде, где вы будете вызывать функцию (возможно, десятки раз):
int rv = do_function_call();
if (rv != SUCCESS) {
return rv;
}
Смысл setjmp()
и longjmp()
заключается в том, что setjmp()
помогает застолбить место перед тем, как приступать к задаче по-настоящему сложной. Затем вы сможете централизовать всю вашу обработку ошибок в одном месте:
int rv;
jmp_buf buf;
if ((rv = setjmp(buf)) != 0) {
/* здесь обрабатываем ошибки */
return;
}
do_complicated_task(buf, args...);
Если откажет любая функция, вовлеченная в do_complicated_task()
, то произойдет просто longjmp(buf, error_code)
. Это означает, что каждая функция в составе do_complicated_task()
может предположить, что любой вызов функции успешен, а, значит, вы сможете не ставить этот код для обработки ошибок в каждом вызове функции (на практике так почти никогда не делается, но это тема для отдельной статьи).
Основная идея в данном случае заключается в том, что longjmp()
позволяет вам лишь выпрыгнуть из глубоко вложенных функций. Вы не можете впрыгнуть в ту глубоко вложенную функцию, из которой ранее уже выпрыгнули. Вот как выглядит стек, когда вы выпрыгиваете из функции. Астериск (*) означает указатель стека, по которому сохранена setjmp()
.
| Stack before longjmp | Stack after longjmp
+-------------------------+----------------------------
stack | main() (*) | main()
grows | do_http_request() |
down | send_a_header() |
| | write_bytes() |
v | write() - fails! |
Как видите, по стеку можно перемещаться только назад, поэтому и отсутствует опасность повреждения данных. С другой стороны, вообразите, что было бы, если бы вы захотели перескакивать между задачами. Если вызвать setjmp()
, а затем вернуться, сделать еще какие-то дела и попытаться возобновить работу, которой вы уже занимались ранее, то возникнет проблема:
| Stack at setjmp() | Stack later | Stack after longjmp()
+-------------------+------------------+----------------------
stack | main() | main() | main()
grows | do_task_one() | do_task_two() | do_stack_two()
down | subtask() | subtask() | subtask()
| | foo() | | ???
v | bar() (*) | (*) | ??? (*)
Указатель стека, сохраненный setjmp()
, будет указывать на фрейм стека, который уже не существует, и ранее в какой-то момент времени мог быть затерт другими данными. Если мы попытаемся при помощи longjmp()
перепрыгнуть обратно в ту функцию, из которой вернулись, то начнутся весьма странные вещи, которые вполне могут привести к обвалу всей программы.
Мораль: если вы собираетесь пользоваться setjmp()
и longjmp()
для перепрыгивания между сложными задачами вроде этих, то должны гарантировать, что у каждой задачи будет свой отдельный стек. В таком случае проблема полностью устраняется, так как, когда longjmp()
сбрасывает указатель стека, программа сама заменит стек на нужный, и никакого затирания стека происходить не будет.
Напишем API планировщика
Отступление получилось немного пространным, но, вооружившись тем, что мы узнали, мы теперь сможем реализовать потоки пользовательского пространства. Для начала отмечу, что очень полезно самому спроектировать API, предназначенный для инициализации, создания и запуска потоков. Если сделать это заранее, мы будем гораздо лучше понимать, что именно пытаемся построить!
void scheduler_init(void);
void scheduler_create_task(void (*func)(void*), void *arg);
void scheduler_run(void);
Эти функции будут использоваться для инициализации планировщика, добавления задач, а затем, наконец, запуска задач в планировщике. После запуска scheduler_run()
будет работать до тех пор, пока не будут завершены все задачи. Выполняемые в данный момент задачи будут иметь следующие API:
void scheduler_exit_current_task(void);
void scheduler_relinquish(void);
Первая функция отвечает за выход из задачи. Выход из задачи также осуществим при возврате из ее функции, поэтому такая конструкция существует только для удобства. Вторая функция описывает, как наши потоки будут сообщать планировщику, что некоторое время должна выполняться другая задача. Когда задача вызывает scheduler_relinquish()
, она может быть ненадолго приостановлена, пока выполняются другие задачи; но, в конечном итоге, функция вернется, и выполнение первой задачи может быть продолжено.
Ради конкретного примера рассмотрим гипотетический вариант использования нашего API, при помощи которого мы протестируем планировщик:
#include
#include
#include "scheduler.h"
struct tester_args {
char *name;
int iters;
};
void tester(void *arg)
{
int i;
struct tester_args *ta = (struct tester_args *)arg;
for (i = 0; i < ta->iters; i++) {
printf("task %s: %d\n", ta->name, i);
scheduler_relinquish();
}
free(ta);
}
void create_test_task(char *name, int iters)
{
struct tester_args *ta = malloc(sizeof(*ta));
ta->name = name;
ta->iters = iters;
scheduler_create_task(tester, ta);
}
int main(int argc, char **argv)
{
scheduler_init();
create_test_task("first", 5);
create_test_task("second", 2);
scheduler_run();
printf("Finished running all tasks!\n");
return EXIT_SUCCESS;
}
В этом примере мы создаем две задачи, выполняющие одну и ту же функцию, но использующие разные аргументы; таким образом, их выполнение можно проследить отдельно. Каждая задача выполняет установленное количество итераций. На каждой итерации она выводит сообщение, а затем дает запуститься другой задаче. В качестве вывода программы мы ожидаем увидеть нечто подобное:
task first: 0
task second: 0
task first: 1
task second: 1
task first: 2
task first: 3
task first: 4
Finished running all tasks!
Давайте реализуем API планировщика
Чтобы реализовать API, нам требуется некоторое внутреннее представление задачи. Итак, приступаем к делу; соберем нужные нам поля:
struct task {
enum {
ST_CREATED,
ST_RUNNING,
ST_WAITING,
} status;
int id;
jmp_buf buf;
void (*func)(void*);
void *arg;
struct sc_list_head task_list;
void *stack_bottom;
void *stack_top;
int stack_size;
};
Давайте обсудим каждое из этих полей в отдельности. Все созданные задачи должны быть в состоянии «created» до начала выполнения. Когда начинается выполнение задачи, она переходит в состояние «running», а если задаче когда-нибудь потребуется дожидаться некой асинхронной операции, ее можно перевести в состояние ожидания («waiting»). Поле id
— это просто уникальный идентификатор задачи. В buf
содержится информация о том, когда будет выполнена longjmp()
для возобновления задачи. Поля func
и arg
передаются к scheduler_create_task()
и необходимы для запуска задачи. Поле task_list
необходимо для реализации двусвязного списка всех задач. Поля stack_bottom
, stack_top
и stack_size
все относятся к отдельному стеку, предназначенному специально для этой задачи. «bottom» — это адрес, возвращаемый malloc()
, но «top» — это указатель на адрес, расположенный прямо над данной областью в памяти. Поскольку стек x86 растет вниз, нужно установить для указателя стека значение stack_top
, а не stack_bottom
.
В таких условиях можно реализовать функцию scheduler_create_task()
:
void scheduler_create_task(void (*func)(void *), void *arg)
{
static int id = 1;
struct task *task = malloc(sizeof(*task));
task->status = ST_CREATED;
task->func = func;
task->arg = arg;
task->id = id++;
task->stack_size = 16 * 1024;
task->stack_bottom = malloc(task->stack_size);
task->stack_top = task->stack_bottom + task->stack_size;
sc_list_insert_end(&priv.task_list, &task->task_list);
}
Используя static int
, мы гарантируем, что, всякий раз при вызове функции происходит инкремент поля id, и там оказывается новое число. Все остальное должно быть понятно без объяснения, за исключением функции sc_list_insert_end()
, которая просто добавляет struct task
в глобальный список. Глобальный список хранится внутри второй структуре, в которой содержатся все приватные данные планировщика. Ниже представлена сама эта структура, а также ее функция инициализации:
struct scheduler_private {
jmp_buf buf;
struct task *current;
struct sc_list_head task_list;
} priv;
void scheduler_init(void)
{
priv.current = NULL;
sc_list_init(&priv.task_list);
}
Поле task_list
используется, чтобы ссылаться на список задач (что неудивительно). В поле current
хранится задача, выполняемая в настоящий момент (или null
, если таких задач сейчас нет). Наиболее важно, что поле buf
будет использоваться, чтобы перепрыгнуть в код scheduler_run()
:
enum {
INIT=0,
SCHEDULE,
EXIT_TASK,
};
void scheduler_run(void)
{
/* Это путь выхода для планировщика! */
switch (setjmp(priv.buf)) {
case EXIT_TASK:
scheduler_free_current_task();
case INIT:
case SCHEDULE:
schedule();
/* в случае возврата делать здесь больше нечего, поэтому выходим */
return;
default:
fprintf(stderr, "Uh oh, scheduler error\n");
return;
}
}
Как только вызвана функция scheduler_run()
, мы устанавливаем буфер setjmp()
, так, что всегда сможем вернуться к этой функции. В первый раз возвращается 0 (INIT), и мы сразу же вызываем schedule()
. Впоследствии мы можем передать константы SCHEDULE или EXIT_TASK в longjmp()
, что спровоцирует разные поведения. Пока давайте проигнорируем случай EXIT_TASK и сразу перейдем к реализации schedule()
:
static void schedule(void)
{
struct task *next = scheduler_choose_task();
if (!next) {
return;
}
priv.current = next;
if (next->status == ST_CREATED) {
/*
* Эта задача пока не запущена. Присваиваем новый указатель
* стека, запускаем задачу и по окончании выходим из нее.
*/
register void *top = next->stack_top;
asm volatile(
"mov %[rs], %%rsp \n"
: [ rs ] "+r" (top) ::
);
/*
* Запускаем функцию задачи
*/
next->status = ST_RUNNING;
next->func(next->arg);
/*
* Указатель стека должен остаться там, где мы его установили. Возвращать его – очень, очень плохая идея
* Давайте лучше выйдем
*/
scheduler_exit_current_task();
} else {
longjmp(next->buf, 1);
}
/* ВОЗВРАТА НЕТ */
}
Сначала вызываем внутреннюю функцию для выбора задачи, которая будет запущена следующей. Этот планировщик будет работать по принципу обычной карусели, поэтому он просто выберет новую задачу из списка. Если эта функция выдаст NULL, то у нас больше не останется задач для выполнения, и мы возвращаемся. Иначе мы должны либо запустить выполнение задачи (если она в состоянии ST_CREATED), либо возобновить ее выполнение.
Чтобы запустить созданную задачу, мы используем ассемблерную инструкцию для x86_64, чтобы присвоить поле stack_top
регистру rsp
(указатель стека). Затем меняем состояние задачи, запускаем функцию и выходим. Обратите внимание: setjmp()
и longjmp()
сохраняют и переставляют указатели стека, поэтому здесь нам единственный раз придется использовать ассемблер для изменения указателя стека.
Если задача уже была запущена, то поле buf
должно содержать контекст, нужный нам для longjmp()
, чтобы возобновить задачу, так что это мы и делаем.
Далее давайте рассмотрим вспомогательную функцию, выбирающую следующую задачу для запуска. Это сердце планировщика и, как я уже говорил ранее, этот планировщик работает по принципу карусели:
static struct task *scheduler_choose_task(void)
{
struct task *task;
sc_list_for_each_entry(task, &priv.task_list, task_list, struct task)
{
if (task->status == ST_RUNNING || task->status == ST_CREATED) {
sc_list_remove(&task->task_list);
sc_list_insert_end(&priv.task_list, &task->task_list);
return task;
}
}
return NULL;
}
Если вам не знакома моя реализация связанного списка (она взята из ядра Linux) — ничего страшного. Функция sc_list_for_each_entry()
— это макрос, позволяющий перебрать все задачи, находящиеся в списке задач. Первая доступная для выбора задача (не находящаяся в состоянии ожидания), которую мы найдем, удаляется с актуальной позиции и перемещается в конец списка задач. Так мы гарантируем, что при следующем запуске планировщика получим иную задачу (если таковая найдется). Мы возвращаем первую задачу, доступную для выбора, либо NULL, если задач нет вообще.
Наконец, перейдем к реализации scheduler_relinquish()
, чтобы посмотреть, как задача может самоустраниться:
void scheduler_relinquish(void)
{
if (setjmp(priv.current->buf)) {
return;
} else {
longjmp(priv.buf, SCHEDULE);
}
}
Это другой вариант использования функции setjmp()
в нашем планировщике. В принципе, этот вариант может показаться немного запутанным. Когда задача вызывает эту функцию, мы применяем setjmp()
для сохранения текущего контекста (включающего, в том числе, актуальный указатель стека). Затем используем longjmp()
, чтобы войти в планировщик (опять в scheduler_run()
) и передаем функцию SCHEDULE; таким образом мы просим назначить новую задачу.
Когда выполнение задачи возобновляется, функция setjmp()
возвращается с ненулевым значением, и мы выходим из любой задачи, которой могли заниматься перед этим!
Наконец, вот что происходит при выходе задачи (это делается либо явно, путем вызова функции выхода, либо путем возвращения из соответствующей функции задачи):
void scheduler_exit_current_task(void)
{
struct task *task = priv.current;
sc_list_remove(&task->task_list);
longjmp(priv.buf, EXIT_TASK);
/* ВОЗВРАТА НЕТ */
}
static void scheduler_free_current_task(void)
{
struct task *task = priv.current;
priv.current = NULL;
free(task->stack_bottom);
free(task);
}
Это двухчастный процесс. Первая функция возвращается непосредственно самой задачей. Мы удаляем из списка задач запись, соответствующую данной, поскольку назначаться она более не будет. Затем при помощи longjmp()
мы возвращаемся в функцию scheduler_run()
. На этот раз используем EXIT_TASK. Так мы указываем планировщику, что, прежде, чем назначать новую задачу, он должен вызвать scheduler_free_current_task()
. Если вы вернетесь к описанию scheduler_run()
, то увидите, что именно это и делает scheduler_run()
.
Мы делали это в два этапа, поскольку, когда вызывается scheduler_exit_current_task()
, он активно использует стек, содержащийся в структуре задачи. Если высвободить стек, продолжая его использовать, то остается вероятность, что функция сможет обратиться все к той же памяти стека, которую мы только что высвободили! Чтобы гарантировать, что такого не произойдет, нам придется при помощи longjmp()
вернуться к планировщику, использующему отдельный стек. После этого мы сможем безопасно высвободить данные, относящиеся к задаче.
Вот мы и разобрали всю реализацию планировщика целиком. Если бы мы попробовали скомпилировать его, присовокупив к нему мою реализацию связанного списка и основную программу, приведенную выше, то получили бы полностью рабочий планировщик! Чтобы нес утруждать вас копипастой, направляю вас в репозиторий на github, где содержится весь код к этой статье.
В чем польза описанного подхода?
Если вы дочитали до сих пор, думаю, незачем убеждать вас в том, что пример интересный. Но на первый взгляд он может показаться не слишком полезным. В конце концов, в C можно использовать «настоящие» потоки, которые могут работать параллельно и не должны дожидаться друг друга, пока какой-то из них не вызовет scheduler_relinquish()
.
Однако, мне в этом видится отправная точка для целой серии захватывающих реализаций полезных возможностей. Речь может идти о тяжеловесных задачах ввода/вывода, о реализации однопоточного асинхронного приложения, в том стиле, как работают новые async-утилиты в Python. При помощи такой системы также можно реализовывать генераторы и корутины. Наконец, как следует потрудившись, эту систему также можно подружить с «реальными» потоками операционной системы, чтобы обеспечить дополнительный параллелизм там, где это необходимо. За каждой из этих идей скрывается интересный проект, и один из них я рекомендую вам попробовать выполнить самостоятельно, дорогой читатель.
Это безопасно?
Думаю, скорее нет, чем да! Нельзя считать безопасным встраиваемый ассемблерный код, влияющий на указатель стека. Не рискуйте использовать таких вещей в продакшене, но обязательно повозитесь с ними и исследуйте!
Более безопасную реализацию такой системы можно построить на основе «неконтекстного» API (см. man getcontext), который позволяет переключаться между такими типами «потоков» пользовательского пространства, обходясь без встраивания ассемблерного кода. К сожалению, такой API не охвачен стандартами (он удален из спецификации POSIX). Но его по-прежнему можно использовать как часть glibc.
Как сделать такой механизм вытесняющим?
Данный планировщик в том виде, как он представлен здесь, работает лишь при условии, что потоки явно передают контроль обратно планировщику. Это нехорошо для программы широкого профиля, например, для операционной системы, поскольку плохо сделанный поток может заблокировать выполнение всех остальных (правда, это не помешало использовать в MS-DOS кооперативную многозадачность!) Не думаю, что из-за этого кооперативная многозадачность явно плоха; все зависит от приложения.
При использовании нестандартного «внеконтекстного» API сигналы POSIX будут хранить контекст кода, выполнявшегося ранее. Устанавливая периодичный сигнал таймера, планировщик пользовательского пространства в самом деле может дать рабочую версию вытесняющей многозадачности! Это еще один крутой проект, заслуживающий отдельной статьи.