今天我們將解決使用RocketMQ事務消息時可能遇到的一個常見問題:如何讓其支持多事務消息?
在實際開發中,我們常常會面臨多事務消息的場景,例如在DailyMart的訂單模塊中,用戶支付后需要調用庫存服務進行庫存扣減,而在訂單確認收貨后需要調用用戶服務實現積分贈送。這兩個業務邏輯都需要通過事務消息來保證分布式事務。
為了處理這種情況,我們可能會考慮在訂單模塊中創建兩個事務消息監聽器,分別用于處理庫存扣減和積分贈送的事務處理和事務回查。
@Component@Slf4j//處理訂單支付的事務監聽器public class OrderPaidTransactionListener implements RocketMQLocalTransactionListener { @Override public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) { ...... //處理訂單支付邏輯 } @Override public RocketMQLocalTransactionState checkLocalTransaction(Message message) { ...... //檢查訂單處理邏輯 }}@Component@Slf4j//處理訂單收貨的事務監聽器public class OrderReceivedTransactionListener implements RocketMQLocalTransactionListener { @Override public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) { ...... } @Override public RocketMQLocalTransactionState checkLocalTransaction(Message message) { ...... }}
然而,當我們信心滿滿地完成業務邏輯編寫并啟動服務時,可能會遇到如下錯誤:rocketMQTemplate already exists RocketMQLocalTransactionListener
圖片
在rocketmq-spring-boot-starter版本低于2.1.0的項目中,可以使用多個 @RocketMQTransactionListener 監聽不同的 txProducerGroup 來發送不同類型的事務消息到topic。然而,從 RocketMQ-Spring 2.1.0 版本開始,注解 @RocketMQTransactionListener 不能設置 txProducerGroup、ak、sk,這些值均需與對應的 RocketMQTemplate 保持一致。通過閱讀源碼 RocketMQTransactionConfiguration#registerTransactionListener() 方法,也可得知在RocketMQ如果已經存在了 RocketMQTransactionListener 則會出現上述錯誤。
圖片
為了在保證系統只有一個 RocketMQTransactionListener 的前提下實現多事務消息,我們可以將 RocketMQLocalTransactionListener 不處理具體業務邏輯,而是將其作為一個分發器使用。
在生產者發送事務消息時指定對應的事務處理器 ,并將事務處理器放置在消息頭上發送出去,在 RocketMQTransactionListener 中根據消息頭選擇具體的事務處理器來實現業務邏輯。
具體實現如下:
首先,定義公共的事務消息處理接口,所有事務消息都實現此接口而非 RocketMQ 默認的 RocketMQLocalTransactionListener。
public interface TransactionMessageHandler { /** * 執行本地事務 * @param payload 消息體 * @param arg 參數 */ RocketMQLocalTransactionState executeLocalTransaction(Object payload, Object arg); /** * 檢查本地執行狀態 * @param payload 消息體 * @return 執行結果 */ RocketMQLocalTransactionState checkLocalTransaction(Object payload); }
public <T extends RemoteDomainEvent> TransactionSendResult sendTransaction(String topic, String tag, T message, Class<? extends TransactionMessageHandler> transactionMessageListener) { if(transactionMessageListener == null){ throw new IllegalArgumentException("transactionMessageListener must not null"); } String destination = buildDestination(topic, tag); Message<T> sendMessage = MessageBuilder.withPayload(message) .setHeader(RocketMQHeaders.KEYS, message.getKey()) .setHeader(SOURCE_HEADER, message.getSource()) .setHeader(TRANSACTION_MESSAGE_HEADER, transactionMessageListener.getSimpleName()) .build(); TransactionSendResult sendResult = rocketMQTemplate.sendMessageInTransaction(destination, sendMessage, null); log.info("[{}]事務消息[{}]發送結果[{}]", destination, JSONObject.toJSON(message),JSONObject.toJSON(sendResult)); return sendResult;}
@Slf4j@RocketMQTransactionListenerpublic class DefaultRocketMQTransactionListener implements RocketMQLocalTransactionListener { private final Map<String, TransactionMessageHandler> transactionMessageHandlerMap; public DefaultRocketMQTransactionListener(Map<String, TransactionMessageHandler> transactionMessageHandlerMap) { this.transactionMessageHandlerMap = transactionMessageHandlerMap; } @Override public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) { log.info("消費者收到事務消息[{}]", JSONObject.toJSON(message)); String listenerName = (String) message.getHeaders().get(MessageHeaderConstant.TRANSACTION_MESSAGE_HEADER); if (null == listenerName) { throw new RuntimeException("not params transactionMessageListener"); } RocketMQLocalTransactionState state; Object payload = message.getPayload(); try { TransactionMessageHandler messageHandler = transactionMessageHandlerMap.get(listenerName); if (null == messageHandler) { throw new RuntimeException("not match condition TransactionMessageHandler"); } state = messageHandler.executeLocalTransaction(payload, arg); } catch (Exception e) { log.error("rocket transaction message executeLocal error:{}", e.getMessage()); return RocketMQLocalTransactionState.ROLLBACK; } return state; } @Override public RocketMQLocalTransactionState checkLocalTransaction(Message message) { log.info("消費者收到事務回查消息[{}]", JsonUtils.obj2String(message.getHeaders())); String listenerName = (String) message.getHeaders().get(MessageHeaderConstant.TRANSACTION_MESSAGE_HEADER); if (null == listenerName) { throw new RuntimeException("not params transactionMessageListener"); } RocketMQLocalTransactionState state; try { TransactionMessageHandler messageHandler = transactionMessageHandlerMap.get(listenerName); if (null == messageHandler) { throw new RuntimeException("not match condition TransactionMessageHandler"); } state = messageHandler.checkLocalTransaction(message.getPayload()); } catch (Exception e) { log.error("rocket transaction message executeLocal error:{}", e.getMessage()); return RocketMQLocalTransactionState.ROLLBACK; } return state; } }
在上述代碼中,根據消息頭中的TRANSACTION_MESSAGE_HEADER參數選擇對應的事務處理器來處理事務消息。
在 DailyMart 中有一個公共組件 dailymart-rocketmq-spring-boot-starter 專門用于 RocketMQ 消息發送監聽的封裝,因此我們也將事務消息的處理邏輯封裝到了此組件中。
圖片
所有的事務消息處理邏輯都實現 TransactionMessageHandler 接口,以訂單支付的處理邏輯為例:
@Component@Slf4jpublic class OrderPaidTransactionConsumer implements TransactionMessageHandler { @Resource private TransactionTemplate transactionTemplate; @Override public RocketMQLocalTransactionState executeLocalTransaction(Object payload, Object arg) { final OrderPaidEvent orderPaidEvent = JsonUtils.byte2Obj((byte[]) payload, OrderPaidEvent.class); ... } @Override public RocketMQLocalTransactionState checkLocalTransaction(Object payload) { final OrderPaidEvent orderPaidEvent = JsonUtils.byte2Obj((byte[]) payload, OrderPaidEvent.class); ... } }
TransactionSendResult sendResult = enhanceTemplate.sendTransaction("TRADE-ORDER", "ORDER-PAID", orderPaidEvent, OrderPaidTransactionConsumer.class);
本文解決了在 RocketMQ 2.1.0 版本以后,無法簡單使用多個 @RocketMQTransactionListener 的問題。通過引入事務消息處理接口 TransactionMessageHandler,我們將原有的事務處理器改造成了一個分發器,使得在 DailyMart 項目中可以輕松處理多事務消息的場景。
本文鏈接:http://www.www897cc.com/showinfo-26-73332-0.html完美解決,RocketMQ如何支持多事務消息?
聲明:本網頁內容旨在傳播知識,若有侵權等問題請及時與本網聯系,我們將在第一時間刪除處理。郵件:2376512515@qq.com