這篇文章,我們聊聊如何應對 RocketMQ 消息堆積。
圖片
消費者在消費的過程中,消費的速度跟不上服務端的發送速度,未處理的消息會越來越多,消息出現堆積進而會造成消息消費延遲。
雖然筆者經常講:RocketMQ 、Kafka 具備堆積的能力,但是以下場景需要重點關注消息堆積和延遲的問題:
圖片
客戶端使用 Push 模式 啟動后,消費消息時,分為以下兩個階段:
通過以上客戶端消費原理可以看出,消息堆積的主要瓶頸在于本地客戶端的消費能力,即消費耗時和消費并發度。
想要避免和解決消息堆積問題,必須合理的控制消費耗時和消息并發度,其中消費耗時的優先級高于消費并發度,必須先保證消費耗時的合理性,再考慮消費并發度問題。
影響消費耗時的消費邏輯主要分為 CPU 內存計算和外部 I/O 操作,通常情況下代碼中如果沒有復雜的遞歸和循環的話,內部計算耗時相對外部 I/O 操作來說幾乎可以忽略。
外部 I/O 操作通常包括如下業務邏輯:
這類外部調用的邏輯和系統容量需要提前梳理,掌握每個調用操作預期的耗時,這樣才能判斷消費邏輯中I/O操作的耗時是否合理。
通常消費堆積都是由于這些下游系統出現了服務異常、容量限制導致的消費耗時增加。
例如:某業務消費邏輯中需要調用下游 Dubbo 接口 ,單次消費耗時為 20 ms,平時消息量小未出現異常。業務側進行大促活動時,下游 Dubbo 服務未進行優化,消費單條消息的耗時增加到 200 ms,業務側可以明顯感受到消費速度大幅下跌。此時,通過提升消費并行度并不能解決問題,需要大幅提高下游 Dubbo 服務性能才行。
絕大部分消息消費行為都屬于 IO 密集型,即可能是操作數據庫,或者調用 RPC,這類消費行為的消費速度在于后端數據庫或者外系統的吞吐量,通過增加消費并行度,可以提高總的消費吞吐量,但是并行度增加到一定程度,反而會下降。
所以,應用必須要設置合理的并行度。如下有幾種修改消費并行度的方法:
當面對消息堆積問題時,我們需要明確到底哪個環節出現問題了,不要慌張,也不要貿然動手。
首先,我們需要查看消費耗時,確認消息的消費耗時是否合理。查看消費耗時一般來講有兩種方式:
1、打印日志
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { try { for (MessageExt messageExt : msgs) { long start = System.currentTimeMillis(); // TODO 業務邏輯 logger.info("MessageId:" + messageExt.getMsgId() + " costTime:" + (System.currentTimeMillis() - start)); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } catch (Exception e) { logger.error("consumeMessage error:", e); return ConsumeConcurrentlyStatus.RECONSUME_LATER; }}
2、查看消息軌跡
圖片
當確定好消費耗時后,可以根據耗時大小,采取不同的措施。
假如消費耗時非常高,需要查看 Consumer 實例 JVM 的堆棧 。
cat stack.log | grep ConsumeMessageThread -A 10 --color
常見的異常堆棧信息如下:
圖片
圖片
圖片
客戶端使用 Push模式 啟動后,消費消息時,分為以下兩個階段:拉取消息和消費消息。
客戶端消費原理可以看出,消息堆積的主要瓶頸在于本地客戶端的消費能力,即消費耗時和消費并發度。
首先分析消費耗時,然后根據耗時大小,采取不同的措施。
參考文檔:
阿里云官方文檔:
https://help.aliyun.com/zh/apsaramq-for-rocketmq/cloud-message-queue-rocketmq-4-x-series/use-cases/message-accumulation-and-latency#concept-2004064
本文鏈接:http://www.www897cc.com/showinfo-26-51256-0.html如何應對 RocketMQ 消息堆積
聲明:本網頁內容旨在傳播知識,若有侵權等問題請及時與本網聯系,我們將在第一時間刪除處理。郵件:2376512515@qq.com
上一篇: 一文搞懂 Java8 reduce操作