您當前的位置:首頁 > 遊戲

基於c++的資料庫連線池的實現與理解

作者:由 linux 發表于 遊戲時間:2021-09-06

推薦影片:

高併發技術之資料庫連線池設計與實現

如何最大限度榨乾資料庫效能,全網最細節講解快取方案實現

c/c++ linux伺服器開發學習地址:c/c++ linux後臺伺服器高階架構師

1。專案目的

在高併發的情況,大量的TCP三次握手,MySQL server連線認證,MySQL server連線關閉回收資源,TCP四次揮手會耗費效能。本專案的目的是為了避免頻繁的向資料庫申請資源,釋放資源帶來效能損耗。

2。基本思路

為資料庫的連線建立一個快取池,預先在該快取中放入一定數量的連線。當多個任務需要訪問mysql時,不需要每個任務都去直接透過TCP連線mysql server,而是在該快取池中取對應數量的連線即可。用完之後不需要釋放該連線,只需要歸還到連線池即可。

3關鍵點分析

1。利用mysql提供的api,自定義一個“連線”類。後面把該連線類放入容器中作為連線池。

2。基於上述分析,連線池的設計採用單例模式設計。

3。擬採用生產者-消費者執行緒模型,生產者負責產生連線,消費者負責使用連線。考慮併發情況,使用互斥鎖和條件變數實現執行緒安全和同步,即:生產後再消費的同步。

4。實現連線池的容器考慮佇列實現。在併發情況下,STL的queue不是執行緒安全的,可使用互斥鎖實現執行緒安全。

5。 由於連線用完後是歸還而不是釋放,擬採用智慧指標來管理連線,用lamda表示式來實現連線歸還的功能。(因為智慧指標出作用域自動析構,且申明指標智慧時可以指定刪除器,方便自定義歸還功能)

6。 連線池中連線的數量為多少時效能最佳?

4 程式碼實現

4.1 “連線” 類的功能

分析可知,連線池中的“連線” 使用類實現。利用mysql提供的API可實現。

主要功能包括:

1。“連線”的構造和析構功能

2。連線資料庫

3。對資料庫的操作

4。 返回一個連線的空閒時間(用於釋放多餘產生的連線,後文會說明)

4.2“連線” 類的程式碼如下

注:(標頭檔案是類的定義,原始檔是類中成員方法的實現):

Connection。h

#pragma once

#include

#include

#include

using namespace std;

class Connection

{

public:

// 初始化資料庫連線

Connection();

// 釋放資料庫連線資源

~Connection();

// 連線資料庫

bool connect(string ip,

unsigned short port,

string username,

string password,

string dbname);

// 更新操作 insert、delete、update

bool update(string sql);

// 查詢操作 select

MYSQL_RES* query(string sql);

// 重新整理連線的起始空閒時刻

// 記錄每個佇列的空閒時間,緩解伺服器資源,在入隊時

void refreshAliveTime();

// 返回連線空閒的時長

clock_t getAliveTime();

private:

MYSQL* _conn; // 表示和MySQL Server的一條連線

clock_t _alivetime; // 記錄進入空閒狀態後的起始存活時刻(即在佇列中出現的時刻)

};

Connection。cpp

#include “public。h”

#include “Connection。h”

// 初始化資料庫連線

Connection::Connection()

{

_conn = mysql_init(nullptr);

}

// 釋放資料庫連線資源

Connection::~Connection()

{

if (_conn != nullptr)

mysql_close(_conn);

}

// 連線資料庫

bool Connection::connect(string ip,

unsigned short port,

string username,

string password,

string dbname)

{

MYSQL* p = mysql_real_connect(_conn, ip。c_str(), username。c_str(),

password。c_str(), dbname。c_str(),

port, nullptr, 0);

//mysql_query(_conn, “set interactive_timeout=24*3600”);

return p != nullptr;

}

// 更新操作 insert、delete、update

bool Connection::update(string sql)

{

bool a = mysql_query(_conn, sql。c_str());

if (mysql_query(_conn, sql。c_str()))

{

LOG(“更新失敗:” + sql + “\nmysql_error:” + mysql_error(_conn));

return false;

}

return true;

}

// 查詢操作 select

MYSQL_RES* Connection::query(string sql)

{

// 查詢操作 select

// 如果查詢成功,返回0。如果出現錯誤,返回非0值

if (mysql_query(_conn, sql。c_str()))

{

LOG(“查詢失敗” + sql + “\nmysql_error:” + mysql_error(_conn));

return nullptr;

}

return mysql_use_result(_conn);

}

// 重新整理連線的起始空閒時刻

void Connection::refreshAliveTime()

{

_alivetime = clock();

}

// 返回連線空閒的時長

clock_t Connection::getAliveTime()

{

return clock() - _alivetime;

}

4.3連線池的功能

連線池的主要引數:

1。初始連線數:連線池事先會準備一些連線備用。最小連線數需要根據實際情況不斷測試決定,設定太多的話會出現很多空連線,浪費資源。

2。最大連線數:當併發請求太多了之後,初始量不夠用了。這時候會根據需求建立更多的連線,但不能無限建立,因為考慮到資源浪費問題。

3。最大空閒時間: 當併發請求增多以後,連線數會變多。由於“歸還”原因,這些連線不會被直接釋放,而是歸還到佇列中。假設後面的併發請求沒那麼多,那麼之前產生的多的連線會造成資源冗餘浪費。需要我們設定一個最大空閒時間。如果在最大空閒時間內,該連線還沒有被使用的話,就需要被回收掉,節約資源。考慮容器基於佇列實現,當隊頭元素的存活時間都沒超過最大空閒時間的話,後面的連線肯定也沒超過該最大空閒時間。

4。連線超時時間: 當併發請求太多了,且連線池的連線數已經超過最大連線數了,導致已經沒有空閒的連線可以使用了。那麼此時執行緒請求再連線會失敗。此時需設定一個連線超時時間,如果超時了,那麼獲取失敗,無法連線資料庫。

待實現的連線池的主要功能如下:

1。建立一個連線池物件。(因為是一個單例模式)

2。初始化連線數以及生產新連線(生產過程是連線池類內部多執行緒建立的,所以許可權為private;另外需要定義一個連線數計算器,使用原子變atomic,就不需要用互斥鎖來保護該計數器了)

3。從連線池中獲取一個可用連線(消費過程是使用者請求,許可權為public;用完後歸還到佇列中)

4。回收連線(透過定義一個掃描函式,獲取每個連線的空閒時間,用於多餘連線的釋放)

5。載入初始配置項,主要是資料庫連線引數如使用者名稱密碼等(可選)。

4.4 連線池的程式碼如下:

CommonConnectionPool。h

#pragma once

#include “Connection。h”

#include

#include

#include

#include

#include

#include

#include

#include

using namespace std;

// 實現連線池功能模組

class ConnectionPool

{

public:

// 獲取連線池物件例項(懶漢式單例模式,在獲取例項時才例項化物件)

static ConnectionPool* getConnectionPool();

// 給外部提供介面,從連線池中獲取一個可用的空閒連線

//注意,這裡不要直接返回指標,否則我們還需要定義一個(歸還連線)的方法,還要自己去釋放該指標。

//這裡直接返回一個智慧指標,智慧指標出作用域自動析構,(我們只需重定義析構即可——不釋放而是歸還)

shared_ptr getConnection();

private:

// 單例模式——建構函式私有化

ConnectionPool();

// 從配置檔案中載入配置項

bool loadConfigFile();

// 執行在獨立的執行緒中,專門負責生產新連線

// 非靜態成員方法,其呼叫依賴物件,要把其設計為一個執行緒函式,需要繫結this指標。

// 把該執行緒函式寫為類的成員方法,最大的好處是 非常方便訪問當前物件的成員變數。(資料)

void produceConnectionTask();

// 掃描超過maxIdleTime時間的空閒連線,進行對於連線的回收

void scannerConnectionTask();

string _ip; // MySQL的ip地址

unsigned short _port; // MySQL的埠號,預設為3306

string _username; // MySQL登陸使用者名稱

string _password; // MySQL登陸密碼

string _dbname; // 連線的資料庫名稱

int _initSize; // 連線池的最大初始連線量

int _maxSize; // 連線池的最大連線量

int _maxIdleTime; // 連線池的最大空閒時間

int _connectionTimeout; // 連線池獲取連線的超時時間

// 儲存MySQL連線的佇列

queue _connectionQue;

// 維護連線佇列的執行緒安全互斥鎖

mutex _queueMutex;

// 記錄connection連線的總數量

atomic_int _connectionCnt;

// 設定條件變數,用於連線生產執行緒和連線消費執行緒的通訊

condition_variable cv;

};

【文章福利】需要C/C++ Linux伺服器架構師學習資料加群812855908(資料包括C/C++,Linux,golang技術,核心,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒體,CDN,P2P,K8S,Docker,TCP/IP,協程,DPDK,ffmpeg等)

基於c++的資料庫連線池的實現與理解

CommonConnectionPool。cpp

#include “CommonConnectionPool。h”

#include “public。h”

// 執行緒安全的懶漢單例函式介面

ConnectionPool* ConnectionPool::getConnectionPool()

{

// 對於靜態區域性變數的初始化,編譯器自動進行lock和unlock

static ConnectionPool pool;

return &pool;

}

// 從配置檔案中載入配置項

bool ConnectionPool::loadConfigFile()

{

FILE* pf = fopen(“mysql。ini”, “r”);

if (pf == nullptr)

{

LOG(“File ‘mysql。ini’ is not existing!”);

return false;

}

// 逐行處理配置檔案中的配置字串

while (!feof(pf))

{

// 配置字串舉例:username=root\n

// 從檔案中獲取一行配置字串

char line[1024] = { 0 };

fgets(line, 1024, pf);

string str = line;

// 找到配置字串中的‘=’

int idx = str。find(‘=’, 0);

// 無效的配置項

if (idx == -1)

{

// 當配置字串中找不到‘=’時說明該配置字串有問題或者是註釋,將其忽略

continue;

}

// 分別取出該行配置中的key和value

int endIdx = str。find(‘\n’, idx);

string key = str。substr(0, idx);

string value = str。substr(idx + 1, endIdx - idx - 1);

if (key == “ip”)

{

_ip = value;

}

else if (key == “port”)

{

_port = atoi(value。c_str());

}

else if (key == “username”)

{

_username = value;

}

else if (key == “password”)

{

_password = value;

}

else if (key == “dbname”)

{

_dbname = value;

}

else if (key == “maxSize”)

{

_maxSize = atoi(value。c_str());

}

else if (key == “maxIdleTime”)

{

_maxIdleTime = atoi(value。c_str());

}

else if (key == “connectionTimeout”)

{

_connectionTimeout = atoi(value。c_str());

}

else if (key == “initSize”)

{

_initSize = atoi(value。c_str());

}

}

return true;

}

// 連線池的建構函式

ConnectionPool::ConnectionPool()

{

// 載入配置項

if (!loadConfigFile())

{

return;

}

// 建立初始數量的連線

for (int i = 0; i < _initSize; ++i)

{

Connection* p = new Connection();

p->connect(_ip, _port, _username, _password, _dbname);

p->refreshAliveTime(); // 記錄連線的起始空閒時刻

_connectionQue。push(p);

_connectionCnt++;

}

// 啟動一個新的執行緒,作為連線的生產者

thread produce(std::bind(&ConnectionPool::produceConnectionTask, this));

produce。detach(); //守護執行緒

// 啟動一個新的定時執行緒,掃描超過maxIdleTime時間的空閒連線,並對其進行回收

thread scanner(std::bind(&ConnectionPool::produceConnectionTask, this));

scanner。detach();

}

// 執行在獨立的執行緒中,專門負責產生新連線

void ConnectionPool::produceConnectionTask()

{

for (;;)

{

unique_lock lock(_queueMutex); //條件變數需要和互斥鎖一塊使用

while (!_connectionQue。empty())

{

// 佇列非空時,此處生產執行緒進入等待狀態

cv。wait(lock); //進入等待時,釋放鎖,保證消費者執行緒正常執行

}

// 連線數量沒有到達上限,繼續建立新的連線

if (_connectionCnt < _maxSize)

{

Connection* p = new Connection();

p->connect(_ip, _port, _username, _password, _dbname);

_connectionQue。push(p);

_connectionCnt++;

}

// 通知消費者執行緒,可以消費連線了

cv。notify_all();

}

}

// 給外部提供介面,從連線池中獲取一個可用的空閒連線

shared_ptr ConnectionPool::getConnection()

{

unique_lock lock(_queueMutex);

while (_connectionQue。empty())

{

if (cv_status::timeout == cv。wait_for(lock, std::chrono::milliseconds(_connectionTimeout))) //超時喚醒

{

if (_connectionQue。empty())

{

LOG(“Failed to get connection:got idle connection timeout!”);

return nullptr;

}

}

}

/*

* shared_ptr智慧指標析構時,會把connection資源直接delete掉,

* 相當於呼叫connection的解構函式,connection就被close掉了。

* 這裡需要自定義shared_ptr的釋放資源的方式,把connection直接歸還到queue當中*/

shared_ptr sp(_connectionQue。front(),

[&](Connection* pcon)

{

// 這裡是在伺服器應用執行緒中呼叫的,所以一定要考慮佇列的執行緒安全操作

unique_lock lock(_queueMutex);

pcon->refreshAliveTime(); //在歸還回空閒連線佇列之前要記錄一下連線開始空閒的時刻

_connectionQue。push(pcon);

});

_connectionQue。pop();

// 消費者取出一個連線之後,通知生產者,生產者檢查佇列,如果為空則生產

cv。notify_all();

return sp;

}

// 掃描超過maxIdleTime時間的空閒連線,進行對於連線的回收

void ConnectionPool::scannerConnectionTask()

{

for (;;)

{

// 透過sleep實現定時效果

this_thread::sleep_for(chrono::seconds(_maxIdleTime));

// 掃描整個佇列,釋放多餘的連線

unique_lock lock(_queueMutex);

while (_connectionCnt > _initSize)

{

Connection* p = _connectionQue。front();

if (p->getAliveTime() >= (_maxIdleTime * 1000))

{

_connectionQue。pop();

_connectionCnt——;

delete p; // 呼叫~Connection()釋放連線

}

else

{

// 隊頭的連線沒有超過_maxIdleTime,其它連線肯定也沒有

break;

}

}

}

}

4.5 配置檔案 mysql.ini

ip=127。0。0。1

port=3306

username=root

password=111111

dbname=chat

initSize=10

maxSize=1024

maxIdleTime=60

connectionTimeout=100

5。測試函式 main。cpp及結果

#pragma once

#include

#include

#include

#include “Connection。h”

#include “CommonConnectionPool。h”

int n = 10000;//資料量

int main()

{

//不使用連線池,單執行緒:

clock_t begin = clock();

for (int i = 0; i < n; i++)

{

Connection conn;

char sql[1024] = { 0 };

sprintf(sql, “insert into t1(id,name) values(%d,‘%s’)”,

1, “a”);

conn。connect(“127。0。0。1”, 3306, “root”, “zh601572”, “chat”);

conn。update(sql);

}

clock_t end = clock();

cout << end - begin << “ms” << endl;

return 0;

//不使用連線池,4執行緒:

//Connection conn;

//conn。connect(“localhost”, 3306, “root”, “zh601572”, “chat”);

//clock_t begin = clock();

//thread t1([]()

// {

// for (int i = 0; i < n/4; ++i)

// {

// Connection conn;

// char sql[1024] = { 0 };

// sprintf(sql, “insert into t1(id,name) values(%d,‘%s’)”,

// 5, “a”);

// conn。connect(“localhost”, 3306, “root”, “zh601572”, “chat”);

// conn。update(sql);

// }

// });

//thread t2([]()

// {

// for (int i = 0; i < n / 4; ++i)

// {

// Connection conn;

// char sql[1024] = { 0 };

// sprintf(sql, “insert into t1(id,name) values(%d,‘%s’)”,

// 6, “a”);

// conn。connect(“localhost”, 3306, “root”, “zh601572”, “chat”);

// conn。update(sql);

// }

// });

//thread t3([]()

// {

// for (int i = 0; i < n / 4; ++i)

// {

// Connection conn;

// char sql[1024] = { 0 };

// sprintf(sql, “insert into t1(id,name) values(%d,‘%s’)”,

// 7, “a”);

// conn。connect(“localhost”, 3306, “root”, “zh601572”, “chat”);

// conn。update(sql);

// }

// });

//thread t4([]()

// {

// for (int i = 0; i < n / 4; ++i)

// {

// Connection conn;

// char sql[1024] = { 0 };

// sprintf(sql, “insert into t1(id,name) values(%d,‘%s’)”,

// 8, “a”);

// conn。connect(“localhost”, 3306, “root”, “zh601572”, “chat”);

// conn。update(sql);

// }

// });

//t1。join();

//t2。join();

//t3。join();

//t4。join();

//clock_t end = clock();

//cout << end - begin << “ms” << endl;

//return 0;

//使用連線池,單執行緒:

//clock_t begin = clock();

//ConnectionPool* cp = ConnectionPool::getConnectionPool();

//for (int i = 0; i < n; i++)

//{

// shared_ptr sp = cp->getConnection();

// char sql[1024] = { 0 };

// sprintf(sql, “insert into t1(id,name) values(%d,‘%s’)”,

// 4, “zhouhui”);

// sp ->update(sql);

//}

//clock_t end = clock();

//cout << end - begin << “ms” << endl;

//return 0;

//使用連線池,四執行緒:

//clock_t begin = clock();

//

//thread t1([]()

// {

// ConnectionPool* cp = ConnectionPool::getConnectionPool();

// for (int i = 0; i < n / 4; i++)

// {

// shared_ptr sp = cp->getConnection();

// char sql[1024] = { 0 };

// sprintf(sql, “insert into t1(id,name) values(%d,‘%s’)”,4, “zhouhui”);

// sp ->update(sql);

// }

// }

//);

//thread t2([]()

// {

// ConnectionPool* cp = ConnectionPool::getConnectionPool();

// for (int i = 0; i < n /4; i++)

// {

// shared_ptr sp = cp->getConnection();

// char sql[1024] = { 0 };

// sprintf(sql, “insert into t1(id,name) values(%d,‘%s’)”, 4, “zhouhui”);

// sp->update(sql);

// }

// }

//);

//thread t3([]()

// {

// ConnectionPool* cp = ConnectionPool::getConnectionPool();

// for (int i = 0; i < n / 4; i++)

// {

// shared_ptr sp = cp->getConnection();

// char sql[1024] = { 0 };

// sprintf(sql, “insert into t1(id,name) values(%d,‘%s’)”, 4, “zhouhui”);

// sp->update(sql);

// }

// }

//);

//thread t4([]()

// {

// ConnectionPool* cp = ConnectionPool::getConnectionPool();

// for (int i = 0; i < n / 4; i++)

// {

// shared_ptr sp = cp->getConnection();

// char sql[1024] = { 0 };

// sprintf(sql, “insert into t1(id,name) values(%d,‘%s’)”, 4, “zhouhui”);

// sp->update(sql);

// }

// }

//);

//t1。join();

//t2。join();

//t3。join();

//t4。join();

//clock_t end = clock();

//cout << (end - begin) << “ms” << endl;

//return 0;

}

結果如下

基於c++的資料庫連線池的實現與理解

可以看到還是節約很多時間資源的。

6 思考

資料庫連線池設定多少連線才合適?

標簽: 連線  SQL  連線池  connection  conn