您當前的位置:首頁 > 舞蹈

分散式事務之解決方案(TCC)

作者:由 海仔 發表于 舞蹈時間:2019-11-26

4。 分散式事務解決方案之TCC

4。1。 什麼是TCC事務

TCC是Try、Confirm、Cancel三個詞語的縮寫,TCC要求每個分支事務實現三個操作 :預處理Try、確認Confirm、撤銷Cancel。Try操作做業務檢查及資源預留,Confirm做業務確認操作,Cancel實現一個與Try相反的操作既回滾操作。TM首先發起所有的分支事務的try操作,任何一個分支事務的try操作執行失敗,TM將會發起所有分支事務的Cancel操作,若try操作全部成功,TM將會發起所有分支事務的Confirm操作,其中Confirm/Cancel操作若執行失敗,TM會進行重試。

分散式事務之解決方案(TCC)

分支事務失敗的情況 :

分散式事務之解決方案(TCC)

TCC分為三個階段 :

Try階段是做業務檢查(一致性)及資源預留(隔離),此階段僅是一個初步操作,它和後續的Confirm一起才能真正構成一個完整的業務邏輯。

Confirm階段是做確認提交,Try階段所有分支事務執行成功後開始執行Confirm。通常情況下,採用TCC則認為Confirm階段是不會出錯的。即 :只要Try成功,Confirm一定成功。若Confirm階段真的出錯了,需引入重試機制或人工處理。

Cancel階段是在業務執行錯誤需要回滾的狀態下執行分支事務的業務取消,預留資源釋放。通常情況下,採用TCC則認為Cancel階段也是一定成功的。若Cancel階段真的出錯了,需引入重試機制或人工處理。

TM事務管理器

TM事務管理器可以實現為獨立的服務,也可以讓全域性事務發起方充當TM的角色,TM獨立出來是為了成為公用元件,是為了考慮結構和軟體複用。

TM在發起全域性事務時生成全域性事務記錄,全域性事務ID貫穿整個分散式事務呼叫鏈條,用來記錄事務上下文,追蹤和記錄狀態,由於Confirm和Cancel失敗需進行重試,因此需要實現為冪等性是指同一個操作無論請求多少次,其結果都相同。

4。2。 TCC解決方案

目前市面上的TCC框架眾多比如下面這幾種 :

分散式事務之解決方案(TCC)

Seata也支援TCC,但Seata的TCC模式對Spring Cloud並沒有提供支援。我們的目標是理解TCC原理以及事務協調運作的過程,因此更傾向於輕量級易於理解的框架。

Hmily是一個高效能分散式事務TCC開源框架。基於Java語言來開發(JDK1。8),支援Dubbo,Spring Cloud等RPC框架進行分散式事務。它目前支援以下特性 :

支援巢狀事務(Nested transaction support)。

採用disruptor框架進行事務日誌的非同步讀寫,與RPC框架的效能毫無差別。

支援SpringBoot-starter專案啟動,使用簡單。

RPC框架支援 :dubbo、motan、springcloud。

本地事務儲存支援 :redis、mongodb、zookeeper、file、mysql。

事務日誌序列化支援 :java、hessian、kryo、protostuff。

採用Aspect AOP切面思想與Spring無縫整合,天然支援叢集。

RPC事務恢復,超時異常恢復等。

Hmily利用AOP對參與分散式事務的本地方法與遠端方法進行攔截處理,透過多方攔截,事務參與者能透明的呼叫到另一方的Try、Confirm、Cancel方法;傳遞事務上下文;並記錄事務日誌,酌情進行補償,重試等。

Hmily不需要事務協調服務,但需要提供一個數據庫(mysql/mongodb/zookeeper/redis/file)來進行日誌儲存。

Hmily實現的TCC服務與普通的服務一樣,只需要暴露一個介面,也就是它的Try業務。Confirm/Cancel業務邏輯,只是因為全域性事務提交/回滾的需要才提供的,因此Confirm/Cancel業務只需要被Hmily TCC事務框架發現即可,不需要被呼叫它的其他業務服務所感知。

官網介紹 :

https://

dromara。org/website/zh-

cn/docs/hmily/index。html

TCC需要注意三種異常處理分別是空回滾、冪等、懸掛 :

空回滾

在沒有呼叫TCC資源Try方法的情況下,呼叫來二階段的Cancel方法,Cancel方法需要識別出這是一個空回滾,然後直接返回成功。

出現原因是當一個分支事務所在服務宕機或網路異常,分支事務呼叫記錄為失敗,這個時候其實是沒有執行Try階段,當故障恢復後,分散式事務進行回滾則會呼叫二階段的Cancel方法,從而形成空回滾。

解決思路是關鍵就是要識別出這個空回滾。思路很簡單就是需要知道一階段是否執行,如果執行來,那就是正常回滾;如果沒執行,那就是空回滾。前面已經說過TM在發起全域性事務時生成全域性事務記錄,全域性事務ID貫穿整個分散式事務呼叫鏈條。再額外增加一張分支事務記錄表,其中有全域性事務ID和分支事務ID,第一階段Try方法裡會插入一條記錄,表示一階段執行來。Cancel接口裡讀取該記錄,如果該記錄存在,則正常回滾;如果該記錄不存在,則是空回滾。

冪等

透過前面介紹已經瞭解到,為了保證TCC二階段提交重試機制不會引發資料不一致,要求TCC的二階段Try、Confirm和Cancel介面保證冪等,這樣不會重複使用或者釋放資源。如果冪等控制沒有做好,很有可能導致資料不一致等嚴重問題。

解決思路在上述 “分支事務記錄”中增加執行狀態,每次執行前都查詢該狀態。

懸掛

懸掛就是對於一個分散式事務,其二階段Cancel介面比Try介面先執行。

出現原因是在RPC呼叫分支事務try時,先註冊分支事務,再執行RPC呼叫,如果此時RPC呼叫的網路發生擁堵,通常RPC呼叫是有超時時間的,RPC超時以後,TM就會通知RM回滾該分散式事務,可能回滾完成後,RPC請求才到達參與者真正執行,而一個Try方法預留的業務資源,只有該分散式事務才能使用,該分散式事務第一階段預留的業務資源就再也沒有人能夠處理了,對於這種情況,我們就稱為懸掛,即業務資源預留後無法繼續處理。

解決思路是如果二階段執行完成,那一階段就不能再繼續執行。在執行一階段事務時判斷在該全域性事務下,“分支事務記錄”表中是否已經有二階段事務記錄,如果有則不執行Try。

舉例,場景為A轉賬30元給B,A和B賬戶在不同的服務。

方案 1

賬戶A

try: 檢查餘額是否夠30元 扣減30元 confirm: 空 cancel: 增加30元

賬戶B

try: 增加30元 confirm: 空 cancel: 減少30元

方案1說明:

1)賬戶A,這裡的餘額就是所謂的業務資源,按照前面提到的原則,在第一階段需要檢查並預留業務資源,因此, 我們在扣錢 TCC 資源的 Try 接口裡先檢查 A 賬戶餘額是否足夠,如果足夠則扣除 30 元。 Confirm 介面表示正式 提交,由於業務資源已經在 Try 接口裡扣除掉了,那麼在第二階段的 Confirm 接口裡可以什麼都不用做。Cancel 介面的執行表示整個事務回滾,賬戶A回滾則需要把 Try 接口裡扣除掉的 30 元還給賬戶。

2)賬號B,在第一階段 Try 接口裡實現給賬戶B加錢,Cancel 介面的執行表示整個事務回滾,賬戶B回滾則需要把 Try 接口裡加的 30 元再減去。

方案1的問題分析:

1)如果賬戶A的try沒有執行在cancel則就多加了30元。

2)由於try,cancel、confirm都是由單獨的執行緒去呼叫,且會出現重複呼叫,所以都需要實現冪等。

3)賬號B在try中增加30元,當try執行完成後可能會其它執行緒給消費了。

4)如果賬戶B的try沒有執行在cancel則就多減了30元。

問題解決:

1)賬戶A的cancel方法需要判斷try方法是否執行,正常執行try後方可執行cancel。

2)try、cancel、confirm方法實現冪等。

3)賬戶B在try方法中不允許更新賬戶金額,在confirm中更新賬戶金額。

4)賬戶B的cancel方法需要判斷try方法是否執行,正常執行try後方可執行cancel。

最佳化方案:

賬戶A :

try: try冪等校驗 try懸掛處理 檢查餘額是否夠30元 扣減30元 confirm: 空 cancel: cancel冪等校驗 cancel空回滾處理 增加可用餘額30元

賬戶B :

try: 空 confirm: confirm冪等校驗 正式增加30元 cancel: 空

4。3。 Hmily實現TCC事務

4。3。1。業務說明

透過Hmily實現TCC分散式事務,模擬兩個賬戶的轉賬交易過程。

兩個賬戶分別在不同的銀行(張三在bank1、李四在bank2),bank1、bank2是兩個微服務。交易過程是,張三給李四轉賬制定金額。

上述交易步驟,要麼一起成功,要麼一起失敗,必須是一個整體性事務。

分散式事務之解決方案(TCC)

4。3。2。 程式組成部分

資料庫:MySQL-5。7。25

JDK:64位 jdk1。8。0_201 微服務:spring-boot-2。1。3、spring-cloud-Greenwich。RELEASE Hmily:hmily-springcloud。2。0。4-RELEASE

微服務及資料庫的關係 :

dtx/dtx-tcc-demo/dtx-tcc-demo-bank1 銀行1,操作張三賬戶, 連線資料庫bank1 dtx/dtx-tcc-demo/dtx-tcc-demo-bank2 銀行2,操作李四賬戶,連線資料庫bank2

服務註冊中心:dtx/discover-server

4。3。3。 建立資料庫

建立hmily資料庫,用於儲存hmily框架記錄的資料。

CREATE DATABASE

hmily

CHARACTER SET ‘utf8’ COLLATE ‘utf8_general_ci’;

建立bank1庫,並匯入以下表結構和資料(包含張三賬戶)

CREATE DATABASE

bank1

CHARACTER SET ‘utf8’ COLLATE ‘utf8_general_ci’;

建立bank2庫,並匯入以下表結構和資料(包含李四賬戶)

CREATE DATABASE

bank2

CHARACTER SET ‘utf8’ COLLATE ‘utf8_general_ci’;

DROP TABLE IF EXISTS

account_info

; CREATE TABLE

account_info

id

bigint(20) NOT NULL AUTO_INCREMENT,

account_name

varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT ‘戶 主姓名’,

account_no

varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT ‘銀行 卡號’,

account_password

varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT ‘帳戶密碼’,

account_balance

double NULL DEFAULT NULL COMMENT ‘帳戶餘額’,

PRIMARY KEY (

id

) USING BTREE

) ENGINE = InnoDB AUTO_INCREMENT = 5 CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;

INSERT INTO

account_info

VALUES (2, ‘張三的賬戶’, ‘1’, ‘’, 10000);

每個資料庫都建立try、confirm、cancel三張日誌表:

CREATE TABLE `local_try_log` ( `tx_no` varchar(64) NOT NULL COMMENT `create_time` datetime DEFAULT NULL, PRIMARY KEY (`tx_no`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 CREATE TABLE `local_confirm_log` ( `tx_no` varchar(64) NOT NULL COMMENT `create_time` datetime DEFAULT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8 CREATE TABLE `local_cancel_log` ( `tx_no` varchar(64) NOT NULL COMMENT `create_time` datetime DEFAULT NULL, PRIMARY KEY (`tx_no`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8

4。3。5 工程dtx-tcc-demo

(1)引入maven依賴

org。dromara hmily‐springcloud 2。0。4‐RELEASE

(2)配置hmily

application。yml :

org: dromara: hmily : serializer : kryo recoverDelayTime : 128 retryMax : 30 scheduledDelay : 128 scheduledThreadMax : 10 repositorySupport : db started: true hmilyDbConfig : driverClassName : com。mysql。jdbc。Driver url : jdbc:mysql://localhost:3306/bank?useUnicode=true username : root password : root

新增配置類接收application。yml中的Hmily配置資訊,並建立HmilyTransactionBootstrap Bean:

@Bean public HmilyTransactionBootstrap hmilyTransactionBootstrap(HmilyInitService hmilyInitService){ HmilyTransactionBootstrap hmilyTransactionBootstrap = new HmilyTransactionBootstrap(hmilyInitService); hmilyTransactionBootstrap。setSerializer(env。getProperty(“org。dromara。hmily。serializer”)); hmilyTransactionBootstrap。setRecoverDelayTime(Integer。parseInt(env。getProperty(“org。dromara。hmily。recoverDelayTime”))); hmilyTransactionBootstrap。setRetryMax(Integer。parseInt(env。getProperty(“org。dromara。hmily。retryMax”))); hmilyTransactionBootstrap。setScheduledDelay(Integer。parseInt(env。getProperty(“org。dromara。hmily。scheduledDelay”))); hmilyTransactionBootstrap。setScheduledThreadMax(Integer。parseInt(env。getProperty(“org。dromara。hmily。scheduledThreadMax”))); hmilyTransactionBootstrap。setRepositorySupport(env。getProperty(“org。dromara。hmily。repositorySupport”)); hmilyTransactionBootstrap。setStarted(Boolean。parseBoolean(env。getProperty(“org。dromara。hmily。started”))); HmilyDbConfig hmilyDbConfig = new HmilyDbConfig(); hmilyDbConfig。setDriverClassName(env。getProperty(“org。dromara。hmily。hmilyDbConfig。driverClassName”)); hmilyDbConfig。setUrl(env。getProperty(“org。dromara。hmily。hmilyDbConfig。url”)); hmilyDbConfig。setUsername(env。getProperty(“org。dromara。hmily。hmilyDbConfig。username”)); hmilyDbConfig。setPassword(env。getProperty(“org。dromara。hmily。hmilyDbConfig。password”)); hmilyTransactionBootstrap。setHmilyDbConfig(hmilyDbConfig); return hmilyTransactionBootstrap; }

啟動類增加@EnableAspectJAutoProxy並增加org。dromara。hmily的掃描項:

@SpringBootApplication @EnableDiscoveryClient @EnableHystrix @EnableFeignClients(basePackages = {“cn。itcast。dtx。tccdemo。bank1。spring”}) @ComponentScan({“cn。itcast。dtx。tccdemo。bank1”,“org。dromara。hmily”}) public class Bank1HmilyServer { public static void main(String[] args) { SpringApplication。run(Bank1HmilyServer。class, args); } }

4。3。7 dtx-tcc-demo-bank1

dtx-tcc-demo-bank1實現try和cancel方法,如下 :

try: try冪等校驗 try懸掛處理 檢查餘額是夠扣減金額 扣減金額 confirm: 空 cancel: cancel冪等校驗 cancel空回滾處理 增加可用餘額

Dao

@Mapper @Component public interface AccountInfoDao { @Update(“update account_info set account_balance=account_balance - #{amount} where account_balance>=#{amount} and account_no=#{accountNo} ”) int subtractAccountBalance(@Param(“accountNo”) String accountNo, @Param(“amount”) Double amount); @Update(“update account_info set account_balance=account_balance + #{amount} where account_no=#{accountNo} ”) int addAccountBalance(@Param(“accountNo”) String accountNo, @Param(“amount”) Double amount);

/** * 增加某分支事務try執行記錄 * @param localTradeNo 本地事務編號 * @return */

@Insert(“insert into local_try_log values(#{txNo},now());”) int addTry(String localTradeNo); @Insert(“insert into local_confirm_log values(#{txNo},now());”) int addConfirm(String localTradeNo); @Insert(“insert into local_cancel_log values(#{txNo},now());”) int addCancel(String localTradeNo);

/** * 查詢分支事務try是否已執行 * @param localTradeNo 本地事務編號 * @return */

@Select(“select count(1) from local_try_log where tx_no = #{txNo} ”) int isExistTry(String localTradeNo);

/** * 查詢分支事務confirm是否已執行 * @param localTradeNo 本地事務編號 * @return */

@Select(“select count(1) from local_confirm_log where tx_no = #{txNo} ”) int isExistConfirm(String localTradeNo);

/** * 查詢分支事務cancel是否已執行 * @param localTradeNo 本地事務編號 * @return */

@Select(“select count(1) from local_cancel_log where tx_no = #{txNo} ”) int isExistCancel(String localTradeNo); }

2)try和cancel方法

@Slf4j @Service public class AccountInfoServiceImpl implements AccountInfoService { @Autowired private AccountInfoDao accountInfoDao; @Autowired private Bank2Client bank2Client;

/** * 只要標記@Hmily就是try方法,在註解中指定confirm、cancel兩個方法的名字 * * @param accountNo * @param amount */

@Hmily(confirmMethod = “commit”, cancelMethod = “rollback”) @Transactional(rollbackFor = Exception。class) @Override public void updateAccountBalance(String accountNo, Double amount) {

// 事務id

String transId = HmilyTransactionContextLocal。getInstance()。get()。getTransId();

http://

log。info

(“Bank1 Service begin try 。。。” + transId); int existTry = accountInfoDao。isExistTry(transId);

// 冪等判斷 判斷local_try_log表中是否有try日誌記錄,如果有不再執行

// try冪等校驗

if (existTry > 0) {

http://

log。info

(“Bank1 Service 已經執行try,無需重複執行,事務id :{}”, transId); return; }

// try懸掛處理,如果cancel、confirm有一個已經執行了,try不再執行

if (accountInfoDao。isExistCancel(transId) > 0 || accountInfoDao。isExistConfirm(transId) > 0) {

http://

log。info

(“Bank1 Service 已經執行confirm或cancel,懸掛處理,事務id :{}”, transId); return; }

// 從賬戶扣減

if (accountInfoDao。subtractAccountBalance(accountNo, amount) <= 0) {

// 扣減失敗

throw new HmilyRuntimeException(“bank1 exception, 扣減失敗,事務id :{}” + transId); }

// 增加本地事務try成功記錄,用於冪等性控制標識

accountInfoDao。addTry(transId);

// 遠端呼叫bank2

if (bank2Client。transfer(amount)) { throw new HmilyRuntimeException(“bank2Client exception,事務id:{}”+transId); }

// 異常一定要拋在Hmily裡面

if (amount == 10) { throw new RuntimeException(“Bank2 make exception 10”); }

http://

log。info

(“Bank2 Service end try 。。” + transId); } @Transactional(rollbackFor = Exception。class) public void commit(String accountNo, double amount) { String transId = HmilyTransactionContextLocal。getInstance()。get()。getTransId();

http://

log。info

(“Bank1 Service begin commit 。。” + transId); } @Transactional(rollbackFor = Exception。class) public void rollback(String accountNo, double amount) { String transId = HmilyTransactionContextLocal。getInstance()。get()。getTransId();

http://

log。info

(“Bank1 Service begin rollback。。。” + transId);

// 空回滾處理,try階段沒有執行什麼也不用做。

if (accountInfoDao。isExistTry(transId) == 0) {

http://

log。info

(“Bank1 try 階段失敗 。。無需rollback ” + transId); return; }

// 冪等性校驗,已經執行過了,什麼也不用做

if (accountInfoDao。isExistCancel(transId) > 0) {

http://

log。info

(“Bank1 已經執行過rollback 。。無需再次rollback ” + transId); return; }

// 再將金額加回賬戶

accountInfoDao。addAccountBalance(accountNo, amount);

// 新增cancel日誌,用於冪等性控制標識

accountInfoDao。addCancel(transId);

http://

log。info

(“Bank1 Service end rollback 。。。 ” + transId); } }

3)feignClient

@FeignClient(value = “seata-demo-bank2”, fallback = Bank2Client。class) public interface Bank2Client { @GetMapping(“/bank2/transfer”) @Hmily Boolean transfer(@RequestParam(“amount”) Double amount); }

Controller

@RestController public class Bank1Controller { @Autowired private AccountInfoService accountInfoService; @RequestMapping(“/transfer”) public String test(@RequestParam(“amount”) Double amount) { accountInfoService。updateAccountBalance(“1”, amount); return “bank1” + amount; } }

4。3。8 dtx-tcc-demo-bank2

dtx-tcc-demo-bank2實現如下功能 :

try: 空 confirm: confirm冪等校驗 正式增加金額 cancel: 空

1)Dao

@Component @Mapper public interface AccountInfoDao { @Update(“update account_info set account_balance=account_balance + #{amount} where account_no=#{accountNo} ”) int addAccountBalance(@Param(“accountNo”) String accountNo, @Param(“amount”) Double amount);

/** * 增加某分支事務try執行記錄 * @param localTradeNo 本地事務編號 * @return */

@Insert(“insert into local_try_log values(#{txNo},now());”) int addTry(String localTradeNo); @Insert(“insert into local_confirm_log values(#{txNo},now());”) int addConfirm(String localTradeNo); @Insert(“insert into local_cancel_log values(#{txNo},now());”) int addCancel(String localTradeNo);

/** * 查詢分支事務try是否已執行 * @param localTradeNo 本地事務編號 * @return */

@Select(“select count(1) from local_try_log where tx_no = #{txNo} ”) int isExistTry(String localTradeNo);

/** * 查詢分支事務confirm是否已執行 * @param localTradeNo 本地事務編號 * @return */

@Select(“select count(1) from local_confirm_log where tx_no = #{txNo} ”) int isExistConfirm(String localTradeNo);

/** * 查詢分支事務cancel是否已執行 * @param localTradeNo 本地事務編號 * @return */

@Select(“select count(1) from local_cancel_log where tx_no = #{txNo} ”) int isExistCancel(String localTradeNo); }

2)實現confirm方法

@Slf4j @Service public class AccountInfoServiceImpl implements AccountInfoService { @Autowired private AccountInfoDao accountInfoDao; @Transactional(rollbackFor = Exception。class) @Hmily(confirmMethod = “confirmMethod”, cancelMethod = “cancelMethod”) @Override public void updateAccountBalance(String accountNo, Double amount) { String transId = HmilyTransactionContextLocal。getInstance()。get()。getTransId();

http://

log。info

(“Bank2 Service Begin try 。。。 ” + transId); } @Transactional(rollbackFor = Exception。class) public void confirmMethod(String accountNo, Double amount) { String transId = HmilyTransactionContextLocal。getInstance()。get()。getTransId();

http://

log。info

(“Bank2 Service commit 。。。” + transId);

// 冪等性校驗,已經執行過了,什麼也不用做

if (accountInfoDao。isExistConfirm(transId) > 0) {

http://

log。info

(“Bank2 已經執行過confirm 。。無需再次confirm ” + transId); return; }

// 正式增加金額

accountInfoDao。addAccountBalance(accountNo, amount);

// 新增confirm日誌

accountInfoDao。addConfirm(transId); } @Transactional(rollbackFor = Exception。class) public void cancelMethod(String accountNo, Double amount) { String transId = HmilyTransactionContextLocal。getInstance()。get()。getTransId();

http://

log。info

(“Bank2 Service begin cancel 。。。 ” + transId); } }

3)Controller

@RestController public class Bank2Controller { @Autowired private AccountInfoService accountInfoService; @RequestMapping(“/transfer”) public Boolean test2(@RequestParam(“amount”) Double amount) { accountInfoService。updateAccountBalance(“2”, amount); return true; } }

3。3。9 測試場景

張三向李四轉賬成功。

李四事務失敗,張三事務回滾成功。

張三事務失敗,李四分支事務回滾成功。

分支事務超時測試。

4。4。 小結

如果拿TCC事務的處理流程與2PC兩階段提交做比較,2PC通常都是在跨庫的DB層面,而TCC則在應用層面的處 理,需要透過業務邏輯來實現。這種分散式事務的實現方式的優勢在於,可以讓

應用自己定義資料操作的粒度,使得降低鎖衝突、提高吞吐量成為可能

而不足之處則在於對應用的侵入性非常強,業務邏輯的每個分支都需要實現try、confirm、cancel三個操作。此外,其實現難度也比較大,需要按照網路狀態、系統故障等不同的失敗原因實現不同的回滾策略。

標簽: try  事務  cancel  amount  transId