基于 MQ 進行系統間的解耦真的是太香了,小艾還沉浸在喜悅中久久不能自拔。但,打臉的事已經在路上了。。。。
昨天下班,在電梯里,物流組的晨姐偶遇了小艾,就一個技術問題向小艾進行了反饋。具體來說,物流系統有一項功能,即實時監控訂單的支付成功事件,一旦檢測到,便會為顧客準備物資,進而安排快遞發貨。今天系統出現了幾次空指針異常。查閱日志,似乎是在反查訂單信息時,沒有獲取到預期的訂單數據。但查詢物流系統,物流單已經成功生成,對業務操作并未造成實際影響,但這個問題還是值得注意。由于這個問題并沒有立即影響到業務流程,所以晨姐沒有在第一時間聯系小艾進行確認。
在小艾正準備啟動IDEA尋找線索的時候,算法組的負責人龍哥急匆匆地走了過來,向小艾反映了他們團隊遇到的一個重要問題。為了提升推薦效果,算法組也會實時監控訂單支付成功事件,并以此為依據重新計算用戶的推薦商品。然而,今天早上,他們突然收到了一系列的報警信息,問題同樣是無法查詢到訂單信息,這個現象與物流系統的問題高度相似。
小艾隨口問道:“這個問題會自己修復嗎?”龍哥愣了一下,回答說:“以前會自動修復,但剛剛那條數據還在報錯。”隨后,龍哥提供了報錯的訂單ID,小艾立即去數據庫中查詢,卻驚訝地發現,這條數據竟然不存在!
看到這種場景,小艾有些慌神,連龍哥什么時候走的都沒有注意到。目光直勾勾的盯著電腦屏幕發呆:
@Transactionalpublic void paySuccess(String orderId, String token){ // 驗證 token,保障有效性 checkToke(token); // 加載訂單信息 Order order = this.orderRepository.getById(orderId); if (order == null){ throw new RuntimeException("訂單不存在"); } // 支付成功,更新訂單狀態 order.paySuccess(); // 將變更更新到數據庫 this.orderRepository.update(order); // 發送支付成功事件 this.eventPublisher.publishEvent(new OrderPaidEvent(order)); // 執行其他業務邏輯 doSomething();}// 監聽變更,發布 MQ@EventListenerpublic void handle(OrderPaidEvent event){ rocketMQTemplate.convertAndSend("order_event", event);}
兩個問題看起來一樣,但又有區別。當下游在收到 MQ 消息時
小艾無意間看到 paySuccess 方法上的 @Transactional 頓時茅塞頓開。
圖片
正如上圖所示:
這就完美的解釋了物流問題,那為什么算法組收到消息里的訂單ID在數據庫不存在呢?
圖片
如上圖所示:
小艾終于鎖定了問題所在,深深地吸了一口氣,釋放了緊繃的神經。就在這時,晨姐的電話打了進來。小艾喃喃自語:“毫無疑問,和算法部門遇到的情況一樣,被XXX訂單給堵住了。”說罷,她信心滿滿地接起了電話…
本質:該問題根本原因是==沒有保障 更新數據庫操作 與 發送消息操作這兩個業務單元之間的一致性。==
定位后,解決方案就變的非常清晰。
最簡的方案就是將 @EventListener 注解 換成 @TransactionalEventListener。
EventListener 和 TransactionalEventListener 都是 Spring 中用于處理事件的監聽器。它們之間的主要區別在于它們處理事件的方式和事務管理。
總之,EventListener 和 TransactionalEventListener 的主要區別在于它們處理事件的方式和事務管理。在選擇使用哪種監聽器時,需要根據實際需求和事務一致性的要求來決定。
了解兩者的區別后,只需做一點調整便可以解決這個問題,調整如下:
/** * 使用 @TransactionalEventListener 替代 @EventListener 監聽訂單支付事件,然后發送消息到 RocketMQ * @param event */@TransactionalEventListenerpublic void handle(OrderPaidEvent event){ rocketMQTemplate.convertAndSend("order_event", event);}
如果沒有使用 Spring 的 Event 機制,但仍想實現 @TransactionEventlistner 的效果,可以直接使用 Spring API:
private void doIfCommitted(Runnable task) { if (TransactionSynchronizationManager.isSynchronizationActive()) { TransactionSynchronization transactionSynchronization = new TransactionSynchronizationAdapter(){ @Override public void afterCommit() { task.run(); } }; TransactionSynchronizationManager.registerSynchronization(transactionSynchronization); }else { task.run(); } }
這個方案確實解決了上述問題,但從一致性角度分析,還是存在設計缺陷,只是發生的概率變低而已,沒有從根本上解決問題。
在事務提交后發送 MQ 時,可能會遇到以下幾種情況,導致兩個操作(數據庫操作和 MQ 發送操作)之間的一致性問題:
這個方案極為簡單,但大幅降低了錯誤概率,主要應用于要求并不嚴格的業務場景。
RocketMQ 的事務消息就是針對這個問題設計的,可以非常高效的解決這個問題。
RocketMQ事務消息是一種支持分布式事務的消息模型,將消息生產和消費與業務邏輯綁定在一起,確保消息發送和事務執行的原子性,保證消息的可靠性。
事務消息分為兩個階段:發送消息和確認消息,確認消息分為提交和回滾兩個操作。在提交操作執行完畢后,消息才會被消費端消費,而在回滾操作執行完畢后,消息會被刪除,從而達到了事務的一致性和可靠性。
事務消息的發生流程如下:
圖片
如果生成者發送 prepare 消息后,未在規定時間內發送 commit 或 rollback 消息,RocketMQ 將進入恢復流程,具體如下:
圖片
一個簡單的示例代碼如下:
// 編寫事務監聽器類public class TransactionListenerImpl implements TransactionListener { private AtomicInteger transactionIndex = new AtomicInteger(0); // 執行本地事務 public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { int value = transactionIndex.getAndIncrement(); System.out.println("executeLocalTransaction " + value); // TODO 執行本地事務,并返回事務狀態 // 本例假定 index 為偶數的消息執行成功,奇數的消息執行失敗 if (value % 2 == 0) { return LocalTransactionState.COMMIT_MESSAGE; } return LocalTransactionState.ROLLBACK_MESSAGE; } // 檢查本地事務狀態 public LocalTransactionState checkLocalTransaction(MessageExt msg) { System.out.println("checkLocalTransaction " + msg.getTransactionId()); // 模擬檢查本地事務狀態,返回事務狀態 boolean committed = prepare(true); if (committed) { return LocalTransactionState.COMMIT_MESSAGE; } return LocalTransactionState.UNKNOW; } // 模擬操作預處理邏輯 private boolean prepare(boolean commit) { System.out.println("prepare " + (commit ? "commit" : "rollback")); return commit; }}// 編寫發送消息的代碼public class Producer { private static final String NAME_SERVER_ADDR = "localhost:9876"; public static void main(String[] args) throws Exception { TransactionMQProducer producer = new TransactionMQProducer("MyGroup"); producer.setNamesrvAddr(NAME_SERVER_ADDR); // 注冊事務監聽器 producer.setTransactionListener(new TransactionListenerImpl()); producer.start(); // 發送事務消息 String[] tags = {"TagA", "TagB", "TagC"}; for (int i = 0; i < 3; i++) { Message msg = new Message("TopicTest", tags[i], ("Hello RocketMQ " + i).getBytes(StandardCharsets.UTF_8)); // 在消息發送時傳遞給事務監聽器的參數 SendResult sendResult = producer.sendMessageInTransaction(msg, null); System.out.printf("%s%n", sendResult); } // 關閉生產者 producer.shutdown(); }}
單看代碼很難理解,簡單畫了張圖,具體如下:
圖片
事務消息并不完美,存在一定的問題:
事務消息表方案是一種常用的保證消息發送與業務操作一致性的方法。該方案基于數據庫事務和消息隊列,將消息發送和業務操作放入同一個事務中,并將業務操作和消息發送的狀態記錄在數據庫的消息表中,以實現消息的可靠性和冪等性。
整體如下圖所示:
圖片
image
核心流程如下:
通過事務消息表方案,可以保證消息的可靠性。即使在消息發送失敗或應用程序崩潰的情況下,也可以通過重新發送消息將業務操作和消息發送的狀態同步。同時,該方案可以避免消息重復發送和漏發的情況。
清晰的流程為復用打下了基礎,lego 對其做了封裝。
首先,需要引入 lego 相關依賴:
<dependency> <groupId>com.geekhalo.lego</groupId> <artifactId>lego-starter</artifactId> <version>0.1.12 以上版本</version></dependency>
其次,在業務數據庫上新建一張表用于存儲消息,示例如下:
create table test_message( id bigint auto_increment primary key, orderly tinyint not null comment '是否為順序消息', topic varchar(64) not null comment 'MQ topic', sharding_key varchar(128) not null comment 'ShardingKey,用于選擇不同的 partition', tag varchar(128) not null comment 'Message Tag 信息', msg_id varchar(64) not null comment 'Msg ID 只有發送成功后才有數據', msg_key varchar(64) not null comment 'MSG Key,用于查詢數據', msg longtext not null comment '要發送的消息', retry_time tinyint not null comment '重試次數', status tinyint not null comment '發送狀態:0-初始化,1-發送成功,2-發送失敗', create_time datetime not null, update_time datetime not null, index idx_update_time_status(update_time, status));
為了兼容多種MQ類型,對發送者進行了抽象,因此需要實現自己的 MessageSender。
@Component@Getter@Slf4jpublic class TestMessageSender implements MessageSender { @Override public String send(Message message) { // 發送消息 }}
最后,就是對所有的組件進行配置,示例代碼如下:
@Configuration@Slf4jpublic class LocalTableBasedReliableMessageConfiguration extends LocalTableBasedReliableMessageConfigurationSupport { @Autowired private DataSource dataSource; @Autowired private MessageSender messageSender; @Override protected DataSource dataSource() { return this.dataSource; } @Override protected String messageTable() { return "test_message"; } @Override protected MessageSender createMessageSend() { return this.messageSender; }}
其中,包括:
ReliableMessageSender#send 在業務方法中使用,執行可靠消息發送;
@Transactionalpublic void testSuccess(){ // 業務邏輯 Message message = buildMessage(); // 業務邏輯 this.reliableMessageSender.send(message);}
除發送流程外,還需要配置補充機制。
ReliableMessageCompensator#compensate 周期性調度,對未發送或發送失敗的消息進行補充;
代碼倉庫:https://gitee.com/litao851025/learnFromBug
代碼地址:https://gitee.com/litao851025/learnFromBug/tree/master/src/main/java/com/geekhalo/demo/mq/sender
本文鏈接:http://www.www897cc.com/showinfo-26-77527-0.html故障現場 | 消息發送居然有這么大的坑
聲明:本網頁內容旨在傳播知識,若有侵權等問題請及時與本網聯系,我們將在第一時間刪除處理。郵件:2376512515@qq.com
上一篇: 探秘HashMap:有趣的算法之旅