PostgreSQL libpq connection pool
Библиотека отлично документирована, есть даже полный перевод на русский язык, от компании PostgresPRO.
При написании серверного бекэнда, столкнулся с тем, что в этой библиотеке нет никакого пула коннектов, а работа с БД, предполагалась в довольно интенсивном режиме и одного коннекта было явно мало. Каждый раз устанавливать соединение для отправки полученных данных, было бы просто безумием, т.к. соединение самая долгая операция, решено было написать свой пул коннектов.
Идея в том, что мы при старте программы создаём несколько соединений и храним их в очереди.
Когда приходят данные, мы просто берём свободное соединеие из очереди, а если свободных соединений нет — ждём когда появится, используем его для вставки данных, а затем помещаем соединение обратно. Идея довольна простая, быстро реализуема и самое главное, скорость работы очень высокая.
Создадим в PostgreSQL базу с именем demo, табличкой demo такой
-- Table: public.demo
-- DROP TABLE public.demo;
CREATE TABLE public.demo
(
id integer NOT NULL DEFAULT nextval('demo_id_seq'::regclass),
name character varying(256),
CONSTRAINT demo_pk PRIMARY KEY (id)
)
WITH (
OIDS=FALSE
);
ALTER TABLE public.demo
OWNER TO postgres;
Пишем класс который будет представлять собой соединение к БД, параметры соединения, будут прописаны прямо в коде для его упрощения, в реальности конечно их надо хранить в конфигурационном файле и при запуске считывать оттуда, чтобы при изменении парметров сервера, не приходилось перекомпилировать программу.
#ifndef PGCONNECTION_H
#define PGCONNECTION_H
#include
#include
#include
class PGConnection
{
public:
PGConnection();
std::shared_ptr connection() const;
private:
void establish_connection();
std::string m_dbhost = "localhost";
int m_dbport = 5432;
std::string m_dbname = "demo";
std::string m_dbuser = "postgres";
std::string m_dbpass = "postgres";
std::shared_ptr m_connection;
};
#endif //PGCONNECTION_H
#include "pgconnection.h"
PGConnection::PGConnection()
{
m_connection.reset( PQsetdbLogin(m_dbhost.c_str(), std::to_string(m_dbport).c_str(), nullptr, nullptr, m_dbname.c_str(), m_dbuser.c_str(), m_dbpass.c_str()), &PQfinish );
if (PQstatus( m_connection.get() ) != CONNECTION_OK && PQsetnonblocking(m_connection.get(), 1) != 0 )
{
throw std::runtime_error( PQerrorMessage( m_connection.get() ) );
}
}
std::shared_ptr PGConnection::connection() const
{
return m_connection;
}
Чтобы предотвратить возможную утечку ресурсов, соединиение мы будем хранить в умном указателе.
В конструкторе мы вызывам функцию PQsetdbLogin, которая устанавливает соединение к БД, возвращая указатель на соедининие PGconn* и переводим соедининие в асинхронный режим работы.
При завершении работы, соединие должно быть удалено функцией PQfinish, которой передается указатель, возвращённый функцией PQsetdbLogin. Поэтому последним параметром в вызове m_connection.reset () мы передаём адрес функции &PQfinish. Когда умный указатель выйдет из области видимости и счётчик ссылок обнулиться, он вызовет эту функцию, тем самым корректно завершив соединие.
Теперь нам нужен класс, который будет создавать, хранить и управлять работой пула коннектов.
#ifndef PGBACKEND_H
#define PGBACKEND_H
#include
#include
#include
#include
#include
#include
#include "pgconnection.h"
class PGBackend
{
public:
PGBackend();
std::shared_ptr connection();
void freeConnection(std::shared_ptr);
private:
void createPool();
std::mutex m_mutex;
std::condition_variable m_condition;
std::queue> m_pool;
const int POOL = 10;
};
#endif //PGBACKEND_H
#include
#include
#include
#include
#include "pgbackend.h"
PGBackend::PGBackend()
{
createPool();
}
void PGBackend::createPool()
{
std::lock_guard locker_( m_mutex );
for ( auto i = 0; i< POOL; ++i ){
m_pool.emplace ( std::make_shared() );
}
}
std::shared_ptr PGBackend::connection()
{
std::unique_lock lock_( m_mutex );
while ( m_pool.empty() ){
m_condition.wait( lock_ );
}
auto conn_ = m_pool.front();
m_pool.pop();
return conn_;
}
void PGBackend::freeConnection(std::shared_ptr conn_)
{
std::unique_lock lock_( m_mutex );
m_pool.push( conn_ );
lock_.unlock();
m_condition.notify_one();
}
В функции createPool создаём пул соедининий, я установил 10 соединений.
Далее — создаём класс PGBackend, и работаем с ним через функции connection — котороя возвращает свободное соединение к БД, и freeConnection — которая помещяет соединие обратно в очередь.
Всё это работает на основе условных переменных, если очередь пуста, значит свободных соединений нет, и поток засыпает, пока не будет разбужен через условную переменную.
Простейший пример, в котором используется наш бекэнд с пулом коннектов, приведён в файле main.cpp. В «боевых условия» у вас конечно будет какой-то цикл событий, при наступлении которых будет проводиться работа с БД. У меня это boost: asio, которая работает асинхронно и принимая события из сети, пишет всё в БД. Приводить здесь её излишне, чтобы не усложнять идею с пулом коннектов. Здесь мы просто создаём 50 потоков, которые работают с сервером через один экземляр PGBackend.
#include
#include
#include "pgbackend.h"
void testConnection(std::shared_ptr pgbackend)
{
//получаем свободное соединение
auto conn = pgbackend->connection();
std::string demo = "SELECT max(id) FROM demo; " ;
PQsendQuery( conn->connection().get(), demo.c_str() );
while ( auto res_ = PQgetResult( conn->connection().get()) ) {
if (PQresultStatus(res_) == PGRES_TUPLES_OK && PQntuples(res_)) {
auto ID = PQgetvalue (res_ ,0, 0);
std::cout<< ID<freeConnection(conn);
}
int main(int argc, char const *argv[])
{
auto pgbackend = std::make_shared();
std::vector> vec;
for ( size_t i = 0; i< 50 ; ++i ){
vec.push_back(std::make_shared(std::thread(testConnection, pgbackend)));
}
for(auto &i : vec) {
i.get()->join();
}
return 0;
}
Компилируется это всё командой:
g++ main.cpp pgbackend.cpp pgconnection.cpp -o pool -std=c++14 -I/usr/include/postgresql/ -lpq -lpthread
Будьте внимательны с количеством соединений БД — этот параметр задаётся параметром max_connections (integer).