Document: https://anarthal.github.io/mysql/index.html
GitHub: https://github.com/anarthal/mysql

db_query.h

#pragma once

#include <cstdint>
#include <memory>
#include <string>
#include <vector>

#include "boost/mysql.hpp"

namespace flower {
class db_query final : public std::enable_shared_from_this<db_query> {
public:
    using ptr = std::shared_ptr<db_query>;

    db_query() = delete;

    explicit db_query(std::vector<boost::mysql::row>&& rows);

    ~db_query() = default;

    int8_t read_int8() {
        return static_cast<int8_t>(read_int64());
    }

    uint8_t read_uint8() {
        return static_cast<uint8_t>(read_uint64());
    }

    int16_t read_int16() {
        return static_cast<int16_t>(read_int64());
    }

    uint16_t read_uint16() {
        return static_cast<uint16_t>(read_uint64());
    }

    int32_t read_int32() {
        return static_cast<int32_t>(read_int64());
    }

    uint32_t read_uint32() {
        return static_cast<uint32_t>(read_uint64());
    }

    int64_t read_int64();

    uint64_t read_uint64();

    double read_double();

    float read_float();

    std::string read_string();

    std::size_t get_row_count() const {
        return row_count_;
    }

    bool is_read_all() const {
        return cur_field_index_ == field_count_ && cur_row_ == row_count_;
    }

private:
    void next_field_index();

private:
    std::vector<boost::mysql::row> result_;
    std::size_t cur_row_;
    std::size_t row_count_;
    std::size_t cur_field_index_;
    std::size_t field_count_;
};
}  // namespace flower
db_query.cpp
#include "db_query.h"

namespace flower {
db_query::db_query(std::vector<boost::mysql::row>&& rows)
    : result_(std::move(rows))
    , cur_row_(0)
    , row_count_(result_.size())
    , cur_field_index_(0)
    , field_count_(result_.empty() ? 0 : result_[cur_row_].values().size()) {
}

int64_t db_query::read_int64() {
    auto data = result_[cur_row_].values()[cur_field_index_].get_optional<int64_t>();
    next_field_index();
    return data.get_value_or({});
}

uint64_t db_query::read_uint64() {
    auto data = result_[cur_row_].values()[cur_field_index_].get_optional<uint64_t>();
    next_field_index();
    return data.get_value_or({});
}

double db_query::read_double() {
    auto data = result_[cur_row_].values()[cur_field_index_].get_optional<double>();
    next_field_index();

    return data.get_value_or({});
}

float db_query::read_float() {
    auto data = result_[cur_row_].values()[cur_field_index_].get_optional<float>();
    next_field_index();
    return data.get_value_or({});
}

std::string db_query::read_string() {
    auto data = result_[cur_row_].values()[cur_field_index_].get_optional<boost::string_view>();
    next_field_index();
    return data.get_value_or({}).to_string();
}

void db_query::next_field_index() {
    if (++cur_field_index_ == field_count_ && ++cur_row_ < row_count_) {
        cur_field_index_ = 0;
        field_count_ = result_[cur_row_].values().size();
    }
}

}  // namespace flower
db_connect.h
#pragma once

#include <functional>
#include <memory>

#include "boost/mysql.hpp"
#include "boost/utility/string_view.hpp"
#include "db_query.h"

namespace flower {
class db_connect final : public std::enable_shared_from_this<db_connect> {
public:
    using ptr = std::shared_ptr<db_connect>;

    db_connect() = delete;

    db_connect(boost::asio::io_context& io, boost::asio::ip::tcp::endpoint& ep, boost::string_view user,
               boost::string_view passwd, boost::string_view db);

    ~db_connect();

    void async_query(boost::string_view sql, std::function<void(db_query::ptr)>&& cb);

    void execute(boost::string_view sql, std::function<void()>&& cb);

    bool async_connect();

    bool is_valid() {
        return connection_.valid();
    }

private:
    boost::asio::ip::tcp::endpoint ep_;
    boost::mysql::tcp_connection connection_;
    boost::mysql::connection_params params_;
    boost::mysql::error_info additional_info_;
};
}  // namespace flower
db_connect.cpp
#include "db_connect.h"

#include "boost/asio/spawn.hpp"
#include "logger.h"

namespace flower {
db_connect::db_connect(boost::asio::io_context &io, boost::asio::ip::tcp::endpoint &ep, boost::string_view user,
                       boost::string_view passwd, boost::string_view db)
    : ep_(ep)
    , connection_(io)
    , params_(user, passwd, db)
    , additional_info_{} {
}

db_connect::~db_connect() {
    if (connection_.valid()) {
        connection_.async_close(additional_info_, [](boost::system::error_code ec) {
            LOG_INFO("db_connect::disconnect success");
        });
    }
}

void db_connect::async_query(boost::string_view sql, std::function<void(db_query::ptr)> &&cb) {
    std::string sql_str = sql.to_string();
    LOG_INFO("db_connect::async_query sql:{}", sql_str);
    if (!connection_.valid()) {
        LOG_ERROR("db_connect::async_query connect valid fail sql:{}", sql_str);
        cb(nullptr);
        return;
    }

    boost::asio::spawn(connection_.get_executor(), [this, sql_str, cb](boost::asio::yield_context &&yield) {
        boost::mysql::error_code ec;
        auto result = connection_.async_query(sql_str, additional_info_, yield[ec]);
        if (ec) {
            LOG_ERROR("db_connect::async_query what:{} message:{}", ec.what(), additional_info_.message());
            cb(nullptr);
            return;
        }

        auto rows = result.async_read_all(additional_info_, yield[ec]);
        if (ec) {
            LOG_ERROR("db_connect::async_query what:{} message:{}", ec.what(), additional_info_.message());
            cb(nullptr);
            return;
        }
        cb(std::make_shared<db_query>(std::move(rows)));
    });
}

void db_connect::execute(boost::string_view sql, std::function<void()> &&cb) {
    std::string sql_str = sql.to_string();
    LOG_INFO("db_connect::execute sql:{}", sql_str);
    if (!connection_.valid()) {
        LOG_ERROR("db_connect::execute connect valid fail sql:{}", sql_str);
        return;
    }

    boost::asio::spawn(connection_.get_executor(), [this, sql_str, cb](boost::asio::yield_context &&yield) {
        boost::mysql::error_code ec;
        connection_.async_query(sql_str, additional_info_, yield[ec]);
        if (ec) {
            LOG_ERROR("db_connect::execute what:{} message:{}", ec.what(), additional_info_.message());
        }
        cb();
    });
}

bool db_connect::async_connect() {
    if (!connection_.valid()) {
        LOG_ERROR("db_connect::connect valid fail");
        return false;
    }

    boost::asio::spawn(connection_.get_executor(),
                       [this, self = shared_from_this()](boost::asio::yield_context &&yield) {
                           boost::system::error_code ec;
                           connection_.async_connect(ep_, params_, yield[ec]);
                           if (ec) {
                               LOG_ERROR("db_connect::async_connect what:{}", ec.what());
                               connection_.close();
                               return;
                           }
                       });

    return true;
}
}  // namespace flower
db_pool.h
#pragma once

#include <memory>
#include <string>

#include "boost/utility/string_view.hpp"
#include "concurrentqueue/blockingconcurrentqueue.h"
#include "db_connect.h"
#include "io_context_pool.h"
#include "singleton.h"
#include "boost/utility/string_view.hpp"

namespace flower {
class db_pool final : public singleton<db_pool> {
    friend class singleton<db_pool>;

public:
    ~db_pool() override = default;

private:
    db_pool() = default;

public:
    bool start(boost::string_view db_name, std::size_t pool_count);

    db_connect::ptr take();

    void post(db_connect::ptr connect);

private:
    moodycamel::BlockingConcurrentQueue<db_connect::ptr> conn_queue_{};
    std::string passwd_;
    std::string user_;
    std::string db_;
};
}  // namespace flower
db_pool.cpp
#include "db_pool.h"

#include "config.h"

namespace flower {
bool db_pool::start(boost::string_view db_name, std::size_t pool_count) {
    auto ip = config::instance().get<std::string>(db_name.to_string() + "_ip");
    auto port = config::instance().get<uint16_t>(db_name.to_string() + "_port");
    auto ep = boost::asio::ip::tcp::endpoint(boost::asio::ip::address_v4::from_string(ip), port);
    user_ = config::instance().get<std::string>(db_name.to_string() + "_user");
    passwd_ = config::instance().get<std::string>(db_name.to_string() + "_passwd");
    db_ = config::instance().get<std::string>(db_name.to_string() + "_name");

    db_connect::ptr conn;
    for (std::size_t i = 0; i < pool_count; ++i) {
        conn = std::make_shared<db_connect>(io_context_pool::instance().get_io_context(), ep, user_, passwd_, db_);
        if (conn->async_connect()) {
            post(conn);
        }
    }

    return conn_queue_.size_approx() > 0;
}

db_connect::ptr db_pool::take() {
    db_connect::ptr db_connect;
    conn_queue_.wait_dequeue(db_connect);
    return db_connect;
}

void db_pool::post(db_connect::ptr connect) {
    if (connect && connect->is_valid()) {
        conn_queue_.enqueue(connect);
    }
}
}  // namespace flower