日韩成人免费在线_国产成人一二_精品国产免费人成电影在线观..._日本一区二区三区久久久久久久久不

當前位置:首頁 > 科技  > 軟件

消息隊列批量收發消息,請避開這五個坑!

來源: 責編: 時間:2023-11-30 09:26:20 229觀看
導讀大家好,我是君哥。使用消息隊列時,為了提高生產和消費的性能,有時會開啟批量處理。在生產端,生產者發送的消息先發送到一個消息列表,積累到一定的消息量之后再批量發送給 Broker,如下圖:在消費端,消費者拉取消息后先不立即處

ltD28資訊網——每日最新資訊28at.com

大家好,我是君哥。ltD28資訊網——每日最新資訊28at.com

使用消息隊列時,為了提高生產和消費的性能,有時會開啟批量處理。ltD28資訊網——每日最新資訊28at.com

在生產端,生產者發送的消息先發送到一個消息列表,積累到一定的消息量之后再批量發送給 Broker,如下圖:ltD28資訊網——每日最新資訊28at.com

ltD28資訊網——每日最新資訊28at.com

在消費端,消費者拉取消息后先不立即處理,而是把消息轉存到一個內存隊列或數據庫,由業務線程去處理,如下圖:ltD28資訊網——每日最新資訊28at.com

ltD28資訊網——每日最新資訊28at.com

無論是生產者做批量發送,還是消費者做批量處理,都需要考慮使用批量消息的業務場景,避免踩坑。下面看一下批量操作可能會遇到哪些坑。ltD28資訊網——每日最新資訊28at.com

批量大小

當生產者采用批量發送的方式來提高發送性能時,一定要考慮發送消息的批量大小。下面是 RocketMQ 批量發送的官方示例:ltD28資訊網——每日最新資訊28at.com

String topic = "BatchTest";List<Message> messages = new ArrayList<>();messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));try {    producer.send(messages);} catch (Exception e) {    e.printStackTrace();    //handle the error}

RocketMQ 默認消息大小是 4M,由 maxMessageSize 參數控制,如果批量消息大小超過 maxMessageSize,則會拋出異常。ltD28資訊網——每日最新資訊28at.com

如果遇到消息大小超過 maxMessageSize 的情況時,可以用下面方法進行處理:ltD28資訊網——每日最新資訊28at.com

  • 把這個參數改大,但需要考慮 Broker 的性能和網絡帶寬;
  • 將消息進行拆分后分批發送;
  • 對消息進行壓縮處理。

RabbitMQ 相關的 API 則提供了更加靈活的批量控制,對消息數量和消息大小都做了控制,下面看一下源碼:ltD28資訊網——每日最新資訊28at.com

ltD28資訊網——每日最新資訊28at.com

ltD28資訊網——每日最新資訊28at.com

冪等

消費端可以批量拉取消息進行消費,這樣可以減少拉取消息時的 RPC 次數,提升消費性能。比如在 RocketMQ 中,可以通過 Consumer 中的 pullBatchSize 來設置一次拉取的消息數量,通過 consumeMessageBatchMaxSize 參數來設置一次消費的消息數量。ltD28資訊網——每日最新資訊28at.com

但需要注意的是,如果批量消息中一條消息消費失敗了,這一批消息都需要進行重試,已經消費成功的消息會被重復消費,帶來業務問題。ltD28資訊網——每日最新資訊28at.com

為了不對業務造成影響,必須考慮冪等。一個簡單的方法是在消息中增加全局唯一 id 屬性,對消息消費結果進行記錄,消費成功后保存 id。這樣在消費消息之前先查詢是否存在消費成功的記錄,如果存在則直接返回處理成功。ltD28資訊網——每日最新資訊28at.com

時延

在使用消息隊列進行批量操作時,必須要考慮到時延問題。比如我們設置一個批次 100 條消息,積累夠 100 條消息后再發送,在消息量小的情況下,可能積累夠 100 條消息會很長時間,導致消費端拉取到一條消息時延很大。ltD28資訊網——每日最新資訊28at.com

雖然消息隊列的一個重要作用是削峰填谷,但在一些場景下,對消息的實時性也有要求。比如在車聯網的充電場景,車聯網平臺需要實時感知充電樁的狀態,如果充電樁積累夠一批消息再上報平臺,平臺獲取到的狀態會不準確,如果心跳消息延時太久,平臺會認為充電樁離線。ltD28資訊網——每日最新資訊28at.com

對于有時延要求又需要批量操作的場景,可以設置一個超時時間,超時后即使消息數量不夠,也會發送出去。看下 RabbitMQ 的處理:ltD28資訊網——每日最新資訊28at.com

public synchronized void send(String exchange, String routingKey, Message message, CorrelationData correlationData)  throws AmqpException { if (correlationData != null) {  //...  super.send(exchange, routingKey, message, correlationData); } else {  if (this.scheduledTask != null) {   this.scheduledTask.cancel(false);  }  MessageBatch batch = this.batchingStrategy.addToBatch(exchange, routingKey, message);  if (batch != null) {   super.send(batch.getExchange(), batch.getRoutingKey(), batch.getMessage(), null);  }  //這里獲取到超時時間,到達超時時間后使用定時器將消息發送出去  Date next = this.batchingStrategy.nextRelease();  if (next != null) {   this.scheduledTask = this.scheduler.schedule((Runnable) () -> releaseBatches(), next);  } }}

可靠性

使用批處理一定要考慮可靠性的問題。ltD28資訊網——每日最新資訊28at.com

在消費端,消費者批量拉取一批消息后把消息暫存到一個內存臨時隊列,然后多線程去臨時隊列消費消息,如果服務宕機,臨時隊列中的消息會丟失。ltD28資訊網——每日最新資訊28at.com

為了避免宕機引發的損失,可以拉取一批消息后保存到數據庫,然后給 Broker 返回 ACK,之后業務代碼去數據庫查詢消息并消費,不過要考慮數據庫大事務、鎖競爭等問題。ltD28資訊網——每日最新資訊28at.com

當然,對于一些消息丟失不敏感的場景,比如日志收集之類的,可靠性這個指標是不用太關注的。ltD28資訊網——每日最新資訊28at.com

特殊場景

因為批量消息有一些復雜性,消息隊列的部分特性不支持。ltD28資訊網——每日最新資訊28at.com

事務消息

批量消息會增加消息重試的難度,所以對于事務消息,建議使用單條消息,一條消息對應一個事務。ltD28資訊網——每日最新資訊28at.com

順序消息

順序消息的實現思路一般是生產者將消息發送到同一個分區,消費者綁定這個分區并使用單線程消費這個分區的消息。如果對同一個 Topic 下的同一個分區來實現批量發送,難度會增大。所以建議順序消息使用單條消息進行發送。ltD28資訊網——每日最新資訊28at.com

延時消息

如果延時消息使用批量進行發送,這一批消息的延時時間必須相同,同時要考慮批量消息的超時時間,超時時間太大會影響延時時間的準確性,生產端實現復雜度大大增加。ltD28資訊網——每日最新資訊28at.com

總結

使用批量消息,在一定程度上可以提高性能和吞吐量,但是確實也會存在一些問題,使用的時候要結合業務場景避開這些坑。ltD28資訊網——每日最新資訊28at.com

本文鏈接:http://www.www897cc.com/showinfo-26-35258-0.html消息隊列批量收發消息,請避開這五個坑!

聲明:本網頁內容旨在傳播知識,若有侵權等問題請及時與本網聯系,我們將在第一時間刪除處理。郵件:2376512515@qq.com

上一篇: 利用屬性選擇器對外部鏈接進行樣式設計

下一篇: 全網最詳細的OpenFeign講解,肯定有你不知道的

標簽:
  • 熱門焦點
Top 主站蜘蛛池模板: 手机| 泰顺县| 湘潭县| 绥芬河市| 银川市| 麻阳| 哈尔滨市| 温泉县| 高州市| 宽甸| 凤阳县| 遵义县| 苏尼特左旗| 资阳市| 兰州市| 武胜县| 禹州市| 绿春县| 泸溪县| 新兴县| 永丰县| 绥阳县| 大安市| 林芝县| 同德县| 永修县| 扶沟县| 信阳市| 河西区| 旬邑县| 乐亭县| 延边| 西畴县| 黔江区| 长宁区| 泸溪县| 濉溪县| 泾阳县| 葵青区| 兴安盟| 鄄城县|