Connection pool для pqxx
Проблема
В современном программировании управление базами данных — это критический компонент многих приложений, особенно в тех случаях, когда дело касается взаимодействия с реляционными базами данных, такими как PostgreSQL. Одной из ключевых задач в таком взаимодействии является эффективное управление соединениями с базой данных, что особенно важно в высоконагруженных приложениях. Именно здесь на сцену выходит понятие «connection pool» или пул соединений.
Пул соединений — очередь, которая содержит активные соединения с базой данных. Когда приложение требует доступа к базе данных, оно забирает соединение из пула, использует его для выполнения необходимых операций, а затем возвращает обратно в пул. Эта концепция позволяет избежать затратного процесса открытия и закрытия соединений при каждом запросе к базе данных, что значительно увеличивает производительность запросов к базе данных.
В процессе изучения бекэнда, как нового для меня направления в программировании, я столкнулся с необходимостью оптимизации управления соединениями. Поискав в интернете существующие решения для библиотеки pqxx
(C++ API для PostgreSQL), я обнаружил, что хотя они и выполняют свою задачу, ни одно из них не соответствовало моим требованиям.
Это побудило меня разработать собственную реализацию пула соединений, которая была бы не только эффективной и масштабируемой, но и предоставляла бы удобный API для работы с транзакциями. Моя цель — создать решение, которое могло бы быть легко интегрировано в любой проект, использующий pqxx
, обеспечивая при этом более высокую производительность и стабильность.
В данной статье я хочу поделиться процессом разработки и обсудить некоторые моменты реализации.
Необходимые знания и навыки
Перед тем как мы погрузимся в технические детали и код, важно отметить, что понимание содержания этой статьи требует определенного уровня знания:
C++: уверенные знания вплоть до C++17. Мы будем применять шаблоны, умные указатели и примитивы синхронизации (mutex
, conditional_variable
). Кроме того, советую ознакомиться с RAII, если вы еще не знаете что это такое.
PostgreSQL: минимальный опыт работы с pqxx и представление о том, что такое connection
, transaction
и query
.
Начинаем кодить
В целом, для MVP моя задумка очень простая: существует две функции/метода — begin_transaction
и end_transaction
. Хоть это, грубо говоря общепринятые названия, я решил сделать borrow_connection
и return_connection
соответственно для начала и конца транзакции.
struct connection_pool {
connection_pool(/*...*/) {
for (int i = 0; i < /*...*/; ++i)
connections.push(std::make_unique(/*...*/));
}
std::unique_ptr borrow_connection() {
std::unique_lock lock(connections_mutex);
connections_cond.wait(lock, [this]() { return !connections.empty(); });
// забираем соединение если очередь не пустая
auto connection = std::move(connections.front());
connections.pop();
return connection;
}
void return_connection(std::unique_ptr& connection) {
// возвращаем соединение
{
std::scoped_lock lock(connections_mutex);
connections.push(std::move(connection));
}
connections_cond.notify_one();
}
private:
std::mutex connections_mutex{};
std::condition_variable connections_cond{};
std::queue> connections{};
};
Получилась достаточно чистая и практически полностью готовая к использованию в ваших проектах реализация. Думаю, что любой человек, который хотел бы написать connection pool для pqxx
, его примерно так и представлял.
Стоит подметить, что этот код не exception safe. Давайте разберем на примере, почему это так?
connection_pool pool(/* параметры для создания соединения */);
try {
auto connection = pool.borrow_connection();
pqxx::work tx(connection)
// выполняем какие-то действия с транзакцией
pool.return_connection(connection);
}
catch (std::exception& e) {
std::cerr << e.what() << std::endl;
}
Думаю опытные разработчики уже заметили, что если выкидывается исключение на любом моменте между borrow_connection
и return_connection
происходит утечка ресурса, то есть соединение достается из очереди, но не возвращается туда. Чтобы это исправить мы можем написать свою реализацию defer как в Go, либо использовать RAII, что является более предпочтительным способом в этом случае.
В этой статье я бы хотел немного поиграться с кодом и сделать что-то интереснее, чем классический connection pool. Прежде всего я хотел упростить API pqxx
лично для себя.
Придумаем API
Обычно, перед тем как писать какую-то абстракцию с неопределенным API, я захожу в блокнот и накидываю API будущей абстракции и параллельно думаю как это можно было бы реализовать. Именно для этой идеи вышел такой код:
void insert_user(cp::connection_pool& pool, int rnd) {
cp::query add_user("INSERT INTO test_users (username, role) VALUES ($1, $2)");
try {
auto tx = cp::tx(pool, add_user);
add_user(std::format("{:X}", rnd), "user");
tx.commit();
} catch (std::exception& e) {
std::cout << e.what() << std::endl;
}
}
int main() {
cp::connection_pool pool{};
try {
cp::query create_table("CREATE TABLE IF NOT EXISTS test_users ("
"id SERIAL PRIMARY KEY,"
"username TEXT,"
"role TEXT)");
auto tx = cp::tx(pool, create_table);
create_table();
tx.commit();
} catch (std::exception& e) {
std::cout << e.what() << std::endl;
}
}
Немного распишу происходящее в коде:
— cp
(connection pool) — пространство имён, содержащее в себе весь код, который мы сегодня напишем.
— cp::tx(...)
создает транзакция. Возвращаемый тип из это функции это RAII объект. Транзакция создается в момент создания этого объекта, а её завершение происходит в деструкторе. Этот объект является прокси (или чем-то подобным) для pqxx::transaction
aka pqxx::work
.
Также важно подметить, что отладка этого кода может быть усложнена отсутствием имён у запросов. Концептуально это не большая проблема (вроде бы). Но мы держим эту проблему в уме, не забываем её.
Первые шаги
Сразу хотелось бы изменить то, что в нашей очереди хранятся pqxx::connection
, а хотелось бы иметь свою обертку для реализации метода prepare
, который будет поддерживать многопоточность.
struct connection_manager {
connection_manager(std::string_view options) : connection(options.data()) {};
void prepare(const std::string& name, const std::string& definition) {
std::scoped_lock lock(prepares_mutex);
if (prepares.contains(name))
return;
connection.prepare(name, definition);
prepares.insert(name);
}
connection_manager(const connection_manager&) = delete;
connection_manager& operator=(const connection_manager&) = delete;
private:
std::unordered_set prepares{};
std::mutex prepares_mutex{};
pqxx::connection connection;
};
Теперь реализуем RAII структуру, которая будет брать и возвращать соединение из очереди.
struct basic_connection final {
basic_connection(connection_pool& pool) : pool(pool) {
manager = pool.borrow_connection();
}
~basic_connection() {
pool.return_connection(manager);
}
pqxx::connection& get() const { return manager->connection; }
operator pqxx::connection&() { return get(); }
operator const pqxx::connection&() const { return get(); }
pqxx::connection* operator->() { return &manager->connection; }
const pqxx::connection* operator->() const { return &manager->connection; }
void prepare(std::string_view name, std::string_view definition) {
manager->prepare(std::string(name), std::string(definition));
}
basic_connection(const basic_connection&) = delete;
basic_connection& operator=(const basic_connection&) = delete;
private:
connection_pool& pool;
std::unique_ptr manager;
};
Здесь я хочу заострить ваше внимание на том, если ваш класс содержит деструктор, при этом вы не собираетесь наследоваться от этого класса то, сделайте его final
. Этот простой трюк сэкономит вам много времени при отладке программ.
Реализация «query»
Давайте вспомним, что должно быть в query:
— некий объект, который будет инициализироваться не в конструкторе cp::query
, а при создании транзакции, то есть при вызове cp::tx
— методы и операторы для удобного преобразования в std::string
и std::string_view
— operator()
, который будет вызывать этот запрос и возвращать pqxx::result
struct query {
query(std::string_view str) : str(str) {}
const char* data() const {
return str.data();
}
operator std::string() const {
return { str.begin(), str.end() };
}
constexpr operator std::string_view() const {
return { str.data(), str.size() };
}
template
pqxx::result operator()(Args&&... args) {
return exec(std::forward(args)...);
}
template
pqxx::result exec(Args&&... args) {
if (!manager.has_value())
throw std::runtime_error("attempt to execute a query without connection with a transaction");
return manager->exec_prepared(std::forward(args)...);
}
protected:
std::string str;
// некий объект, который мы проинициализируем в cp::tx
std::optional manager{};
};
Теперь напишем query_manager
.
Реализация «query_manager»
По аналогии распишем будущий функционал этой структуры:
— метод exec_prepared
— конструктор, принимающий ссылку на транзакцию и уникальный идентификатор для запроса
Реализуем транзакцию в виде нашей кастомной структуры basic_transaction
.
struct query_manager {
query_manager(basic_transaction& transaction, std::string_view query_id)
: transaction_view(transaction), query_id(query_id) {}
template
pqxx::result exec_prepared(Args&&... args);
private:
std::string query_id{};
basic_transaction& transaction_view;
};
Пока оставим реализацию exec_prepared
на потом, ведь еще не ясно, что за API будет у basic_transaction
.
Реализация «basic_transaction»
Функционал и требования:
— конструктор:
— принимает connection_pool
— принимает случайное количество запросов
— вызывает pqxx::transaction::prepare
для каждого запроса один раз для thread-safe кода
— инициализирует manager
у каждого запроса
— следует идиоме RAII
— содержит API для прямого доступа к pqxx::transaction
struct basic_transaction {
void prepare_one(const query& q) {
// Генерируем уникальное имя для этой транзакции
const auto query_id = std::format("{:X}", std::hash()(q));
connection.prepare(query_id, q);
// Связываем запрос с этой транзакцией
q.manager.emplace(*this, query_id);
}
template
void prepare(Queries&&... queries) {
(prepare_one(std::forward(queries)), ...);
}
template
basic_transaction(connection_pool& pool, Queries&&... queries)
: connection(pool), transaction(connection.get()) {
prepare(std::forward(queries)...);
}
// Запрещаем копирование
basic_transaction(const basic_transaction&) = delete;
basic_transaction& operator=(const basic_transaction&) = delete;
// API для доступа к pqxx::transaction
pqxx::work& get() { return transaction; }
operator pqxx::work&() { return get(); }
private:
basic_connection connection;
pqxx::work transaction;
};
Финальные штрихи
Теперь допишем всё, что не поместилось внутри классов. Тут собственно пару строк и вышло:
template
basic_transaction tx(connection_pool& pool, Queries&&... queries) {
return basic_transaction(pool, std::forward(queries)...);
}
template
pqxx::result query_manager::exec_prepared(Args&&... args) {
return transaction_view.transaction.exec_prepared(
query_id, std::forward(args)...
);
}
Вот вроде бы и всё. Но нет.
Дописываем: «named_query»
Помнится мне, что была проблема с тем, что отладка кода может превратиться в ад, из-за отсутствия явных имён у запросов. В целом, с текущим кодом эта проблема решается элементарно. Для начала напишем новую структуру, `named_query`, которую пронаследуем от query
:
struct named_query : query {
named_query(std::string_view name, std::string_view str)
: query(str), name(name) {}
protected:
std::string name;
};
А также добавим немного кода в basic_transaction
:
struct basic_transaction {
// ...
void prepare_one(const named_query& q) {
connection.prepare(q.name, q);
q.manager.emplace(*this, q.name);
}
// ...
}
Ну вот в общем-то и всё.
Конец
Данный код я выложил на гитхаб как single-header библиотеку. Найти можно тут. В статье я опустил некоторые детали реализации, но всё равно показал более 90% кода.