日韩成人免费在线_国产成人一二_精品国产免费人成电影在线观..._日本一区二区三区久久久久久久久不

當前位置:首頁 > 科技  > 軟件

基于Redis實現(xiàn)消息隊列的實踐

來源: 責編: 時間:2024-01-02 09:30:45 229觀看
導讀為什么要基于Redis實現(xiàn)消費隊列?消息隊列是一種典型的發(fā)布/訂閱模式,是專門為異步化應用和分布式系統(tǒng)設計的,具有高性能、穩(wěn)定性及可伸縮性的特點,是開發(fā)分布式系統(tǒng)和應用系統(tǒng)必備的技術之一。目前,針對不同的業(yè)務場景,比較

為什么要基于Redis實現(xiàn)消費隊列?

消息隊列是一種典型的發(fā)布/訂閱模式,是專門為異步化應用和分布式系統(tǒng)設計的,具有高性能、穩(wěn)定性及可伸縮性的特點,是開發(fā)分布式系統(tǒng)和應用系統(tǒng)必備的技術之一。目前,針對不同的業(yè)務場景,比較成熟可靠的消息中間件產品有RocketMQ、Kafka、RabbitMq等,基于Redis再去實現(xiàn)一個消息隊列少有提及,那么已經有很成熟的產品可以選擇,還有必要再基于Redis自己來實現(xiàn)一個消息隊列嗎?基于Redis實現(xiàn)的消息隊列有什么特別的地方嗎?lQC28資訊網(wǎng)——每日最新資訊28at.com

先來回顧一個Redis有哪些特性:lQC28資訊網(wǎng)——每日最新資訊28at.com

  1. 速度快:Redis是基于內存的key-value類型的數(shù)據(jù)庫,數(shù)據(jù)都存放在內存中,使得讀寫速度非常快,能夠達到每秒數(shù)十萬次的讀寫操作。
  2. 鍵值對的數(shù)據(jù)結構:Redis中的數(shù)據(jù)以鍵值對的形式存儲,使得查詢和操作數(shù)據(jù)非常方便和高效。
  3. 功能豐富:Redis具有許多實用的功能,例如鍵過期、發(fā)布訂閱、Lua腳本、事務和管道等。這些功能使得Redis能夠廣泛應用于各種場景,如緩存、消息系統(tǒng)等。
  4. 持久化:Redis提供了兩種持久化方案,即RDB(根據(jù)時間生成數(shù)據(jù)快照)和AOF(以追加方式記錄每次寫操作)。兩種方案可以互相配合,確保數(shù)據(jù)的安全性。
  5. 主從復制:Redis支持主從復制功能,可以輕松實現(xiàn)數(shù)據(jù)備份和擴展。主節(jié)點會將其數(shù)據(jù)復制給從節(jié)點,從而實現(xiàn)數(shù)據(jù)的冗余和備份。
  6. 高可用和分布式:Redis從2.8版本開始提供了高可用實現(xiàn)哨兵模式,可以保證節(jié)點的故障發(fā)現(xiàn)和故障自動轉移。此外,Redis從3.0版本開始支持集群模式,可以輕松實現(xiàn)數(shù)據(jù)的分布式存儲和擴展。

總結一下:redis的特點就是:快、簡單、穩(wěn)定;lQC28資訊網(wǎng)——每日最新資訊28at.com

以RocketMQ為代表,作為專業(yè)的消息中間件而言,有哪些特性呢:lQC28資訊網(wǎng)——每日最新資訊28at.com

  1. 高性能、高可靠:RocketMQ采用分布式架構,能夠高效地處理大量消息,同時也具有高可靠性的特性,能夠保證消息的不丟失和正確傳遞。
  2. 高實時:RocketMQ支持消息的實時傳遞,能夠滿足實時交易系統(tǒng)的需求,為系統(tǒng)提供及時、準確的消息。
  3. 事務消息:RocketMQ支持事務消息,能夠在消息發(fā)送和接收過程中保持事務的一致性,確保消息的可靠性和系統(tǒng)的穩(wěn)定性。
  4. 順序消息:RocketMQ可以保證消息的有序性,無論是在一個生產者還是多個生產者之間,都能保證消息按照發(fā)送順序進行消費。
  5. 批量消息:RocketMQ支持批量消息,能夠一次性發(fā)送多條消息,提高消息發(fā)送效率。
  6. 定時消息:RocketMQ支持定時消息,能夠在指定的時間將消息發(fā)送到指定的Topic,滿足定時任務的需求。
  7. 消息回溯:RocketMQ支持消息回溯,能夠根據(jù)需要將消息重新發(fā)送到指定的Topic,便于調試和錯誤處理。
  8. 多種消息模式:RocketMQ支持發(fā)布/訂閱、點對點、群聊等多種消息模式,適用于不同的業(yè)務場景。
  9. 可擴展性:RocketMQ采用分布式架構,能夠方便地擴展消息處理能力,支持多個生產者和消費者同時處理消息。
  10. 多語言支持:RocketMQ提供多種語言的客戶端庫,支持包括Java、Python、C++等在內的多種編程語言。

總結一下:RocketMQ的特點就是除了性能非常高、系統(tǒng)本身的功能比較專業(yè)、完善,能適應非常多的場景;lQC28資訊網(wǎng)——每日最新資訊28at.com

從上述分析可以看出,Redis隊列和MQ消息隊列各有優(yōu)勢,Redis的最大特點就是快,所以基于Redis的消息隊列相比MQ消息隊列而言,更適合實時處理,但是基于Redis的消息隊列更易受服務器內存限制;而RocketMQ消息隊列作為專業(yè)的消息中間件產品,功能更完善,更適合應用于比較復雜的業(yè)務場景,可以實現(xiàn)離線消息發(fā)送、消息可靠投遞以及消息的安全性,但MQ消息隊列的讀寫性能略低于Redis隊列。在技術選型時,除了上述的因素外,還有一個需要注意:大多數(shù)系統(tǒng)都會引入Redis作為基礎的緩存中間件使用,如果要選用RocketMQ的話,還需要額外再申請資源進行部署。lQC28資訊網(wǎng)——每日最新資訊28at.com

很多時候,所謂的優(yōu)點和缺點,只是針對特定場景而言,如果場景不一樣了,優(yōu)點可能會變成缺點,缺點也可能會變成優(yōu)點。因此,除了專業(yè)的消息中間件外,基于Redis實現(xiàn)一個消息隊列也是有必要的,在某些特殊的業(yè)務場景,比如一些并發(fā)量不是很高的管理系統(tǒng),某些業(yè)務流程需要異步化處理,這時選擇基于Redis自己實現(xiàn)一個消息隊列,也是一個比較好的選擇。這也是本篇文章主要分享的內容。lQC28資訊網(wǎng)——每日最新資訊28at.com

消息隊列的基礎知識:

什么是隊列?

隊列(Queue)是一種數(shù)據(jù)結構,遵循先進先出(FIFO)的原則。在隊列中,元素被添加到末尾(入隊),并從開頭移除(出隊)。lQC28資訊網(wǎng)——每日最新資訊28at.com

圖片lQC28資訊網(wǎng)——每日最新資訊28at.com

Java中有哪些隊列?lQC28資訊網(wǎng)——每日最新資訊28at.com

  1. LinkedList:LinkedList實現(xiàn)了Deque接口,可以作為隊列(FIFO)或棧(LIFO)使用。它是一個雙向鏈表,所以插入和刪除操作具有很高的效率。
  2. ArrayDeque:ArrayDeque也是一個雙端隊列,具有高效的插入和刪除操作。與LinkedList相比,ArrayDeque通常在大多數(shù)操作中表現(xiàn)得更快,因為它在內部使用動態(tài)數(shù)組。
  3. PriorityQueue:PriorityQueue是一個優(yōu)先隊列,它保證隊列頭部總是最小元素。你可以自定義元素的排序規(guī)則。
  4. ConcurrentLinkedQueue:ConcurrentLinkedQueue是一個線程安全的隊列,它使用無鎖算法進行并發(fā)控制。它適用于高并發(fā)場景,但在低并發(fā)場景中可能比其他隊列慢。
  5. LinkedBlockingQueue:LinkedBlockingQueue是一個線程安全的阻塞隊列,它使用鏈表數(shù)據(jù)結構來存儲數(shù)據(jù)。當隊列為空時,獲取元素的操作將會被阻塞;當隊列已滿時,插入元素的操作將會被阻塞。
  6. ArrayBlockingQueue:ArrayBlockingQueue是一個線程安全的阻塞隊列,它使用數(shù)組數(shù)據(jù)結構來存儲數(shù)據(jù)。與LinkedBlockingQueue相比,ArrayBlockingQueue的容量是固定的。
  7. PriorityBlockingQueue:PriorityBlockingQueue是一個線程安全的優(yōu)先阻塞隊列。與PriorityQueue類似,它保證隊列頭部總是最小元素。
  8. SynchronousQueue:SynchronousQueue是一個線程安全的阻塞隊列,它只包含一個元素。當隊列為空時,獲取元素的操作將會被阻塞;當隊列已滿時,插入元素的操作將會被阻塞。
  9. DelayQueue:DelayQueue是一個無界阻塞隊列,用于放置實現(xiàn)了Delayed接口的對象,其中的對象只能在其到期時才能從隊列中取走。

LinkedBlockingQueue

以LinkedBlockingQueue為例,其使用方法是這樣的:lQC28資訊網(wǎng)——每日最新資訊28at.com

創(chuàng)建了一個生產者線程和一個消費者線程,生產者線程和消費者線程分別對同一個LinkedBlockingQueue對象進行操作。生產者線程通過調用put()方法將元素添加到隊列中,而消費者線程通過調用take()方法從隊列中取出元素。這兩個方法都會阻塞線程,直到隊列中有元素可供取出或有空間可供添加元素。lQC28資訊網(wǎng)——每日最新資訊28at.com

import java.util.concurrent.LinkedBlockingQueue;    public class LinkedBlockingQueueExample {      public static void main(String[] args) {          LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>();            // 生產者線程          new Thread(() -> {              for (int i = 0; i < 10; i++) {                  try {                      queue.put("Element " + i);                      System.out.println("Produced: Element " + i);                  } catch (InterruptedException e) {                      e.printStackTrace();                  }              }          }).start();          // 消費者線程          new Thread(() -> {              for (int i = 0; i < 10; i++) {                  try {                      String element = queue.take();                      System.out.println("Consumed: " + element);                  } catch (InterruptedException e) {                      e.printStackTrace();                  }              }          }).start();      }  }


lQC28資訊網(wǎng)——每日最新資訊28at.com

基于Redis實現(xiàn)消息隊列的幾種方式

基于List數(shù)據(jù)類型

  • List 類型實現(xiàn)的方式最為簡單和直接,它主要是通過 lpush、rpop 存入和讀取實現(xiàn)消息隊列的,如下圖所示:

圖片圖片lQC28資訊網(wǎng)——每日最新資訊28at.com

  • lpush 可以把最新的消息存儲到消息隊列(List 集合)的首部,而 rpop 可以讀取消息隊列的尾部,這樣就實現(xiàn)了先進先出;
  • 優(yōu)點:使用 List 實現(xiàn)消息隊列的優(yōu)點是消息可以被持久化,List 可以借助 Redis 本身的持久化功能,AOF 或者是 RDB 或混合持久化的方式,用于把數(shù)據(jù)保存至磁盤,這樣當 Redis 重啟之后,消息不會丟失。
  • 缺點:基于List類型實現(xiàn)的消息隊列不支持重復消費、沒有按照主題訂閱的功能、不支持消費消息確認等功能,如果確實需要,需要自己實現(xiàn)。

基于Zset數(shù)據(jù)類型

  • 基于ZSet數(shù)據(jù)類型實現(xiàn)消息隊列,是利用 zadd 和 zrangebyscore 來實現(xiàn)存入和讀取消息的。
  • 優(yōu)點:和基于List數(shù)據(jù)類型差不多,同樣具備持久化的功能,不同的是消息數(shù)據(jù)存儲的結構類型不一樣;
  • 缺點:List 存在的問題它也同樣存在,不支持重復消費,沒有主題訂閱功能,不支持消費消息確認,并且使用 ZSet 還不能存儲相同元素的值。因為它是有序集合,有序集合的存儲元素值是不能重復的,但分值可以重復,也就是說當消息值重復時,只能存儲一條信息在 ZSet 中。

基于發(fā)布訂閱模式

  • 基于發(fā)布訂閱模式,是使用Pattern Subscribe 的功能實現(xiàn)主題訂閱的功能,也就是 。因此我們可以使用一個消費者“queue_*”來訂閱所有以“queue_”開頭的消息隊列,如下圖所示:
  • 優(yōu)點:可以按照主題訂閱方式
  • 缺點:

無法持久化保存消息,如果 Redis 服務器宕機或重啟,那么所有的消息將會丟失;lQC28資訊網(wǎng)——每日最新資訊28at.com

發(fā)布訂閱模式是“發(fā)后既忘”的工作模式,如果有訂閱者離線重連之后就不能消費之前的歷史消息;lQC28資訊網(wǎng)——每日最新資訊28at.com

不支持消費者確認機制,穩(wěn)定性不能得到保證,例如當消費者獲取到消息之后,還沒來得及執(zhí)行就宕機了。因為沒有消費者確認機制,Redis 就會誤以為消費者已經執(zhí)行了,因此就不會重復發(fā)送未被正常消費的消息了,這樣整體的 Redis 穩(wěn)定性就被沒有辦法得到保障了。lQC28資訊網(wǎng)——每日最新資訊28at.com

基于Stream類型

基于Stream 類型實現(xiàn):使用 Stream 的 xadd 和 xrange 來實現(xiàn)消息的存入和讀取了,并且 Stream 提供了 xack 手動確認消息消費的命令,用它我們就可以實現(xiàn)消費者確認的功能了,使用命令如下:lQC28資訊網(wǎng)——每日最新資訊28at.com

127.0.0.1:6379> xack mq group1 1580959593553-0(integer) 1

消費確認增加了消息的可靠性,一般在業(yè)務處理完成之后,需要執(zhí)行 ack 確認消息已經被消費完成,整個流程的執(zhí)行如下圖所示:lQC28資訊網(wǎng)——每日最新資訊28at.com

其中“Group”為群組,消費者也就是接收者需要訂閱到群組才能正常獲取到消息。lQC28資訊網(wǎng)——每日最新資訊28at.com

以上就是基于Redis實現(xiàn)消息隊列的幾種方式的簡單對比介紹,下面主要是分享一下基于Redis的List數(shù)據(jù)類型實現(xiàn),其他幾種方式,有興趣的小伙可以自己嘗試一下。lQC28資訊網(wǎng)——每日最新資訊28at.com

基于Redis的List數(shù)據(jù)類型實現(xiàn)消費隊列的工作原理是什么?lQC28資訊網(wǎng)——每日最新資訊28at.com

Redis基于List結構實現(xiàn)隊列的原理主要依賴于List的push和pop操作。lQC28資訊網(wǎng)——每日最新資訊28at.com

在Redis中,你可以使用LPUSH命令將一個或多個元素推入列表的左邊,也就是列表頭部。同樣,你可以使用RPUSH命令將一個或多個元素推入列表的右邊,也就是列表尾部。lQC28資訊網(wǎng)——每日最新資訊28at.com

對于隊列來說,新元素總是從隊列的頭部進入,而讀取操作總是從隊列的尾部開始。因此,當你想將一個新元素加入隊列時,你可以使用LPUSH命令。當你想從隊列中取出一個元素時,你可以使用RPOP命令。lQC28資訊網(wǎng)——每日最新資訊28at.com

此外,Redis還提供了BRPOP命令,這是一個阻塞的RPOP版本。如果給定列表內沒有任何元素可供彈出的話,將阻塞連接直到等待超時或發(fā)現(xiàn)可彈出元素為止。lQC28資訊網(wǎng)——每日最新資訊28at.com

需要注意的是,雖然Redis能夠提供原子性的push和pop操作,但是在并發(fā)環(huán)境下使用隊列時,仍然需要考慮線程安全和并發(fā)控制的問題。你可能需要使用Lua腳本或者其他機制來確保并發(fā)操作的正確性。lQC28資訊網(wǎng)——每日最新資訊28at.com

總的來說,Redis通過提供List數(shù)據(jù)結構以及一系列相關命令,可以很方便地實現(xiàn)隊列的功能。lQC28資訊網(wǎng)——每日最新資訊28at.com

下面是Redis關于List數(shù)據(jù)結構操作的命令主要包括以下幾種:lQC28資訊網(wǎng)——每日最新資訊28at.com

  • LPUSH key value:將一個或多個值插入到列表的頭部。
  • RPUSH key value:將一個或多個值插入到列表的尾部。
  • LPOP key:移除并獲取列表的第一個元素。
  • RPOP key:移除并獲取列表的最后一個元素。
  • LRANGE key start stop:獲取指定索引范圍內的元素。
  • LINDEX key index:獲取指定索引位置的元素。
  • LLEN key:獲取列表的長度。
  • LREM key count value:移除列表中指定數(shù)量的特定元素。
  • BRPOP key [key ...] timeout:移出并獲取列表的最后一個元素,如果列表沒有元素會阻塞直到等待超時或發(fā)現(xiàn)可彈出元素為止。

基于Redis的List數(shù)據(jù)類型實現(xiàn)延遲消息隊列實戰(zhàn)

需求描述

以一個實際需求為例,演示一個基于Redis的延遲隊列是怎么使用的?lQC28資訊網(wǎng)——每日最新資訊28at.com

有一個XX任務管理的功能,主要的業(yè)務過程:lQC28資訊網(wǎng)——每日最新資訊28at.com

1、創(chuàng)建任務后;lQC28資訊網(wǎng)——每日最新資訊28at.com

2、不斷檢查任務的狀態(tài),任務的狀態(tài)有三種:待執(zhí)行、執(zhí)行中、執(zhí)行完成;lQC28資訊網(wǎng)——每日最新資訊28at.com

3、如果任務狀態(tài)是執(zhí)行完成后,主動獲取任務執(zhí)行結果,對任務執(zhí)行結果進行處理;如果任務狀態(tài)是待執(zhí)行、執(zhí)行中,則延遲5秒后,再次查詢任務執(zhí)行狀態(tài);lQC28資訊網(wǎng)——每日最新資訊28at.com

圖片圖片lQC28資訊網(wǎng)——每日最新資訊28at.com

實現(xiàn)方案

1、依賴引入lQC28資訊網(wǎng)——每日最新資訊28at.com

<dependency>    <groupId>org.springframework.boot</groupId>    <artifactId>spring-boot-starter-redis</artifactId>    <version>1.4.7.RELEASE</version></dependency><dependency>    <groupId>org.redisson</groupId>    <artifactId>redisson</artifactId>    <version>3.23.1</version></dependency>

2、定義三個延遲隊列BeforeQueue、RunningQueue、CompleteQueue,對隊列的任務進行存取,BeforeQueue用于對待執(zhí)行狀態(tài)的任務的存取,Running用于對執(zhí)行中狀態(tài)的任務的存取,CompleteQueue用于對執(zhí)行完成狀態(tài)的任務的存取,在三個任務隊列中,取出元素是阻塞的,即如果隊列中沒有新的任務,當前線程會一直阻塞等待,直到有新的任務進入;如果是隊列中還有元素,則遵循先進先出的原則逐個取出進行處理;lQC28資訊網(wǎng)——每日最新資訊28at.com

@Component@Slf4jpublic class BeforeQueue {    @Autowired    private RedissonClient redissonClient;    /**     * <p>取出元素</p>     * <p>如果隊列中沒有元素,就阻塞等待,直</p>     * @return     */    public Object take(){        RBlockingQueue<Object> queue1 = redissonClient.getBlockingQueue("queue1");        Object obj = null;        try {            obj = queue1.take();            log.info("從myqueue1取出元素:{}",obj.toString());        } catch (InterruptedException e) {            e.printStackTrace();        }        return obj;    }    /**     * <p>放入元素</p>     * @param obj     */    public void offer(Object obj){        RBlockingDeque<Object> queue1 = redissonClient.getBlockingDeque("queue1");        RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(queue1);        delayedQueue.offer(obj,5, TimeUnit.SECONDS);        log.info("向myqueue1設置元素:{}",obj.toString());    }}
@Component@Slf4jpublic class RunningQueue {    @Autowired    private RedissonClient redissonClient;    public Object take(){        RBlockingQueue<Object> queue1 = redissonClient.getBlockingQueue("queue2");        Object obj = null;        try {            obj = queue1.take();            log.info("從myqueue2取出元素:{}",obj.toString());        } catch (InterruptedException e) {            e.printStackTrace();        }        return obj;    }    public void offer(Object obj){        RBlockingDeque<Object> queue1 = redissonClient.getBlockingDeque("queue2");        RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(queue1);        delayedQueue.offer(obj,5, TimeUnit.SECONDS);        log.info("向myqueue2設置元素:{}",obj.toString());    }}
@Component@Slf4jpublic class CompleteQueue {    @Autowired    private RedissonClient redissonClient;    public Object take(){        RBlockingQueue<Object> queue1 = redissonClient.getBlockingQueue("queue3");        Object obj = null;        try {            obj = queue1.take();            log.info("從CompleteQueue取出元素:{}",obj.toString());        } catch (InterruptedException e) {            e.printStackTrace();        }        return obj;    }    public void offer(Object obj){        RBlockingQueue<Object> queue1 = redissonClient.getBlockingDeque("queue3");        RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(queue1);        delayedQueue.offer(obj,5, TimeUnit.SECONDS);        log.info("向CompleteQueue設置元素:{}",obj.toString());    }}

3、定義三個監(jiān)聽器BeforeQueueListener、RunningQueueListener、CompleteQueueListener,監(jiān)聽器的主要作用主要就是負責監(jiān)聽三個隊列中是否有新的任務 元素進入,如果有,則立即取出消費;如果沒有,則阻塞等待新的元素進入,具體的實現(xiàn)邏輯是:新創(chuàng)建的任務會先放置到BeforeQueue中,BeforeQueueListener監(jiān)聽到有新的任務進入,會取出任務作一些業(yè)務處理,業(yè)務處理完一放入到RunningQueue中,RunningQueueListener監(jiān)聽到有新的任務進入,會取出任務再進行處理,這里的處理主要是查詢任務執(zhí)行狀態(tài),查詢狀態(tài)結果主要分兩種情況:1、執(zhí)行中、待執(zhí)行狀態(tài),則把任務重新放入RunningQueue隊列中,延遲5秒;2、執(zhí)行完成狀態(tài),則把任務放置到CompleteQueue中;CompleteQueueListener監(jiān)聽到有新的任務進入后,會主動獲取任務執(zhí)行結果,作最后業(yè)務處理;lQC28資訊網(wǎng)——每日最新資訊28at.com

4、監(jiān)聽器在在處理隊列中的數(shù)據(jù)相關的業(yè)務時,如果發(fā)生異常,則需要把取出的元素再重新入入到當前隊列中,等待下一輪的重試;lQC28資訊網(wǎng)——每日最新資訊28at.com

@Component@Slf4jpublic class BeforeQueueListener implements Listener{    @Autowired    private BeforeQueue beforeQueue;    @Autowired    private RunningQueue runningQueue;    @Override    public void start() {        new Thread(new Runnable() {            @Override            public void run() {                while (true){                    log.info("監(jiān)聽器進入阻塞:BeforeQueueListener");                    Object obj = beforeQueue.take();                    if (ObjectUtil.isNotNull(obj)) {                        try {                            log.info("開始休眠1s模擬業(yè)務處理:BeforeQueueListener,元素:{}",obj.toString());                            Thread.currentThread().sleep(1000);                            log.info("業(yè)務處理完成:BeforeQueueListener,元素:{}",obj.toString());                            runningQueue.offer(obj);                        } catch (InterruptedException e) {                            log.error("業(yè)務處理發(fā)生異常,重置元素到BeforeQueue隊列中");                            log.error(e.getMessage());                            beforeQueue.offer(obj);                        }                    }                }            }        }).start();    }}
@Component@Slf4jpublic class RunningQueueListener implements Listener {    @Autowired    private RunningQueue runningQueue;    @Autowired    private CompleteQueue completeQueue;    @Override    public void start() {        new Thread(new Runnable() {            @Override            public void run() {                while (true) {                    log.info("監(jiān)聽器進入阻塞:RunningQueueListener");                    Object obj = runningQueue.take();                    if (ObjectUtil.isNotNull(obj)) {                        try {                            log.info("開始休眠1s模擬業(yè)務處理:RunningQueueListener,元素:{}", obj.toString());                            Thread.currentThread().sleep(1000);                            Random random = new Random();                            int i = random.nextInt(2);                            if (i==0) {                                test();                            }                            log.info("業(yè)務處理完成:RunningQueueListener,元素:{}", obj.toString());                            completeQueue.offer(obj);                        } catch (Exception e) {                            log.error("業(yè)務處理發(fā)生異常,重置元素到RunningQueue隊列中");                            log.error(e.getMessage());                            runningQueue.offer(obj);                        }                    }                }            }        }).start();    }    public void test(){        try {            int i=1/0;        } catch (Exception e) {           throw  new RuntimeException("除數(shù)異常");        }    }}
@Component@Slf4jpublic class CompleteQueueListener implements Listener{    @Autowired    private CompleteQueue completeQueue;    @Override    public void start() {        new Thread(new Runnable() {            @Override            public void run() {                while (true){                    log.info("監(jiān)聽器進入阻塞:CompleteQueueListener");                    Object obj = completeQueue.take();                    if (ObjectUtil.isNotNull(obj)) {                        try {                            log.info("開始休眠1s模擬業(yè)務處理:CompleteQueueListener,元素:{}",obj.toString());                            Thread.currentThread().sleep(1000);                            log.info("業(yè)務處理完成:listener3,元素:{}",obj.toString());                        } catch (InterruptedException e) {                            log.error("業(yè)務處理發(fā)生異常,重置元素到CompleteQueue隊列中");                            log.error(e.getMessage());                            completeQueue.offer(obj);                        }                       log.info("CompleteQueueListener任務結束,元素:{}",obj.toString());                    }                }            }        }).start();    }}

5、利用Springboot的擴展點ApplicationRunner,在項目啟動完成后,分別啟動BeforeQueueListener、RunningQueueListener、CompleteQueueListener,讓三個監(jiān)聽器進入阻塞監(jiān)聽狀態(tài)lQC28資訊網(wǎng)——每日最新資訊28at.com

@Componentpublic class MyRunner implements ApplicationRunner {    @Autowired    private ApplicationContext applicationContext;    @Override    public void run(ApplicationArguments args) throws Exception {        Map<String, Listener> beansOfType = applicationContext.getBeansOfType(Listener.class);        for (String s : beansOfType.keySet()) {            Listener listener = beansOfType.get(s);            listener.start();        }    }}

結果驗證lQC28資訊網(wǎng)——每日最新資訊28at.com

圖片圖片lQC28資訊網(wǎng)——每日最新資訊28at.com

一個比較有意思的問題

日志丟失的問題

三個任務隊列分別有三個線程來進行阻塞監(jiān)聽,即如果任務隊列中有任務元素,則取出進行處理;如果沒有,則阻塞等待,主線程只負責把任務設置到任務隊列中,出現(xiàn)的問題是:控制臺的日志輸出顯示任務元素已經放置到第一個BeforeQueue中,按照預期的結果應該是,控制臺的日志輸出會顯示,從BeforeQueue取出元素進行業(yè)務處理、以及業(yè)務處理的日志,然后放置到RunningQueue中,再從RunningQueue中取出進行業(yè)務處理,接著放置到CompleteQueue隊列中,最后從CompleteQueue中取出進行業(yè)務處理,最后結束;實際情況是:總是缺少從BeforeQueue取出元素進行業(yè)務處理、以及業(yè)務處理的日志,其他的日志輸出都很正常、執(zhí)行結果也正常;lQC28資訊網(wǎng)——每日最新資訊28at.com

問題原因

經過排查分析,最后找到了原因:lQC28資訊網(wǎng)——每日最新資訊28at.com

是logback線程安全問題, Logback 的大部分組件都是線程安全的,但某些特定的配置可能會導致線程安全問題。例如,如果你在同一個 Appender 中處理多個線程的日志事件,那么可能會出現(xiàn)線程安全問題,導致某些日志事件丟失。lQC28資訊網(wǎng)——每日最新資訊28at.com

解決方法

問題原因找到了,其實解決方法也就找到,具體就是logback的異步日志,logback.xml配置如下:lQC28資訊網(wǎng)——每日最新資訊28at.com

<?xml versinotallow="1.0" encoding="UTF-8"?><configuration scan="true" scanPeriod="60 seconds" debug="false">    <!-- 日志存放路徑 -->    <property name="log.path" value="logs/"/>    <!-- 日志輸出格式 -->    <property name="console.log.pattern"              value="%d{yyyy-MM-dd HH:mm:ss.SSS} %highlight(%-5level) %magenta(${PID:-}) - %green([%-21thread]) %cyan(%-35logger{30}) %msg%n"/>    <!-- 控制臺輸出 -->    <appender name="console" class="ch.qos.logback.core.ConsoleAppender">        <encoder>            <pattern>${console.log.pattern}</pattern>            <charset>utf-8</charset>        </encoder>    </appender>    <appender name="ASYNC" class="ch.qos.logback.classic.AsyncAppender">        <queueSize>500</queueSize>        <discardingThreshold>0</discardingThreshold>        <neverBlock>true</neverBlock>        <appender-ref ref="console" />    </appender>    <!--系統(tǒng)操作日志-->    <root level="info">        <appender-ref ref="ASYNC" />    </root></configuration>

文章中展示了關鍵性代碼,示例全部代碼地址:https://gitcode.net/fox9916/redisson-demo.gitlQC28資訊網(wǎng)——每日最新資訊28at.com

本文鏈接:http://www.www897cc.com/showinfo-26-55129-0.html基于Redis實現(xiàn)消息隊列的實踐

聲明:本網(wǎng)頁內容旨在傳播知識,若有侵權等問題請及時與本網(wǎng)聯(lián)系,我們將在第一時間刪除處理。郵件:2376512515@qq.com

上一篇: “推薦大戰(zhàn):抖音vs.快手”——背后的秘密全揭曉!

下一篇: 五分鐘學會JSON格式的全部知識,你學會了嗎?

標簽:
  • 熱門焦點
  • 鴻蒙OS 4.0公測機型公布:甚至連nova6都支持

    華為全新的HarmonyOS 4.0操作系統(tǒng)將于今天下午正式登場,官方在發(fā)布會之前也已經正式給出了可升級的機型產品,這意味著這些機型會率先支持升級享用。這次的HarmonyOS 4.0支持
  • Mate60手機殼曝光 致敬自己的經典設計

    8月3日消息,今天下午博主數(shù)碼閑聊站帶來了華為Mate60的第三方手機殼圖,可以讓我們在真機發(fā)布之前看看這款華為全新旗艦的大致輪廓。從曝光的圖片看,Mate 60背后攝像頭面積依然
  • Raft算法:保障分布式系統(tǒng)共識的穩(wěn)健之道

    1. 什么是Raft算法?Raft 是英文”Reliable、Replicated、Redundant、And Fault-Tolerant”(“可靠、可復制、可冗余、可容錯”)的首字母縮寫。Raft算法是一種用于在分布式系統(tǒng)
  • 摸魚心法第一章——和配置文件說拜拜

    為了能摸魚我們團隊做了容器化,但是帶來的問題是服務配置文件很麻煩,然后大家在群里進行了“親切友好”的溝通圖片圖片圖片圖片對比就對比,簡單對比下獨立配置中心和k8s作為配
  • K8S | Service服務發(fā)現(xiàn)

    一、背景在微服務架構中,這里以開發(fā)環(huán)境「Dev」為基礎來描述,在K8S集群中通常會開放:路由網(wǎng)關、注冊中心、配置中心等相關服務,可以被集群外部訪問;圖片對于測試「Tes」環(huán)境或者
  • 不容錯過的MSBuild技巧,必備用法詳解和實踐指南

    一、MSBuild簡介MSBuild是一種基于XML的構建引擎,用于在.NET Framework和.NET Core應用程序中自動化構建過程。它是Visual Studio的構建引擎,可在命令行或其他構建工具中使用
  • 使用AIGC工具提升安全工作效率

    在日常工作中,安全人員可能會涉及各種各樣的安全任務,包括但不限于:開發(fā)某些安全工具的插件,滿足自己特定的安全需求;自定義github搜索工具,快速查找所需的安全資料、漏洞poc、exp
  • 自律,給不了Keep自由!

    來源 | 互聯(lián)網(wǎng)品牌官作者 | 李大為編排 | 又耳 審核 | 谷曉輝自律能不能給用戶自由暫時不好說,但大概率不能給Keep自由。近日,全球最大的在線健身平臺Keep正式登陸港交所,努力
  • AI藝術欣賞體驗會在上海梅賽德斯奔馳中心音樂俱樂部上演

    光影交錯的鏡像世界,虛實幻化的視覺奇觀,虛擬偶像與真人共同主持,這些場景都出現(xiàn)在2019世界人工智能大會的舞臺上。8月29日至31日,“AI藝術欣賞體驗會”在上海
Top 主站蜘蛛池模板: 广汉市| 商南县| 印江| 资阳市| 江山市| 濮阳市| 随州市| 崇义县| 阿鲁科尔沁旗| 县级市| 荣成市| 开鲁县| 防城港市| 望奎县| 沾益县| 佛坪县| 山西省| 枞阳县| 宣恩县| 闵行区| 开化县| 河津市| 永登县| 福贡县| 方城县| 岱山县| 双江| 富民县| 江陵县| 灵璧县| 和田市| 沙坪坝区| 竹溪县| 介休市| 云阳县| 临猗县| 清远市| 淳化县| 政和县| 当阳市| 雅安市|