日韩成人免费在线_国产成人一二_精品国产免费人成电影在线观..._日本一区二区三区久久久久久久久不

當前位置:首頁 > 科技  > 軟件

Spring實現Kafka重試Topic,真的太香了

來源: 責編: 時間:2024-01-26 09:02:59 227觀看
導讀概述Kafka的強大功能之一是每個分區都有一個Consumer的偏移值。該偏移值是消費者將讀取的下一條消息的值。可以自動或手動增加該值。如果我們由于錯誤而無法處理消息并想重試,我們可以選擇手動管理,并在成功的情況下增

概述

Kafka的強大功能之一是每個分區都有一個Consumer的偏移值。該偏移值是消費者將讀取的下一條消息的值。可以自動或手動增加該值。如果我們由于錯誤而無法處理消息并想重試,我們可以選擇手動管理,并在成功的情況下增加偏移量。但是,這會暫時阻止隊列消息的處理。我們可以選擇異步方法。6Gx28資訊網——每日最新資訊28at.com

為什么我們需要它?

如果發生錯誤,而不是停止隊列消息的處理;我們可以將錯誤消息轉移到不同的主題并再次處理。6Gx28資訊網——每日最新資訊28at.com

如果在處理 Kafka 消息時出現錯誤,可以使用 RetryableTopic 注解以一定的時間間隔和一定的次數再次處理消息。如果完成嘗試次數后錯誤仍然存在,則消息將發送到 DLT 隊列。6Gx28資訊網——每日最新資訊28at.com

如何使用?

我們首先回顧一下RetryableTopic注解可以取的一些值,以便您可以做出最適合您的設置:6Gx28資訊網——每日最新資訊28at.com

attempts:嘗試處理消息的次數。它的默認值為 3。如果完成所有嘗試后仍然收到錯誤,則消息將發送到 DLT 隊列。6Gx28資訊網——每日最新資訊28at.com

backoff:用于確定處理消息的時間間隔。從 Backoff 類獲取一個值。您可以在下面找到退避的詳細示例。6Gx28資訊網——每日最新資訊28at.com

排除/排除名稱:允許您排除指定的異常類。當您添加到列表中的任何錯誤被拋出時,重試機制將不會被激活。6Gx28資訊網——每日最新資訊28at.com

include / includeNames:僅當拋出指定的異常時才會激活重試機制。6Gx28資訊網——每日最新資訊28at.com

kafkaTemplate:雖然您可以給出現有 kafkaTemplate bean 的名稱,但您也可以為特定于重試的 Kafka 模板定義不同的 bean。6Gx28資訊網——每日最新資訊28at.com

autoCreateTopics:決定是否自動創建Retry和DLT主題。6Gx28資訊網——每日最新資訊28at.com

retryTopicSuffix / dltTopicSuffix:用于確定要添加到自動創建的主題末尾的后綴。6Gx28資訊網——每日最新資訊28at.com

dltStrategy:如果不需要DLT,可以定義為NO_DLT。6Gx28資訊網——每日最新資訊28at.com

SameIntervalTopicReuseStrategy/fixedDelayTopicStrategy(3.0.4之前):用于確定要創建的重試主題策略。創建 (SINGLE_TOPIC) 或盡可能多的嘗試值 (MULTIPLE_TOPICS) 重試主題。6Gx28資訊網——每日最新資訊28at.com

Backoff的示例:

  • 具有固定的增量值
Backoff(delay = 600000 ) // 每 10 分鐘
  • 具有指數價值
Backoff(delay = 60000 , multiplier = 2 ) // 1、2、4、8... 分鐘后重復。
  • 用占位符定義值
Backoff(delayExpression = "${delay}", multiplierExpression = "${multiplier}")

@RetryableTopic 示例:

@RetryableTopic(     backoff = @Backoff(delay = 300000),     attempts = 12,     sameIntervalTopicReuseStrategy =          SameIntervalTopicReuseStrategy.SINGLE_TOPIC,     kafkaTemplate = "kafkaRetryableTopicTemplate",     exclude = { SerializationException.class,                  DeserializationException.class,                  NullPointerException.class                } ) @KafkaListener(topics = "my-topic") public void processMessage(RetryableDto retryableDto) {     log.info("Retrying process RetryableDto : {}", retryableDto);     // process message }

在上面的例子中,消息將每5分鐘重新處理一次,總共12次,即1小時。如果任何嘗試均順利完成,則試用將終止。6Gx28資訊網——每日最新資訊28at.com

由于定義了 SINGLE_TOPIC,因此將創建單個主題以進行重試。如果沒有進行此定義,則會創建 12 個重試主題。6Gx28資訊網——每日最新資訊28at.com

如果拋出了排除中定義的任何錯誤,則不會執行重做。6Gx28資訊網——每日最新資訊28at.com

如果需要,您可以編寫自己的 RetryableException 并在包含中定義此值,以便僅在引發此錯誤時才重試。6Gx28資訊網——每日最新資訊28at.com

DLT隊列處理

如果完成了定義的嘗試次數并且繼續收到錯誤,則消息將發送到 DLT 隊列。如果要處理這些消息,可以使用DltHandler注解。6Gx28資訊網——每日最新資訊28at.com

用法示例:6Gx28資訊網——每日最新資訊28at.com

@DltHandler  public  void  handleDltMessage (RetryableDto retryableDto) {      log.error("DLT處理程序消息:{}", retryableDto); }

注意事項

雖然使用 RetryableTopic 的異步處理優勢為我們帶來了性能提升,但這種使用也有一些缺點。6Gx28資訊網——每日最新資訊28at.com

使用RetryableTopic可能會破壞消息的處理順序。6Gx28資訊網——每日最新資訊28at.com

讓我們用一個例子來解釋這種情況:當主主題在時間 t 處理時,一條消息出錯并被發送到重試主題。在時間 t + 1 時,另一條消息來到主主題并成功處理。讓我們在重試主題中的消息在時間 t + 2 時被成功處理。在這種情況下,第一條傳入消息將在第二條消息之后處理。如果訂購對您很重要,我建議您在消息處理過程中進行必要的檢查。6Gx28資訊網——每日最新資訊28at.com

另一個缺點是消息雙重處理的風險。您可以通過考慮這種可能性來進行改進。6Gx28資訊網——每日最新資訊28at.com

本文鏈接:http://www.www897cc.com/showinfo-26-68327-0.htmlSpring實現Kafka重試Topic,真的太香了

聲明:本網頁內容旨在傳播知識,若有侵權等問題請及時與本網聯系,我們將在第一時間刪除處理。郵件:2376512515@qq.com

上一篇: 如何使用Python、Apache Kafka和云平臺構建健壯的實時數據管道

下一篇: 性能篇:解密Stream,提升集合遍歷效率的秘訣!

標簽:
  • 熱門焦點
Top 主站蜘蛛池模板: 南通市| 辉南县| 汉阴县| 景德镇市| 增城市| 高安市| 嘉黎县| 孙吴县| 焦作市| 邳州市| 盐山县| 元阳县| 桃园县| 镇康县| 林口县| 平阴县| 桃源县| 伊吾县| 墨脱县| 甘南县| 冀州市| 新巴尔虎左旗| 秀山| 丹江口市| 京山县| 正阳县| 监利县| 贡山| 巫山县| 体育| 朝阳县| 潞城市| 固安县| 马鞍山市| 兴安县| 塔城市| 封开县| 宜君县| 囊谦县| 上林县| 临江市|