基於c++的資料庫連線池的實現與理解
推薦影片:
高併發技術之資料庫連線池設計與實現
如何最大限度榨乾資料庫效能,全網最細節講解快取方案實現
c/c++ linux伺服器開發學習地址:c/c++ linux後臺伺服器高階架構師
1。專案目的
在高併發的情況,大量的TCP三次握手,MySQL server連線認證,MySQL server連線關閉回收資源,TCP四次揮手會耗費效能。本專案的目的是為了避免頻繁的向資料庫申請資源,釋放資源帶來效能損耗。
2。基本思路
為資料庫的連線建立一個快取池,預先在該快取中放入一定數量的連線。當多個任務需要訪問mysql時,不需要每個任務都去直接透過TCP連線mysql server,而是在該快取池中取對應數量的連線即可。用完之後不需要釋放該連線,只需要歸還到連線池即可。
3關鍵點分析
1。利用mysql提供的api,自定義一個“連線”類。後面把該連線類放入容器中作為連線池。
2。基於上述分析,連線池的設計採用單例模式設計。
3。擬採用生產者-消費者執行緒模型,生產者負責產生連線,消費者負責使用連線。考慮併發情況,使用互斥鎖和條件變數實現執行緒安全和同步,即:生產後再消費的同步。
4。實現連線池的容器考慮佇列實現。在併發情況下,STL的queue不是執行緒安全的,可使用互斥鎖實現執行緒安全。
5。 由於連線用完後是歸還而不是釋放,擬採用智慧指標來管理連線,用lamda表示式來實現連線歸還的功能。(因為智慧指標出作用域自動析構,且申明指標智慧時可以指定刪除器,方便自定義歸還功能)
6。 連線池中連線的數量為多少時效能最佳?
4 程式碼實現
4.1 “連線” 類的功能
分析可知,連線池中的“連線” 使用類實現。利用mysql提供的API可實現。
主要功能包括:
1。“連線”的構造和析構功能
2。連線資料庫
3。對資料庫的操作
4。 返回一個連線的空閒時間(用於釋放多餘產生的連線,後文會說明)
4.2“連線” 類的程式碼如下
注:(標頭檔案是類的定義,原始檔是類中成員方法的實現):
Connection。h
#pragma once
#include
#include
#include
using namespace std;
class Connection
{
public:
// 初始化資料庫連線
Connection();
// 釋放資料庫連線資源
~Connection();
// 連線資料庫
bool connect(string ip,
unsigned short port,
string username,
string password,
string dbname);
// 更新操作 insert、delete、update
bool update(string sql);
// 查詢操作 select
MYSQL_RES* query(string sql);
// 重新整理連線的起始空閒時刻
// 記錄每個佇列的空閒時間,緩解伺服器資源,在入隊時
void refreshAliveTime();
// 返回連線空閒的時長
clock_t getAliveTime();
private:
MYSQL* _conn; // 表示和MySQL Server的一條連線
clock_t _alivetime; // 記錄進入空閒狀態後的起始存活時刻(即在佇列中出現的時刻)
};
Connection。cpp
#include “public。h”
#include “Connection。h”
// 初始化資料庫連線
Connection::Connection()
{
_conn = mysql_init(nullptr);
}
// 釋放資料庫連線資源
Connection::~Connection()
{
if (_conn != nullptr)
mysql_close(_conn);
}
// 連線資料庫
bool Connection::connect(string ip,
unsigned short port,
string username,
string password,
string dbname)
{
MYSQL* p = mysql_real_connect(_conn, ip。c_str(), username。c_str(),
password。c_str(), dbname。c_str(),
port, nullptr, 0);
//mysql_query(_conn, “set interactive_timeout=24*3600”);
return p != nullptr;
}
// 更新操作 insert、delete、update
bool Connection::update(string sql)
{
bool a = mysql_query(_conn, sql。c_str());
if (mysql_query(_conn, sql。c_str()))
{
LOG(“更新失敗:” + sql + “\nmysql_error:” + mysql_error(_conn));
return false;
}
return true;
}
// 查詢操作 select
MYSQL_RES* Connection::query(string sql)
{
// 查詢操作 select
// 如果查詢成功,返回0。如果出現錯誤,返回非0值
if (mysql_query(_conn, sql。c_str()))
{
LOG(“查詢失敗” + sql + “\nmysql_error:” + mysql_error(_conn));
return nullptr;
}
return mysql_use_result(_conn);
}
// 重新整理連線的起始空閒時刻
void Connection::refreshAliveTime()
{
_alivetime = clock();
}
// 返回連線空閒的時長
clock_t Connection::getAliveTime()
{
return clock() - _alivetime;
}
4.3連線池的功能
連線池的主要引數:
1。初始連線數:連線池事先會準備一些連線備用。最小連線數需要根據實際情況不斷測試決定,設定太多的話會出現很多空連線,浪費資源。
2。最大連線數:當併發請求太多了之後,初始量不夠用了。這時候會根據需求建立更多的連線,但不能無限建立,因為考慮到資源浪費問題。
3。最大空閒時間: 當併發請求增多以後,連線數會變多。由於“歸還”原因,這些連線不會被直接釋放,而是歸還到佇列中。假設後面的併發請求沒那麼多,那麼之前產生的多的連線會造成資源冗餘浪費。需要我們設定一個最大空閒時間。如果在最大空閒時間內,該連線還沒有被使用的話,就需要被回收掉,節約資源。考慮容器基於佇列實現,當隊頭元素的存活時間都沒超過最大空閒時間的話,後面的連線肯定也沒超過該最大空閒時間。
4。連線超時時間: 當併發請求太多了,且連線池的連線數已經超過最大連線數了,導致已經沒有空閒的連線可以使用了。那麼此時執行緒請求再連線會失敗。此時需設定一個連線超時時間,如果超時了,那麼獲取失敗,無法連線資料庫。
待實現的連線池的主要功能如下:
1。建立一個連線池物件。(因為是一個單例模式)
2。初始化連線數以及生產新連線(生產過程是連線池類內部多執行緒建立的,所以許可權為private;另外需要定義一個連線數計算器,使用原子變atomic,就不需要用互斥鎖來保護該計數器了)
3。從連線池中獲取一個可用連線(消費過程是使用者請求,許可權為public;用完後歸還到佇列中)
4。回收連線(透過定義一個掃描函式,獲取每個連線的空閒時間,用於多餘連線的釋放)
5。載入初始配置項,主要是資料庫連線引數如使用者名稱密碼等(可選)。
4.4 連線池的程式碼如下:
CommonConnectionPool。h
#pragma once
#include “Connection。h”
#include
#include
#include
#include
#include
#include
#include
#include
using namespace std;
// 實現連線池功能模組
class ConnectionPool
{
public:
// 獲取連線池物件例項(懶漢式單例模式,在獲取例項時才例項化物件)
static ConnectionPool* getConnectionPool();
// 給外部提供介面,從連線池中獲取一個可用的空閒連線
//注意,這裡不要直接返回指標,否則我們還需要定義一個(歸還連線)的方法,還要自己去釋放該指標。
//這裡直接返回一個智慧指標,智慧指標出作用域自動析構,(我們只需重定義析構即可——不釋放而是歸還)
shared_ptr
private:
// 單例模式——建構函式私有化
ConnectionPool();
// 從配置檔案中載入配置項
bool loadConfigFile();
// 執行在獨立的執行緒中,專門負責生產新連線
// 非靜態成員方法,其呼叫依賴物件,要把其設計為一個執行緒函式,需要繫結this指標。
// 把該執行緒函式寫為類的成員方法,最大的好處是 非常方便訪問當前物件的成員變數。(資料)
void produceConnectionTask();
// 掃描超過maxIdleTime時間的空閒連線,進行對於連線的回收
void scannerConnectionTask();
string _ip; // MySQL的ip地址
unsigned short _port; // MySQL的埠號,預設為3306
string _username; // MySQL登陸使用者名稱
string _password; // MySQL登陸密碼
string _dbname; // 連線的資料庫名稱
int _initSize; // 連線池的最大初始連線量
int _maxSize; // 連線池的最大連線量
int _maxIdleTime; // 連線池的最大空閒時間
int _connectionTimeout; // 連線池獲取連線的超時時間
// 儲存MySQL連線的佇列
queue
// 維護連線佇列的執行緒安全互斥鎖
mutex _queueMutex;
// 記錄connection連線的總數量
atomic_int _connectionCnt;
// 設定條件變數,用於連線生產執行緒和連線消費執行緒的通訊
condition_variable cv;
};
【文章福利】需要C/C++ Linux伺服器架構師學習資料加群812855908(資料包括C/C++,Linux,golang技術,核心,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒體,CDN,P2P,K8S,Docker,TCP/IP,協程,DPDK,ffmpeg等)
CommonConnectionPool。cpp
#include “CommonConnectionPool。h”
#include “public。h”
// 執行緒安全的懶漢單例函式介面
ConnectionPool* ConnectionPool::getConnectionPool()
{
// 對於靜態區域性變數的初始化,編譯器自動進行lock和unlock
static ConnectionPool pool;
return &pool;
}
// 從配置檔案中載入配置項
bool ConnectionPool::loadConfigFile()
{
FILE* pf = fopen(“mysql。ini”, “r”);
if (pf == nullptr)
{
LOG(“File ‘mysql。ini’ is not existing!”);
return false;
}
// 逐行處理配置檔案中的配置字串
while (!feof(pf))
{
// 配置字串舉例:username=root\n
// 從檔案中獲取一行配置字串
char line[1024] = { 0 };
fgets(line, 1024, pf);
string str = line;
// 找到配置字串中的‘=’
int idx = str。find(‘=’, 0);
// 無效的配置項
if (idx == -1)
{
// 當配置字串中找不到‘=’時說明該配置字串有問題或者是註釋,將其忽略
continue;
}
// 分別取出該行配置中的key和value
int endIdx = str。find(‘\n’, idx);
string key = str。substr(0, idx);
string value = str。substr(idx + 1, endIdx - idx - 1);
if (key == “ip”)
{
_ip = value;
}
else if (key == “port”)
{
_port = atoi(value。c_str());
}
else if (key == “username”)
{
_username = value;
}
else if (key == “password”)
{
_password = value;
}
else if (key == “dbname”)
{
_dbname = value;
}
else if (key == “maxSize”)
{
_maxSize = atoi(value。c_str());
}
else if (key == “maxIdleTime”)
{
_maxIdleTime = atoi(value。c_str());
}
else if (key == “connectionTimeout”)
{
_connectionTimeout = atoi(value。c_str());
}
else if (key == “initSize”)
{
_initSize = atoi(value。c_str());
}
}
return true;
}
// 連線池的建構函式
ConnectionPool::ConnectionPool()
{
// 載入配置項
if (!loadConfigFile())
{
return;
}
// 建立初始數量的連線
for (int i = 0; i < _initSize; ++i)
{
Connection* p = new Connection();
p->connect(_ip, _port, _username, _password, _dbname);
p->refreshAliveTime(); // 記錄連線的起始空閒時刻
_connectionQue。push(p);
_connectionCnt++;
}
// 啟動一個新的執行緒,作為連線的生產者
thread produce(std::bind(&ConnectionPool::produceConnectionTask, this));
produce。detach(); //守護執行緒
// 啟動一個新的定時執行緒,掃描超過maxIdleTime時間的空閒連線,並對其進行回收
thread scanner(std::bind(&ConnectionPool::produceConnectionTask, this));
scanner。detach();
}
// 執行在獨立的執行緒中,專門負責產生新連線
void ConnectionPool::produceConnectionTask()
{
for (;;)
{
unique_lock
while (!_connectionQue。empty())
{
// 佇列非空時,此處生產執行緒進入等待狀態
cv。wait(lock); //進入等待時,釋放鎖,保證消費者執行緒正常執行
}
// 連線數量沒有到達上限,繼續建立新的連線
if (_connectionCnt < _maxSize)
{
Connection* p = new Connection();
p->connect(_ip, _port, _username, _password, _dbname);
_connectionQue。push(p);
_connectionCnt++;
}
// 通知消費者執行緒,可以消費連線了
cv。notify_all();
}
}
// 給外部提供介面,從連線池中獲取一個可用的空閒連線
shared_ptr
{
unique_lock
while (_connectionQue。empty())
{
if (cv_status::timeout == cv。wait_for(lock, std::chrono::milliseconds(_connectionTimeout))) //超時喚醒
{
if (_connectionQue。empty())
{
LOG(“Failed to get connection:got idle connection timeout!”);
return nullptr;
}
}
}
/*
* shared_ptr智慧指標析構時,會把connection資源直接delete掉,
* 相當於呼叫connection的解構函式,connection就被close掉了。
* 這裡需要自定義shared_ptr的釋放資源的方式,把connection直接歸還到queue當中*/
shared_ptr
[&](Connection* pcon)
{
// 這裡是在伺服器應用執行緒中呼叫的,所以一定要考慮佇列的執行緒安全操作
unique_lock
pcon->refreshAliveTime(); //在歸還回空閒連線佇列之前要記錄一下連線開始空閒的時刻
_connectionQue。push(pcon);
});
_connectionQue。pop();
// 消費者取出一個連線之後,通知生產者,生產者檢查佇列,如果為空則生產
cv。notify_all();
return sp;
}
// 掃描超過maxIdleTime時間的空閒連線,進行對於連線的回收
void ConnectionPool::scannerConnectionTask()
{
for (;;)
{
// 透過sleep實現定時效果
this_thread::sleep_for(chrono::seconds(_maxIdleTime));
// 掃描整個佇列,釋放多餘的連線
unique_lock
while (_connectionCnt > _initSize)
{
Connection* p = _connectionQue。front();
if (p->getAliveTime() >= (_maxIdleTime * 1000))
{
_connectionQue。pop();
_connectionCnt——;
delete p; // 呼叫~Connection()釋放連線
}
else
{
// 隊頭的連線沒有超過_maxIdleTime,其它連線肯定也沒有
break;
}
}
}
}
4.5 配置檔案 mysql.ini
ip=127。0。0。1
port=3306
username=root
password=111111
dbname=chat
initSize=10
maxSize=1024
maxIdleTime=60
connectionTimeout=100
5。測試函式 main。cpp及結果
#pragma once
#include
#include
#include
#include “Connection。h”
#include “CommonConnectionPool。h”
int n = 10000;//資料量
int main()
{
//不使用連線池,單執行緒:
clock_t begin = clock();
for (int i = 0; i < n; i++)
{
Connection conn;
char sql[1024] = { 0 };
sprintf(sql, “insert into t1(id,name) values(%d,‘%s’)”,
1, “a”);
conn。connect(“127。0。0。1”, 3306, “root”, “zh601572”, “chat”);
conn。update(sql);
}
clock_t end = clock();
cout << end - begin << “ms” << endl;
return 0;
//不使用連線池,4執行緒:
//Connection conn;
//conn。connect(“localhost”, 3306, “root”, “zh601572”, “chat”);
//clock_t begin = clock();
//thread t1([]()
// {
// for (int i = 0; i < n/4; ++i)
// {
// Connection conn;
// char sql[1024] = { 0 };
// sprintf(sql, “insert into t1(id,name) values(%d,‘%s’)”,
// 5, “a”);
// conn。connect(“localhost”, 3306, “root”, “zh601572”, “chat”);
// conn。update(sql);
// }
// });
//thread t2([]()
// {
// for (int i = 0; i < n / 4; ++i)
// {
// Connection conn;
// char sql[1024] = { 0 };
// sprintf(sql, “insert into t1(id,name) values(%d,‘%s’)”,
// 6, “a”);
// conn。connect(“localhost”, 3306, “root”, “zh601572”, “chat”);
// conn。update(sql);
// }
// });
//thread t3([]()
// {
// for (int i = 0; i < n / 4; ++i)
// {
// Connection conn;
// char sql[1024] = { 0 };
// sprintf(sql, “insert into t1(id,name) values(%d,‘%s’)”,
// 7, “a”);
// conn。connect(“localhost”, 3306, “root”, “zh601572”, “chat”);
// conn。update(sql);
// }
// });
//thread t4([]()
// {
// for (int i = 0; i < n / 4; ++i)
// {
// Connection conn;
// char sql[1024] = { 0 };
// sprintf(sql, “insert into t1(id,name) values(%d,‘%s’)”,
// 8, “a”);
// conn。connect(“localhost”, 3306, “root”, “zh601572”, “chat”);
// conn。update(sql);
// }
// });
//t1。join();
//t2。join();
//t3。join();
//t4。join();
//clock_t end = clock();
//cout << end - begin << “ms” << endl;
//return 0;
//使用連線池,單執行緒:
//clock_t begin = clock();
//ConnectionPool* cp = ConnectionPool::getConnectionPool();
//for (int i = 0; i < n; i++)
//{
// shared_ptr
// char sql[1024] = { 0 };
// sprintf(sql, “insert into t1(id,name) values(%d,‘%s’)”,
// 4, “zhouhui”);
// sp ->update(sql);
//}
//clock_t end = clock();
//cout << end - begin << “ms” << endl;
//return 0;
//使用連線池,四執行緒:
//clock_t begin = clock();
//
//thread t1([]()
// {
// ConnectionPool* cp = ConnectionPool::getConnectionPool();
// for (int i = 0; i < n / 4; i++)
// {
// shared_ptr
// char sql[1024] = { 0 };
// sprintf(sql, “insert into t1(id,name) values(%d,‘%s’)”,4, “zhouhui”);
// sp ->update(sql);
// }
// }
//);
//thread t2([]()
// {
// ConnectionPool* cp = ConnectionPool::getConnectionPool();
// for (int i = 0; i < n /4; i++)
// {
// shared_ptr
// char sql[1024] = { 0 };
// sprintf(sql, “insert into t1(id,name) values(%d,‘%s’)”, 4, “zhouhui”);
// sp->update(sql);
// }
// }
//);
//thread t3([]()
// {
// ConnectionPool* cp = ConnectionPool::getConnectionPool();
// for (int i = 0; i < n / 4; i++)
// {
// shared_ptr
// char sql[1024] = { 0 };
// sprintf(sql, “insert into t1(id,name) values(%d,‘%s’)”, 4, “zhouhui”);
// sp->update(sql);
// }
// }
//);
//thread t4([]()
// {
// ConnectionPool* cp = ConnectionPool::getConnectionPool();
// for (int i = 0; i < n / 4; i++)
// {
// shared_ptr
// char sql[1024] = { 0 };
// sprintf(sql, “insert into t1(id,name) values(%d,‘%s’)”, 4, “zhouhui”);
// sp->update(sql);
// }
// }
//);
//t1。join();
//t2。join();
//t3。join();
//t4。join();
//clock_t end = clock();
//cout << (end - begin) << “ms” << endl;
//return 0;
}
結果如下
可以看到還是節約很多時間資源的。
6 思考
資料庫連線池設定多少連線才合適?