日韩成人免费在线_国产成人一二_精品国产免费人成电影在线观..._日本一区二区三区久久久久久久久不

當(dāng)前位置:首頁(yè) > 科技  > 軟件

實(shí)戰(zhàn)與原理:如何基于RocketMQ實(shí)現(xiàn)分布式事務(wù)?

來(lái)源: 責(zé)編: 時(shí)間:2024-01-26 08:59:37 207觀看
導(dǎo)讀使用事務(wù)消息在DailyMart系統(tǒng)中,用戶發(fā)起支付后,訂單系統(tǒng)需要調(diào)用庫(kù)存服務(wù)執(zhí)行庫(kù)存扣減邏輯。由于這是跨服務(wù)調(diào)用,因此會(huì)產(chǎn)生分布式事務(wù)。在這里,我們使用RocketMQ的事務(wù)消息來(lái)實(shí)現(xiàn)分布式事務(wù)。1、首先,在訂單服務(wù)的應(yīng)用服

使用事務(wù)消息

在DailyMart系統(tǒng)中,用戶發(fā)起支付后,訂單系統(tǒng)需要調(diào)用庫(kù)存服務(wù)執(zhí)行庫(kù)存扣減邏輯。圖片T8i28資訊網(wǎng)——每日最新資訊28at.com

由于這是跨服務(wù)調(diào)用,因此會(huì)產(chǎn)生分布式事務(wù)。在這里,我們使用RocketMQ的事務(wù)消息來(lái)實(shí)現(xiàn)分布式事務(wù)。T8i28資訊網(wǎng)——每日最新資訊28at.com

1、首先,在訂單服務(wù)的應(yīng)用服務(wù)層處理支付邏輯,并調(diào)用RocketMQ發(fā)送事務(wù)消息:T8i28資訊網(wǎng)——每日最新資訊28at.com

@Overridepublic String payment(String orderSn) {    // todo 集成支付寶支付    // 支付流水號(hào)    String outOrderNo = IdUtils.get32UUID();    TradeOrder tradeOrder = Optional.ofNullable(tradeOrderService.getByOrderSn(orderSn)).orElseThrow(() -> new BusinessException("訂單編號(hào)不存在"));    // 如果訂單處于待支付狀態(tài)    if (Objects.equals(tradeOrder.getStatus(), OrderStatusEnum.WAITING_PAYMENT.getStatus())) {        OrderPaidEvent orderPaidEvent = new OrderPaidEvent(orderSn, outOrderNo);        TransactionSendResult sendResult = enhanceTemplate.sendTransaction("TRADE-ORDER", "ORDER-PAID");        if (SendStatus.SEND_OK == sendResult.getSendStatus() && sendResult.getLocalTransactionState() == LocalTransactionState.COMMIT_MESSAGE) {            return tradeOrder.getOrderSn();        } else {            throw new BusinessException("支付失敗...");        }    } else {        throw new BusinessException("訂單已支付,請(qǐng)勿重復(fù)提交...");    }}

T8i28資訊網(wǎng)——每日最新資訊28at.com

2、在訂單服務(wù)的基礎(chǔ)設(shè)施層,創(chuàng)建一個(gè)類實(shí)現(xiàn) RocketMQLocalTransactionListener 接口:T8i28資訊網(wǎng)——每日最新資訊28at.com

該接口有兩個(gè)方法:T8i28資訊網(wǎng)——每日最新資訊28at.com

  • executeLocalTransaction:用于執(zhí)行本地事務(wù)。
  • checkLocalTransaction:在RocketMQ執(zhí)行消息回查時(shí)檢查本地事務(wù)執(zhí)行結(jié)果,用于確定消息提交還是回滾。
@Component@Slf4jpublic class OrderPaidTransactionConsumer implements RocketMQLocalTransactionListener {        @Resource    private TransactionTemplate transactionTemplate;    @Resource    private TradeOrderService tradeOrderService;         /**     * 執(zhí)行本地事務(wù)     * 將訂單狀態(tài)修改成已支付     */    @Override    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {                final OrderPaidEvent orderPaidEvent = JsonUtils.byte2Obj((byte[]) message.getPayload(), OrderPaidEvent.class);        try {            // 放到同一個(gè)本地事務(wù)中            this.transactionTemplate.executeWithoutResult(status -> {                String orderSn = orderPaidEvent.getOrderSn();                // 修改成待發(fā)貨                tradeOrderService.changeOrderStatus(orderSn, OrderStatusEnum.AWAITING_SHIPMENT);            });            return RocketMQLocalTransactionState.COMMIT;        } catch (Exception e) {            log.error("修改訂單狀態(tài)失敗", e);            // ROLLBACK 則回滾消息,rocketmq將廢棄這條消息            return RocketMQLocalTransactionState.ROLLBACK;            // 如果是UNKNOWN, 則觸發(fā)回查        }    }    /**     * 檢查本地事務(wù)執(zhí)行狀態(tài)     * 消息回查時(shí),對(duì)于正在進(jìn)行中的事務(wù)不要返回Rollback或Commit結(jié)果,應(yīng)繼續(xù)保持Unknown的狀態(tài)。     */    @Override    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {        final OrderPaidEvent orderPaidEvent = JsonUtils.byte2Obj((byte[]) message.getPayload(), OrderPaidEvent.class);        String orderSn = orderPaidEvent.getOrderSn();        TradeOrder tradeOrder = tradeOrderService.getByOrderSn(orderSn);        // 如果已經(jīng)修改成待發(fā)貨說明本地事務(wù)執(zhí)行成功,此時(shí)消費(fèi)端可以直接消費(fèi)        if (Objects.equals(tradeOrder.getStatus(), OrderStatusEnum.AWAITING_SHIPMENT.getStatus())) {            return RocketMQLocalTransactionState.COMMIT;        } else {            // 這里查不到的時(shí)候返回 UNKNOWN在于,有可能事務(wù)還沒有提交,回查就開始了            return RocketMQLocalTransactionState.UNKNOWN;        }    }}

T8i28資訊網(wǎng)——每日最新資訊28at.com

3、在庫(kù)存服務(wù)的基礎(chǔ)設(shè)施層,監(jiān)聽消息以執(zhí)行庫(kù)存扣減邏輯:T8i28資訊網(wǎng)——每日最新資訊28at.com

@Component@Slf4j@RocketMQMessageListener(consumerGroup = "dailymart_inventory_group", topic = "TRADE-ORDER", selectorExpression = "ORDER-PAID")public class InventoryDeductionConsumer extends EnhanceMessageHandler<OrderPaidEvent> implements RocketMQListener<OrderPaidEvent> {        @Resource    private InventoryDomainService inventoryDomainService;        @Override    public void onMessage(OrderPaidEvent orderPaidEvent) {        super.dispatchMessage(orderPaidEvent);    }        @Override    protected void handleMessage(OrderPaidEvent orderPaidEvent) throws Exception {        // 執(zhí)行庫(kù)存扣減邏輯        String orderSn = orderPaidEvent.getOrderSn();        inventoryDomainService.deductionInventory(orderSn);    }}

通過以上步驟,我們完成了RocketMQ事務(wù)消息的發(fā)送,利用事務(wù)消息的特性保證分布式事務(wù)的最終一致性。與普通消息相比,事務(wù)消息在處理時(shí)需要實(shí)現(xiàn) RocketMQLocalTransactionListener 接口,這是事務(wù)消息的核心。T8i28資訊網(wǎng)——每日最新資訊28at.com

介紹完事務(wù)消息的使用,接下來(lái)我們?cè)賮?lái)聊聊事務(wù)消息的原理。T8i28資訊網(wǎng)——每日最新資訊28at.com

事務(wù)消息的原理

首先,讓我們思考一下,如果不使用事務(wù)消息會(huì)有什么問題。T8i28資訊網(wǎng)——每日最新資訊28at.com

很容易想到的一個(gè)問題就是消息丟失。當(dāng)保存訂單后由于網(wǎng)絡(luò)問題導(dǎo)致消息丟失,如下圖所示:T8i28資訊網(wǎng)——每日最新資訊28at.com

圖片圖片T8i28資訊網(wǎng)——每日最新資訊28at.com

在不使用RocketMQ的情況下,我們往往會(huì)通過 本地消息表 + 補(bǔ)償重試 的機(jī)制來(lái)保證消息一定會(huì)發(fā)送出去。其原理可以參考上篇文章 [Dailymart26:微服務(wù)中躲不過的坑 - 分布式事務(wù)]。T8i28資訊網(wǎng)——每日最新資訊28at.com

那RocketMQ是如何解決這個(gè)問題的呢?T8i28資訊網(wǎng)——每日最新資訊28at.com

1. 發(fā)送half消息,探測(cè)MQ是否正常

在基于RocketMQ的事務(wù)消息中,我們不是先執(zhí)行自身的訂單支付邏輯,而是先讓訂單系統(tǒng)發(fā)送一條 half消息 到MQ去。這個(gè)half消息本質(zhì)上是一個(gè)訂單支付成功的消息,只不過此時(shí)庫(kù)存系統(tǒng)是看不見這個(gè)half消息的。然后,我們等待接收這個(gè)half消息寫入成功的響應(yīng)通知。T8i28資訊網(wǎng)——每日最新資訊28at.com

圖片圖片T8i28資訊網(wǎng)——每日最新資訊28at.com

發(fā)送half消息的本質(zhì)其實(shí)是為了探測(cè)MQ是否仍然正常運(yùn)行。但問題來(lái)了,如上所述,消息會(huì)發(fā)生丟失,那么half消息丟失怎么辦呢?T8i28資訊網(wǎng)——每日最新資訊28at.com

2. half消息發(fā)送失敗

在發(fā)送half消息時(shí),由于網(wǎng)絡(luò)原因或者M(jìn)Q直接掛了,就會(huì)導(dǎo)致half消息發(fā)送失敗。這個(gè)時(shí)候訂單系統(tǒng)需要執(zhí)行一系列的回滾操作。在我們的場(chǎng)景中,應(yīng)該執(zhí)行退款操作,將錢退還給用戶,并告知用戶交易失敗。T8i28資訊網(wǎng)——每日最新資訊28at.com

3. half消息成功,訂單系統(tǒng)執(zhí)行自己的業(yè)務(wù)邏輯

如果成功收到half消息的正常響應(yīng),此時(shí)訂單系統(tǒng)應(yīng)該執(zhí)行自己的業(yè)務(wù)邏輯。在我們這個(gè)場(chǎng)景中,就是修改訂單數(shù)據(jù)庫(kù)狀態(tài),將其修改為待發(fā)貨狀態(tài)。這部分邏輯就對(duì)應(yīng)上述代碼中的executeLocalTransaction()方法。T8i28資訊網(wǎng)——每日最新資訊28at.com

圖片圖片T8i28資訊網(wǎng)——每日最新資訊28at.com

4. 訂單本地事務(wù)執(zhí)行失敗

如果訂單系統(tǒng)執(zhí)行本地事務(wù)失敗,則需要發(fā)送一個(gè)rollback請(qǐng)求給MQ,讓其刪除這條half消息。T8i28資訊網(wǎng)——每日最新資訊28at.com

圖片圖片T8i28資訊網(wǎng)——每日最新資訊28at.com

5. 訂單本地事務(wù)執(zhí)行成功

如果訂單系統(tǒng)的本地事務(wù)執(zhí)行正常,此時(shí)需要發(fā)送一個(gè)commit請(qǐng)求給MQ,要求MQ對(duì)之前的half消息進(jìn)行commit操作,這樣庫(kù)存系統(tǒng)就可以消費(fèi)這條消息了。T8i28資訊網(wǎng)——每日最新資訊28at.com

圖片圖片T8i28資訊網(wǎng)——每日最新資訊28at.com

訂單創(chuàng)建消息處于half狀態(tài)時(shí),庫(kù)存系統(tǒng)是看不見它的。必須等到訂單系統(tǒng)執(zhí)行commit請(qǐng)求,消息被commit后,庫(kù)存系統(tǒng)才能看到并獲取這條消息進(jìn)行后續(xù)處理。T8i28資訊網(wǎng)——每日最新資訊28at.com

6. half消息發(fā)送成功,但是沒收到half的響應(yīng)

以上就是RocketMQ事務(wù)消息的正向流程。T8i28資訊網(wǎng)——每日最新資訊28at.com

然而,還有一個(gè)問題:如果訂單系統(tǒng)發(fā)送half消息成功后卻沒有收到half消息的響應(yīng),該如何處理呢?T8i28資訊網(wǎng)——每日最新資訊28at.com

在這種情況下,訂單系統(tǒng)可能會(huì)誤以為是發(fā)送half消息到MQ失敗了。訂單系統(tǒng)就會(huì)執(zhí)行回滾流程,退還支付金額,關(guān)閉訂單。T8i28資訊網(wǎng)——每日最新資訊28at.com

圖片圖片T8i28資訊網(wǎng)——每日最新資訊28at.com

然而,此時(shí)MQ系統(tǒng)中已經(jīng)存在了一條half消息。這條half消息又該如何處理呢?T8i28資訊網(wǎng)——每日最新資訊28at.com

在RocketMQ中,有一套補(bǔ)償流程。RocketMQ會(huì)定期掃描處于half狀態(tài)的消息。如果一直沒有對(duì)這個(gè)消息執(zhí)行 commit/rollback 操作,超過了一定的時(shí)間,RocketMQ就會(huì)回調(diào)你的訂單系統(tǒng)的一個(gè)接口,用以確認(rèn)你本地事務(wù)的情況。T8i28資訊網(wǎng)——每日最新資訊28at.com

當(dāng)訂單系統(tǒng)收到MQ的回查請(qǐng)求時(shí),就需要檢索一下數(shù)據(jù)庫(kù),根據(jù)訂單狀態(tài)決定執(zhí)行commit還是rollback。T8i28資訊網(wǎng)——每日最新資訊28at.com

這部分邏輯就對(duì)應(yīng)上述代碼中checkLocalTransaction()方法。T8i28資訊網(wǎng)——每日最新資訊28at.com

圖片圖片T8i28資訊網(wǎng)——每日最新資訊28at.com

7. rollback 或者 commit 失敗怎么辦?

通過上述說明,可以看到,RocketMQ是根據(jù)rollback或commit操作來(lái)決定half消息的狀態(tài)的。如果業(yè)務(wù)系統(tǒng)執(zhí)行了commit操作,則將half消息設(shè)置為可見,庫(kù)存系統(tǒng)可以消費(fèi);如果業(yè)務(wù)系統(tǒng)執(zhí)行了rollback操作,MQ就會(huì)刪除half消息。那么問題來(lái)了:如果訂單系統(tǒng)在執(zhí)行rollback或commit操作時(shí)失敗又該如何處理呢?T8i28資訊網(wǎng)——每日最新資訊28at.com

這時(shí)候仍然依賴于前文提到的回查機(jī)制。T8i28資訊網(wǎng)——每日最新資訊28at.com

由于此時(shí)MQ中的消息一直處于half狀態(tài),超過一定的超時(shí)時(shí)間后,MQ會(huì)發(fā)現(xiàn)這個(gè)half消息有問題,然后回調(diào)你的訂單系統(tǒng)的接口。此時(shí)訂單系統(tǒng)需要根據(jù)訂單狀態(tài)來(lái)決定執(zhí)行commit請(qǐng)求還是rollback請(qǐng)求。T8i28資訊網(wǎng)——每日最新資訊28at.com

以上,就是RocketMQ事務(wù)消息的原理。結(jié)合文章開頭的代碼,是不是已經(jīng)很清晰了呢?T8i28資訊網(wǎng)——每日最新資訊28at.com

本文鏈接:http://www.www897cc.com/showinfo-26-68319-0.html實(shí)戰(zhàn)與原理:如何基于RocketMQ實(shí)現(xiàn)分布式事務(wù)?

聲明:本網(wǎng)頁(yè)內(nèi)容旨在傳播知識(shí),若有侵權(quán)等問題請(qǐng)及時(shí)與本網(wǎng)聯(lián)系,我們將在第一時(shí)間刪除處理。郵件:2376512515@qq.com

上一篇: Python Pathlib模塊:一站式解決文件路徑難題

下一篇: 我愛說實(shí)話,Mica-Http 超好用!

標(biāo)簽:
  • 熱門焦點(diǎn)
  • 石頭智能洗地機(jī)A10 Plus體驗(yàn):雙向自清潔治好了我的懶癌

    一、前言和介紹專為家庭請(qǐng)假懶人而生的石頭科技在近日又帶來(lái)了自己的全新旗艦新品,石頭智能洗地機(jī)A10 Plus。從這個(gè)產(chǎn)品名上就不難看出,這次石頭推出的并不是常見的掃地機(jī)器
  • 6月安卓手機(jī)性能榜:vivo/iQOO霸占旗艦排行榜前三

    2023年上半年已經(jīng)正式過去了,我們也迎來(lái)了安兔兔V10版本,在新的驍龍8Gen3和天璣9300發(fā)布之前,性能榜的榜單大體會(huì)以驍龍8Gen2和天璣9200+為主,至于那顆3.36GHz的驍龍8Gen2領(lǐng)先
  • 這款新興工具平臺(tái),讓你的電腦效率翻倍

    隨著信息技術(shù)的發(fā)展,我們獲取信息的渠道越來(lái)越多,但是處理信息的效率卻成為一個(gè)瓶頸。于是各種工具應(yīng)運(yùn)而生,都在爭(zhēng)相解決我們的工作效率問題。今天我要給大家介紹一款效率
  • 一個(gè)注解實(shí)現(xiàn)接口冪等,這樣才優(yōu)雅!

    場(chǎng)景碼猿慢病云管理系統(tǒng)中其實(shí)高并發(fā)的場(chǎng)景不是很多,沒有必要每個(gè)接口都去考慮并發(fā)高的場(chǎng)景,比如添加住院患者的這個(gè)接口,具體的業(yè)務(wù)代碼就不貼了,業(yè)務(wù)偽代碼如下:圖片上述代碼有
  • 2天漲粉255萬(wàn),又一賽道在抖音爆火

    來(lái)源:運(yùn)營(yíng)研究社作者 | 張知白編輯 | 楊佩汶設(shè)計(jì) | 晏談夢(mèng)潔這個(gè)暑期,旅游賽道徹底火了:有的「地方」火了&mdash;&mdash;貴州村超旅游收入 1 個(gè)月超過 12 億;有的「博主」火了&m
  • 東方甄選單飛:有些鳥注定是關(guān)不住的

    文/彭寬鴻編輯/羅卿東方甄選創(chuàng)始人俞敏洪帶隊(duì)的&ldquo;7天甘肅行&rdquo;直播活動(dòng)已在近日順利收官。成立后一年多時(shí)間里,東方甄選要脫離抖音自立門戶的傳聞不絕于耳,&ldquo;7
  • 認(rèn)真聊聊東方甄選:如何告別低垂的果實(shí)

    來(lái)源:山核桃作者:財(cái)經(jīng)無(wú)忌爆火一年后,俞敏洪和他的東方甄選依舊是頗受外界關(guān)心的&ldquo;網(wǎng)紅&rdquo;。7月5日至9日,為期5天的東方甄選&ldquo;甘肅行&rdquo;首次在自有App內(nèi)直播,
  • 7月4日見!iQOO 11S官宣:“雞血版”驍龍8 Gen2+200W快充加持

    上半年已接近尾聲,截至目前各大品牌旗下的頂級(jí)旗艦都已悉數(shù)亮相,而下半年即將推出的頂級(jí)旗艦已經(jīng)成為了數(shù)碼圈爆料的主流,其中就包括全新的iQOO 11S系
  • 2022爆款:ROG魔霸6 冰川散熱系統(tǒng)持續(xù)護(hù)航

    喜逢開學(xué)季,各大商家開始推出自己的新產(chǎn)品,進(jìn)行打折促銷活動(dòng)。對(duì)于忠實(shí)的端游愛好者來(lái)說,能夠擁有一款夢(mèng)寐以求的筆記本電腦是一件十分開心的事。但是現(xiàn)在的
Top 主站蜘蛛池模板: 永城市| 东丰县| 上杭县| 镇雄县| 巨野县| 黄骅市| 镇宁| 涡阳县| 博罗县| 喜德县| 玉林市| 临夏市| 涪陵区| 淮阳县| 敖汉旗| 饶平县| 峡江县| 尼勒克县| 宜昌市| 通辽市| 锦州市| 邛崃市| 新民市| 庐江县| 榆中县| 平遥县| 常熟市| 云和县| 沁阳市| 临武县| 大悟县| 临邑县| 诸城市| 广德县| 靖州| 桂平市| 开原市| 和硕县| 西安市| 崇仁县| 英超|