[Из песочницы] Асинхронная работа с PostgreSQL в C
Сегодня захотелось написать небольшую заметку об асинхронной работе с PostgreSQL в C. Мотивы просты: для небольшой утилитки встала необходимость реализовать такой функционал, гугл на тему понятных и рабочих примеров предательски молчал (нашелся только пример в pqxx для C++ — там есть метод асинхронного соединения и pipeline-класс для запросов), а официальная документация по этому вопросу хоть и весьма подробная, но не слишком структурированная, да и сам алгоритм работы с библиотекой libpq в асинхронном режиме имеет много подводных камней. Поэтому разобравшись в вопросе хочется поделиться результатами с общественностью, на случай если кому-то это будет полезным.
Итак, будем считать, что рассказывать что такое PostgreSQL никому не нужно, и чем синхронный (блокирующий) режим работы отличается от асинхронного читатели тоже примерно понимают. Кстати, кроме первого и очевидного достоинства асинхронных вызовов (они не блокируют ввод-вывод и выполнение потока, что освобождает от необходимости создавать дополнительные треды, синхронизировать их, и т.д.), в случае с Postgre есть еще один плюс: обычный метод PQexec позволяет за один раз получить результат выполнения только одного SQL-запроса, а асинхронные функции libpq такого ограничения не имеют.
Как я уже говорил, у libpq в асинхронном режиме есть довольно много подводных камней. Бывают библиотеки, где асинхронный режим реализован красиво и завершенно (разработчик вызывает абсолютно любой асинхронный метод, назначив ему callback, а после этого достаточно просто «вращать» event loop библиотеки (бесконечно или по таймеру вызывать метод), а далее уже сама библиотека позаботится об обработке команд в нужной последовательности, отлове событий и вызове колбэков), то у PostgreSQL модель работы другая.
Имеется большое множество команд для асинхронных соединений и запросов, которые должны быть вызываны в строго определенной последовательности в зависимости от текущего состояния и результата предыдущей операции, плюс необходимо вручную проверять готовность сокетов. Достаточно в каком-то месте ошибиться и вызвать функцию не вовремя или наоборот не вызвать, или попробовать обратиться к занятому сокету, и это может привести к блокировке потока (в худшем случае — бесконечной, то есть зависанию). И еще библиотека в асинхронном режиме почти никак не контроллирует таймаут операций — обо всем нужно будет заботиться самим.
В официальной документации большая часть информации по работе в асинхронном режиме приведена в следующих двух разделах: раз и два.
Ну, а мы перейдем сразу к делу.
Чтобы установить соединение с БД в асинхронном режиме, порядок действий должен быть примерно таков:
1. Выделить память под структуру соединения и начать подключение методом PQconnectStart ()
2. Запомнить текущее время, чтобы можно было в дальнейшем контроллировать таймаут операции.
3. Проверить успешность подключения, вызвав PQstatus (). Если результат равен CONNECTION_BAD, значит инициализация была не успешной (Например, ошибка в строке подключения или не удалось аллоцировать память), иначе же можно продолжать
4. Проверить методом PQconnectPoll () текущий статус подключения.
Возможные результаты:
PGRES_POLLING_WRITING - ожидание завершения отправки данных из сокета
PGRES_POLLING_READING - ожидание завершения чтения данных из сокета
PGRES_POLLING_FAILED - произошла ошибка во время обмена данными с сервером
PGRES_POLLING_OK - подключение выполнено успешно
5. В случае статуса PGRES_POLLING_WRITING или PGRES_POLLING_READING необходимо получить используемый сокет подключения методом PQsocket () и системными функциями select () или poll () проверять его доступность для записи или чтения данных до тех пор пока он не освободится, после чего повторить пункт 4 до достижения результата OK или FAILED, либо до истечения таймаута (не забываем, таймаут нужно проверять вручную).
Если следующий вызов PQconnectPoll () будет _до_ освобождения сокета, поток заблокируется, и это надо иметь в виду.
После всего этого, если все прошло успешно, мы получаем установленное соединение с БД. Порядок действий для выполнения SQL-запросов будет выглядить же примерно так:
1. Подготовить запрос к отправке на сервер командой PQsendQuery ().
2. Установить неблокирующий режим для отправки запроса методом PQsetnonblocking (), потому что по умолчанию в libPq асинхронно выполняется только чтение, а не запись в сокет.
3. Выполнять PQflush () до тех пор пока она не выдаст 0 (запрос отправлен успешно) или -1 (ошибка).
4. Получить активный сокет и проверить его на готовность к чтению через select () или poll (), до тех пор пока он не будет готов к операции.
5. Выполнить PQconsumeInput (). Если функция вернула 0, то произошла ошибка.
6. Выполнить PQisBusy (). Если функция вернула 1, значит обработка запроса или чтения ответа сервера еще не завершено и нужно заново повторить алгоритм начиная с пункта 4.
Ну и не забываем контроллировать таймауты, само собой.
После выполнения всех вышеперечисленных операций, работать с результатами запроса можно как обычно — PQgetResult (), PQgetvalue (), и т.д.
А теперь перейдем к практике. Код на C, однако если захочется обернуть его в класс для использования в программе на C++, то as you wish, всё очень просто.
// Компилировать будем как-то так: gcc pgtest4.c -I/usr/include/postgresql -lpq
#include //< Си библиотека для работы PostgreSQL
#include //< setsockopt() и некоторые константы
#include //< select()
#include //< gettimeoftheday()
#include //< usleep() тоже может пригодиться
#define SOCK_POLL_TIMEOUT 100 // таймаут ожидания освобождения сокета (на сколько можно максимально блокировать основной поток?) в мс
typedef enum {
DISCONNECTED = 0,
CONN_POLLING,
CONN_READING,
CONN_WRITING,
READY,
QUERY_SENT,
QUERY_FLUSHING,
QUERY_BUSY,
QUERY_READING,
CLOSING,
ERROR
} pq_state;
typedef enum {
NO_ERROR = 0,
ALLOCATION_FAIL,
POLLING_FAIL,
READING_FAIL,
WRITING_FAIL,
TIMEOUT_FAIL
} pq_error;
struct pqconn_s{
pq_state state; //< текущее действие
PGconn* conn; //< указатель на структуру с данными о соединении
unsigned long start; //< время начала текущей операции (для таймаута)
long timeout; //< таймаут текущей операции
pq_error error; //< если случится что-то не то, сюда прилетит код ошибки
};
/**
* @brief получить текущеем время
* @return время в мс
*/
unsigned long time_ms(void)
{
struct timespec tp;
// gettimeoftheday() тут использовать нельзя, оно может плавать
clock_gettime(CLOCK_MONOTONIC, &tp);
return (tp.tv_sec * 1000 + tp.tv_nsec / 1000000);
}
/**
* @brief проверить готовность (свободность) сокета к записи/чтению
* @param socket_fd - дескриптор интересующего сокета
* @param rw - 0 если проверяем на чтение, 1 если на запись
* @return как и select(): -1 = ошибка, 0 - свободен (готов), 1 - занят
*/
int try_socket(int socket_fd, int rw)
{
fd_set fset;
struct timeval sock_timeout;
sock_timeout.tv_sec = 0;
sock_timeout.tv_usec = SOCK_POLL_TIMEOUT;
FD_ZERO(&fset);
FD_SET(socket_fd, &fset);
setsockopt(socket_fd, SOL_SOCKET, SO_RCVTIMEO, (char *)&sock_timeout, sizeof(struct timeval));
//здесь кстати возможно не помешает еще выставить SO_SNDTIMEO. экспериментируйте.
return select(socket_fd + 1, ((!rw) ? &fset : NULL), ((rw) ? &fset : NULL), NULL, &sock_timeout);
}
/**
* @brief начать процесс подключения к серверу БД
* @param conninfo - строка подключения к БД
* @param s - указатель на структуру pqconn_s с данными о подключении и текущем состоянии
* @param timeout - таймаут операции в мс
* @return 0 - ошибка (можно узнать ее код в s->error), 1 - успех
*/
int pgsql_connection_start(const char* conninfo, struct pqconn_s* s, long timeout)
{
if (!s) return 0;
if (!conninfo)
{
s->error = ALLOCATION_FAIL;
return 0;
}
s->conn = PQconnectStart(conninfo);
s->state = CONN_POLLING;
s->start = time_ms();
s->timeout = timeout;
s->error = NO_ERROR;
ConnStatusType status;
status = PQstatus(s->conn);
if (status == CONNECTION_BAD)
{
s->state = ERROR;
s->error = POLLING_FAIL;
return 0;
}
return 1;
}
/**
* @brief начать отправку запроса на сервер БД и получение ответа
* @param command - SQL-запрос
* @param s - указатель на структуру pqconn_s с данными о подключении и текущем состоянии
* @param timeout - таймаут операции в мс
* @return 0 - ошибка, 1 - успех
*/
int pgsql_send_query(struct pqconn_s* s, const char *command, long timeout)
{
if (s->state != READY)
{
return 0;
}
if (!PQsendQuery(s->conn, command))
{
return 0;
}
PQsetnonblocking(s->conn, 0);
s->state = QUERY_FLUSHING;
s->start = time_ms();
s->timeout = timeout;
s->error = NO_ERROR;
return 1;
}
/**
* @brief основной цикл, метод должен вызываться периодично
* @param s - указатель на структуру pqconn_s с данными о подключении и текущем состоянии
*/
void pgsql_event_loop(struct pqconn_s* s)
{
if ((s->state == DISCONNECTED) || (s->state == READY))
return;
if ((time_ms() - s->start) > s->timeout)
{
s->state = CLOSING;
s->error = TIMEOUT_FAIL;
}
if (s->state == CONN_POLLING)
{
PostgresPollingStatusType poll_result;
poll_result = PQconnectPoll(s->conn);
if (poll_result == PGRES_POLLING_WRITING)
s->state = CONN_WRITING;
if (poll_result == PGRES_POLLING_READING)
s->state = CONN_READING;
if (poll_result == PGRES_POLLING_FAILED)
{
s->state = ERROR;
s->error = POLLING_FAIL;
}
if (poll_result == PGRES_POLLING_OK)
s->state = READY;
}
if (s->state == CONN_READING)
{
int sock_state = try_socket(PQsocket(s->conn), 0);
if (sock_state == -1)
{
s->error = READING_FAIL;
s->state = CLOSING;
}
if (sock_state > 0)
s->state = CONN_POLLING;
}
if (s->state == CONN_WRITING)
{
int sock_state = try_socket(PQsocket(s->conn), 1);
if (sock_state == -1)
{
s->error = WRITING_FAIL;
s->state = CLOSING;
}
if (sock_state > 0)
s->state = CONN_POLLING;
}
if (s->state == CLOSING)
{
PQfinish(s->conn);
s->state = ERROR;
}
if (s->state == QUERY_FLUSHING)
{
int flush_res = PQflush(s->conn);
if (0 == flush_res)
s->state = QUERY_READING;
if (-1 == flush_res)
{
s->error = WRITING_FAIL;
s->state = CLOSING;
}
}
if (s->state == QUERY_READING)
{
int sock_state = try_socket(PQsocket(s->conn), 0);
if (sock_state == -1)
{
s->error = READING_FAIL;
s->state = CLOSING;
}
if (sock_state > 0)
s->state = QUERY_BUSY;
}
if (s->state == QUERY_BUSY)
{
if (!PQconsumeInput(s->conn))
{
s->error = READING_FAIL;
s->state = CLOSING;
}
if (PQisBusy(s->conn))
s->state = QUERY_READING;
else
s->state = READY;
}
}
В начале описываем все необходимые нам состояния и возможные ошибки, и объявляем структуру в которой будут храниться данные подключения и выполняемого действия — указатель на структур PGconn необходимую библиотеке для работы с сервером, состояния автомата, код ошибок (если они будут) и время начала текущей операции (для контроля тайм-аута).
Две маленькие функции time_ms () и try_socket () представляют собой обертки над функциями стандартной библиотеки для получения текущего времени в миллисекундах и проверки сокета на занятость соответственно.
Использовать же все это можно как-то примерно так:
int main(void)
{
struct pqconn_s s;
pgsql_connection_start("dbname=db1 user=user1 password=password1 hostaddr=10.0.0.1 port=5432", &s, 15000);
while ((s.state != ERROR) && (s.state != READY))
{
pgsql_event_loop(&s);
}
if (s.state == ERROR)
{
perror("DB connection failed \n");
return 1;
}
pgsql_send_query(&s, "SELECT * FROM history;", 50000);
while ((s.state != ERROR) && (s.state != READY))
{
pgsql_event_loop(&s);
}
if (s.state == ERROR)
{
perror("DB query failed \n");
return 1;
}
PGresult *res;
int rec_count;
int row;
int col;
res = PQgetResult(s.conn);
if (PQresultStatus(res) != PGRES_TUPLES_OK) {
perror("We did not get any data!\n");
return 1;
}
rec_count = PQntuples(res);
printf("Received %d records.\n", rec_count);
for (row=0; row
Понятно дело, что приведенный пример работает по факту все-таки в блокирующем режиме (т.к. происходит принудительно ожидание установки поля state структуры в состояние ERROR или READY), однако как можно догадаться, дело осталось за малым: нужно вместо этого добавить в pgsql_event_loop () вызов callback’ов в случае успешного соединения, получения данных или возникновения ошибки, а event loop крутить вместе с остальными действиями в основном цикле программы или вызывать его по таймеру, и тогда работа с базой будет идти по-настоящему асинхронно.
Искренне надеюсь, что вышеописанное окажется кому-нибудь полезным.