PostgreSQL libpq connection pool

habralogo.jpg
Для работы с PostgreSQL на языке С++, есть замечательная библиотека libpq.
Библиотека отлично документирована, есть даже полный перевод на русский язык, от компании PostgresPRO.

При написании серверного бекэнда, столкнулся с тем, что в этой библиотеке нет никакого пула коннектов, а работа с БД, предполагалась в довольно интенсивном режиме и одного коннекта было явно мало. Каждый раз устанавливать соединение для отправки полученных данных, было бы просто безумием, т.к. соединение самая долгая операция, решено было написать свой пул коннектов.
Идея в том, что мы при старте программы создаём несколько соединений и храним их в очереди.
Когда приходят данные, мы просто берём свободное соединеие из очереди, а если свободных соединений нет — ждём когда появится, используем его для вставки данных, а затем помещаем соединение обратно. Идея довольна простая, быстро реализуема и самое главное, скорость работы очень высокая.

Создадим в PostgreSQL базу с именем demo, табличкой demo такой

структуры
-- Table: public.demo

-- DROP TABLE public.demo;

CREATE TABLE public.demo
(
  id integer NOT NULL DEFAULT nextval('demo_id_seq'::regclass),
  name character varying(256),
  CONSTRAINT demo_pk PRIMARY KEY (id)
)
WITH (
  OIDS=FALSE
);
ALTER TABLE public.demo
  OWNER TO postgres;

Пишем класс который будет представлять собой соединение к БД, параметры соединения, будут прописаны прямо в коде для его упрощения, в реальности конечно их надо хранить в конфигурационном файле и при запуске считывать оттуда, чтобы при изменении парметров сервера, не приходилось перекомпилировать программу.

pgconnection.h
#ifndef PGCONNECTION_H
#define PGCONNECTION_H

#include 
#include 
#include 

class PGConnection
{
public:
    PGConnection();
    std::shared_ptr connection() const;

private:
    void establish_connection();

    std::string m_dbhost = "localhost";
    int         m_dbport = 5432;
    std::string m_dbname = "demo";
    std::string m_dbuser = "postgres";
    std::string m_dbpass = "postgres";

    std::shared_ptr  m_connection;

};


#endif //PGCONNECTION_H

pgconnection.cpp
#include "pgconnection.h"


PGConnection::PGConnection()
{
    m_connection.reset( PQsetdbLogin(m_dbhost.c_str(), std::to_string(m_dbport).c_str(), nullptr, nullptr, m_dbname.c_str(), m_dbuser.c_str(), m_dbpass.c_str()), &PQfinish );

    if (PQstatus( m_connection.get() ) != CONNECTION_OK && PQsetnonblocking(m_connection.get(), 1) != 0 )
    {
       throw std::runtime_error( PQerrorMessage( m_connection.get() ) );
    }

}


std::shared_ptr PGConnection::connection() const
{
    return m_connection;
}

Чтобы предотвратить возможную утечку ресурсов, соединиение мы будем хранить в умном указателе.
В конструкторе мы вызывам функцию PQsetdbLogin, которая устанавливает соединение к БД, возвращая указатель на соедининие PGconn* и переводим соедининие в асинхронный режим работы.
При завершении работы, соединие должно быть удалено функцией PQfinish, которой передается указатель, возвращённый функцией PQsetdbLogin. Поэтому последним параметром в вызове m_connection.reset () мы передаём адрес функции &PQfinish. Когда умный указатель выйдет из области видимости и счётчик ссылок обнулиться, он вызовет эту функцию, тем самым корректно завершив соединие.
Теперь нам нужен класс, который будет создавать, хранить и управлять работой пула коннектов.

pgbackend.h

#ifndef PGBACKEND_H
#define PGBACKEND_H

#include 
#include 
#include 
#include 
#include 
#include 
#include "pgconnection.h"


class PGBackend
{
public:
    PGBackend();
    std::shared_ptr connection();
    void freeConnection(std::shared_ptr);

private:
    void createPool();
    
    std::mutex m_mutex;
    std::condition_variable m_condition;
    std::queue> m_pool;

    const int POOL = 10;


};

#endif //PGBACKEND_H

pgbackend.cpp

#include 
#include 
#include 
#include 
#include "pgbackend.h"

PGBackend::PGBackend()
{

    createPool();
  
}

void PGBackend::createPool()
{
    std::lock_guard locker_( m_mutex );

    for ( auto i = 0; i< POOL; ++i ){
         m_pool.emplace ( std::make_shared() );
    }
}

std::shared_ptr PGBackend::connection()
{

    std::unique_lock lock_( m_mutex );

    while ( m_pool.empty() ){
            m_condition.wait( lock_ );
    }

    auto conn_ = m_pool.front();
    m_pool.pop();

    return  conn_;
}


void PGBackend::freeConnection(std::shared_ptr conn_)
{
    std::unique_lock lock_( m_mutex );
    m_pool.push( conn_ );
    lock_.unlock();
    m_condition.notify_one();
}

В функции createPool создаём пул соедининий, я установил 10 соединений.
Далее — создаём класс PGBackend, и работаем с ним через функции connection — котороя возвращает свободное соединение к БД, и freeConnection — которая помещяет соединие обратно в очередь.
Всё это работает на основе условных переменных, если очередь пуста, значит свободных соединений нет, и поток засыпает, пока не будет разбужен через условную переменную.

Простейший пример, в котором используется наш бекэнд с пулом коннектов, приведён в файле main.cpp. В «боевых условия» у вас конечно будет какой-то цикл событий, при наступлении которых будет проводиться работа с БД. У меня это boost: asio, которая работает асинхронно и принимая события из сети, пишет всё в БД. Приводить здесь её излишне, чтобы не усложнять идею с пулом коннектов. Здесь мы просто создаём 50 потоков, которые работают с сервером через один экземляр PGBackend.

main.cpp

#include 
#include 
#include "pgbackend.h"

void testConnection(std::shared_ptr pgbackend)
{
//получаем свободное соединение
    auto conn = pgbackend->connection();

    std::string demo = "SELECT max(id) FROM demo; " ;
    PQsendQuery( conn->connection().get(), demo.c_str() );

    while ( auto res_ = PQgetResult( conn->connection().get()) ) {
        if (PQresultStatus(res_) == PGRES_TUPLES_OK && PQntuples(res_)) {
            auto ID = PQgetvalue (res_ ,0, 0);
            std::cout<< ID<freeConnection(conn);

}


int main(int argc, char const *argv[])
{
	
	auto pgbackend = std::make_shared();

    
    std::vector> vec;

    for ( size_t i = 0; i< 50 ; ++i ){

        vec.push_back(std::make_shared(std::thread(testConnection, pgbackend)));
    }

    for(auto &i : vec) {
        i.get()->join();
    }

 
    return 0;
}


Компилируется это всё командой:

g++ main.cpp pgbackend.cpp pgconnection.cpp -o pool -std=c++14 -I/usr/include/postgresql/ -lpq -lpthread

Будьте внимательны с количеством соединений БД — этот параметр задаётся параметром max_connections (integer).

Комментарии (0)

© Habrahabr.ru