日韩成人免费在线_国产成人一二_精品国产免费人成电影在线观..._日本一区二区三区久久久久久久久不

當前位置:首頁 > 科技  > 軟件

故障現場 | 消息發送居然有這么大的坑

來源: 責編: 時間:2024-03-18 17:43:48 211觀看
導讀1. 問題&分析基于 MQ 進行系統間的解耦真的是太香了,小艾還沉浸在喜悅中久久不能自拔。但,打臉的事已經在路上了。。。。1.1. 案例昨天下班,在電梯里,物流組的晨姐偶遇了小艾,就一個技術問題向小艾進行了反饋。具體來說,物

1. 問題&分析

基于 MQ 進行系統間的解耦真的是太香了,小艾還沉浸在喜悅中久久不能自拔。但,打臉的事已經在路上了。。。。MbA28資訊網——每日最新資訊28at.com

1.1. 案例

昨天下班,在電梯里,物流組的晨姐偶遇了小艾,就一個技術問題向小艾進行了反饋。具體來說,物流系統有一項功能,即實時監控訂單的支付成功事件,一旦檢測到,便會為顧客準備物資,進而安排快遞發貨。今天系統出現了幾次空指針異常。查閱日志,似乎是在反查訂單信息時,沒有獲取到預期的訂單數據。但查詢物流系統,物流單已經成功生成,對業務操作并未造成實際影響,但這個問題還是值得注意。由于這個問題并沒有立即影響到業務流程,所以晨姐沒有在第一時間聯系小艾進行確認。MbA28資訊網——每日最新資訊28at.com

在小艾正準備啟動IDEA尋找線索的時候,算法組的負責人龍哥急匆匆地走了過來,向小艾反映了他們團隊遇到的一個重要問題。為了提升推薦效果,算法組也會實時監控訂單支付成功事件,并以此為依據重新計算用戶的推薦商品。然而,今天早上,他們突然收到了一系列的報警信息,問題同樣是無法查詢到訂單信息,這個現象與物流系統的問題高度相似。MbA28資訊網——每日最新資訊28at.com

小艾隨口問道:“這個問題會自己修復嗎?”龍哥愣了一下,回答說:“以前會自動修復,但剛剛那條數據還在報錯。”隨后,龍哥提供了報錯的訂單ID,小艾立即去數據庫中查詢,卻驚訝地發現,這條數據竟然不存在!MbA28資訊網——每日最新資訊28at.com

看到這種場景,小艾有些慌神,連龍哥什么時候走的都沒有注意到。目光直勾勾的盯著電腦屏幕發呆:MbA28資訊網——每日最新資訊28at.com

@Transactionalpublic void paySuccess(String orderId, String token){    // 驗證 token,保障有效性    checkToke(token);    // 加載訂單信息    Order order = this.orderRepository.getById(orderId);    if (order == null){        throw new RuntimeException("訂單不存在");    }    // 支付成功,更新訂單狀態    order.paySuccess();    // 將變更更新到數據庫    this.orderRepository.update(order);    // 發送支付成功事件    this.eventPublisher.publishEvent(new OrderPaidEvent(order));    // 執行其他業務邏輯    doSomething();}// 監聽變更,發布 MQ@EventListenerpublic void handle(OrderPaidEvent event){    rocketMQTemplate.convertAndSend("order_event", event);}

1.2. 問題分析

兩個問題看起來一樣,但又有區別。當下游在收到 MQ 消息時MbA28資訊網——每日最新資訊28at.com

  1. 無法查不到訂單,但稍后能自我修復
  2. 一直查不到訂單,數據庫里還沒有,無法進行自我修復

小艾無意間看到 paySuccess 方法上的 @Transactional 頓時茅塞頓開。MbA28資訊網——每日最新資訊28at.com

圖片圖片MbA28資訊網——每日最新資訊28at.com

正如上圖所示:MbA28資訊網——每日最新資訊28at.com

  1. 在數據庫更新完成后,系統會立即發送消息隊列(MQ)消息,同時主流程會繼續執行后續的耗時操作。
  2. 當下游接收到MQ消息時,會進行數據查詢。然而,由于此時主流程尚未完成事務提交,因此無法查詢到相關數據,導致下游出現錯誤。
  3. 如果MQ消息消費失敗,系統會自動進行重試。如果在此期間主流程已經完成了事務提交,那么就能夠成功查詢到數據,從而使得業務流程得以恢復正常。

這就完美的解釋了物流問題,那為什么算法組收到消息里的訂單ID在數據庫不存在呢?MbA28資訊網——每日最新資訊28at.com

圖片圖片MbA28資訊網——每日最新資訊28at.com

如上圖所示:MbA28資訊網——每日最新資訊28at.com

  1. 在數據庫更新完成后,系統會立即發送消息隊列(MQ)消息,而主流程將同時繼續執行后續的耗時操作。
  2. 若主流程在執行后續邏輯時發生異常,將導致整個事務回滾,進而中斷處理過程。
  3. 下游系統接收到消息后,進行數據反查,但由于事務已回滾,因此無法查詢到任何數據。
  4. 因為發生事務回滾,數據庫中根本就沒有這條記錄,所以即使后面有自動重試機制,也無法恢復處理邏輯。

小艾終于鎖定了問題所在,深深地吸了一口氣,釋放了緊繃的神經。就在這時,晨姐的電話打了進來。小艾喃喃自語:“毫無疑問,和算法部門遇到的情況一樣,被XXX訂單給堵住了。”說罷,她信心滿滿地接起了電話…MbA28資訊網——每日最新資訊28at.com

本質:該問題根本原因是==沒有保障 更新數據庫操作 與 發送消息操作這兩個業務單元之間的一致性。==MbA28資訊網——每日最新資訊28at.com

2. 解決方案

定位后,解決方案就變的非常清晰。MbA28資訊網——每日最新資訊28at.com

2.1. 方案1:使用 @TransactionalEventListener

最簡的方案就是將 @EventListener 注解 換成 @TransactionalEventListener。MbA28資訊網——每日最新資訊28at.com

2.1.1. EventListener 和 TransactionalEventListener

EventListener 和 TransactionalEventListener 都是 Spring 中用于處理事件的監聽器。它們之間的主要區別在于它們處理事件的方式和事務管理。MbA28資訊網——每日最新資訊28at.com

  1. EventListener:這是一個通用的事件監聽器,當事件發布時,它會立即執行相應的處理方法。它不會參與到事務管理中,也就是說,即使在事務執行過程中發生異常,EventListener 依然會執行。
  2. TransactionalEventListener:這是一個具有事務管理功能的事件監聽器。當事件發布時,它會等待當前事務完成后再執行相應的處理方法。這意味著,如果在事務執行過程中發生異常,TransactionalEventListener 將不會執行,從而確保事務的一致性。

總之,EventListener 和 TransactionalEventListener 的主要區別在于它們處理事件的方式和事務管理。在選擇使用哪種監聽器時,需要根據實際需求和事務一致性的要求來決定。MbA28資訊網——每日最新資訊28at.com

2.1.2. 源碼示例

了解兩者的區別后,只需做一點調整便可以解決這個問題,調整如下:MbA28資訊網——每日最新資訊28at.com

/** * 使用 @TransactionalEventListener 替代 @EventListener 監聽訂單支付事件,然后發送消息到 RocketMQ * @param event */@TransactionalEventListenerpublic void handle(OrderPaidEvent event){    rocketMQTemplate.convertAndSend("order_event", event);}

如果沒有使用 Spring 的 Event 機制,但仍想實現 @TransactionEventlistner 的效果,可以直接使用 Spring API:MbA28資訊網——每日最新資訊28at.com

private void doIfCommitted(Runnable task) {        if (TransactionSynchronizationManager.isSynchronizationActive()) {            TransactionSynchronization transactionSynchronization = new TransactionSynchronizationAdapter(){                @Override                public void afterCommit() {                    task.run();                }            };            TransactionSynchronizationManager.registerSynchronization(transactionSynchronization);        }else {            task.run();        }    }

2.1.3. 問題&挑戰

這個方案確實解決了上述問題,但從一致性角度分析,還是存在設計缺陷,只是發生的概率變低而已,沒有從根本上解決問題。MbA28資訊網——每日最新資訊28at.com

在事務提交后發送 MQ 時,可能會遇到以下幾種情況,導致兩個操作(數據庫操作和 MQ 發送操作)之間的一致性問題:MbA28資訊網——每日最新資訊28at.com

  1. 數據庫事務提交成功,但在發送 MQ 消息時發生網絡故障。此時,數據庫操作已經完成,但 MQ 消息未能成功發送。
  2. 數據庫事務提交成功,但在發送 MQ 消息時發生 MQ 服務器故障。此時,數據庫操作已經完成,但 MQ 消息未能成功發送。
  3. 數據庫事務提交成功,但在發送 MQ 消息時發生應用程序故障。此時,數據庫操作已經完成,但 MQ 消息未能成功發送。
  4. 數據庫事務提交成功,但在發送 MQ 消息時發生消息丟失。此時,數據庫操作已經完成,但 MQ 消息未能成功發送。

這個方案極為簡單,但大幅降低了錯誤概率,主要應用于要求并不嚴格的業務場景。MbA28資訊網——每日最新資訊28at.com

2.2 方案2:RocketMQ事務消息

RocketMQ 的事務消息就是針對這個問題設計的,可以非常高效的解決這個問題。MbA28資訊網——每日最新資訊28at.com

2.2.1. 半消息以及工作原理

RocketMQ事務消息是一種支持分布式事務的消息模型,將消息生產和消費與業務邏輯綁定在一起,確保消息發送和事務執行的原子性,保證消息的可靠性。MbA28資訊網——每日最新資訊28at.com

事務消息分為兩個階段:發送消息和確認消息,確認消息分為提交和回滾兩個操作。在提交操作執行完畢后,消息才會被消費端消費,而在回滾操作執行完畢后,消息會被刪除,從而達到了事務的一致性和可靠性。MbA28資訊網——每日最新資訊28at.com

事務消息的發生流程如下:MbA28資訊網——每日最新資訊28at.com

圖片圖片MbA28資訊網——每日最新資訊28at.com

  1. 生產者發送prepare消息到RocketMQ服務端,RocketMQ將消息存儲到本地并返回結果;
  2. 生產者開始執行本地事務,并根據本地事務的結果將狀態信息提交給RocketMQ服務端;
  3. 如果本地事務執行成功,生產者向RocketMQ服務端發送commit消息;
  4. 如果本地事務執行失敗,生產者向RocketMQ服務端發送rollback消息;
  5. RocketMQ接收到commit或rollback消息后,對消息進行投放或刪除;

如果生成者發送 prepare 消息后,未在規定時間內發送 commit 或 rollback 消息,RocketMQ 將進入恢復流程,具體如下:MbA28資訊網——每日最新資訊28at.com

圖片圖片MbA28資訊網——每日最新資訊28at.com

  1. 如果在回查的時間之前沒有收到相應的 commit 或 rollback 消息,則 RocketMQ 會將對該 prepare 消息進行回查;
  2. 應用程序接收到回查指令,從業務庫中獲取數據,并根據業務邏輯進行判斷,最終是 commit 還是 rollback;
  3. RocketMQ 接收到 commit 或 rollback 回復后,進行相應動作,從而實現業務操作和消息發送的一致性;

2.2.2. 源碼示例

一個簡單的示例代碼如下:MbA28資訊網——每日最新資訊28at.com

// 編寫事務監聽器類public class TransactionListenerImpl implements TransactionListener {    private AtomicInteger transactionIndex = new AtomicInteger(0);    // 執行本地事務    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {        int value = transactionIndex.getAndIncrement();        System.out.println("executeLocalTransaction " + value);        // TODO 執行本地事務,并返回事務狀態        // 本例假定 index 為偶數的消息執行成功,奇數的消息執行失敗        if (value % 2 == 0) {            return LocalTransactionState.COMMIT_MESSAGE;        }        return LocalTransactionState.ROLLBACK_MESSAGE;    }    // 檢查本地事務狀態    public LocalTransactionState checkLocalTransaction(MessageExt msg) {        System.out.println("checkLocalTransaction " + msg.getTransactionId());        // 模擬檢查本地事務狀態,返回事務狀態        boolean committed = prepare(true);        if (committed) {            return LocalTransactionState.COMMIT_MESSAGE;        }        return LocalTransactionState.UNKNOW;    }    // 模擬操作預處理邏輯    private boolean prepare(boolean commit) {        System.out.println("prepare " + (commit ? "commit" : "rollback"));        return commit;    }}// 編寫發送消息的代碼public class Producer {    private static final String NAME_SERVER_ADDR = "localhost:9876";    public static void main(String[] args) throws Exception {        TransactionMQProducer producer = new TransactionMQProducer("MyGroup");        producer.setNamesrvAddr(NAME_SERVER_ADDR);        // 注冊事務監聽器        producer.setTransactionListener(new TransactionListenerImpl());        producer.start();        // 發送事務消息        String[] tags = {"TagA", "TagB", "TagC"};        for (int i = 0; i < 3; i++) {            Message msg = new Message("TopicTest", tags[i], ("Hello RocketMQ " + i).getBytes(StandardCharsets.UTF_8));            // 在消息發送時傳遞給事務監聽器的參數            SendResult sendResult = producer.sendMessageInTransaction(msg, null);            System.out.printf("%s%n", sendResult);        }        // 關閉生產者        producer.shutdown();    }}

單看代碼很難理解,簡單畫了張圖,具體如下:MbA28資訊網——每日最新資訊28at.com

圖片圖片MbA28資訊網——每日最新資訊28at.com

2.2.3. 問題&挑戰

事務消息并不完美,存在一定的問題:MbA28資訊網——每日最新資訊28at.com

  1. 與 MQ 實現強相關,并不是每個 MQ 實現都對事務消息提供支持;
  2. API 比較晦澀,存在一定的學習成本,同時需要對業務邏輯拆分到 Listener 中,增加理解成本;

2.3. 方案3:本地消息表

事務消息表方案是一種常用的保證消息發送與業務操作一致性的方法。該方案基于數據庫事務和消息隊列,將消息發送和業務操作放入同一個事務中,并將業務操作和消息發送的狀態記錄在數據庫的消息表中,以實現消息的可靠性和冪等性。MbA28資訊網——每日最新資訊28at.com

2.3.1. 設計&核心流程

整體如下圖所示:MbA28資訊網——每日最新資訊28at.com

圖片圖片MbA28資訊網——每日最新資訊28at.com

imageMbA28資訊網——每日最新資訊28at.com

核心流程如下:MbA28資訊網——每日最新資訊28at.com

  1. 應用程序開啟一個數據庫事務,并在事務中執行業務操作和消息發送;
  2. 在事務中,將業務操作和消息發送的狀態記錄到消息表中;
  3. 如果業務操作執行成功,并且消息發送成功,提交事務,否則回滾事務;
  4. 定時掃描消息表,并根據消息狀態重新發送未被確認的消息。如果消息發送成功,更新消息狀態;否則根據重試次數更新消息狀態或者丟棄消息;

通過事務消息表方案,可以保證消息的可靠性。即使在消息發送失敗或應用程序崩潰的情況下,也可以通過重新發送消息將業務操作和消息發送的狀態同步。同時,該方案可以避免消息重復發送和漏發的情況。MbA28資訊網——每日最新資訊28at.com

2.3.2. 功能封裝

清晰的流程為復用打下了基礎,lego 對其做了封裝。MbA28資訊網——每日最新資訊28at.com

2.3.2.1. 環境準備

首先,需要引入 lego 相關依賴:MbA28資訊網——每日最新資訊28at.com

<dependency>    <groupId>com.geekhalo.lego</groupId>    <artifactId>lego-starter</artifactId>    <version>0.1.12 以上版本</version></dependency>

其次,在業務數據庫上新建一張表用于存儲消息,示例如下:MbA28資訊網——每日最新資訊28at.com

create table test_message(    id           bigint auto_increment primary key,    orderly      tinyint      not null comment '是否為順序消息',    topic        varchar(64)  not null comment 'MQ topic',    sharding_key varchar(128) not null comment 'ShardingKey,用于選擇不同的 partition',    tag          varchar(128) not null comment 'Message Tag 信息',    msg_id       varchar(64)  not null comment 'Msg ID 只有發送成功后才有數據',    msg_key      varchar(64)  not null comment 'MSG Key,用于查詢數據',    msg          longtext     not null comment '要發送的消息',    retry_time   tinyint      not null comment '重試次數',    status       tinyint      not null comment '發送狀態:0-初始化,1-發送成功,2-發送失敗',    create_time  datetime     not null,    update_time  datetime     not null,    index idx_update_time_status(update_time, status));

為了兼容多種MQ類型,對發送者進行了抽象,因此需要實現自己的 MessageSender。MbA28資訊網——每日最新資訊28at.com

@Component@Getter@Slf4jpublic class TestMessageSender implements MessageSender {    @Override    public String send(Message message) {        // 發送消息    }}

最后,就是對所有的組件進行配置,示例代碼如下:MbA28資訊網——每日最新資訊28at.com

@Configuration@Slf4jpublic class LocalTableBasedReliableMessageConfiguration        extends LocalTableBasedReliableMessageConfigurationSupport {    @Autowired    private DataSource dataSource;    @Autowired    private MessageSender messageSender;    @Override    protected DataSource dataSource() {        return this.dataSource;    }    @Override    protected String messageTable() {        return "test_message";    }    @Override    protected MessageSender createMessageSend() {        return this.messageSender;    }}

其中,包括:MbA28資訊網——每日最新資訊28at.com

  1. 繼承自 LocalTableBasedReliableMessageConfigurationSupport,由父類完成基本配置;
  2. 實現 DataSource dataSource() 方法,返回業務數據源(備注:必須與業務使用同一個數據源)
  3. 實現 String messageTable() 方法,配置本地消息表表名;
  4. 實現 MessageSender createMessageSend() 方法,返回 MessageSender 實例,執行真正的消費發送;
2.2.3.2. 具體使用

ReliableMessageSender#send 在業務方法中使用,執行可靠消息發送;MbA28資訊網——每日最新資訊28at.com

@Transactionalpublic void testSuccess(){    // 業務邏輯    Message message = buildMessage();    // 業務邏輯    this.reliableMessageSender.send(message);}

除發送流程外,還需要配置補充機制。MbA28資訊網——每日最新資訊28at.com

ReliableMessageCompensator#compensate 周期性調度,對未發送或發送失敗的消息進行補充;MbA28資訊網——每日最新資訊28at.com

4. 示例&源碼

代碼倉庫:https://gitee.com/litao851025/learnFromBugMbA28資訊網——每日最新資訊28at.com

代碼地址:https://gitee.com/litao851025/learnFromBug/tree/master/src/main/java/com/geekhalo/demo/mq/senderMbA28資訊網——每日最新資訊28at.com


MbA28資訊網——每日最新資訊28at.com

本文鏈接:http://www.www897cc.com/showinfo-26-77527-0.html故障現場 | 消息發送居然有這么大的坑

聲明:本網頁內容旨在傳播知識,若有侵權等問題請及時與本網聯系,我們將在第一時間刪除處理。郵件:2376512515@qq.com

上一篇: 探秘HashMap:有趣的算法之旅

下一篇: 寫了個簡單爬蟲,收集 Boss 直聘自動駕駛崗位

標簽:
  • 熱門焦點
Top 主站蜘蛛池模板: 公主岭市| 鄂州市| 肇庆市| 松滋市| 论坛| 丹棱县| 富顺县| 昌宁县| 清流县| 冀州市| 思茅市| 克东县| 武威市| 阿拉善左旗| 灵璧县| 镇赉县| 龙岩市| 中牟县| 鲁甸县| 宁强县| 吴桥县| 襄城县| 泸州市| 大足县| 连平县| 洛隆县| 江津市| 河南省| 望城县| 华蓥市| 顺昌县| 且末县| 宣汉县| 建宁县| 广河县| 滕州市| 青冈县| 务川| 泰州市| 襄垣县| 攀枝花市|