日韩成人免费在线_国产成人一二_精品国产免费人成电影在线观..._日本一区二区三区久久久久久久久不

當前位置:首頁 > 科技  > 軟件

消息隊列,聊聊發送消息的四種姿勢~

來源: 責編: 時間:2023-12-18 09:46:55 246觀看
導讀微服務開發中經常會使用消息隊列進行跨服務通信。在一個典型場景中,服務A執行一個業務邏輯,需要保存數據庫,然后通知服務B執行相應的業務邏輯。在這種場景下,我們需要考慮如何發送消息。圖片1. 基礎版首先,我們可能會考慮

微服務開發中經常會使用消息隊列進行跨服務通信。在一個典型場景中,服務A執行一個業務邏輯,需要保存數據庫,然后通知服務B執行相應的業務邏輯。在這種場景下,我們需要考慮如何發送消息。rsS28資訊網——每日最新資訊28at.com

圖片圖片rsS28資訊網——每日最新資訊28at.com

1. 基礎版

首先,我們可能會考慮將數據庫操作和消息發送放在同一個事務中,以下是偽代碼示例:rsS28資訊網——每日最新資訊28at.com

@Transactional  public void saveWithMessage(BusinessDO businessDO){  String id = IdUtils.nextId(); businessDO.setId(id);    xxxRepository.save(businessDO);          BusinessMessage businessMessage = new BusinessMessage();      businessMessage.setKey(id);      SendResult send = rocketMQTemplate.syncSend("test-topic", sendMessage);}

在這段代碼里通過@Transactional注解將數據庫的操作以及發送消息放到一個事務中,如果數據庫的保存或者消息發送失敗,則回滾事務。rsS28資訊網——每日最新資訊28at.com

乍一看似乎沒什么問題,但稍微推敲一下就會發現此方式有如下兩個缺陷:rsS28資訊網——每日最新資訊28at.com

1.1 數據不一致

首先最容易想到的是,這種消息發送方式無法保證數據的最終一致性。rsS28資訊網——每日最新資訊28at.com

這里先讓我來解釋一下基于消息隊列,生產者發送消息到消費者消費消息的過程:rsS28資訊網——每日最新資訊28at.com

  1. 生產者發送消息
  2. MQ收到消息并將數據持久化,在存儲中新增一條記錄
  3. 返回ACK給生產者
  4. MQ 推送消息給對應的消費者,等待消費者返回ACK
  5. 如果消費者在指定時間內成功返回ACK,那么MQ則認為消費成功,執行第6步刪除消息,如果MQ在指定時間內沒有收到ACK,則認為消息消費失敗,會重新推送消息,重復執行第4、5、6步操作。
  6. 刪除消息。

圖片圖片rsS28資訊網——每日最新資訊28at.com

好,現在回到上面發送消息的場景,假設數據庫處理成功,消息消費成功,但是MQ由于某些原因處理超時,導致ACK確認失敗,此時整個事務回滾,結果出現數據不一致問題。rsS28資訊網——每日最新資訊28at.com

這種數據不一致的問題在RPC調用的場景下也經常出現,其根本的原因在于:遠程調用,結果最終可能為成功、失敗、超時;而對于超時的情況,處理方最終的結果可能是成功,也可能是失敗,調用方是無法知曉的。rsS28資訊網——每日最新資訊28at.com

1.2 事務未提交

其次,使用以上方式還存在另一個問題,即消費者在處理消息時可能讀不到剛剛保存的數據,即消費者消費速度快于事務提交的速度。rsS28資訊網——每日最新資訊28at.com

舉例:rsS28資訊網——每日最新資訊28at.com

假設服務B需要通過消息中的數據ID獲取服務A數據庫保存的數據。這種情況下,數據庫操作與消息發送在同一事務中。可能出現服務B在處理消息時,服務A事務還未提交,導致服務B獲取的數據是空數據,無法執行相應業務邏輯。rsS28資訊網——每日最新資訊28at.com

1.3 適用場景

盡管這種發送方式存在上述兩個問題,但在某些場景下仍然適用。例如,消費者在處理時不依賴生產者的數據,且對數據一致性要求不高,這種情況下消息發送和數據庫保存可以不在同一個事務中。rsS28資訊網——每日最新資訊28at.com

2. 進階版

為解決事務未提交問題,我們可以確保事務提交后再發送消息。在SpringBoot項目中,有兩種常見解決方案。rsS28資訊網——每日最新資訊28at.com

2.1 事務同步器

基于事務同步器的方法:rsS28資訊網——每日最新資訊28at.com

@Transactional  public void saveWithMessage(BusinessDO businessDO){      xxxRepository.save(businessDO);          TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {          @Override          public void afterCommit() {              BusinessMessage businessMessage = new BusinessMessage();              businessMessage.setXXX();              rocketMQTemplate.syncSend("test-topic", sendMessage);          }      });  }

TransactionSynchronizationManager.registerSynchronization 是 Spring 框架中用于注冊事務同步的方法。通過這個方法,你可以在事務提交、回滾或完成時執行一些額外的邏輯。rsS28資訊網——每日最新資訊28at.com

在上述代碼中,使用了afterCommit方法,在事務成功提交后執行發送消息操作,確保在數據庫操作成功且事務穩定的情況下發送消息。rsS28資訊網——每日最新資訊28at.com

2.2 消息監聽器

另一種方法是基于ApplicationEventPublisher,在保存數據庫操作后發布一個事件,并通過@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)注解監聽事件。這樣可以確保消息在數據庫事務提交之后再發送。rsS28資訊網——每日最新資訊28at.com

@Transactional  public void saveWithMessage(BusinessDO businessDO){      xxxRepository.save(businessDO);   eventPublisher.publishEvent(new UserCreatedEvent(registerUser));}@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT) public void handleUserRegisteredEvent(UserCreatedEvent userCreatedEvent) {      rocketMQTemplate.syncSend("test-topic", sendMessage);  }

這里需要說明一下:在默認情況下Spring的事件監聽機制并不是異步的(上次群友弄錯了),而是同步的將代碼進行解耦,@TransactionalEventListener也是通過同步的方式,但是加入了回調的方式來解決,這樣就能夠控制事務進行Commited、Rollback時才進行事件的處理,來達到事務同步的目的。rsS28資訊網——每日最新資訊28at.com

2.3 適用范圍

通過以上方式,相較于基礎版,可以確保消息在事務提交后發送,解決了消費者讀取空數據的問題。但仍然無法保證數據的一致性,適用于對數據一致性要求不高的場景。rsS28資訊網——每日最新資訊28at.com

3. 本地消息表+補償重試

如果需要保證最終一致性而非強一致性,可以采用本地消息表+補償重試的方式來發送消息。rsS28資訊網——每日最新資訊28at.com

這種方式的執行原理如下:rsS28資訊網——每日最新資訊28at.com

  1. 在執行業務操作的同時,在本地消息表中插入一條狀態為待發送的記錄,業務數據的記錄與消息記錄必須在同一個事務中完成,這是此方案的核心原則。由于消息表與業務表在同一個庫中,事務可以通過數據庫來保證。
  2. 事務提交后發送消息,如果消息發送成功,則將消息狀態標記為發送成功或刪除消息
  3. 在生產者服務中會創建一個定時任務,定時從消息表中檢索待發送的消息重新發送。
  4. 對于消費者消費失敗,則依賴MQ本身的重試機制來完成,保證數據的最終一致性。

圖片圖片rsS28資訊網——每日最新資訊28at.com

核心代碼如下:rsS28資訊網——每日最新資訊28at.com

@Transactional  public void saveWithMessage(BusinessDO businessDO){      TransactionMessage transactionMessage = new TransactionMessage();      transactionMessage.setStaus(MessageStatus.WAITING_SEND);      transactionMessage.setMessageKey(businessDO.getId());      ...        xxxRepository.save(businessDO);      messageRepository.save(TransactionMessage);      TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {          @Override          public void afterCommit() {              messageService.sendMessage(transactionMessage,businessDO);          }      });  }  public void sendMessage(TransactionMessage transactionMessage,BusinessDO businessDO){      BusinessMessage businessMessage = new BusinessMessage();      businessMessage.setXXX();      try{          rocketMQTemplate.syncSend("test-topic", sendMessage);          transactionMessage.setStatus(MessageStatus.SUCCESS);          messageRepository.update(transactionMessage);      }catch (Exception e){          // 執行失敗的業務邏輯      }    }

3.1 問題

雖然這種方式能夠保證消息在事務提交后發送,且能夠保證最終一致性,但仍然存在一些缺陷:rsS28資訊網——每日最新資訊28at.com

首先,需要額外的消息表,增加了系統復雜度。(針對此問題,我們又可以將該功能單獨提取出來,做成一個消息服務來統一處理,考慮篇幅問題,這里暫不展開。)rsS28資訊網——每日最新資訊28at.com

其次,通過定時任務輪詢消息表,對于處理成功但ACK超時的數據會重新發送消息,這對下游系統產生了強烈的冪等性保障要求,消費者的處理邏輯必須做好冪等控制。關于冪等的處理方案,我在[[Dailymart17:并發與冪等的實現方案]]一文中有詳細的說明,歡迎翻閱。rsS28資訊網——每日最新資訊28at.com

4. 基于事務消息發送

目前,RocketMQ是主流MQ中唯一一個支持事務消息的,如果你們項目恰好使用的是RocketMQ,可以采用事務消息來發送。rsS28資訊網——每日最新資訊28at.com

有關RocketMQ事務消息的詳細信息,可以參考我之前的文章[SpringCloud基于RocketMQ實現分布式事務,這里不再贅述。同時,由于在Dailymart項目中使用的是RocketMQ,也可以參考Dailymart的代碼實現。rsS28資訊網——每日最新資訊28at.com

小結

本文詳細介紹了微服務開發中常用的4種消息發送方式。對于那些對數據一致性要求不高的場景,可以選擇使用進階版的消息發送方式。而對于需要保證最終一致性的情況,推薦采用事務消息和本地消息表的方式進行消息發送。rsS28資訊網——每日最新資訊28at.com

在文中涉及的相關代碼,你可以在我星球 DDD&微服務 系列專欄中找到相應的實現,歡迎加入我們的討論。rsS28資訊網——每日最新資訊28at.com

DailyMart是一個基于 DDD 和Spring Cloud Alibaba的微服務商城系統,采用SpringBoot3.x以及JDK17。旨在為開發者提供集成式的學習體驗,并將其無縫地應用于實際項目中。該專欄包含領域驅動設計(DDD)、Spring Cloud Alibaba企業級開發實踐、設計模式實際應用場景解析、分庫分表戰術及實用技巧等內容。如果你對這個系列感興趣,可在本公眾號回復關鍵詞 DDD 獲取完整文檔以及相關源碼。rsS28資訊網——每日最新資訊28at.com

本文鏈接:http://www.www897cc.com/showinfo-26-48351-0.html消息隊列,聊聊發送消息的四種姿勢~

聲明:本網頁內容旨在傳播知識,若有侵權等問題請及時與本網聯系,我們將在第一時間刪除處理。郵件:2376512515@qq.com

上一篇: Eslint 會被 Oxlint 干掉嗎?

下一篇: 神技能!一招教你免費搞定PDF轉word

標簽:
  • 熱門焦點
Top 主站蜘蛛池模板: 焦作市| 贵定县| 邻水| 邓州市| 仲巴县| 泊头市| 绩溪县| 淅川县| 甘孜| 田林县| 淮滨县| 凉城县| 乌什县| 东兴市| 武安市| 洪雅县| 顺昌县| 两当县| 上栗县| 江口县| 黎川县| 新源县| 洪泽县| 吉安县| 道孚县| 长白| 阿尔山市| 乌海市| 肥西县| 汤原县| 玉环县| 四平市| 司法| 丰台区| 伊川县| 松原市| 宜城市| 赤城县| 桃江县| 赤峰市| 开化县|