Connection pool для pqxx

ead72ce4f06c588876420c7c90d7fa6a

Проблема

В современном программировании управление базами данных — это критический компонент многих приложений, особенно в тех случаях, когда дело касается взаимодействия с реляционными базами данных, такими как PostgreSQL. Одной из ключевых задач в таком взаимодействии является эффективное управление соединениями с базой данных, что особенно важно в высоконагруженных приложениях. Именно здесь на сцену выходит понятие «connection pool» или пул соединений.

Пул соединений — очередь, которая содержит активные соединения с базой данных. Когда приложение требует доступа к базе данных, оно забирает соединение из пула, использует его для выполнения необходимых операций, а затем возвращает обратно в пул. Эта концепция позволяет избежать затратного процесса открытия и закрытия соединений при каждом запросе к базе данных, что значительно увеличивает производительность запросов к базе данных.

В процессе изучения бекэнда, как нового для меня направления в программировании, я столкнулся с необходимостью оптимизации управления соединениями. Поискав в интернете существующие решения для библиотеки pqxx (C++ API для PostgreSQL), я обнаружил, что хотя они и выполняют свою задачу, ни одно из них не соответствовало моим требованиям.

Это побудило меня разработать собственную реализацию пула соединений, которая была бы не только эффективной и масштабируемой, но и предоставляла бы удобный API для работы с транзакциями. Моя цель — создать решение, которое могло бы быть легко интегрировано в любой проект, использующий pqxx, обеспечивая при этом более высокую производительность и стабильность.

В данной статье я хочу поделиться процессом разработки и обсудить некоторые моменты реализации.

Необходимые знания и навыки

Перед тем как мы погрузимся в технические детали и код, важно отметить, что понимание содержания этой статьи требует определенного уровня знания:

C++: уверенные знания вплоть до C++17. Мы будем применять шаблоны, умные указатели и примитивы синхронизации (mutex, conditional_variable). Кроме того, советую ознакомиться с RAII, если вы еще не знаете что это такое.

PostgreSQL: минимальный опыт работы с pqxx и представление о том, что такое connection, transaction и query.

Начинаем кодить

В целом, для MVP моя задумка очень простая: существует две функции/метода — begin_transaction и end_transaction. Хоть это, грубо говоря общепринятые названия, я решил сделать borrow_connection и return_connection соответственно для начала и конца транзакции.

struct connection_pool {
    connection_pool(/*...*/) {
        for (int i = 0; i < /*...*/; ++i)
            connections.push(std::make_unique(/*...*/));
    }

    std::unique_ptr borrow_connection() {
        std::unique_lock lock(connections_mutex);
        connections_cond.wait(lock, [this]() { return !connections.empty(); });

        // забираем соединение если очередь не пустая
        auto connection = std::move(connections.front());
        connections.pop();
        return connection;
    }

    void return_connection(std::unique_ptr& connection) {
        // возвращаем соединение
        {
            std::scoped_lock lock(connections_mutex);
            connections.push(std::move(connection));
        }

        connections_cond.notify_one();
    }

private:
    std::mutex connections_mutex{};
    std::condition_variable connections_cond{};
    std::queue> connections{};
};

Получилась достаточно чистая и практически полностью готовая к использованию в ваших проектах реализация. Думаю, что любой человек, который хотел бы написать connection pool для pqxx, его примерно так и представлял.

Стоит подметить, что этот код не exception safe. Давайте разберем на примере, почему это так?

connection_pool pool(/* параметры для создания соединения */);

try {
    auto connection = pool.borrow_connection();
    pqxx::work tx(connection)

    // выполняем какие-то действия с транзакцией
    
    pool.return_connection(connection);
}
catch (std::exception& e) {
    std::cerr << e.what() << std::endl;
}

Думаю опытные разработчики уже заметили, что если выкидывается исключение на любом моменте между borrow_connection и return_connection происходит утечка ресурса, то есть соединение достается из очереди, но не возвращается туда. Чтобы это исправить мы можем написать свою реализацию defer как в Go, либо использовать RAII, что является более предпочтительным способом в этом случае.

В этой статье я бы хотел немного поиграться с кодом и сделать что-то интереснее, чем классический connection pool. Прежде всего я хотел упростить API pqxx лично для себя.

Придумаем API

Обычно, перед тем как писать какую-то абстракцию с неопределенным API, я захожу в блокнот и накидываю API будущей абстракции и параллельно думаю как это можно было бы реализовать. Именно для этой идеи вышел такой код:

void insert_user(cp::connection_pool& pool, int rnd) {
	cp::query add_user("INSERT INTO test_users (username, role) VALUES ($1, $2)");

	try {
		auto tx = cp::tx(pool, add_user);
		add_user(std::format("{:X}", rnd), "user");
		tx.commit();
	} catch (std::exception& e) {
		std::cout << e.what() << std::endl;
	}
}

int main() {
	cp::connection_pool pool{};

	try {
		cp::query create_table("CREATE TABLE IF NOT EXISTS test_users ("
							   "id SERIAL PRIMARY KEY,"
							   "username TEXT,"
							   "role TEXT)");

		auto tx = cp::tx(pool, create_table);
		create_table();
		tx.commit();
	} catch (std::exception& e) {
		std::cout << e.what() << std::endl;
	}
}

Немного распишу происходящее в коде:

—  cp (connection pool) — пространство имён, содержащее в себе весь код, который мы сегодня напишем.

cp::tx(...) создает транзакция. Возвращаемый тип из это функции это RAII объект. Транзакция создается в момент создания этого объекта, а её завершение происходит в деструкторе. Этот объект является прокси (или чем-то подобным) для pqxx::transaction aka pqxx::work.

Также важно подметить, что отладка этого кода может быть усложнена отсутствием имён у запросов. Концептуально это не большая проблема (вроде бы). Но мы держим эту проблему в уме, не забываем её.

Первые шаги

Сразу хотелось бы изменить то, что в нашей очереди хранятся pqxx::connection, а хотелось бы иметь свою обертку для реализации метода prepare, который будет поддерживать многопоточность.

struct connection_manager {
    connection_manager(std::string_view options) : connection(options.data()) {};

    void prepare(const std::string& name, const std::string& definition) {
        std::scoped_lock lock(prepares_mutex);
        if (prepares.contains(name))
            return;

        connection.prepare(name, definition);
        prepares.insert(name);
    }

    connection_manager(const connection_manager&) = delete;
    connection_manager& operator=(const connection_manager&) = delete;
private:
    std::unordered_set prepares{};
    std::mutex prepares_mutex{};
    pqxx::connection connection;
};

Теперь реализуем RAII структуру, которая будет брать и возвращать соединение из очереди.

struct basic_connection final {
	basic_connection(connection_pool& pool) : pool(pool) {
		manager = pool.borrow_connection();
	}

	~basic_connection() {
		pool.return_connection(manager);
	}

	pqxx::connection& get() const { return manager->connection; }

	operator pqxx::connection&() { return get(); }
	operator const pqxx::connection&() const { return get(); }

	pqxx::connection* operator->() { return &manager->connection; }
	const pqxx::connection* operator->() const { return &manager->connection; }

	void prepare(std::string_view name, std::string_view definition) {
		manager->prepare(std::string(name), std::string(definition));
	}

	basic_connection(const basic_connection&) = delete;
	basic_connection& operator=(const basic_connection&) = delete;

private:
	connection_pool& pool;
	std::unique_ptr manager;
};

Здесь я хочу заострить ваше внимание на том, если ваш класс содержит деструктор, при этом вы не собираетесь наследоваться от этого класса то, сделайте его final. Этот простой трюк сэкономит вам много времени при отладке программ.

Реализация «query»

Давайте вспомним, что должно быть в query:

— некий объект, который будет инициализироваться не в конструкторе cp::query, а при создании транзакции, то есть при вызове cp::tx

— методы и операторы для удобного преобразования в std::string и std::string_view

operator(), который будет вызывать этот запрос и возвращать pqxx::result

struct query {
	query(std::string_view str) : str(str) {}

	const char* data() const {
		return str.data();
	}

	operator std::string() const {
		return { str.begin(), str.end() };
	}

	constexpr operator std::string_view() const {
		return { str.data(), str.size() };
	}

	template
	pqxx::result operator()(Args&&... args) {
		return exec(std::forward(args)...);
	}

	template
	pqxx::result exec(Args&&... args) {
		if (!manager.has_value())
			throw std::runtime_error("attempt to execute a query without connection with a transaction");

		return manager->exec_prepared(std::forward(args)...);
	}

protected:
	std::string str;

    // некий объект, который мы проинициализируем в cp::tx
	std::optional manager{};
};

Теперь напишем query_manager.

Реализация «query_manager»

По аналогии распишем будущий функционал этой структуры:

— метод exec_prepared

— конструктор, принимающий ссылку на транзакцию и уникальный идентификатор для запроса

Реализуем транзакцию в виде нашей кастомной структуры basic_transaction.

struct query_manager {
    query_manager(basic_transaction& transaction, std::string_view query_id) 
        : transaction_view(transaction), query_id(query_id) {}

    template
    pqxx::result exec_prepared(Args&&... args);

private:
    std::string query_id{};
    basic_transaction& transaction_view;
};

Пока оставим реализацию exec_prepared на потом, ведь еще не ясно, что за API будет у basic_transaction.

Реализация «basic_transaction»

Функционал и требования:

— конструктор:

 — принимает connection_pool

 — принимает случайное количество запросов

 — вызывает pqxx::transaction::prepare для каждого запроса один раз для thread-safe кода

 — инициализирует manager у каждого запроса

— следует идиоме RAII

— содержит API для прямого доступа к pqxx::transaction

struct basic_transaction {
    void prepare_one(const query& q) {
        // Генерируем уникальное имя для этой транзакции
        const auto query_id = std::format("{:X}", std::hash()(q));
        connection.prepare(query_id, q);

        // Связываем запрос с этой транзакцией
        q.manager.emplace(*this, query_id);
    }

    template
    void prepare(Queries&&... queries) {
        (prepare_one(std::forward(queries)), ...);
    }

    template
    basic_transaction(connection_pool& pool, Queries&&... queries) 
      : connection(pool), transaction(connection.get()) {
        prepare(std::forward(queries)...);
    }

    // Запрещаем копирование
    basic_transaction(const basic_transaction&) = delete;
    basic_transaction& operator=(const basic_transaction&) = delete;
    
    // API для доступа к pqxx::transaction 
    pqxx::work& get() { return transaction; }
    operator pqxx::work&() { return get(); }
  
private:
    basic_connection connection;
    pqxx::work transaction;
};

Финальные штрихи

Теперь допишем всё, что не поместилось внутри классов. Тут собственно пару строк и вышло:

template
basic_transaction tx(connection_pool& pool, Queries&&... queries) {
    return basic_transaction(pool, std::forward(queries)...);
}

template
pqxx::result query_manager::exec_prepared(Args&&... args) {
    return transaction_view.transaction.exec_prepared(
        query_id, std::forward(args)...
    );
}

Вот вроде бы и всё. Но нет.

Дописываем: «named_query»

Помнится мне, что была проблема с тем, что отладка кода может превратиться в ад, из-за отсутствия явных имён у запросов. В целом, с текущим кодом эта проблема решается элементарно. Для начала напишем новую структуру, `named_query`, которую пронаследуем от query:

struct named_query : query {
    named_query(std::string_view name, std::string_view str) 
      : query(str), name(name) {}

protected:
    std::string name;
};

А также добавим немного кода в basic_transaction:

struct basic_transaction {
    // ...
    
	void prepare_one(const named_query& q) {
		connection.prepare(q.name, q);
		q.manager.emplace(*this, q.name);
	}

    // ...
}

Ну вот в общем-то и всё.

Конец

Данный код я выложил на гитхаб как single-header библиотеку. Найти можно тут. В статье я опустил некоторые детали реализации, но всё равно показал более 90% кода.

© Habrahabr.ru