消息隊列是一種典型的發(fā)布/訂閱模式,是專門為異步化應(yīng)用和分布式系統(tǒng)設(shè)計的,具有高性能、穩(wěn)定性及可伸縮性的特點,是開發(fā)分布式系統(tǒng)和應(yīng)用系統(tǒng)必備的技術(shù)之一。目前,針對不同的業(yè)務(wù)場景,比較成熟可靠的消息中間件產(chǎn)品有RocketMQ、Kafka、RabbitMq等,基于Redis再去實現(xiàn)一個消息隊列少有提及,那么已經(jīng)有很成熟的產(chǎn)品可以選擇,還有必要再基于Redis自己來實現(xiàn)一個消息隊列嗎?基于Redis實現(xiàn)的消息隊列有什么特別的地方嗎?
先來回顧一個Redis有哪些特性:
總結(jié)一下:redis的特點就是:快、簡單、穩(wěn)定;
以RocketMQ為代表,作為專業(yè)的消息中間件而言,有哪些特性呢:
總結(jié)一下:RocketMQ的特點就是除了性能非常高、系統(tǒng)本身的功能比較專業(yè)、完善,能適應(yīng)非常多的場景;
從上述分析可以看出,Redis隊列和MQ消息隊列各有優(yōu)勢,Redis的最大特點就是快,所以基于Redis的消息隊列相比MQ消息隊列而言,更適合實時處理,但是基于Redis的消息隊列更易受服務(wù)器內(nèi)存限制;而RocketMQ消息隊列作為專業(yè)的消息中間件產(chǎn)品,功能更完善,更適合應(yīng)用于比較復(fù)雜的業(yè)務(wù)場景,可以實現(xiàn)離線消息發(fā)送、消息可靠投遞以及消息的安全性,但MQ消息隊列的讀寫性能略低于Redis隊列。在技術(shù)選型時,除了上述的因素外,還有一個需要注意:大多數(shù)系統(tǒng)都會引入Redis作為基礎(chǔ)的緩存中間件使用,如果要選用RocketMQ的話,還需要額外再申請資源進(jìn)行部署。
很多時候,所謂的優(yōu)點和缺點,只是針對特定場景而言,如果場景不一樣了,優(yōu)點可能會變成缺點,缺點也可能會變成優(yōu)點。因此,除了專業(yè)的消息中間件外,基于Redis實現(xiàn)一個消息隊列也是有必要的,在某些特殊的業(yè)務(wù)場景,比如一些并發(fā)量不是很高的管理系統(tǒng),某些業(yè)務(wù)流程需要異步化處理,這時選擇基于Redis自己實現(xiàn)一個消息隊列,也是一個比較好的選擇。這也是本篇文章主要分享的內(nèi)容。
隊列(Queue)是一種數(shù)據(jù)結(jié)構(gòu),遵循先進(jìn)先出(FIFO)的原則。在隊列中,元素被添加到末尾(入隊),并從開頭移除(出隊)。
Java中有哪些隊列?
以LinkedBlockingQueue為例,其使用方法是這樣的:
創(chuàng)建了一個生產(chǎn)者線程和一個消費(fèi)者線程,生產(chǎn)者線程和消費(fèi)者線程分別對同一個LinkedBlockingQueue對象進(jìn)行操作。生產(chǎn)者線程通過調(diào)用put()方法將元素添加到隊列中,而消費(fèi)者線程通過調(diào)用take()方法從隊列中取出元素。這兩個方法都會阻塞線程,直到隊列中有元素可供取出或有空間可供添加元素。
import java.util.concurrent.LinkedBlockingQueue; public class LinkedBlockingQueueExample { public static void main(String[] args) { LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>(); // 生產(chǎn)者線程 new Thread(() -> { for (int i = 0; i < 10; i++) { try { queue.put("Element " + i); System.out.println("Produced: Element " + i); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); // 消費(fèi)者線程 new Thread(() -> { for (int i = 0; i < 10; i++) { try { String element = queue.take(); System.out.println("Consumed: " + element); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); } }
圖片
無法持久化保存消息,如果 Redis 服務(wù)器宕機(jī)或重啟,那么所有的消息將會丟失;
發(fā)布訂閱模式是“發(fā)后既忘”的工作模式,如果有訂閱者離線重連之后就不能消費(fèi)之前的歷史消息;
不支持消費(fèi)者確認(rèn)機(jī)制,穩(wěn)定性不能得到保證,例如當(dāng)消費(fèi)者獲取到消息之后,還沒來得及執(zhí)行就宕機(jī)了。因為沒有消費(fèi)者確認(rèn)機(jī)制,Redis 就會誤以為消費(fèi)者已經(jīng)執(zhí)行了,因此就不會重復(fù)發(fā)送未被正常消費(fèi)的消息了,這樣整體的 Redis 穩(wěn)定性就被沒有辦法得到保障了。
基于Stream 類型實現(xiàn):使用 Stream 的 xadd 和 xrange 來實現(xiàn)消息的存入和讀取了,并且 Stream 提供了 xack 手動確認(rèn)消息消費(fèi)的命令,用它我們就可以實現(xiàn)消費(fèi)者確認(rèn)的功能了,使用命令如下:
127.0.0.1:6379> xack mq group1 1580959593553-0(integer) 1
消費(fèi)確認(rèn)增加了消息的可靠性,一般在業(yè)務(wù)處理完成之后,需要執(zhí)行 ack 確認(rèn)消息已經(jīng)被消費(fèi)完成,整個流程的執(zhí)行如下圖所示:
其中“Group”為群組,消費(fèi)者也就是接收者需要訂閱到群組才能正常獲取到消息。
以上就是基于Redis實現(xiàn)消息隊列的幾種方式的簡單對比介紹,下面主要是分享一下基于Redis的List數(shù)據(jù)類型實現(xiàn),其他幾種方式,有興趣的小伙可以自己嘗試一下。
基于Redis的List數(shù)據(jù)類型實現(xiàn)消費(fèi)隊列的工作原理是什么?
Redis基于List結(jié)構(gòu)實現(xiàn)隊列的原理主要依賴于List的push和pop操作。
在Redis中,你可以使用LPUSH命令將一個或多個元素推入列表的左邊,也就是列表頭部。同樣,你可以使用RPUSH命令將一個或多個元素推入列表的右邊,也就是列表尾部。
對于隊列來說,新元素總是從隊列的頭部進(jìn)入,而讀取操作總是從隊列的尾部開始。因此,當(dāng)你想將一個新元素加入隊列時,你可以使用LPUSH命令。當(dāng)你想從隊列中取出一個元素時,你可以使用RPOP命令。
此外,Redis還提供了BRPOP命令,這是一個阻塞的RPOP版本。如果給定列表內(nèi)沒有任何元素可供彈出的話,將阻塞連接直到等待超時或發(fā)現(xiàn)可彈出元素為止。
需要注意的是,雖然Redis能夠提供原子性的push和pop操作,但是在并發(fā)環(huán)境下使用隊列時,仍然需要考慮線程安全和并發(fā)控制的問題。你可能需要使用Lua腳本或者其他機(jī)制來確保并發(fā)操作的正確性。
總的來說,Redis通過提供List數(shù)據(jù)結(jié)構(gòu)以及一系列相關(guān)命令,可以很方便地實現(xiàn)隊列的功能。
下面是Redis關(guān)于List數(shù)據(jù)結(jié)構(gòu)操作的命令主要包括以下幾種:
以一個實際需求為例,演示一個基于Redis的延遲隊列是怎么使用的?
有一個XX任務(wù)管理的功能,主要的業(yè)務(wù)過程:
1、創(chuàng)建任務(wù)后;
2、不斷檢查任務(wù)的狀態(tài),任務(wù)的狀態(tài)有三種:待執(zhí)行、執(zhí)行中、執(zhí)行完成;
3、如果任務(wù)狀態(tài)是執(zhí)行完成后,主動獲取任務(wù)執(zhí)行結(jié)果,對任務(wù)執(zhí)行結(jié)果進(jìn)行處理;如果任務(wù)狀態(tài)是待執(zhí)行、執(zhí)行中,則延遲5秒后,再次查詢?nèi)蝿?wù)執(zhí)行狀態(tài);
圖片
1、依賴引入
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-redis</artifactId> <version>1.4.7.RELEASE</version></dependency><dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.23.1</version></dependency>
2、定義三個延遲隊列BeforeQueue、RunningQueue、CompleteQueue,對隊列的任務(wù)進(jìn)行存取,BeforeQueue用于對待執(zhí)行狀態(tài)的任務(wù)的存取,Running用于對執(zhí)行中狀態(tài)的任務(wù)的存取,CompleteQueue用于對執(zhí)行完成狀態(tài)的任務(wù)的存取,在三個任務(wù)隊列中,取出元素是阻塞的,即如果隊列中沒有新的任務(wù),當(dāng)前線程會一直阻塞等待,直到有新的任務(wù)進(jìn)入;如果是隊列中還有元素,則遵循先進(jìn)先出的原則逐個取出進(jìn)行處理;
@Component@Slf4jpublic class BeforeQueue { @Autowired private RedissonClient redissonClient; /** * <p>取出元素</p> * <p>如果隊列中沒有元素,就阻塞等待,直</p> * @return */ public Object take(){ RBlockingQueue<Object> queue1 = redissonClient.getBlockingQueue("queue1"); Object obj = null; try { obj = queue1.take(); log.info("從myqueue1取出元素:{}",obj.toString()); } catch (InterruptedException e) { e.printStackTrace(); } return obj; } /** * <p>放入元素</p> * @param obj */ public void offer(Object obj){ RBlockingDeque<Object> queue1 = redissonClient.getBlockingDeque("queue1"); RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(queue1); delayedQueue.offer(obj,5, TimeUnit.SECONDS); log.info("向myqueue1設(shè)置元素:{}",obj.toString()); }}
@Component@Slf4jpublic class RunningQueue { @Autowired private RedissonClient redissonClient; public Object take(){ RBlockingQueue<Object> queue1 = redissonClient.getBlockingQueue("queue2"); Object obj = null; try { obj = queue1.take(); log.info("從myqueue2取出元素:{}",obj.toString()); } catch (InterruptedException e) { e.printStackTrace(); } return obj; } public void offer(Object obj){ RBlockingDeque<Object> queue1 = redissonClient.getBlockingDeque("queue2"); RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(queue1); delayedQueue.offer(obj,5, TimeUnit.SECONDS); log.info("向myqueue2設(shè)置元素:{}",obj.toString()); }}
@Component@Slf4jpublic class CompleteQueue { @Autowired private RedissonClient redissonClient; public Object take(){ RBlockingQueue<Object> queue1 = redissonClient.getBlockingQueue("queue3"); Object obj = null; try { obj = queue1.take(); log.info("從CompleteQueue取出元素:{}",obj.toString()); } catch (InterruptedException e) { e.printStackTrace(); } return obj; } public void offer(Object obj){ RBlockingQueue<Object> queue1 = redissonClient.getBlockingDeque("queue3"); RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(queue1); delayedQueue.offer(obj,5, TimeUnit.SECONDS); log.info("向CompleteQueue設(shè)置元素:{}",obj.toString()); }}
3、定義三個監(jiān)聽器BeforeQueueListener、RunningQueueListener、CompleteQueueListener,監(jiān)聽器的主要作用主要就是負(fù)責(zé)監(jiān)聽三個隊列中是否有新的任務(wù) 元素進(jìn)入,如果有,則立即取出消費(fèi);如果沒有,則阻塞等待新的元素進(jìn)入,具體的實現(xiàn)邏輯是:新創(chuàng)建的任務(wù)會先放置到BeforeQueue中,BeforeQueueListener監(jiān)聽到有新的任務(wù)進(jìn)入,會取出任務(wù)作一些業(yè)務(wù)處理,業(yè)務(wù)處理完一放入到RunningQueue中,RunningQueueListener監(jiān)聽到有新的任務(wù)進(jìn)入,會取出任務(wù)再進(jìn)行處理,這里的處理主要是查詢?nèi)蝿?wù)執(zhí)行狀態(tài),查詢狀態(tài)結(jié)果主要分兩種情況:1、執(zhí)行中、待執(zhí)行狀態(tài),則把任務(wù)重新放入RunningQueue隊列中,延遲5秒;2、執(zhí)行完成狀態(tài),則把任務(wù)放置到CompleteQueue中;CompleteQueueListener監(jiān)聽到有新的任務(wù)進(jìn)入后,會主動獲取任務(wù)執(zhí)行結(jié)果,作最后業(yè)務(wù)處理;
4、監(jiān)聽器在在處理隊列中的數(shù)據(jù)相關(guān)的業(yè)務(wù)時,如果發(fā)生異常,則需要把取出的元素再重新入入到當(dāng)前隊列中,等待下一輪的重試;
@Component@Slf4jpublic class BeforeQueueListener implements Listener{ @Autowired private BeforeQueue beforeQueue; @Autowired private RunningQueue runningQueue; @Override public void start() { new Thread(new Runnable() { @Override public void run() { while (true){ log.info("監(jiān)聽器進(jìn)入阻塞:BeforeQueueListener"); Object obj = beforeQueue.take(); if (ObjectUtil.isNotNull(obj)) { try { log.info("開始休眠1s模擬業(yè)務(wù)處理:BeforeQueueListener,元素:{}",obj.toString()); Thread.currentThread().sleep(1000); log.info("業(yè)務(wù)處理完成:BeforeQueueListener,元素:{}",obj.toString()); runningQueue.offer(obj); } catch (InterruptedException e) { log.error("業(yè)務(wù)處理發(fā)生異常,重置元素到BeforeQueue隊列中"); log.error(e.getMessage()); beforeQueue.offer(obj); } } } } }).start(); }}
@Component@Slf4jpublic class RunningQueueListener implements Listener { @Autowired private RunningQueue runningQueue; @Autowired private CompleteQueue completeQueue; @Override public void start() { new Thread(new Runnable() { @Override public void run() { while (true) { log.info("監(jiān)聽器進(jìn)入阻塞:RunningQueueListener"); Object obj = runningQueue.take(); if (ObjectUtil.isNotNull(obj)) { try { log.info("開始休眠1s模擬業(yè)務(wù)處理:RunningQueueListener,元素:{}", obj.toString()); Thread.currentThread().sleep(1000); Random random = new Random(); int i = random.nextInt(2); if (i==0) { test(); } log.info("業(yè)務(wù)處理完成:RunningQueueListener,元素:{}", obj.toString()); completeQueue.offer(obj); } catch (Exception e) { log.error("業(yè)務(wù)處理發(fā)生異常,重置元素到RunningQueue隊列中"); log.error(e.getMessage()); runningQueue.offer(obj); } } } } }).start(); } public void test(){ try { int i=1/0; } catch (Exception e) { throw new RuntimeException("除數(shù)異常"); } }}
@Component@Slf4jpublic class CompleteQueueListener implements Listener{ @Autowired private CompleteQueue completeQueue; @Override public void start() { new Thread(new Runnable() { @Override public void run() { while (true){ log.info("監(jiān)聽器進(jìn)入阻塞:CompleteQueueListener"); Object obj = completeQueue.take(); if (ObjectUtil.isNotNull(obj)) { try { log.info("開始休眠1s模擬業(yè)務(wù)處理:CompleteQueueListener,元素:{}",obj.toString()); Thread.currentThread().sleep(1000); log.info("業(yè)務(wù)處理完成:listener3,元素:{}",obj.toString()); } catch (InterruptedException e) { log.error("業(yè)務(wù)處理發(fā)生異常,重置元素到CompleteQueue隊列中"); log.error(e.getMessage()); completeQueue.offer(obj); } log.info("CompleteQueueListener任務(wù)結(jié)束,元素:{}",obj.toString()); } } } }).start(); }}
5、利用Springboot的擴(kuò)展點ApplicationRunner,在項目啟動完成后,分別啟動BeforeQueueListener、RunningQueueListener、CompleteQueueListener,讓三個監(jiān)聽器進(jìn)入阻塞監(jiān)聽狀態(tài)
@Componentpublic class MyRunner implements ApplicationRunner { @Autowired private ApplicationContext applicationContext; @Override public void run(ApplicationArguments args) throws Exception { Map<String, Listener> beansOfType = applicationContext.getBeansOfType(Listener.class); for (String s : beansOfType.keySet()) { Listener listener = beansOfType.get(s); listener.start(); } }}
結(jié)果驗證
圖片
三個任務(wù)隊列分別有三個線程來進(jìn)行阻塞監(jiān)聽,即如果任務(wù)隊列中有任務(wù)元素,則取出進(jìn)行處理;如果沒有,則阻塞等待,主線程只負(fù)責(zé)把任務(wù)設(shè)置到任務(wù)隊列中,出現(xiàn)的問題是:控制臺的日志輸出顯示任務(wù)元素已經(jīng)放置到第一個BeforeQueue中,按照預(yù)期的結(jié)果應(yīng)該是,控制臺的日志輸出會顯示,從BeforeQueue取出元素進(jìn)行業(yè)務(wù)處理、以及業(yè)務(wù)處理的日志,然后放置到RunningQueue中,再從RunningQueue中取出進(jìn)行業(yè)務(wù)處理,接著放置到CompleteQueue隊列中,最后從CompleteQueue中取出進(jìn)行業(yè)務(wù)處理,最后結(jié)束;實際情況是:總是缺少從BeforeQueue取出元素進(jìn)行業(yè)務(wù)處理、以及業(yè)務(wù)處理的日志,其他的日志輸出都很正常、執(zhí)行結(jié)果也正常;
經(jīng)過排查分析,最后找到了原因:
是logback線程安全問題, Logback 的大部分組件都是線程安全的,但某些特定的配置可能會導(dǎo)致線程安全問題。例如,如果你在同一個 Appender 中處理多個線程的日志事件,那么可能會出現(xiàn)線程安全問題,導(dǎo)致某些日志事件丟失。
問題原因找到了,其實解決方法也就找到,具體就是logback的異步日志,logback.xml配置如下:
<?xml versinotallow="1.0" encoding="UTF-8"?><configuration scan="true" scanPeriod="60 seconds" debug="false"> <!-- 日志存放路徑 --> <property name="log.path" value="logs/"/> <!-- 日志輸出格式 --> <property name="console.log.pattern" value="%d{yyyy-MM-dd HH:mm:ss.SSS} %highlight(%-5level) %magenta(${PID:-}) - %green([%-21thread]) %cyan(%-35logger{30}) %msg%n"/> <!-- 控制臺輸出 --> <appender name="console" class="ch.qos.logback.core.ConsoleAppender"> <encoder> <pattern>${console.log.pattern}</pattern> <charset>utf-8</charset> </encoder> </appender> <appender name="ASYNC" class="ch.qos.logback.classic.AsyncAppender"> <queueSize>500</queueSize> <discardingThreshold>0</discardingThreshold> <neverBlock>true</neverBlock> <appender-ref ref="console" /> </appender> <!--系統(tǒng)操作日志--> <root level="info"> <appender-ref ref="ASYNC" /> </root></configuration>
文章中展示了關(guān)鍵性代碼,示例全部代碼地址:https://gitcode.net/fox9916/redisson-demo.git
本文鏈接:http://www.www897cc.com/showinfo-26-55111-0.html基于Redis實現(xiàn)消息隊列的實踐
聲明:本網(wǎng)頁內(nèi)容旨在傳播知識,若有侵權(quán)等問題請及時與本網(wǎng)聯(lián)系,我們將在第一時間刪除處理。郵件:2376512515@qq.com