mysql库使用
2018-12-22 11:08:26 # cpp

db_connect.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
#pragma once

#include <memory>
#include <string_view>

#include "db_query.h"
#include "mysql/errmsg.h"
#include "mysql/mysql.h"

namespace Sky {
class DBConnect final {
public:
using Ptr = std::shared_ptr<DBConnect>;

DBConnect() = default;

~DBConnect();

bool connect(const std::string& host, const std::string& user, const std::string& passwd, const std::string& name,
const std::string& charset, uint16_t port);

void query(std::string_view sql, DBQuery& query);

bool execute(std::string_view sql);

int getInsertId();

bool beginTransaction();

void commit();

void rollBack();

private:
void clear();

private:
MYSQL* m_conn{};
std::string m_host{};
std::string m_user{};
std::string m_passwd{};
std::string m_name{};
std::string m_charset{};
uint16_t m_port{};
bool m_ping{};
bool m_autoCommit{};
};
} // namespace Sky

db_connect.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
#include "db_connect.h"

#include <thread>

#include "comm_func.h"
#include "define.h"
#include "logger.h"

namespace Sky {
constexpr uint32_t DB_MYSQL_RECONNECT_TIMES = 200;
constexpr uint32_t DB_MYSQL_RECONNECT_COUNT = 10;

DBConnect::~DBConnect() {
this->clear();
}

bool DBConnect::connect(const std::string& host, const std::string& user, const std::string& passwd,
const std::string& name, const std::string& charset, uint16_t port) {
MYSQL* conn = mysql_init(nullptr);
if (conn == nullptr) {
LOG_ERROR("DBConnect::connect mysql init fail");
return false;
}

int32_t reconnect = 1;
m_ping = (mysql_options(conn, MYSQL_OPT_RECONNECT, &reconnect) != 0);
uint32_t timeout = 60;
mysql_options(conn, MYSQL_OPT_CONNECT_TIMEOUT, &timeout);
mysql_options(conn, MYSQL_OPT_READ_TIMEOUT, &timeout);
mysql_options(conn, MYSQL_OPT_WRITE_TIMEOUT, &timeout);

if (mysql_real_connect(conn, host.c_str(), user.c_str(), passwd.c_str(), name.c_str(), port, nullptr, 0) ==
nullptr) {
LOG_ERROR("DBConnect::connect error:{}", mysql_error(conn));
return false;
}

if (mysql_set_character_set(conn, charset.c_str()) != 0) {
LOG_ERROR("DBConnect::connect set character error:{}", mysql_error(conn));
mysql_close(conn);
return false;
}

this->clear();

m_conn = conn;
m_host = host;
m_user = user;
m_passwd = passwd;
m_name = name;
m_charset = charset;
m_port = port;
m_autoCommit = true;
return true;
}

void DBConnect::clear() {
if (m_conn != nullptr) {
mysql_close(m_conn);
m_conn = nullptr;
}
}

void DBConnect::query(std::string_view sql, DBQuery& query) {
if (m_ping) {
mysql_ping(m_conn);
}

if (mysql_real_query(m_conn, sql.data(), sql.length()) != 0) {
uint32_t error = mysql_errno(m_conn);
if (error == CR_COMMANDS_OUT_OF_SYNC || error == CR_SERVER_GONE_ERROR || error == CR_SERVER_LOST) {
int count = DB_MYSQL_RECONNECT_COUNT;
while (count-- > 0) {
if (this->connect(m_host, m_user, m_passwd, m_name, m_charset, m_port)) {
if (mysql_real_query(m_conn, sql.data(), sql.length()) == 0) {
LOG_INFO("DBConnect::query sql:{}", sql);
return query.setRes(mysql_store_result(m_conn));
}
LOG_ERROR("DBConnect::query still failed after sql:{} code:{} error:{}", sql, mysql_errno(m_conn),
mysql_error(m_conn));
return;
}

std::this_thread::sleep_for(std::chrono::milliseconds(DB_MYSQL_RECONNECT_TIMES));
}
LOG_ERROR("DBConnect::query reopen faild sql:{} code:{} error:{}", sql, mysql_errno(m_conn),
mysql_error(m_conn));
return;
} else {
LOG_ERROR("DBConnect::query sql:{} code:{} error:{}", sql, mysql_errno(m_conn), mysql_error(m_conn));
return;
}
}

LOG_INFO("DBConnect::query sql:{}", sql);
query.setRes(mysql_store_result(m_conn));
}

bool DBConnect::execute(std::string_view sql) {
if (m_ping) {
mysql_ping(m_conn);
}

if (mysql_real_query(m_conn, sql.data(), sql.length()) != 0) {
uint32_t error = mysql_errno(m_conn);
if (error == CR_COMMANDS_OUT_OF_SYNC || error == CR_SERVER_GONE_ERROR || error == CR_SERVER_LOST) {
int count = DB_MYSQL_RECONNECT_COUNT;
while (count-- > 0) {
if (this->connect(m_host, m_user, m_passwd, m_name, m_charset, m_port)) {
if (mysql_real_query(m_conn, sql.data(), sql.length()) == 0) {
LOG_INFO("DBConnect::execute sql:{}", sql);
return true;
}
LOG_ERROR("DBConnect::execute still failed after sql:{} \n\tcode:{} error:{}", sql,
mysql_errno(m_conn), mysql_error(m_conn));
return false;
}

std::this_thread::sleep_for(std::chrono::milliseconds(DB_MYSQL_RECONNECT_TIMES));
}
LOG_ERROR("DBConnect::execute reopen faild sql:{} \n\tcode:{} error:{}", sql, mysql_errno(m_conn),
mysql_error(m_conn));
return false;
} else {
LOG_ERROR("DBConnect::execute sql:{} \n\tcode:{} error:{}", sql, mysql_errno(m_conn), mysql_error(m_conn));
return false;
}
}

LOG_INFO("DBConnect::execute sql:{}", sql);
return true;
}

bool DBConnect::beginTransaction() {
if (!m_autoCommit) {
return true;
}

if (mysql_autocommit(m_conn, 0) != 0) {
return false;
}

m_autoCommit = false;
return true;
}

void DBConnect::commit() {
if (m_autoCommit) {
return;
}

mysql_commit(m_conn);

if (mysql_autocommit(m_conn, 1) == 0) {
m_autoCommit = true;
}
}

void DBConnect::rollBack() {
if (m_autoCommit)
return;

mysql_rollback(m_conn);

if (mysql_autocommit(m_conn, 1) == 0) {
m_autoCommit = true;
}
}

int DBConnect::getInsertId() {
return static_cast<int>(mysql_insert_id(m_conn));
}
} // namespace Sky

db_pool.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#pragma once

#include <atomic>
#include <list>
#include <memory>
#include <string>

#include "concurrentqueue/blockingconcurrentqueue.h"
#include "db_connect.h"

namespace Sky {
class DBPool final {
public:
DBPool() = default;

~DBPool();

bool start(const std::string& dbName);

void stop();

DBConnect::Ptr take();

private:
void recovery(DBConnect* dc);

private:
moodycamel::BlockingConcurrentQueue<DBConnect::Ptr> m_queue{};
std::atomic_bool m_running{};
};
} // namespace Sky

db_pool.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
#include "db_pool.h"

#include "config.h"

namespace Sky {
DBPool::~DBPool() {
mysql_library_end();
}

bool DBPool::start(const std::string& dbName) {
auto address = Config::instance().get<std::string>(dbName + "Ip");
auto port = Config::instance().get<int32_t>(dbName + "Port");
auto user = Config::instance().get<std::string>(dbName + "User");
auto password = Config::instance().get<std::string>(dbName + "Passwd");
auto name = Config::instance().get<std::string>(dbName + "Name");
auto charset = Config::instance().get<std::string>(dbName + "Charset");
auto poolCount = Config::instance().get<int32_t>(dbName + "ConnCount");

mysql_library_init(-1, nullptr, nullptr);

assert(mysql_thread_safe() == 1);

DBConnect::Ptr conn;
m_running = true;
for (std::size_t i = 0; i < poolCount; ++i) {
conn = std::shared_ptr<DBConnect>{new DBConnect, [this](DBConnect* dc) {
recovery(dc);
}};

if (conn && conn->connect(address, user, password, name, charset, port)) {
m_queue.enqueue(conn);
}
}

m_running = m_queue.size_approx() > 0;
LOG_DEBUG("MYSQL SERVER VERSION:{}", MYSQL_SERVER_VERSION);
return m_running;
}

void DBPool::recovery(DBConnect* dc) {
if (m_running) {
if (m_queue.enqueue({dc, [this](DBConnect* dc) {
recovery(dc);
}})) {
return;
}
}

dc->~DBConnect();
}

void DBPool::stop() {
if (!m_running) {
return;
}

m_running = false;
DBConnect::Ptr dc;
while (m_queue.try_dequeue(dc)) {
}
}

DBConnect::Ptr DBPool::take() {
if (!m_running) {
return {};
}

DBConnect::Ptr connect;
m_queue.wait_dequeue(connect);
return connect;
}
} // namespace Sky

db_query.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
#pragma once

#include <cstdint>
#include <memory>
#include <string_view>

#include "comm_func.h"
#include "define.h"
#include "mysql/errmsg.h"
#include "mysql/mysql.h"

namespace Sky {

#define REGISTER_DB_READ_FUNC(ty, name) \
ty read##name() { \
if (checkData()) { \
return {}; \
} \
int fieldIndex = m_curFieldIndex; \
if (checkFieldIndex(fieldIndex)) { \
return {}; \
} \
auto cur_data = CommFunc::stringTo<ty>(m_curRow[fieldIndex]); \
nextFieldIndex(); \
return cur_data; \
}

class DBQuery final : public UniversalPacket {
public:
using Ptr = std::shared_ptr<DBQuery>;

DBQuery() = default;

DBQuery(uint16_t msgIdx, uint16_t sn)
: m_sn(sn) {
m_msgIdx = msgIdx;
}

~DBQuery() {
freeRes();
}

void freeRes();

DBQuery &operator=(DBQuery &query);

bool getState() {
return m_state;
}

void setState(bool state) {
m_state = state;
}

uint16_t getSn() {
return m_sn;
}

public:
void setRes(MYSQL_RES *res);

int getRowCount() const {
return m_rowCount;
}

int getCurFieldIndex() const {
return m_curFieldIndex;
}

bool eof() {
return m_curRow == nullptr;
}

std::string_view readString() {
if (checkData()) {
return {};
}

int fieldIndex = m_curFieldIndex;
if (checkFieldIndex(fieldIndex)) {
return {};
}

std::string_view cur_data = m_curRow[fieldIndex];
nextFieldIndex();
return cur_data;
}

REGISTER_DB_READ_FUNC(int8_t, Int8)

REGISTER_DB_READ_FUNC(uint8_t, UInt8)

REGISTER_DB_READ_FUNC(int16_t, Int16)

REGISTER_DB_READ_FUNC(uint16_t, UInt16)

REGISTER_DB_READ_FUNC(int32_t, Int32)

REGISTER_DB_READ_FUNC(uint32_t, UInt32)

REGISTER_DB_READ_FUNC(int64_t, Int64)

REGISTER_DB_READ_FUNC(uint64_t, UInt64)

REGISTER_DB_READ_FUNC(float, Float)

REGISTER_DB_READ_FUNC(double, Double)

private:
bool checkData() const;

bool checkFieldIndex(int field_index) const;

void nextFieldIndex();

public:
MYSQL_RES *m_result{};
int m_fieldCount{};
int m_rowCount{};
int m_curFieldIndex{};
MYSQL_ROW m_curRow{};
uint16_t m_sn{};
bool m_state{};
};
} // namespace Sky

db_query.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
#include "db_query.h"

namespace Sky {
DBQuery &DBQuery::operator=(DBQuery &query) {
if (this == &query) {
return *this;
}

freeRes();

m_result = query.m_result;
m_fieldCount = 0;
m_rowCount = 0;
m_curRow = nullptr;
m_curFieldIndex = query.m_curFieldIndex;
m_msgIdx = query.m_msgIdx;
m_state = query.m_state;
m_sn = query.m_sn;

if (m_result != nullptr) {
mysql_data_seek(m_result, 0);
m_fieldCount = mysql_num_fields(m_result);
m_rowCount = static_cast<int>(mysql_num_rows(m_result));
m_curRow = mysql_fetch_row(m_result);
}

query.m_result = nullptr;
query.m_fieldCount = 0;
query.m_rowCount = 0;
query.m_curRow = nullptr;
query.m_curFieldIndex = 0;
query.m_msgIdx = 0;
query.m_state = false;
query.m_sn = 0;
return *this;
}

void DBQuery::setRes(MYSQL_RES *res) {
this->freeRes();
m_result = res;
if (m_result != nullptr) {
m_fieldCount = mysql_num_fields(m_result);
m_rowCount = static_cast<int>(mysql_num_rows(m_result));

mysql_data_seek(m_result, 0);
m_curRow = mysql_fetch_row(m_result);
}
}

bool DBQuery::checkData() const {
if (m_result == nullptr || m_curRow == nullptr) {
return true;
}
return false;
};

bool DBQuery::checkFieldIndex(int field_index) const {
if (m_fieldCount <= 0 || m_curRow[field_index] == nullptr) {
return true;
}
return false;
}

void DBQuery::nextFieldIndex() {
if (++m_curFieldIndex == m_fieldCount) {
m_curFieldIndex = 0;
m_curRow = mysql_fetch_row(m_result);
}
}

void DBQuery::freeRes() {
if (m_result != nullptr) {
mysql_free_result(m_result);
m_result = nullptr;
m_rowCount = 0;
m_fieldCount = 0;
m_curRow = nullptr;
m_curFieldIndex = 0;
m_msgIdx = 0;
m_sn = 0;
m_state = false;
}
}
} // namespace Sky