消息隊列是一種典型的發布/訂閱模式,是專門為異步化應用和分布式系統設計的,具有高性能、穩定性及可伸縮性的特點,是開發分布式系統和應用系統必備的技術之一。目前,針對不同的業務場景,比較成熟可靠的消息中間件產品有RocketMQ、Kafka、RabbitMq等,基于Redis再去實現一個消息隊列少有提及,那么已經有很成熟的產品可以選擇,還有必要再基于Redis自己來實現一個消息隊列嗎?基于Redis實現的消息隊列有什么特別的地方嗎?
先來回顧一個Redis有哪些特性:
總結一下:redis的特點就是:快、簡單、穩定;
以RocketMQ為代表,作為專業的消息中間件而言,有哪些特性呢:
總結一下:RocketMQ的特點就是除了性能非常高、系統本身的功能比較專業、完善,能適應非常多的場景;
從上述分析可以看出,Redis隊列和MQ消息隊列各有優勢,Redis的最大特點就是快,所以基于Redis的消息隊列相比MQ消息隊列而言,更適合實時處理,但是基于Redis的消息隊列更易受服務器內存限制;而RocketMQ消息隊列作為專業的消息中間件產品,功能更完善,更適合應用于比較復雜的業務場景,可以實現離線消息發送、消息可靠投遞以及消息的安全性,但MQ消息隊列的讀寫性能略低于Redis隊列。在技術選型時,除了上述的因素外,還有一個需要注意:大多數系統都會引入Redis作為基礎的緩存中間件使用,如果要選用RocketMQ的話,還需要額外再申請資源進行部署。
很多時候,所謂的優點和缺點,只是針對特定場景而言,如果場景不一樣了,優點可能會變成缺點,缺點也可能會變成優點。因此,除了專業的消息中間件外,基于Redis實現一個消息隊列也是有必要的,在某些特殊的業務場景,比如一些并發量不是很高的管理系統,某些業務流程需要異步化處理,這時選擇基于Redis自己實現一個消息隊列,也是一個比較好的選擇。這也是本篇文章主要分享的內容。
隊列(Queue)是一種數據結構,遵循先進先出(FIFO)的原則。在隊列中,元素被添加到末尾(入隊),并從開頭移除(出隊)。
Java中有哪些隊列?
以LinkedBlockingQueue為例,其使用方法是這樣的:
創建了一個生產者線程和一個消費者線程,生產者線程和消費者線程分別對同一個LinkedBlockingQueue對象進行操作。生產者線程通過調用put()方法將元素添加到隊列中,而消費者線程通過調用take()方法從隊列中取出元素。這兩個方法都會阻塞線程,直到隊列中有元素可供取出或有空間可供添加元素。
import java.util.concurrent.LinkedBlockingQueue; public class LinkedBlockingQueueExample { public static void main(String[] args) { LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>(); // 生產者線程 new Thread(() -> { for (int i = 0; i < 10; i++) { try { queue.put("Element " + i); System.out.println("Produced: Element " + i); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); // 消費者線程 new Thread(() -> { for (int i = 0; i < 10; i++) { try { String element = queue.take(); System.out.println("Consumed: " + element); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); } }
圖片
無法持久化保存消息,如果 Redis 服務器宕機或重啟,那么所有的消息將會丟失;
發布訂閱模式是“發后既忘”的工作模式,如果有訂閱者離線重連之后就不能消費之前的歷史消息;
不支持消費者確認機制,穩定性不能得到保證,例如當消費者獲取到消息之后,還沒來得及執行就宕機了。因為沒有消費者確認機制,Redis 就會誤以為消費者已經執行了,因此就不會重復發送未被正常消費的消息了,這樣整體的 Redis 穩定性就被沒有辦法得到保障了。
基于Stream 類型實現:使用 Stream 的 xadd 和 xrange 來實現消息的存入和讀取了,并且 Stream 提供了 xack 手動確認消息消費的命令,用它我們就可以實現消費者確認的功能了,使用命令如下:
127.0.0.1:6379> xack mq group1 1580959593553-0(integer) 1
消費確認增加了消息的可靠性,一般在業務處理完成之后,需要執行 ack 確認消息已經被消費完成,整個流程的執行如下圖所示:
其中“Group”為群組,消費者也就是接收者需要訂閱到群組才能正常獲取到消息。
以上就是基于Redis實現消息隊列的幾種方式的簡單對比介紹,下面主要是分享一下基于Redis的List數據類型實現,其他幾種方式,有興趣的小伙可以自己嘗試一下。
基于Redis的List數據類型實現消費隊列的工作原理是什么?
Redis基于List結構實現隊列的原理主要依賴于List的push和pop操作。
在Redis中,你可以使用LPUSH命令將一個或多個元素推入列表的左邊,也就是列表頭部。同樣,你可以使用RPUSH命令將一個或多個元素推入列表的右邊,也就是列表尾部。
對于隊列來說,新元素總是從隊列的頭部進入,而讀取操作總是從隊列的尾部開始。因此,當你想將一個新元素加入隊列時,你可以使用LPUSH命令。當你想從隊列中取出一個元素時,你可以使用RPOP命令。
此外,Redis還提供了BRPOP命令,這是一個阻塞的RPOP版本。如果給定列表內沒有任何元素可供彈出的話,將阻塞連接直到等待超時或發現可彈出元素為止。
需要注意的是,雖然Redis能夠提供原子性的push和pop操作,但是在并發環境下使用隊列時,仍然需要考慮線程安全和并發控制的問題。你可能需要使用Lua腳本或者其他機制來確保并發操作的正確性。
總的來說,Redis通過提供List數據結構以及一系列相關命令,可以很方便地實現隊列的功能。
下面是Redis關于List數據結構操作的命令主要包括以下幾種:
以一個實際需求為例,演示一個基于Redis的延遲隊列是怎么使用的?
有一個XX任務管理的功能,主要的業務過程:
1、創建任務后;
2、不斷檢查任務的狀態,任務的狀態有三種:待執行、執行中、執行完成;
3、如果任務狀態是執行完成后,主動獲取任務執行結果,對任務執行結果進行處理;如果任務狀態是待執行、執行中,則延遲5秒后,再次查詢任務執行狀態;
圖片
1、依賴引入
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-redis</artifactId> <version>1.4.7.RELEASE</version></dependency><dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.23.1</version></dependency>
2、定義三個延遲隊列BeforeQueue、RunningQueue、CompleteQueue,對隊列的任務進行存取,BeforeQueue用于對待執行狀態的任務的存取,Running用于對執行中狀態的任務的存取,CompleteQueue用于對執行完成狀態的任務的存取,在三個任務隊列中,取出元素是阻塞的,即如果隊列中沒有新的任務,當前線程會一直阻塞等待,直到有新的任務進入;如果是隊列中還有元素,則遵循先進先出的原則逐個取出進行處理;
@Component@Slf4jpublic class BeforeQueue { @Autowired private RedissonClient redissonClient; /** * <p>取出元素</p> * <p>如果隊列中沒有元素,就阻塞等待,直</p> * @return */ public Object take(){ RBlockingQueue<Object> queue1 = redissonClient.getBlockingQueue("queue1"); Object obj = null; try { obj = queue1.take(); log.info("從myqueue1取出元素:{}",obj.toString()); } catch (InterruptedException e) { e.printStackTrace(); } return obj; } /** * <p>放入元素</p> * @param obj */ public void offer(Object obj){ RBlockingDeque<Object> queue1 = redissonClient.getBlockingDeque("queue1"); RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(queue1); delayedQueue.offer(obj,5, TimeUnit.SECONDS); log.info("向myqueue1設置元素:{}",obj.toString()); }}
@Component@Slf4jpublic class RunningQueue { @Autowired private RedissonClient redissonClient; public Object take(){ RBlockingQueue<Object> queue1 = redissonClient.getBlockingQueue("queue2"); Object obj = null; try { obj = queue1.take(); log.info("從myqueue2取出元素:{}",obj.toString()); } catch (InterruptedException e) { e.printStackTrace(); } return obj; } public void offer(Object obj){ RBlockingDeque<Object> queue1 = redissonClient.getBlockingDeque("queue2"); RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(queue1); delayedQueue.offer(obj,5, TimeUnit.SECONDS); log.info("向myqueue2設置元素:{}",obj.toString()); }}
@Component@Slf4jpublic class CompleteQueue { @Autowired private RedissonClient redissonClient; public Object take(){ RBlockingQueue<Object> queue1 = redissonClient.getBlockingQueue("queue3"); Object obj = null; try { obj = queue1.take(); log.info("從CompleteQueue取出元素:{}",obj.toString()); } catch (InterruptedException e) { e.printStackTrace(); } return obj; } public void offer(Object obj){ RBlockingQueue<Object> queue1 = redissonClient.getBlockingDeque("queue3"); RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(queue1); delayedQueue.offer(obj,5, TimeUnit.SECONDS); log.info("向CompleteQueue設置元素:{}",obj.toString()); }}
3、定義三個監聽器BeforeQueueListener、RunningQueueListener、CompleteQueueListener,監聽器的主要作用主要就是負責監聽三個隊列中是否有新的任務 元素進入,如果有,則立即取出消費;如果沒有,則阻塞等待新的元素進入,具體的實現邏輯是:新創建的任務會先放置到BeforeQueue中,BeforeQueueListener監聽到有新的任務進入,會取出任務作一些業務處理,業務處理完一放入到RunningQueue中,RunningQueueListener監聽到有新的任務進入,會取出任務再進行處理,這里的處理主要是查詢任務執行狀態,查詢狀態結果主要分兩種情況:1、執行中、待執行狀態,則把任務重新放入RunningQueue隊列中,延遲5秒;2、執行完成狀態,則把任務放置到CompleteQueue中;CompleteQueueListener監聽到有新的任務進入后,會主動獲取任務執行結果,作最后業務處理;
4、監聽器在在處理隊列中的數據相關的業務時,如果發生異常,則需要把取出的元素再重新入入到當前隊列中,等待下一輪的重試;
@Component@Slf4jpublic class BeforeQueueListener implements Listener{ @Autowired private BeforeQueue beforeQueue; @Autowired private RunningQueue runningQueue; @Override public void start() { new Thread(new Runnable() { @Override public void run() { while (true){ log.info("監聽器進入阻塞:BeforeQueueListener"); Object obj = beforeQueue.take(); if (ObjectUtil.isNotNull(obj)) { try { log.info("開始休眠1s模擬業務處理:BeforeQueueListener,元素:{}",obj.toString()); Thread.currentThread().sleep(1000); log.info("業務處理完成:BeforeQueueListener,元素:{}",obj.toString()); runningQueue.offer(obj); } catch (InterruptedException e) { log.error("業務處理發生異常,重置元素到BeforeQueue隊列中"); log.error(e.getMessage()); beforeQueue.offer(obj); } } } } }).start(); }}
@Component@Slf4jpublic class RunningQueueListener implements Listener { @Autowired private RunningQueue runningQueue; @Autowired private CompleteQueue completeQueue; @Override public void start() { new Thread(new Runnable() { @Override public void run() { while (true) { log.info("監聽器進入阻塞:RunningQueueListener"); Object obj = runningQueue.take(); if (ObjectUtil.isNotNull(obj)) { try { log.info("開始休眠1s模擬業務處理:RunningQueueListener,元素:{}", obj.toString()); Thread.currentThread().sleep(1000); Random random = new Random(); int i = random.nextInt(2); if (i==0) { test(); } log.info("業務處理完成:RunningQueueListener,元素:{}", obj.toString()); completeQueue.offer(obj); } catch (Exception e) { log.error("業務處理發生異常,重置元素到RunningQueue隊列中"); log.error(e.getMessage()); runningQueue.offer(obj); } } } } }).start(); } public void test(){ try { int i=1/0; } catch (Exception e) { throw new RuntimeException("除數異常"); } }}
@Component@Slf4jpublic class CompleteQueueListener implements Listener{ @Autowired private CompleteQueue completeQueue; @Override public void start() { new Thread(new Runnable() { @Override public void run() { while (true){ log.info("監聽器進入阻塞:CompleteQueueListener"); Object obj = completeQueue.take(); if (ObjectUtil.isNotNull(obj)) { try { log.info("開始休眠1s模擬業務處理:CompleteQueueListener,元素:{}",obj.toString()); Thread.currentThread().sleep(1000); log.info("業務處理完成:listener3,元素:{}",obj.toString()); } catch (InterruptedException e) { log.error("業務處理發生異常,重置元素到CompleteQueue隊列中"); log.error(e.getMessage()); completeQueue.offer(obj); } log.info("CompleteQueueListener任務結束,元素:{}",obj.toString()); } } } }).start(); }}
5、利用Springboot的擴展點ApplicationRunner,在項目啟動完成后,分別啟動BeforeQueueListener、RunningQueueListener、CompleteQueueListener,讓三個監聽器進入阻塞監聽狀態
@Componentpublic class MyRunner implements ApplicationRunner { @Autowired private ApplicationContext applicationContext; @Override public void run(ApplicationArguments args) throws Exception { Map<String, Listener> beansOfType = applicationContext.getBeansOfType(Listener.class); for (String s : beansOfType.keySet()) { Listener listener = beansOfType.get(s); listener.start(); } }}
結果驗證
圖片
三個任務隊列分別有三個線程來進行阻塞監聽,即如果任務隊列中有任務元素,則取出進行處理;如果沒有,則阻塞等待,主線程只負責把任務設置到任務隊列中,出現的問題是:控制臺的日志輸出顯示任務元素已經放置到第一個BeforeQueue中,按照預期的結果應該是,控制臺的日志輸出會顯示,從BeforeQueue取出元素進行業務處理、以及業務處理的日志,然后放置到RunningQueue中,再從RunningQueue中取出進行業務處理,接著放置到CompleteQueue隊列中,最后從CompleteQueue中取出進行業務處理,最后結束;實際情況是:總是缺少從BeforeQueue取出元素進行業務處理、以及業務處理的日志,其他的日志輸出都很正常、執行結果也正常;
經過排查分析,最后找到了原因:
是logback線程安全問題, Logback 的大部分組件都是線程安全的,但某些特定的配置可能會導致線程安全問題。例如,如果你在同一個 Appender 中處理多個線程的日志事件,那么可能會出現線程安全問題,導致某些日志事件丟失。
問題原因找到了,其實解決方法也就找到,具體就是logback的異步日志,logback.xml配置如下:
<?xml versinotallow="1.0" encoding="UTF-8"?><configuration scan="true" scanPeriod="60 seconds" debug="false"> <!-- 日志存放路徑 --> <property name="log.path" value="logs/"/> <!-- 日志輸出格式 --> <property name="console.log.pattern" value="%d{yyyy-MM-dd HH:mm:ss.SSS} %highlight(%-5level) %magenta(${PID:-}) - %green([%-21thread]) %cyan(%-35logger{30}) %msg%n"/> <!-- 控制臺輸出 --> <appender name="console" class="ch.qos.logback.core.ConsoleAppender"> <encoder> <pattern>${console.log.pattern}</pattern> <charset>utf-8</charset> </encoder> </appender> <appender name="ASYNC" class="ch.qos.logback.classic.AsyncAppender"> <queueSize>500</queueSize> <discardingThreshold>0</discardingThreshold> <neverBlock>true</neverBlock> <appender-ref ref="console" /> </appender> <!--系統操作日志--> <root level="info"> <appender-ref ref="ASYNC" /> </root></configuration>
文章中展示了關鍵性代碼,示例全部代碼地址:https://gitcode.net/fox9916/redisson-demo.git
本文鏈接:http://www.www897cc.com/showinfo-26-55111-0.html基于Redis實現消息隊列的實踐
聲明:本網頁內容旨在傳播知識,若有侵權等問題請及時與本網聯系,我們將在第一時間刪除處理。郵件:2376512515@qq.com