哈嘍,大家好,我是了不起。
Redis平常作為緩存使用較多,但是也可以作為發布訂閱的消息隊列來使用,本篇給大家介紹一下如何簡單使用!右手就能操作
本篇我們會使用Spring Data Redis中集成的發布訂閱功能來展示這個示例,
先看我們需要的依賴, 其實只需要引入spring-boot-starter-data-redis 就夠了,另外再寫一個接口來觸發消息發布。
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-webflux</artifactId> </dependency>
Spring Data 為 Redis 提供了專用的消息傳遞集成,其功能和命名與 Spring Framework 中的 JMS 集成類似。
Redis 消息傳遞大致可分為兩個功能領域:
其中主要的類都在這兩個包下面,感興趣的小伙伴可以去看看,原理就先不講了,下期再安排吧。
org.springframework.data.redis.connectionorg.springframework.data.redis.listener
發布消息我們可以直接使用RedisTemplate的 convertAndSend , 這個方法有兩個參數,分別是channel, 還有消息內容。
public Long convertAndSend(String channel, Object message) { Assert.hasText(channel, "a non-empty channel is required"); byte[] rawChannel = this.rawString(channel); byte[] rawMessage = this.rawValue(message); return (Long)this.execute((connection) -> { return connection.publish(rawChannel, rawMessage); }, true); }
本次我們使用如下類來發布消息。作為示例就要簡單粗暴。
public interface MessagePublisher { void publish(String message);}import org.springframework.data.redis.core.RedisTemplate;import org.springframework.data.redis.listener.ChannelTopic;public class RedisMessagePublisher implements MessagePublisher { private RedisTemplate<String, Object> redisTemplate; private ChannelTopic topic; public RedisMessagePublisher() { } public RedisMessagePublisher( RedisTemplate<String, Object> redisTemplate, ChannelTopic topic) { this.redisTemplate = redisTemplate; this.topic = topic; } public void publish(String message) { redisTemplate.convertAndSend(topic.getTopic(), message); }}
訂閱消息需要實現MessageListener的接口 ,onMessage的方法是收到消息后的消費方法。
import org.springframework.data.redis.connection.Message;import org.springframework.data.redis.connection.MessageListener;import org.springframework.stereotype.Service;@Servicepublic class RedisMessageSubscriber implements MessageListener { public void onMessage(Message message, byte[] pattern) { System.*out*.println("Message received: " + message.toString()); }}// 消息訂閱2@Service("redisMessageSubscriber2")public class RedisMessageSubscriber2 implements MessageListener { public void onMessage(Message message, byte[] pattern) { System.out.println("Message received2: " + message.toString()); }}
另外就是訂閱方訂閱發布者,SpringDataRedis這里使用了一個消息監聽容器和適配器來處理。我們直接貼出代碼:
import com.north.redis.message.MessagePublisher;import com.north.redis.message.RedisMessagePublisher;import com.north.redis.message.RedisMessageSubscriber;import jakarta.annotation.Resource;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.data.redis.connection.MessageListener;import org.springframework.data.redis.connection.RedisConnectionFactory;import org.springframework.data.redis.core.RedisTemplate;import org.springframework.data.redis.listener.ChannelTopic;import org.springframework.data.redis.listener.RedisMessageListenerContainer;import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;import org.springframework.data.redis.serializer.StringRedisSerializer;@Configurationpublic class RedisConfig { @Autowired private RedisConnectionFactory redisConnectionFactory; @Resource MessageListener redisMessageSubscriber2; @Bean public RedisTemplate<String, Object> redisTemplate() { RedisTemplate<String, Object> template = new RedisTemplate<>(); template.setConnectionFactory(redisConnectionFactory); // 使用StringRedisSerializer來序列化和反序列化redis的key值 template.setKeySerializer(new StringRedisSerializer()); // 使用GenericJackson2JsonRedisSerializer來序列化和反序列化redis的value值 template.setValueSerializer(new GenericJackson2JsonRedisSerializer()); template.afterPropertiesSet(); return template; } @Bean MessageListenerAdapter messageListener() { return new MessageListenerAdapter(new RedisMessageSubscriber()); } @Bean RedisMessageListenerContainer redisContainer() { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(redisConnectionFactory); container.addMessageListener(messageListener(), topic()); container.addMessageListener(redisMessageSubscriber2, topic()); return container; } @Bean MessagePublisher redisPublisher() { return new RedisMessagePublisher(redisTemplate(), topic()); } @Bean ChannelTopic topic() { return new ChannelTopic("northQueue"); }}
以上代碼中有幾個點:
RedisMessageListenerContainer 是處理消費者和發布者的關系的類 ,使用起來也比較簡單。
下面我們做一個小測試:
寫一個接口來出發消息發布,使用多個訂閱者
@RestControllerpublic class TestController { @Resource private MessagePublisher redisMessagePublisher; @GetMapping("/hello") public Flux<String> hello(@RequestParam String message) { redisMessagePublisher.publish(message); return Flux.*just*("Hello", "Webflux"); }}
啟動SpringBoot項目后我們發送消息測試:
圖片
兩個消費者都接到了消息:
圖片
本文鏈接:http://www.www897cc.com/showinfo-26-59651-0.htmlRedis發布訂閱,右手就行!
聲明:本網頁內容旨在傳播知識,若有侵權等問題請及時與本網聯系,我們將在第一時間刪除處理。郵件:2376512515@qq.com