日韩成人免费在线_国产成人一二_精品国产免费人成电影在线观..._日本一区二区三区久久久久久久久不

當(dāng)前位置:首頁 > 科技  > 軟件

基于Redis實現(xiàn)消息隊列的實踐

來源: 責(zé)編: 時間:2024-01-02 09:30:45 231觀看
導(dǎo)讀為什么要基于Redis實現(xiàn)消費(fèi)隊列?消息隊列是一種典型的發(fā)布/訂閱模式,是專門為異步化應(yīng)用和分布式系統(tǒng)設(shè)計的,具有高性能、穩(wěn)定性及可伸縮性的特點,是開發(fā)分布式系統(tǒng)和應(yīng)用系統(tǒng)必備的技術(shù)之一。目前,針對不同的業(yè)務(wù)場景,比較

為什么要基于Redis實現(xiàn)消費(fèi)隊列?

消息隊列是一種典型的發(fā)布/訂閱模式,是專門為異步化應(yīng)用和分布式系統(tǒng)設(shè)計的,具有高性能、穩(wěn)定性及可伸縮性的特點,是開發(fā)分布式系統(tǒng)和應(yīng)用系統(tǒng)必備的技術(shù)之一。目前,針對不同的業(yè)務(wù)場景,比較成熟可靠的消息中間件產(chǎn)品有RocketMQ、Kafka、RabbitMq等,基于Redis再去實現(xiàn)一個消息隊列少有提及,那么已經(jīng)有很成熟的產(chǎn)品可以選擇,還有必要再基于Redis自己來實現(xiàn)一個消息隊列嗎?基于Redis實現(xiàn)的消息隊列有什么特別的地方嗎?L3L28資訊網(wǎng)——每日最新資訊28at.com

先來回顧一個Redis有哪些特性:L3L28資訊網(wǎng)——每日最新資訊28at.com

  1. 速度快:Redis是基于內(nèi)存的key-value類型的數(shù)據(jù)庫,數(shù)據(jù)都存放在內(nèi)存中,使得讀寫速度非常快,能夠達(dá)到每秒數(shù)十萬次的讀寫操作。
  2. 鍵值對的數(shù)據(jù)結(jié)構(gòu):Redis中的數(shù)據(jù)以鍵值對的形式存儲,使得查詢和操作數(shù)據(jù)非常方便和高效。
  3. 功能豐富:Redis具有許多實用的功能,例如鍵過期、發(fā)布訂閱、Lua腳本、事務(wù)和管道等。這些功能使得Redis能夠廣泛應(yīng)用于各種場景,如緩存、消息系統(tǒng)等。
  4. 持久化:Redis提供了兩種持久化方案,即RDB(根據(jù)時間生成數(shù)據(jù)快照)和AOF(以追加方式記錄每次寫操作)。兩種方案可以互相配合,確保數(shù)據(jù)的安全性。
  5. 主從復(fù)制:Redis支持主從復(fù)制功能,可以輕松實現(xiàn)數(shù)據(jù)備份和擴(kuò)展。主節(jié)點會將其數(shù)據(jù)復(fù)制給從節(jié)點,從而實現(xiàn)數(shù)據(jù)的冗余和備份。
  6. 高可用和分布式:Redis從2.8版本開始提供了高可用實現(xiàn)哨兵模式,可以保證節(jié)點的故障發(fā)現(xiàn)和故障自動轉(zhuǎn)移。此外,Redis從3.0版本開始支持集群模式,可以輕松實現(xiàn)數(shù)據(jù)的分布式存儲和擴(kuò)展。

總結(jié)一下:redis的特點就是:快、簡單、穩(wěn)定;L3L28資訊網(wǎng)——每日最新資訊28at.com

以RocketMQ為代表,作為專業(yè)的消息中間件而言,有哪些特性呢:L3L28資訊網(wǎng)——每日最新資訊28at.com

  1. 高性能、高可靠:RocketMQ采用分布式架構(gòu),能夠高效地處理大量消息,同時也具有高可靠性的特性,能夠保證消息的不丟失和正確傳遞。
  2. 高實時:RocketMQ支持消息的實時傳遞,能夠滿足實時交易系統(tǒng)的需求,為系統(tǒng)提供及時、準(zhǔn)確的消息。
  3. 事務(wù)消息:RocketMQ支持事務(wù)消息,能夠在消息發(fā)送和接收過程中保持事務(wù)的一致性,確保消息的可靠性和系統(tǒng)的穩(wěn)定性。
  4. 順序消息:RocketMQ可以保證消息的有序性,無論是在一個生產(chǎn)者還是多個生產(chǎn)者之間,都能保證消息按照發(fā)送順序進(jìn)行消費(fèi)。
  5. 批量消息:RocketMQ支持批量消息,能夠一次性發(fā)送多條消息,提高消息發(fā)送效率。
  6. 定時消息:RocketMQ支持定時消息,能夠在指定的時間將消息發(fā)送到指定的Topic,滿足定時任務(wù)的需求。
  7. 消息回溯:RocketMQ支持消息回溯,能夠根據(jù)需要將消息重新發(fā)送到指定的Topic,便于調(diào)試和錯誤處理。
  8. 多種消息模式:RocketMQ支持發(fā)布/訂閱、點對點、群聊等多種消息模式,適用于不同的業(yè)務(wù)場景。
  9. 可擴(kuò)展性:RocketMQ采用分布式架構(gòu),能夠方便地擴(kuò)展消息處理能力,支持多個生產(chǎn)者和消費(fèi)者同時處理消息。
  10. 多語言支持:RocketMQ提供多種語言的客戶端庫,支持包括Java、Python、C++等在內(nèi)的多種編程語言。

總結(jié)一下:RocketMQ的特點就是除了性能非常高、系統(tǒng)本身的功能比較專業(yè)、完善,能適應(yīng)非常多的場景;L3L28資訊網(wǎng)——每日最新資訊28at.com

從上述分析可以看出,Redis隊列和MQ消息隊列各有優(yōu)勢,Redis的最大特點就是快,所以基于Redis的消息隊列相比MQ消息隊列而言,更適合實時處理,但是基于Redis的消息隊列更易受服務(wù)器內(nèi)存限制;而RocketMQ消息隊列作為專業(yè)的消息中間件產(chǎn)品,功能更完善,更適合應(yīng)用于比較復(fù)雜的業(yè)務(wù)場景,可以實現(xiàn)離線消息發(fā)送、消息可靠投遞以及消息的安全性,但MQ消息隊列的讀寫性能略低于Redis隊列。在技術(shù)選型時,除了上述的因素外,還有一個需要注意:大多數(shù)系統(tǒng)都會引入Redis作為基礎(chǔ)的緩存中間件使用,如果要選用RocketMQ的話,還需要額外再申請資源進(jìn)行部署。L3L28資訊網(wǎng)——每日最新資訊28at.com

很多時候,所謂的優(yōu)點和缺點,只是針對特定場景而言,如果場景不一樣了,優(yōu)點可能會變成缺點,缺點也可能會變成優(yōu)點。因此,除了專業(yè)的消息中間件外,基于Redis實現(xiàn)一個消息隊列也是有必要的,在某些特殊的業(yè)務(wù)場景,比如一些并發(fā)量不是很高的管理系統(tǒng),某些業(yè)務(wù)流程需要異步化處理,這時選擇基于Redis自己實現(xiàn)一個消息隊列,也是一個比較好的選擇。這也是本篇文章主要分享的內(nèi)容。L3L28資訊網(wǎng)——每日最新資訊28at.com

消息隊列的基礎(chǔ)知識:

什么是隊列?

隊列(Queue)是一種數(shù)據(jù)結(jié)構(gòu),遵循先進(jìn)先出(FIFO)的原則。在隊列中,元素被添加到末尾(入隊),并從開頭移除(出隊)。L3L28資訊網(wǎng)——每日最新資訊28at.com

圖片L3L28資訊網(wǎng)——每日最新資訊28at.com

Java中有哪些隊列?L3L28資訊網(wǎng)——每日最新資訊28at.com

  1. LinkedList:LinkedList實現(xiàn)了Deque接口,可以作為隊列(FIFO)或棧(LIFO)使用。它是一個雙向鏈表,所以插入和刪除操作具有很高的效率。
  2. ArrayDeque:ArrayDeque也是一個雙端隊列,具有高效的插入和刪除操作。與LinkedList相比,ArrayDeque通常在大多數(shù)操作中表現(xiàn)得更快,因為它在內(nèi)部使用動態(tài)數(shù)組。
  3. PriorityQueue:PriorityQueue是一個優(yōu)先隊列,它保證隊列頭部總是最小元素。你可以自定義元素的排序規(guī)則。
  4. ConcurrentLinkedQueue:ConcurrentLinkedQueue是一個線程安全的隊列,它使用無鎖算法進(jìn)行并發(fā)控制。它適用于高并發(fā)場景,但在低并發(fā)場景中可能比其他隊列慢。
  5. LinkedBlockingQueue:LinkedBlockingQueue是一個線程安全的阻塞隊列,它使用鏈表數(shù)據(jù)結(jié)構(gòu)來存儲數(shù)據(jù)。當(dāng)隊列為空時,獲取元素的操作將會被阻塞;當(dāng)隊列已滿時,插入元素的操作將會被阻塞。
  6. ArrayBlockingQueue:ArrayBlockingQueue是一個線程安全的阻塞隊列,它使用數(shù)組數(shù)據(jù)結(jié)構(gòu)來存儲數(shù)據(jù)。與LinkedBlockingQueue相比,ArrayBlockingQueue的容量是固定的。
  7. PriorityBlockingQueue:PriorityBlockingQueue是一個線程安全的優(yōu)先阻塞隊列。與PriorityQueue類似,它保證隊列頭部總是最小元素。
  8. SynchronousQueue:SynchronousQueue是一個線程安全的阻塞隊列,它只包含一個元素。當(dāng)隊列為空時,獲取元素的操作將會被阻塞;當(dāng)隊列已滿時,插入元素的操作將會被阻塞。
  9. DelayQueue:DelayQueue是一個無界阻塞隊列,用于放置實現(xiàn)了Delayed接口的對象,其中的對象只能在其到期時才能從隊列中取走。

LinkedBlockingQueue

以LinkedBlockingQueue為例,其使用方法是這樣的:L3L28資訊網(wǎng)——每日最新資訊28at.com

創(chuàng)建了一個生產(chǎn)者線程和一個消費(fèi)者線程,生產(chǎn)者線程和消費(fèi)者線程分別對同一個LinkedBlockingQueue對象進(jìn)行操作。生產(chǎn)者線程通過調(diào)用put()方法將元素添加到隊列中,而消費(fèi)者線程通過調(diào)用take()方法從隊列中取出元素。這兩個方法都會阻塞線程,直到隊列中有元素可供取出或有空間可供添加元素。L3L28資訊網(wǎng)——每日最新資訊28at.com

import java.util.concurrent.LinkedBlockingQueue;    public class LinkedBlockingQueueExample {      public static void main(String[] args) {          LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>();            // 生產(chǎn)者線程          new Thread(() -> {              for (int i = 0; i < 10; i++) {                  try {                      queue.put("Element " + i);                      System.out.println("Produced: Element " + i);                  } catch (InterruptedException e) {                      e.printStackTrace();                  }              }          }).start();          // 消費(fèi)者線程          new Thread(() -> {              for (int i = 0; i < 10; i++) {                  try {                      String element = queue.take();                      System.out.println("Consumed: " + element);                  } catch (InterruptedException e) {                      e.printStackTrace();                  }              }          }).start();      }  }


L3L28資訊網(wǎng)——每日最新資訊28at.com

基于Redis實現(xiàn)消息隊列的幾種方式

基于List數(shù)據(jù)類型

  • List 類型實現(xiàn)的方式最為簡單和直接,它主要是通過 lpush、rpop 存入和讀取實現(xiàn)消息隊列的,如下圖所示:

圖片圖片L3L28資訊網(wǎng)——每日最新資訊28at.com

  • lpush 可以把最新的消息存儲到消息隊列(List 集合)的首部,而 rpop 可以讀取消息隊列的尾部,這樣就實現(xiàn)了先進(jìn)先出;
  • 優(yōu)點:使用 List 實現(xiàn)消息隊列的優(yōu)點是消息可以被持久化,List 可以借助 Redis 本身的持久化功能,AOF 或者是 RDB 或混合持久化的方式,用于把數(shù)據(jù)保存至磁盤,這樣當(dāng) Redis 重啟之后,消息不會丟失。
  • 缺點:基于List類型實現(xiàn)的消息隊列不支持重復(fù)消費(fèi)、沒有按照主題訂閱的功能、不支持消費(fèi)消息確認(rèn)等功能,如果確實需要,需要自己實現(xiàn)。

基于Zset數(shù)據(jù)類型

  • 基于ZSet數(shù)據(jù)類型實現(xiàn)消息隊列,是利用 zadd 和 zrangebyscore 來實現(xiàn)存入和讀取消息的。
  • 優(yōu)點:和基于List數(shù)據(jù)類型差不多,同樣具備持久化的功能,不同的是消息數(shù)據(jù)存儲的結(jié)構(gòu)類型不一樣;
  • 缺點:List 存在的問題它也同樣存在,不支持重復(fù)消費(fèi),沒有主題訂閱功能,不支持消費(fèi)消息確認(rèn),并且使用 ZSet 還不能存儲相同元素的值。因為它是有序集合,有序集合的存儲元素值是不能重復(fù)的,但分值可以重復(fù),也就是說當(dāng)消息值重復(fù)時,只能存儲一條信息在 ZSet 中。

基于發(fā)布訂閱模式

  • 基于發(fā)布訂閱模式,是使用Pattern Subscribe 的功能實現(xiàn)主題訂閱的功能,也就是 。因此我們可以使用一個消費(fèi)者“queue_*”來訂閱所有以“queue_”開頭的消息隊列,如下圖所示:
  • 優(yōu)點:可以按照主題訂閱方式
  • 缺點:

無法持久化保存消息,如果 Redis 服務(wù)器宕機(jī)或重啟,那么所有的消息將會丟失;L3L28資訊網(wǎng)——每日最新資訊28at.com

發(fā)布訂閱模式是“發(fā)后既忘”的工作模式,如果有訂閱者離線重連之后就不能消費(fèi)之前的歷史消息;L3L28資訊網(wǎng)——每日最新資訊28at.com

不支持消費(fèi)者確認(rèn)機(jī)制,穩(wěn)定性不能得到保證,例如當(dāng)消費(fèi)者獲取到消息之后,還沒來得及執(zhí)行就宕機(jī)了。因為沒有消費(fèi)者確認(rèn)機(jī)制,Redis 就會誤以為消費(fèi)者已經(jīng)執(zhí)行了,因此就不會重復(fù)發(fā)送未被正常消費(fèi)的消息了,這樣整體的 Redis 穩(wěn)定性就被沒有辦法得到保障了。L3L28資訊網(wǎng)——每日最新資訊28at.com

基于Stream類型

基于Stream 類型實現(xiàn):使用 Stream 的 xadd 和 xrange 來實現(xiàn)消息的存入和讀取了,并且 Stream 提供了 xack 手動確認(rèn)消息消費(fèi)的命令,用它我們就可以實現(xiàn)消費(fèi)者確認(rèn)的功能了,使用命令如下:L3L28資訊網(wǎng)——每日最新資訊28at.com

127.0.0.1:6379> xack mq group1 1580959593553-0(integer) 1

消費(fèi)確認(rèn)增加了消息的可靠性,一般在業(yè)務(wù)處理完成之后,需要執(zhí)行 ack 確認(rèn)消息已經(jīng)被消費(fèi)完成,整個流程的執(zhí)行如下圖所示:L3L28資訊網(wǎng)——每日最新資訊28at.com

其中“Group”為群組,消費(fèi)者也就是接收者需要訂閱到群組才能正常獲取到消息。L3L28資訊網(wǎng)——每日最新資訊28at.com

以上就是基于Redis實現(xiàn)消息隊列的幾種方式的簡單對比介紹,下面主要是分享一下基于Redis的List數(shù)據(jù)類型實現(xiàn),其他幾種方式,有興趣的小伙可以自己嘗試一下。L3L28資訊網(wǎng)——每日最新資訊28at.com

基于Redis的List數(shù)據(jù)類型實現(xiàn)消費(fèi)隊列的工作原理是什么?L3L28資訊網(wǎng)——每日最新資訊28at.com

Redis基于List結(jié)構(gòu)實現(xiàn)隊列的原理主要依賴于List的push和pop操作。L3L28資訊網(wǎng)——每日最新資訊28at.com

在Redis中,你可以使用LPUSH命令將一個或多個元素推入列表的左邊,也就是列表頭部。同樣,你可以使用RPUSH命令將一個或多個元素推入列表的右邊,也就是列表尾部。L3L28資訊網(wǎng)——每日最新資訊28at.com

對于隊列來說,新元素總是從隊列的頭部進(jìn)入,而讀取操作總是從隊列的尾部開始。因此,當(dāng)你想將一個新元素加入隊列時,你可以使用LPUSH命令。當(dāng)你想從隊列中取出一個元素時,你可以使用RPOP命令。L3L28資訊網(wǎng)——每日最新資訊28at.com

此外,Redis還提供了BRPOP命令,這是一個阻塞的RPOP版本。如果給定列表內(nèi)沒有任何元素可供彈出的話,將阻塞連接直到等待超時或發(fā)現(xiàn)可彈出元素為止。L3L28資訊網(wǎng)——每日最新資訊28at.com

需要注意的是,雖然Redis能夠提供原子性的push和pop操作,但是在并發(fā)環(huán)境下使用隊列時,仍然需要考慮線程安全和并發(fā)控制的問題。你可能需要使用Lua腳本或者其他機(jī)制來確保并發(fā)操作的正確性。L3L28資訊網(wǎng)——每日最新資訊28at.com

總的來說,Redis通過提供List數(shù)據(jù)結(jié)構(gòu)以及一系列相關(guān)命令,可以很方便地實現(xiàn)隊列的功能。L3L28資訊網(wǎng)——每日最新資訊28at.com

下面是Redis關(guān)于List數(shù)據(jù)結(jié)構(gòu)操作的命令主要包括以下幾種:L3L28資訊網(wǎng)——每日最新資訊28at.com

  • LPUSH key value:將一個或多個值插入到列表的頭部。
  • RPUSH key value:將一個或多個值插入到列表的尾部。
  • LPOP key:移除并獲取列表的第一個元素。
  • RPOP key:移除并獲取列表的最后一個元素。
  • LRANGE key start stop:獲取指定索引范圍內(nèi)的元素。
  • LINDEX key index:獲取指定索引位置的元素。
  • LLEN key:獲取列表的長度。
  • LREM key count value:移除列表中指定數(shù)量的特定元素。
  • BRPOP key [key ...] timeout:移出并獲取列表的最后一個元素,如果列表沒有元素會阻塞直到等待超時或發(fā)現(xiàn)可彈出元素為止。

基于Redis的List數(shù)據(jù)類型實現(xiàn)延遲消息隊列實戰(zhàn)

需求描述

以一個實際需求為例,演示一個基于Redis的延遲隊列是怎么使用的?L3L28資訊網(wǎng)——每日最新資訊28at.com

有一個XX任務(wù)管理的功能,主要的業(yè)務(wù)過程:L3L28資訊網(wǎng)——每日最新資訊28at.com

1、創(chuàng)建任務(wù)后;L3L28資訊網(wǎng)——每日最新資訊28at.com

2、不斷檢查任務(wù)的狀態(tài),任務(wù)的狀態(tài)有三種:待執(zhí)行、執(zhí)行中、執(zhí)行完成;L3L28資訊網(wǎng)——每日最新資訊28at.com

3、如果任務(wù)狀態(tài)是執(zhí)行完成后,主動獲取任務(wù)執(zhí)行結(jié)果,對任務(wù)執(zhí)行結(jié)果進(jìn)行處理;如果任務(wù)狀態(tài)是待執(zhí)行、執(zhí)行中,則延遲5秒后,再次查詢?nèi)蝿?wù)執(zhí)行狀態(tài);L3L28資訊網(wǎng)——每日最新資訊28at.com

圖片圖片L3L28資訊網(wǎng)——每日最新資訊28at.com

實現(xiàn)方案

1、依賴引入L3L28資訊網(wǎng)——每日最新資訊28at.com

<dependency>    <groupId>org.springframework.boot</groupId>    <artifactId>spring-boot-starter-redis</artifactId>    <version>1.4.7.RELEASE</version></dependency><dependency>    <groupId>org.redisson</groupId>    <artifactId>redisson</artifactId>    <version>3.23.1</version></dependency>

2、定義三個延遲隊列BeforeQueue、RunningQueue、CompleteQueue,對隊列的任務(wù)進(jìn)行存取,BeforeQueue用于對待執(zhí)行狀態(tài)的任務(wù)的存取,Running用于對執(zhí)行中狀態(tài)的任務(wù)的存取,CompleteQueue用于對執(zhí)行完成狀態(tài)的任務(wù)的存取,在三個任務(wù)隊列中,取出元素是阻塞的,即如果隊列中沒有新的任務(wù),當(dāng)前線程會一直阻塞等待,直到有新的任務(wù)進(jìn)入;如果是隊列中還有元素,則遵循先進(jìn)先出的原則逐個取出進(jìn)行處理;L3L28資訊網(wǎng)——每日最新資訊28at.com

@Component@Slf4jpublic class BeforeQueue {    @Autowired    private RedissonClient redissonClient;    /**     * <p>取出元素</p>     * <p>如果隊列中沒有元素,就阻塞等待,直</p>     * @return     */    public Object take(){        RBlockingQueue<Object> queue1 = redissonClient.getBlockingQueue("queue1");        Object obj = null;        try {            obj = queue1.take();            log.info("從myqueue1取出元素:{}",obj.toString());        } catch (InterruptedException e) {            e.printStackTrace();        }        return obj;    }    /**     * <p>放入元素</p>     * @param obj     */    public void offer(Object obj){        RBlockingDeque<Object> queue1 = redissonClient.getBlockingDeque("queue1");        RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(queue1);        delayedQueue.offer(obj,5, TimeUnit.SECONDS);        log.info("向myqueue1設(shè)置元素:{}",obj.toString());    }}
@Component@Slf4jpublic class RunningQueue {    @Autowired    private RedissonClient redissonClient;    public Object take(){        RBlockingQueue<Object> queue1 = redissonClient.getBlockingQueue("queue2");        Object obj = null;        try {            obj = queue1.take();            log.info("從myqueue2取出元素:{}",obj.toString());        } catch (InterruptedException e) {            e.printStackTrace();        }        return obj;    }    public void offer(Object obj){        RBlockingDeque<Object> queue1 = redissonClient.getBlockingDeque("queue2");        RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(queue1);        delayedQueue.offer(obj,5, TimeUnit.SECONDS);        log.info("向myqueue2設(shè)置元素:{}",obj.toString());    }}
@Component@Slf4jpublic class CompleteQueue {    @Autowired    private RedissonClient redissonClient;    public Object take(){        RBlockingQueue<Object> queue1 = redissonClient.getBlockingQueue("queue3");        Object obj = null;        try {            obj = queue1.take();            log.info("從CompleteQueue取出元素:{}",obj.toString());        } catch (InterruptedException e) {            e.printStackTrace();        }        return obj;    }    public void offer(Object obj){        RBlockingQueue<Object> queue1 = redissonClient.getBlockingDeque("queue3");        RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(queue1);        delayedQueue.offer(obj,5, TimeUnit.SECONDS);        log.info("向CompleteQueue設(shè)置元素:{}",obj.toString());    }}

3、定義三個監(jiān)聽器BeforeQueueListener、RunningQueueListener、CompleteQueueListener,監(jiān)聽器的主要作用主要就是負(fù)責(zé)監(jiān)聽三個隊列中是否有新的任務(wù) 元素進(jìn)入,如果有,則立即取出消費(fèi);如果沒有,則阻塞等待新的元素進(jìn)入,具體的實現(xiàn)邏輯是:新創(chuàng)建的任務(wù)會先放置到BeforeQueue中,BeforeQueueListener監(jiān)聽到有新的任務(wù)進(jìn)入,會取出任務(wù)作一些業(yè)務(wù)處理,業(yè)務(wù)處理完一放入到RunningQueue中,RunningQueueListener監(jiān)聽到有新的任務(wù)進(jìn)入,會取出任務(wù)再進(jìn)行處理,這里的處理主要是查詢?nèi)蝿?wù)執(zhí)行狀態(tài),查詢狀態(tài)結(jié)果主要分兩種情況:1、執(zhí)行中、待執(zhí)行狀態(tài),則把任務(wù)重新放入RunningQueue隊列中,延遲5秒;2、執(zhí)行完成狀態(tài),則把任務(wù)放置到CompleteQueue中;CompleteQueueListener監(jiān)聽到有新的任務(wù)進(jìn)入后,會主動獲取任務(wù)執(zhí)行結(jié)果,作最后業(yè)務(wù)處理;L3L28資訊網(wǎng)——每日最新資訊28at.com

4、監(jiān)聽器在在處理隊列中的數(shù)據(jù)相關(guān)的業(yè)務(wù)時,如果發(fā)生異常,則需要把取出的元素再重新入入到當(dāng)前隊列中,等待下一輪的重試;L3L28資訊網(wǎng)——每日最新資訊28at.com

@Component@Slf4jpublic class BeforeQueueListener implements Listener{    @Autowired    private BeforeQueue beforeQueue;    @Autowired    private RunningQueue runningQueue;    @Override    public void start() {        new Thread(new Runnable() {            @Override            public void run() {                while (true){                    log.info("監(jiān)聽器進(jìn)入阻塞:BeforeQueueListener");                    Object obj = beforeQueue.take();                    if (ObjectUtil.isNotNull(obj)) {                        try {                            log.info("開始休眠1s模擬業(yè)務(wù)處理:BeforeQueueListener,元素:{}",obj.toString());                            Thread.currentThread().sleep(1000);                            log.info("業(yè)務(wù)處理完成:BeforeQueueListener,元素:{}",obj.toString());                            runningQueue.offer(obj);                        } catch (InterruptedException e) {                            log.error("業(yè)務(wù)處理發(fā)生異常,重置元素到BeforeQueue隊列中");                            log.error(e.getMessage());                            beforeQueue.offer(obj);                        }                    }                }            }        }).start();    }}
@Component@Slf4jpublic class RunningQueueListener implements Listener {    @Autowired    private RunningQueue runningQueue;    @Autowired    private CompleteQueue completeQueue;    @Override    public void start() {        new Thread(new Runnable() {            @Override            public void run() {                while (true) {                    log.info("監(jiān)聽器進(jìn)入阻塞:RunningQueueListener");                    Object obj = runningQueue.take();                    if (ObjectUtil.isNotNull(obj)) {                        try {                            log.info("開始休眠1s模擬業(yè)務(wù)處理:RunningQueueListener,元素:{}", obj.toString());                            Thread.currentThread().sleep(1000);                            Random random = new Random();                            int i = random.nextInt(2);                            if (i==0) {                                test();                            }                            log.info("業(yè)務(wù)處理完成:RunningQueueListener,元素:{}", obj.toString());                            completeQueue.offer(obj);                        } catch (Exception e) {                            log.error("業(yè)務(wù)處理發(fā)生異常,重置元素到RunningQueue隊列中");                            log.error(e.getMessage());                            runningQueue.offer(obj);                        }                    }                }            }        }).start();    }    public void test(){        try {            int i=1/0;        } catch (Exception e) {           throw  new RuntimeException("除數(shù)異常");        }    }}
@Component@Slf4jpublic class CompleteQueueListener implements Listener{    @Autowired    private CompleteQueue completeQueue;    @Override    public void start() {        new Thread(new Runnable() {            @Override            public void run() {                while (true){                    log.info("監(jiān)聽器進(jìn)入阻塞:CompleteQueueListener");                    Object obj = completeQueue.take();                    if (ObjectUtil.isNotNull(obj)) {                        try {                            log.info("開始休眠1s模擬業(yè)務(wù)處理:CompleteQueueListener,元素:{}",obj.toString());                            Thread.currentThread().sleep(1000);                            log.info("業(yè)務(wù)處理完成:listener3,元素:{}",obj.toString());                        } catch (InterruptedException e) {                            log.error("業(yè)務(wù)處理發(fā)生異常,重置元素到CompleteQueue隊列中");                            log.error(e.getMessage());                            completeQueue.offer(obj);                        }                       log.info("CompleteQueueListener任務(wù)結(jié)束,元素:{}",obj.toString());                    }                }            }        }).start();    }}

5、利用Springboot的擴(kuò)展點ApplicationRunner,在項目啟動完成后,分別啟動BeforeQueueListener、RunningQueueListener、CompleteQueueListener,讓三個監(jiān)聽器進(jìn)入阻塞監(jiān)聽狀態(tài)L3L28資訊網(wǎng)——每日最新資訊28at.com

@Componentpublic class MyRunner implements ApplicationRunner {    @Autowired    private ApplicationContext applicationContext;    @Override    public void run(ApplicationArguments args) throws Exception {        Map<String, Listener> beansOfType = applicationContext.getBeansOfType(Listener.class);        for (String s : beansOfType.keySet()) {            Listener listener = beansOfType.get(s);            listener.start();        }    }}

結(jié)果驗證L3L28資訊網(wǎng)——每日最新資訊28at.com

圖片圖片L3L28資訊網(wǎng)——每日最新資訊28at.com

一個比較有意思的問題

日志丟失的問題

三個任務(wù)隊列分別有三個線程來進(jìn)行阻塞監(jiān)聽,即如果任務(wù)隊列中有任務(wù)元素,則取出進(jìn)行處理;如果沒有,則阻塞等待,主線程只負(fù)責(zé)把任務(wù)設(shè)置到任務(wù)隊列中,出現(xiàn)的問題是:控制臺的日志輸出顯示任務(wù)元素已經(jīng)放置到第一個BeforeQueue中,按照預(yù)期的結(jié)果應(yīng)該是,控制臺的日志輸出會顯示,從BeforeQueue取出元素進(jìn)行業(yè)務(wù)處理、以及業(yè)務(wù)處理的日志,然后放置到RunningQueue中,再從RunningQueue中取出進(jìn)行業(yè)務(wù)處理,接著放置到CompleteQueue隊列中,最后從CompleteQueue中取出進(jìn)行業(yè)務(wù)處理,最后結(jié)束;實際情況是:總是缺少從BeforeQueue取出元素進(jìn)行業(yè)務(wù)處理、以及業(yè)務(wù)處理的日志,其他的日志輸出都很正常、執(zhí)行結(jié)果也正常;L3L28資訊網(wǎng)——每日最新資訊28at.com

問題原因

經(jīng)過排查分析,最后找到了原因:L3L28資訊網(wǎng)——每日最新資訊28at.com

是logback線程安全問題, Logback 的大部分組件都是線程安全的,但某些特定的配置可能會導(dǎo)致線程安全問題。例如,如果你在同一個 Appender 中處理多個線程的日志事件,那么可能會出現(xiàn)線程安全問題,導(dǎo)致某些日志事件丟失。L3L28資訊網(wǎng)——每日最新資訊28at.com

解決方法

問題原因找到了,其實解決方法也就找到,具體就是logback的異步日志,logback.xml配置如下:L3L28資訊網(wǎng)——每日最新資訊28at.com

<?xml versinotallow="1.0" encoding="UTF-8"?><configuration scan="true" scanPeriod="60 seconds" debug="false">    <!-- 日志存放路徑 -->    <property name="log.path" value="logs/"/>    <!-- 日志輸出格式 -->    <property name="console.log.pattern"              value="%d{yyyy-MM-dd HH:mm:ss.SSS} %highlight(%-5level) %magenta(${PID:-}) - %green([%-21thread]) %cyan(%-35logger{30}) %msg%n"/>    <!-- 控制臺輸出 -->    <appender name="console" class="ch.qos.logback.core.ConsoleAppender">        <encoder>            <pattern>${console.log.pattern}</pattern>            <charset>utf-8</charset>        </encoder>    </appender>    <appender name="ASYNC" class="ch.qos.logback.classic.AsyncAppender">        <queueSize>500</queueSize>        <discardingThreshold>0</discardingThreshold>        <neverBlock>true</neverBlock>        <appender-ref ref="console" />    </appender>    <!--系統(tǒng)操作日志-->    <root level="info">        <appender-ref ref="ASYNC" />    </root></configuration>

文章中展示了關(guān)鍵性代碼,示例全部代碼地址:https://gitcode.net/fox9916/redisson-demo.gitL3L28資訊網(wǎng)——每日最新資訊28at.com

本文鏈接:http://www.www897cc.com/showinfo-26-55111-0.html基于Redis實現(xiàn)消息隊列的實踐

聲明:本網(wǎng)頁內(nèi)容旨在傳播知識,若有侵權(quán)等問題請及時與本網(wǎng)聯(lián)系,我們將在第一時間刪除處理。郵件:2376512515@qq.com

上一篇: “推薦大戰(zhàn):抖音vs.快手”——背后的秘密全揭曉!

下一篇: 五分鐘學(xué)會JSON格式的全部知識,你學(xué)會了嗎?

標(biāo)簽:
  • 熱門焦點
  • 0糖0卡0脂 旭日森林仙草烏龍茶優(yōu)惠:15瓶到手29元

    旭日森林無糖仙草烏龍茶510ml*15瓶平時要賣為79.9元,今日下單領(lǐng)取50元優(yōu)惠券,到手價為29.9元。產(chǎn)品規(guī)格:0糖0卡0脂,添加草本仙草汁,清涼爽口,富含茶多酚,保留
  • 一篇聊聊Go錯誤封裝機(jī)制

    %w 是用于錯誤包裝(Error Wrapping)的格式化動詞。它是用于 fmt.Errorf 和 fmt.Sprintf 函數(shù)中的一個特殊格式化動詞,用于將一個錯誤(或其他可打印的值)包裝在一個新的錯誤中。使
  • 微信語音大揭秘:為什么禁止轉(zhuǎn)發(fā)?

    大家好,我是你們的小米。今天,我要和大家聊一個有趣的話題:為什么微信語音不可以轉(zhuǎn)發(fā)?這是一個我們經(jīng)常在日常使用中遇到的問題,也是一個讓很多人好奇的問題。讓我們一起來揭開這
  • JavaScript學(xué)習(xí) -AES加密算法

    引言在當(dāng)今數(shù)字化時代,前端應(yīng)用程序扮演著重要角色,用戶的敏感數(shù)據(jù)經(jīng)常在前端進(jìn)行加密和解密操作。然而,這樣的操作在網(wǎng)絡(luò)傳輸和存儲中可能會受到惡意攻擊的威脅。為了確保數(shù)據(jù)
  • 中國家電海外掘金正當(dāng)時|出海專題

    作者|吳南南編輯|胡展嘉運(yùn)營|陳佳慧出品|零態(tài)LT(ID:LingTai_LT)2023年,出海市場戰(zhàn)況空前,中國創(chuàng)業(yè)者在海外紛紛摩拳擦掌,以期能夠把中國的商業(yè)模式、創(chuàng)業(yè)理念、戰(zhàn)略打法輸出海外,他們依
  • 年輕人的“職場羞恥感”,無處不在

    作者:馮曉亭 陶 淘 李 欣 張 琳 馬舒葉來源:燃次元&ldquo;人在職場,應(yīng)該選擇什么樣的著裝?&rdquo;近日,在網(wǎng)絡(luò)上,一個與著裝相關(guān)的帖子引發(fā)關(guān)注,在該帖子里,一位在高級寫字樓亞洲金
  • 消息稱小米汽車開始篩選交付中心:需至少120個車位

    IT之家 7 月 7 日消息,日前,有微博簡介為“汽車行業(yè)從業(yè)者、長三角一體化擁護(hù)者”的微博用戶 @長三角行健者 發(fā)文表示,據(jù)經(jīng)銷商集團(tuán)反饋,小米汽車目前
  • 英特爾Xe HPG游戲顯卡:擁有512EU,單風(fēng)扇版本

    據(jù)10 月 30 日外媒 TheVerge 消息報道,英特爾 Xe HPG Arc Alchemist 的正面實被曝光,不僅擁有 512 EU 版顯卡,還擁有 128EU 的單風(fēng)扇版本。另外,這款顯卡 PCB
  • 三翼鳥智能家居亮相電博會,讓用戶體驗更真實

    2021電博會在青島國際會展中心開幕中,三翼鳥直接把“家”搬到了現(xiàn)場,成為了展會的一大看點。這也是三翼鳥繼9月9日發(fā)布了行業(yè)首個一站式定制智慧家平臺后的
Top 主站蜘蛛池模板: 秦安县| 万山特区| 岱山县| 金湖县| 扬州市| 伊春市| 静海县| 香港| 乃东县| 洛宁县| 松溪县| 南涧| 元谋县| 安溪县| 正宁县| 手机| 池州市| 淮阳县| 民权县| 镇远县| 滕州市| 焦作市| 湖南省| 南郑县| 共和县| 措美县| 玉溪市| 桂林市| 桓台县| 西城区| 康马县| 苏州市| 辽阳市| 苗栗市| 自贡市| 盐源县| 徐州市| 仁怀市| 德令哈市| 镶黄旗| 自贡市|