[Перевод] Руководство по межпроцессному взаимодействию (IPC) в Linux — Часть 2

8da4f3f58b8bb550162f761279b5ad39

Представляю вашему вниманию вторую часть перевода статьи A guide to inter-process communication in Linux.

Первая часть перевода была посвящена общему введению в курс дела и механизму разделяемого хранилища (shared storage). В этой части будут рассмотрены механизмы каналов (именованных и неименованных) и очереди сообщений.

Приятного чтения!

Использование каналов и очередей сообщений

В этом разделе описываются каналы, которые используются для соединения и взаимодействия процессов между собой. У канала есть вход (write end) для записи байтов и выход (read end) для чтения этих байтов в порядке FIFO (first in, first out). При обычном использовании один процесс пишет в канал, а другой процесс читает из того же канала. Байты сами по себе могут представлять что угодно: номера, записи о сотрудниках, цифровые фильмы и т.д.

Существует две разновидности каналов: именованные и неименованные. Ими можно пользоваться как через командную строку, так и в программах: это будет показано в примерах. В данном разделе также будут рассмотрены очереди памяти (memory queue), которые (незаслуженно!) вышли из моды.

В примерах кода из первого раздела была освещена проблема состояния гонки (race condition), как при работе с файлами (file-based), так и при работе с памятью (memory-based). Этот вопрос, разумеется, актуален и для канального IPC — в данном разделе это будет рассмотрено. В примерах кода для каналов и очередей памяти используются API, одобренные POSIX, а основной целью стандартов POSIX является потокобезопасность (thread-safety).

Рассмотрим man страницу функции mq_open, которая относится к API очередей памяти. Эта страница включает в себя раздел Атрибуты, в котором есть небольшая таблица:

Interface

Attribute

Value

mq_open ()

Thread safety

MT-Safe

Значение MT-Safe (где MT расшифровывается как multi-threaded) означает, что функция mq_open потокобезопасна (thread-safe), и, в свою очередь, процессобезопасна (process-safe): под выполнением процесса подразумевается выполнение одного из его потоков, и, поскольку состояние гонки не может возникнуть среди потоков одного и того же процесса, оно не произойдет и между потоками разных процессов. Атрибут MT-Safe гарантирует, что при вызове mq_open состояние гонки не возникнет. В общем, канальный IPC является concurrent-safe (хотя и в дальнейших примерах кода будет указываться предостережение).

Неименованные каналы

Начнем с вымышленного примера — команды, показывающей, как работают неименованные каналы. На всех современных системах в командной строке вертикальная черта | представляет собой неименованный канал. Предположим, что % — символ приглашения командной строки (command line prompt), и рассмотрим данную команду:

% sleep 5 | echo "Hello, world!" ## writer to the left of |, reader to the right

Утилиты sleep и echo выполняются как отдельные процессы, а неименованный канал позволяет им «общаться» друг с другом. Однако, «вымышленность» примера заключается в том, что никакой коммуникации не происходит. На экране появляется приветствие Hello, world!; затем, через 5 секунд, появляется символ приглашения командной строки, означающее, что и sleep, и echo завершены. В чём дело?

В соответствии с синтаксисом вертикальной черты, процесс слева (sleep) пишет, а процесс справа (echo) — читает. По умолчанию, читающий процесс (reader) блокируется до тех пор, пока в канале не появятся байты, которые можно прочитать, а пишущий (writer) — после записи всех байтов — завершает работу отправкой маркера остановки потока (end-of-stream marker). (Даже если writer преждевременно завершает работу, маркер остановки потока будет отправлен reader-у). Неименованный канал существует, пока writer и reader не завершат работу.

В примере выше процесс sleep не пишет ни одного байта в канал, но завершает работу через 5 секунд, что вызывает отправку маркера остановки потока в канал. В это время процесс echo немедленно выводит Hello, world! в стандартный вывод (на экран), т.к. процесс не прочитал ни одного байта из канала, поэтому и ждать ему нечего. Как только процессы sleep и echo завершают работу, неименованный канал — так и не использованный для коммуникации — закрывается и выводится символ приглашения командной строки.

Ниже представлен более полезный пример, использующий два неименованных канала. Предположим, содержимое файла test.dat выглядит следующим образом:

this
is
the
way
the
world
ends

Команда

% cat test.dat | sort | uniq

перенаправляет вывод из процесса cat (concatenate) в процесс sort для сортировки данных; затем отсортированные данные направляются в процесс uniq для удаления дубликатов (в данном случае, сокращает два the до одного):

ends
is
the
this
way
world

Рассмотрим программу, состоящую из двух процессов, взаимодействующих друг с другом через неименованный канал (см. Пример 1).

Пример 1. Программа pipeUN: два процесса, взаимодействующих друг с другом через неименованный канал

#include  /* wait */
#include 
#include    /* exit functions */
#include    /* read, write, pipe, _exit */
#include 

#define ReadEnd  0
#define WriteEnd 1

void report_and_exit(const char* msg) {
  perror(msg);
  exit(-1);    /** failure **/
}

int main() {
  int pipeFDs[2]; /* two file descriptors */
  char buf;       /* 1-byte buffer */
  const char* msg = "Nature's first green is gold\n"; /* bytes to write */

  if (pipe(pipeFDs) < 0) report_and_exit("pipeFD");
  pid_t cpid = fork();                                /* fork a child process */
  if (cpid < 0) report_and_exit("fork");              /* check for failure */

  if (0 == cpid) {    /*** child ***/                 /* child process */
    close(pipeFDs[WriteEnd]);                         /* child reads, doesn't write */

    while (read(pipeFDs[ReadEnd], &buf, 1) > 0)       /* read until end of byte stream */
      write(STDOUT_FILENO, &buf, sizeof(buf));        /* echo to the standard output */

    close(pipeFDs[ReadEnd]);                          /* close the ReadEnd: all done */
    _exit(0);                                         /* exit and notify parent at once  */
  }
  else {              /*** parent ***/
    close(pipeFDs[ReadEnd]);                          /* parent writes, doesn't read */

    write(pipeFDs[WriteEnd], msg, strlen(msg));       /* write the bytes to the pipe */
    close(pipeFDs[WriteEnd]);                         /* done writing: generate eof */

    wait(NULL);                                       /* wait for child to exit */
    exit(0);                                          /* exit normally */
  }
  return 0;
}

Программа выше использует системную функцию fork для создания процесса. Хоть программа представляет собой всего один файл с исходным кодом, в процессе ее выполнения (успешного!) присутствует многопроцессность (multi-processing).

Ниже кратко представлено о том, как работает библиотечная функция fork.

Функция fork, вызванная в родительском процессе, возращает -1 родителю в случае ошибки. В pipeUN вызов выглядит так:

pid_t cpid = fork();

В этом примере возвращаемое значение записывается в переменную cpid целочисленного типа pid_t. (Каждый процесс имеет собственный process ID, положительное целое число, идентифицирующее этот процесс). Создание нового процесса через fork может завершится с ошибкой по нескольким причинам, одна из них — переполнение таблицы процессов — структуры, которую использует система для отслеживания процессов. Процессы-зомби (о них будет рассказано позже), могут вызвать переполнение таблицы процессов, если их не «собирать» (harvest).

Если вызов fork завершается успешно, создаётся новый дочерний (child) процесс и возвращается одно значение родительскому процессу и другое значение дочернему процессу. И родительский, и дочерний процессы выполняют один и тот же код, который следует после вызова fork. (Дочерний процесс наследует копии всех переменных, объявленных к этому моменту родительским). В частности, успешный вызов fork возвращает:

Для отделения кода, предназначенного для родительского процесса, от кода для дочернего процесса, используется if/else или эквивалентная конструкция. В данном примере:

if (0 == cpid) { /*** child ***/
...
}
else { /*** parent ***/
...
}

Если создание форка процесса завершается успешно, pipeUN продолжает выполнение следующим образом. Для хранения двух файловых дескрипторов (один — для записи в канал, второй — для чтения из канала) есть массив целых чисел:

int pipeFDs[2]; /* two file descriptors */

где pipeFDs[0] — дескриптор для чтения, pipeFDs[1] — дексриптор для записи. Успешный вызов функции pipe, выполненный непосредственно перед вызовом fork, заполняет массив этими файловыми дескрипторами.

if (pipe(pipeFDs) < 0) report_and_exit("pipeFD");

У родительского и дочернего процессов есть копии этих двух файловых дескрипторов, однако, в соответствии с паттерном разделения ответственности (separation of concerns), каждому из процессов требуется только один дескриптор. В нашем примере родительский процесс пишет, а дочерний — читает, хотя роли могут быть и обратными. Поэтому первый оператор в коде дочернего if-блока закрывает канальный вход для записи (write end):

close(pipeFDs[WriteEnd]); /* called in child code */

а первый оператор в коде родительского else-блока закрывает канальный выход для чтения (read end):

close(pipeFDs[ReadEnd]); /* called in parent code */

Затем родительский процесс записывает несколько байтов (коды ASCII) в неименованный канал, а дочерний процесс считывает их и перенаправляет их в стандартный вывод.

Необходимо пояснить про вызов функции wait в коде для родительского процесса. Как только дочерний процесс был создан, он становится максимально независим от своего родителя. Дочерний процесс может выполнять различный код, который не имеет ничего общего с родительским. Однако, с помощью механизма сигналов система уведомляет родителя, когда дочерний процесс завершит свою работу.

Что будет, если родительский процесс завершится раньше дочернего? В данном случае, если меры предосторожности не будут приняты, дочерний процесс становится процессом-зомби и о нём остаётся запись в таблице процессов. Меры предосторожности бывают двух типов. Один подход — когда родительский процесс информирует систему, что для него не имеет значения, когда завершится работа дочернего процесса:

signal(SIGCHLD, SIG_IGN); /* in parent: ignore notification */

Второй подход — когда родительский процесс выполняет wait вплоть до завершения работы дочернего процесса, тем самым обеспечивая, что он переживёт дочерний процесс. В pipeUN используется второй подход — при вызове

wait(NULL); /* called in parent */

в родительском коде.

Вызов wait подразумевает, что родительский процесс будет ожидать завершения работы дочернего процесса, и в pipeUN будет только один дочерний процесс. (Аргумент NULL может быть заменен адрес целочисленной переменной, в которую запишется статус завершения (exit status) дочернего процесса). Для более гибкого управления существует функция waitpid, используемая, например, для выбора конкретного дочернего процесса среди нескольких.

В pipeUN предприняты еще одни меры предосторожности. Когда в родительском процессе заканчивает ждать, он завершает работу обычным вызовом функции exit. При этом дочерний процесс завершается вызовом __exit для ускорения процесса оповещения о своем завершении. По сути, дочерний процесс сообщает системе — как можно скорее сообщите родительскому процессу, что я завершил работу.

Если два процесса пишут в один и тот же неименованный канал, могут ли байты перемешаться (interleave) между собой? Например, если процесс П1 пишет:

foo bar

в канал, а процесс П2 параллельно пишет:

baz baz

в тот же самый канал, кажется, что в канале в этот момент может быть что угодно, например:

baz foo baz bar

Стандарт POSIX гарантирует, что перемешивания байтов не пройзодет, пока в канал не будет записано больше, чем PIPE_BUF байтов. На системах Linux, размер PIPE_BUF соответствует 4096 байтам. Для того, чтобы обойти эту проблему, я предпочитаю в каналах использовать только один writer и только один reader.

Именованные каналы

У неименованного канала нет файла поддержки (backing file): система поддерживает в памяти буфер, используемый для передачи байтов от writer-а к reader-у. Как только writer и reader завершает работу, буфер освобождается, и неименованный канал закрывается. У именованного канала, наоборот, имеется файл поддержи и отдельный API.

Взглянем на еще один пример с командной строкой, чтобы понять суть работы именованных каналов. Шаги представлены ниже:

1) Открыть два терминала. Рабочий каталог у них должен быть один и тот же.

2) В одном из терминалов введите следующие команды (символ приглашения командной строки — %, мои комментарии начинаются с ##):

% mkfifo tester  ## creates a backing file named tester
% cat tester ## type the pipe's content to stdout

Сначала в терминале ничего не должно отобразиться, поскольку в именованный канал еще ничего не было записано.

3) Во втором терминале введите команду:

% cat > tester ## redirect keyboard input to the pipe
hello, world!  ## then hit Return key
bye, bye       ## ditto
    ## terminate session with a 
			   ## Control-C

Что бы не было введено в этот терминал, оно будет отображаться в первом. Как только нажато Ctrl+C, в обоих терминалах появляется символ приглашения командной строки: канал закрылся.

4) «Прибираемся» — удаляем файл, который реализует именованный канал:

% unlink tester

Как следует из названия утилиты mkfifo, именованный канал еще называют FIFO, поскольку «первый байт на вход = первый байт на выход» (first byte in is the first byte out). Существует библиотечная функция с именем mkfifo, создающая именованный канал в программах, и она будет использована в следующем примере, который состоит из двух процессов: один пишет в именованный канал, а другой из него читает.

Пример 2. Программа fifoWriter

#include 
#include 
#include  
#include 
#include 
#include 
#include 

#define MaxLoops         12000   /* outer loop */
#define ChunkSize           16   /* how many written at a time */
#define IntsPerChunk         4   /* four 4-byte ints per chunk */
#define MaxZs              250   /* max microseconds to sleep */

int main() {
  const char* pipeName = "./fifoChannel";
  mkfifo(pipeName, 0666);                      /* read/write for user/group/others */
  int fd = open(pipeName, O_CREAT | O_WRONLY); /* open as write-only */
  if (fd < 0) return -1;                       /** error **/
  
  int i;
  for (i = 0; i < MaxLoops; i++) {          /* write MaxWrites times */
    int j;
    for (j = 0; j < ChunkSize; j++) {       /* each time, write ChunkSize bytes */
      int k;
      int chunk[IntsPerChunk];
      for (k = 0; k < IntsPerChunk; k++) 
	chunk[k] = rand();             
      write(fd, chunk, sizeof(chunk)); 
    }
    usleep((rand() % MaxZs) + 1);           /* pause a bit for realism */
  }

  close(fd);                                /* close pipe: generates an end-of-file */
  unlink(pipeName);                         /* unlink from the implementing file */
  printf("%i ints sent to the pipe.\n", MaxLoops * ChunkSize * IntsPerChunk);

  return 0;
}

Программу выше можно свести к следующему:

1) Программа создаёт именованный канал для записи:

mkfifo(pipeName, 0666); /* read/write perms for user/group/others */
int fd = open(pipeName, O_CREAT | O_WRONLY);

где pipeName — имя файла поддержки, передаваемое в mkfifo как первый аргумент. Затем именованный канал открывается с помощью уже известного вызова функции open, возвращающей файловый дескриптор.

2) Для большего реализма fifoWriter пишет не все данные за раз, а пишет фрагментами (chunk), между отправкой которых засыпает на рандомное количество микросекунд. Суммарно, в именованный канал запишется 768000 4-байтных целых чисел.

3) После закрытия именованного канала, fifoWriter отсоединяет (unlink) файл поддержки:

close(fd); /* close pipe: generates end-of-stream marker */
unlink(pipeName); /* unlink from the implementing file */

Система освобождает файл поддержки, как только все процессы, подключенный к каналу, выполнят операцию отсоединения. В этом примере два процесса (fifoWriter и fifoReader) отсоединяют файл.

Две программы должны выполняться в разных терминалах с общим рабочим каталогом. Однако, fifoWriter должен стартовать перед fifoReader, поскольку именно первый процесс создаёт канал. Затем fifoReader обращается к уже созданному именованному каналу (см. пример 3).

Пример 3. Программа fifoReader

#include 
#include 
#include 
#include 
#include 

unsigned is_prime(unsigned n) { /* not pretty, but gets the job done efficiently */
  if (n <= 3) return n > 1;
  if (0 == (n % 2) || 0 == (n % 3)) return 0;

  unsigned i;
  for (i = 5; (i * i) <= n; i += 6) 
    if (0 == (n % i) || 0 == (n % (i + 2))) return 0;

  return 1; /* found a prime! */
}

int main() {
  const char* file = "./fifoChannel";
  int fd = open(file, O_RDONLY); 
  if (fd < 0) return -1; /* no point in continuing */
  unsigned count = 0, total = 0, primes_count = 0;

  while (1) {
    int next;
    int i;
    ssize_t count = read(fd, &next, sizeof(int));   

    if (0 == count) break;                  /* end of stream */
    else if (count == sizeof(int)) {        /* read a 4-byte int value */
      total++;
      if (is_prime(next)) primes_count++;
    }   
  }

  close(fd);       /* close pipe from read end */
  unlink(file);    /* unlink from the underlying file */
  printf("Received ints: %u, primes: %u\n", total, primes_count);  

  return 0;
}

Программу выше можно свести к следующему:

1) Поскольку fifoWriter создаёт именованный канал, fifoReader-у требуется только вызвать open для обращения к каналу через файл поддержки:

const char* file = "./fifoChannel";
int fd = open(file, O_RDONLY);

Файл открывается только для чтения.

2) Затем программа попадает в (потенциально) бесконечный цикл, в каждой итерации которого она пытается считать 4-байтный фрагмент. Вызов read:

ssize_t count = read(fd, &next, sizeof(int));

возращает 0 в конце потока (end-of-stream); в этом случае fifoReader выходит из цикла, закрывает именованный канал, и отсоединяет файл поддержки перед завершением работы.

3) После чтения 4-байтного целого числа, fifoReader проверяет, является ли число простым (prime).

В пробном запуске программы среди полученых 768,000 целых чисел было насчитано 37,682 простых. На повторных пробных запусках, fifoReader успешно считал все байты, записанные fifoWriter. В этом нет ничего удивительного — два процесса выполняются на одном и том же компьютере, что исключает проблемы с сетью. Именованные каналы — надёжный, эффективный и широко используемый механизм IPC.

Ниже представлен вывод двух программ; каждая запущена из разных терминалов, но в одном и том же рабочем каталоге:

% ./fifoWriter
768000 ints sent to the pipe.

% ./fifoReader
Received ints: 768000, primes: 37682

Очереди сообщений

Каналы строго соответствуют поведению FIFO: первый записанный байт — первый считанный байт, второй записанный байт — второй считанный байт, и так далее. Очереди сообщений могут как вести себя таким же образом, так и более гибко — фрагменты данных могут быть получены и не в порядке FIFO.

Очередеь сообщений — последовательность сообщений, состоящих из двух частей:

  • полезная нагрузка (payload) — массив байтов (`char` в С);

  • тип, представленный положительным целым числом; тип категоризирует сообщения для возможности более гибкого получения.

Рассмотрим очередь сообщений, где каждое из сообщений отмечено целым числом:

          +-+    +-+    +-+    +-+
sender--->|3|--->|2|--->|2|--->|1|--->receiver
          +-+    +-+    +-+    +-+

Из этих четырёх сообщений, то, что обозначено 1, находится в передней части, т.е. ближе всего к получателю. Следом идут два сообщения с отметкой 2, и сообщение с отметкой 3 в конце. Если строгое FIFO поведение соблюдается, тогда сообщения должны быть получены в порядке 1–2–2–3. Однако, очередь сообщений поддерживает и другой порядок получения. К примеру, сообщения могут быть получены в порядке 3–2–1–2.

Пример mqueue состоит из двух программ: sender — пишущая в очередь сообщений и receiver — читающая из очереди сообщений. Обе программы включают заголовочный файл queue.h, показанный в Примере 4 ниже.

Пример 4. Заголовочный файл queue.h

#define ProjectId 123
#define PathName  "queue.h" /* any existing, accessible file would do */
#define MsgLen    4
#define MsgCount  6

typedef struct { 
  long type;                 /* must be of type long */ 
  char payload[MsgLen + 1];  /* bytes in the message */  
} queuedMessage;

В заголовочном файле определена структура queuedMessage с полями payload (массив байтов) и type (целое число). В файле также определяются символьные константы (директивы #define), первые две из которых используются для генерации ключа, который, в свою очередь, используется для получения ID очереди сообщений. ProjectID может быть любым положительным целым числом, а PathName должен быть существующим и доступным файлом — в нашем случае это файл queue.h.

Операторы, осуществляющие настройку sender и receiver:

key_t key = ftok(PathName, ProjectId); /* generate key */
int qid = msgget(key, 0666 | IPC_CREAT); /* use key to get queue id */

ID qid по факту является аналогом файлового дескриптора для очереди сообщений.

Пример 5. Программа sender

#include  
#include  
#include 
#include 
#include 
#include "queue.h"

void report_and_exit(const char* msg) {
  perror(msg);
  exit(-1); /* EXIT_FAILURE */
}

int main() {
  key_t key = ftok(PathName, ProjectId); 
  if (key < 0) report_and_exit("couldn't get key...");
  
  int qid = msgget(key, 0666 | IPC_CREAT); 
  if (qid < 0) report_and_exit("couldn't get queue id...");

  char* payloads[] = {"msg1", "msg2", "msg3", "msg4", "msg5", "msg6"};
  int types[] = {1, 1, 2, 2, 3, 3}; /* each must be > 0 */
  int i;
  for (i = 0; i < MsgCount; i++) {
    /* build the message */
    queuedMessage msg;
    msg.type = types[i];
    strcpy(msg.payload, payloads[i]);

    /* send the message */
    msgsnd(qid, &msg, MsgLen + 1, IPC_NOWAIT); /* don't block */
    printf("%s sent as type %i\n", msg.payload, (int) msg.type);
  }
  return 0;
}

Программа выше отправляет 6 сообщений — по два сообщения каждого типа: первые сообщения с типом 1, следующие два — с типом 2, два последних — с типом 3. Отправка сообщений:

msgsnd(qid, &msg, sizeof(msg), IPC_NOWAIT);

выполняется как неблокирующая (флаг IPC_NOWAIT), поскольку размер сообщений небольшой. В этом случае единственно возможная опасность — переполненная очередь (что в случае нашего примера — маловероятно) — может привести к ошибке отправки. Программа ниже тоже получает сообщения с использованием флага IPC_NOWAIT.

Пример 6. Программа receiver

#include  
#include  
#include 
#include 
#include "queue.h"

void report_and_exit(const char* msg) {
  perror(msg);
  exit(-1); /* EXIT_FAILURE */
}
  
int main() { 
  key_t key= ftok(PathName, ProjectId); /* key to identify the queue */
  if (key < 0) report_and_exit("key not gotten...");
  
  int qid = msgget(key, 0666 | IPC_CREAT); /* access if created already */
  if (qid < 0) report_and_exit("no access to queue...");

  int types[] = {3, 1, 2, 1, 3, 2}; /* different than in sender */
  int i;
  for (i = 0; i < MsgCount; i++) {
    queuedMessage msg; /* defined in queue.h */
    if (msgrcv(qid, &msg, MsgLen + 1, types[i], MSG_NOERROR | IPC_NOWAIT) < 0)
      puts("msgrcv trouble...");
    printf("%s received as type %i\n", msg.payload, (int) msg.type);
  }

  /** remove the queue **/
  if (msgctl(qid, IPC_RMID, NULL) < 0)  /* NULL = 'no flags' */
    report_and_exit("trouble removing queue...");
  
  return 0; 
} 

Программа receiver не создаёт очередь сообщений, хотя API предоставляет такую возможность. В receiver-е вызов:

int qid = msgget(key, 0666 | IPC_CREAT);

вводит в заблуждение флагом IPC_CREAT; однако этот флаг подразумевает «создать при необходимости, иначе — получить доступ». Программа sender вызывает msgsnd для отправки сообщений, тогда как receiver вызывает msgrcv для их получения. В нашем примере sender отправляет сообщения в порядке 1–1–2–2–3–3, однако receiver будет их получать в порядке 3–1–2–3–2 для демонстрации, что очереди сообщений не ограничены только порядком FIFO.

% ./sender
msg1 sent as type 1
msg2 sent as type 1
msg3 sent as type 2
msg4 sent as type 2
msg5 sent as type 3
msg6 sent as type 3

% ./receiver
msg5 received as type 3
msg1 received as type 1
msg3 received as type 2
msg2 received as type 1
msg6 received as type 3
msg4 received as type 2

Вывод выше показывает, что sender и receiver могут быть запущены в одном и том же терминале. Также в выводе показано, что очередь сообщений сохраняется даже после того, как процесс sender завершает работу. Очередь удаляется только после того, как процесс receiver явным образом удалит её вызовом msgctl:

if (msgctl(qid, IPC_RMID, NULL) < 0) /* remove queue */

Подведение итогов

API каналов и очередей сообщений являются однонаправленными: один процесс пишет, другой — читает. Существует реализация двунаправленных именованных каналов, но, по моему мнению — чем механизм IPC проще, тем лучше. Как отмечалось ранее, популярность очередей сообщений упала, но без веской причины; очереди являются еще одним инструментов из набора инструментов IPC.

Habrahabr.ru прочитано 6054 раза