Boost库MySQL 封装使用
Document: https://anarthal.github.io/mysql/index.html
GitHub: https://github.com/anarthal/mysqldb_query.h
#pragma once
#include <cstdint>
#include <memory>
#include <string>
#include <vector>
#include "boost/mysql.hpp"
namespace flower {
class db_query final : public std::enable_shared_from_this<db_query> {
public:
using ptr = std::shared_ptr<db_query>;
db_query() = delete;
explicit db_query(std::vector<boost::mysql::row>&& rows);
~db_query() = default;
int8_t read_int8() {
return static_cast<int8_t>(read_int64());
}
uint8_t read_uint8() {
return static_cast<uint8_t>(read_uint64());
}
int16_t read_int16() {
return static_cast<int16_t>(read_int64());
}
uint16_t read_uint16() {
return static_cast<uint16_t>(read_uint64());
}
int32_t read_int32() {
return static_cast<int32_t>(read_int64());
}
uint32_t read_uint32() {
return static_cast<uint32_t>(read_uint64());
}
int64_t read_int64();
uint64_t read_uint64();
double read_double();
float read_float();
std::string read_string();
std::size_t get_row_count() const {
return row_count_;
}
bool is_read_all() const {
return cur_field_index_ == field_count_ && cur_row_ == row_count_;
}
private:
void next_field_index();
private:
std::vector<boost::mysql::row> result_;
std::size_t cur_row_;
std::size_t row_count_;
std::size_t cur_field_index_;
std::size_t field_count_;
};
} // namespace flower
db_query.cpp
#include "db_query.h"
namespace flower {
db_query::db_query(std::vector<boost::mysql::row>&& rows)
: result_(std::move(rows))
, cur_row_(0)
, row_count_(result_.size())
, cur_field_index_(0)
, field_count_(result_.empty() ? 0 : result_[cur_row_].values().size()) {
}
int64_t db_query::read_int64() {
auto data = result_[cur_row_].values()[cur_field_index_].get_optional<int64_t>();
next_field_index();
return data.get_value_or({});
}
uint64_t db_query::read_uint64() {
auto data = result_[cur_row_].values()[cur_field_index_].get_optional<uint64_t>();
next_field_index();
return data.get_value_or({});
}
double db_query::read_double() {
auto data = result_[cur_row_].values()[cur_field_index_].get_optional<double>();
next_field_index();
return data.get_value_or({});
}
float db_query::read_float() {
auto data = result_[cur_row_].values()[cur_field_index_].get_optional<float>();
next_field_index();
return data.get_value_or({});
}
std::string db_query::read_string() {
auto data = result_[cur_row_].values()[cur_field_index_].get_optional<boost::string_view>();
next_field_index();
return data.get_value_or({}).to_string();
}
void db_query::next_field_index() {
if (++cur_field_index_ == field_count_ && ++cur_row_ < row_count_) {
cur_field_index_ = 0;
field_count_ = result_[cur_row_].values().size();
}
}
} // namespace flower
db_connect.h
#pragma once
#include <functional>
#include <memory>
#include "boost/mysql.hpp"
#include "boost/utility/string_view.hpp"
#include "db_query.h"
namespace flower {
class db_connect final : public std::enable_shared_from_this<db_connect> {
public:
using ptr = std::shared_ptr<db_connect>;
db_connect() = delete;
db_connect(boost::asio::io_context& io, boost::asio::ip::tcp::endpoint& ep, boost::string_view user,
boost::string_view passwd, boost::string_view db);
~db_connect();
void async_query(boost::string_view sql, std::function<void(db_query::ptr)>&& cb);
void execute(boost::string_view sql, std::function<void()>&& cb);
bool async_connect();
bool is_valid() {
return connection_.valid();
}
private:
boost::asio::ip::tcp::endpoint ep_;
boost::mysql::tcp_connection connection_;
boost::mysql::connection_params params_;
boost::mysql::error_info additional_info_;
};
} // namespace flower
db_connect.cpp
#include "db_connect.h"
#include "boost/asio/spawn.hpp"
#include "logger.h"
namespace flower {
db_connect::db_connect(boost::asio::io_context &io, boost::asio::ip::tcp::endpoint &ep, boost::string_view user,
boost::string_view passwd, boost::string_view db)
: ep_(ep)
, connection_(io)
, params_(user, passwd, db)
, additional_info_{} {
}
db_connect::~db_connect() {
if (connection_.valid()) {
connection_.async_close(additional_info_, [](boost::system::error_code ec) {
LOG_INFO("db_connect::disconnect success");
});
}
}
void db_connect::async_query(boost::string_view sql, std::function<void(db_query::ptr)> &&cb) {
std::string sql_str = sql.to_string();
LOG_INFO("db_connect::async_query sql:{}", sql_str);
if (!connection_.valid()) {
LOG_ERROR("db_connect::async_query connect valid fail sql:{}", sql_str);
cb(nullptr);
return;
}
boost::asio::spawn(connection_.get_executor(), [this, sql_str, cb](boost::asio::yield_context &&yield) {
boost::mysql::error_code ec;
auto result = connection_.async_query(sql_str, additional_info_, yield[ec]);
if (ec) {
LOG_ERROR("db_connect::async_query what:{} message:{}", ec.what(), additional_info_.message());
cb(nullptr);
return;
}
auto rows = result.async_read_all(additional_info_, yield[ec]);
if (ec) {
LOG_ERROR("db_connect::async_query what:{} message:{}", ec.what(), additional_info_.message());
cb(nullptr);
return;
}
cb(std::make_shared<db_query>(std::move(rows)));
});
}
void db_connect::execute(boost::string_view sql, std::function<void()> &&cb) {
std::string sql_str = sql.to_string();
LOG_INFO("db_connect::execute sql:{}", sql_str);
if (!connection_.valid()) {
LOG_ERROR("db_connect::execute connect valid fail sql:{}", sql_str);
return;
}
boost::asio::spawn(connection_.get_executor(), [this, sql_str, cb](boost::asio::yield_context &&yield) {
boost::mysql::error_code ec;
connection_.async_query(sql_str, additional_info_, yield[ec]);
if (ec) {
LOG_ERROR("db_connect::execute what:{} message:{}", ec.what(), additional_info_.message());
}
cb();
});
}
bool db_connect::async_connect() {
if (!connection_.valid()) {
LOG_ERROR("db_connect::connect valid fail");
return false;
}
boost::asio::spawn(connection_.get_executor(),
[this, self = shared_from_this()](boost::asio::yield_context &&yield) {
boost::system::error_code ec;
connection_.async_connect(ep_, params_, yield[ec]);
if (ec) {
LOG_ERROR("db_connect::async_connect what:{}", ec.what());
connection_.close();
return;
}
});
return true;
}
} // namespace flower
db_pool.h
#pragma once
#include <memory>
#include <string>
#include "boost/utility/string_view.hpp"
#include "concurrentqueue/blockingconcurrentqueue.h"
#include "db_connect.h"
#include "io_context_pool.h"
#include "singleton.h"
#include "boost/utility/string_view.hpp"
namespace flower {
class db_pool final : public singleton<db_pool> {
friend class singleton<db_pool>;
public:
~db_pool() override = default;
private:
db_pool() = default;
public:
bool start(boost::string_view db_name, std::size_t pool_count);
db_connect::ptr take();
void post(db_connect::ptr connect);
private:
moodycamel::BlockingConcurrentQueue<db_connect::ptr> conn_queue_{};
std::string passwd_;
std::string user_;
std::string db_;
};
} // namespace flower
db_pool.cpp
#include "db_pool.h"
#include "config.h"
namespace flower {
bool db_pool::start(boost::string_view db_name, std::size_t pool_count) {
auto ip = config::instance().get<std::string>(db_name.to_string() + "_ip");
auto port = config::instance().get<uint16_t>(db_name.to_string() + "_port");
auto ep = boost::asio::ip::tcp::endpoint(boost::asio::ip::address_v4::from_string(ip), port);
user_ = config::instance().get<std::string>(db_name.to_string() + "_user");
passwd_ = config::instance().get<std::string>(db_name.to_string() + "_passwd");
db_ = config::instance().get<std::string>(db_name.to_string() + "_name");
db_connect::ptr conn;
for (std::size_t i = 0; i < pool_count; ++i) {
conn = std::make_shared<db_connect>(io_context_pool::instance().get_io_context(), ep, user_, passwd_, db_);
if (conn->async_connect()) {
post(conn);
}
}
return conn_queue_.size_approx() > 0;
}
db_connect::ptr db_pool::take() {
db_connect::ptr db_connect;
conn_queue_.wait_dequeue(db_connect);
return db_connect;
}
void db_pool::post(db_connect::ptr connect) {
if (connect && connect->is_valid()) {
conn_queue_.enqueue(connect);
}
}
} // namespace flower