国产欧美日韩第一页|日本一二三不卡视频|在线精品小视频,亚洲第一免费播放区,metcn人体亚洲一区,亚洲精品午夜视频

幫助中心 >  行業(yè)資訊 >  開發(fā) >  從代碼分析事務(wù)消息&消息延時(shí)

從代碼分析事務(wù)消息&消息延時(shí)

2021-04-26 15:07:30 6821

代碼分析事務(wù)消息

分布式事務(wù)模型圖如下:



1.png


我們先看事務(wù)消息客戶端的實(shí)現(xiàn)。即上圖的MQClient.

我們先看代碼的整體封裝。

下面代碼段做了兩件事:

1.本地?cái)?shù)據(jù)庫(kù)寫入 2.事務(wù)消息客戶端發(fā)送消息。

@Transactional表示開啟事務(wù)。在注釋下,開啟一個(gè)新的Order。用OrderDao將數(shù)據(jù)寫入本地?cái)?shù)據(jù)庫(kù)。然后調(diào)用transactionMsgClient.sendMsg將消息發(fā)出去(這是靜態(tài)方法調(diào)用,transactionMsgClient是一個(gè)類,sendMsg是它的方法)。這樣,本地?cái)?shù)據(jù)庫(kù)寫入和發(fā)消息這兩件事,就是個(gè)原子事務(wù),也就是說(shuō),兩件事要么一起成功,要么一起失敗。


2.png




上面代碼用到了MybatisTransactionMsgClient。MyBatis是一個(gè)Java持久化框架,它通過(guò)XML描述符或注解把對(duì)象與存儲(chǔ)過(guò)程或SQL語(yǔ)句關(guān)聯(lián)起來(lái),映射成數(shù)據(jù)庫(kù)內(nèi)對(duì)應(yīng)的記錄。

上面提到過(guò),transactionMsgClient.sendMsg是個(gè)類,這個(gè)類繼承了TransactionMsgClient。下面代碼中,transactionMsgClient.sendMsg調(diào)用了其父類的TransactionsendClient的sendMsg方法,寫事務(wù)消息表,并且發(fā)送消息。


3.png




我們接下來(lái)看sendMsg這個(gè)方法到底做了什么:

1、消息內(nèi)容落數(shù)據(jù)庫(kù)  2、發(fā)送消息

下面代碼會(huì)先做一個(gè)判斷,在if字段里:con.getAutoCommit。也就是說(shuō),只有當(dāng)沒(méi)有開啟自動(dòng)commit的時(shí)候(有自動(dòng)提交就破壞了事務(wù)的原子性),把信息寫在數(shù)據(jù)庫(kù)表里,然后構(gòu)造一個(gè)messages,發(fā)消息。而發(fā)消息的方法是將消息放到消息隊(duì)列中。


4.png




在事務(wù)消息設(shè)計(jì)中,后臺(tái)發(fā)送消息隊(duì)列設(shè)計(jì),如下圖所示:


5.png



參照上圖,我們可以看到后臺(tái)發(fā)送消息隊(duì)列有兩個(gè):


SendMsg隊(duì)列的消息消費(fèi)很快,基本上放進(jìn)去很快就會(huì)被消費(fèi)掉。這樣重試才能有效,否則一直重試,沒(méi)有意義。


6.png



下面代碼段的的作用:是發(fā)送消息時(shí),從隊(duì)列里中取消息,看是否到期,將到期的消息取出來(lái):


7.png



后臺(tái)優(yōu)先隊(duì)列的維護(hù)。


8.png



最后,事務(wù)消息表的設(shè)計(jì);


9.png



代碼分析事務(wù)消息

我們知道,RocketMQ支持延時(shí)消息。我們先看一下延時(shí)消息的應(yīng)用場(chǎng)景。

延時(shí)需求是在當(dāng)前時(shí)間后的某一時(shí)間點(diǎn)觸發(fā)指定的業(yè)務(wù)邏輯或操作。

  • 例如微信發(fā)消息,過(guò)一段時(shí)間沒(méi)發(fā)送成功的話,提示重新發(fā)送(如下圖的小紅點(diǎn)提示)。



  • 訂單狀態(tài)流轉(zhuǎn):如延時(shí)支付。京東上超過(guò)24小時(shí)的訂單會(huì)被自動(dòng)取消。

實(shí)際上,我們?cè)?/span>分布式事務(wù)的終極模型中,也用到了延時(shí)消息。

RocketMQ支持18個(gè)級(jí)別的延時(shí)等級(jí):messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 6h 12h。也就是說(shuō),消息延時(shí)發(fā)送有這18個(gè)級(jí)別。如果業(yè)務(wù)的延時(shí)消息需求與這18個(gè)級(jí)別不匹配,就需要自行基于RocketMQ進(jìn)行二次開發(fā)。


10.png



接下來(lái),我們看一下18個(gè)延時(shí)的實(shí)現(xiàn)原理。

RocketMQ的18個(gè)延時(shí)級(jí)別,每個(gè)級(jí)別對(duì)應(yīng)一個(gè)Queue,根據(jù)Level參數(shù),將消息放到對(duì)應(yīng)的18個(gè)隊(duì)列中的等級(jí)。每個(gè)隊(duì)列都對(duì)應(yīng)了到時(shí)出隊(duì)。例如1s隊(duì)列就是1s出隊(duì),2小時(shí)隊(duì)就是2小時(shí)出隊(duì)。消息出隊(duì)以后,當(dāng)成正常消息投遞。然后被投遞到了消費(fèi)隊(duì)列,被消費(fèi)者消費(fèi)掉。


11.png




提交成功!非常感謝您的反饋,我們會(huì)繼續(xù)努力做到更好!

這條文檔是否有幫助解決問(wèn)題?

非常抱歉未能幫助到您。為了給您提供更好的服務(wù),我們很需要您進(jìn)一步的反饋信息:

在文檔使用中是否遇到以下問(wèn)題: