Redis实现消息队列(订阅/发布模式 、LPUSH+BRPOP)
目录
1、生产者+消费者工程搭建
创建两个SpringBoot工程,名称叫做producer
和consumer
,并且都引入相应的pom、配置yaml文件、配置redisConfig。
pom:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> </dependency>
yaml配置:
server: port: xxx servlet: context-path: /xxxspring: #redis配置 redis: # Redis数据库索引(默认为0) database: 1 # 连接地址 host: 127.0.0.1 #端口号 port: 6389 ##连接超时时间 timeout: 3600ms #密码 password: lettuce: pool: # 连接池最大连接数(使用负值表示没有限制) max-active: 8 # 连接池最大阻塞等待时间(使用负值表示没有限制) max-wait: -1ms # 连接池中的最大空闲连接 max-idle: 8 # 连接池中的最小空闲连接 min-idle: 1 #关闭超时 shutdown-timeout: 500ms
配置redisConfig:
@Configurationpublic class RedisTemplateConfig { @Bean("redisTemplate") public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory) { RedisTemplate<String, Object> template = new RedisTemplate<String, Object>(); // 设置连接工厂 template.setConnectionFactory(connectionFactory); // 设置序列化方式 StringRedisSerializer stringRedisSerializer = new StringRedisSerializer(); // key序列化 template.setKeySerializer(stringRedisSerializer); // value序列化 template.setValueSerializer(getJackson2JsonRedisSerializer()); // Hash key序列化 template.setHashKeySerializer(stringRedisSerializer); // Hash value序列化 template.setHashValueSerializer(getJackson2JsonRedisSerializer()); template.afterPropertiesSet(); return template; } /** * 缓存管理器 * * @param redisConnectionFactory * @return */ @Bean public CacheManager cacheManager(RedisConnectionFactory redisConnectionFactory) { RedisCacheConfiguration defaultCacheConfig = RedisCacheConfiguration.defaultCacheConfig(); // 设置缓存管理器管理 defaultCacheConfig = defaultCacheConfig // 缓存的默认过期时间: .entryTtl(Duration.ofSeconds(36000)) // 设置 key为string序列化 .serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(new StringRedisSerializer())) // 设置value为json序列化 .serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(getJackson2JsonRedisSerializer())) // 不缓存空值 .disableCachingNullValues(); Set<String> cacheNames = new HashSet<>(); // 对每个缓存空间应用不同的配置 Map<String, RedisCacheConfiguration> configMap = new HashMap<>(8); RedisCacheManager cacheManager = RedisCacheManager.builder(redisConnectionFactory) .cacheDefaults(defaultCacheConfig) .initialCacheNames(cacheNames) .withInitialCacheConfigurations(configMap) .build(); return cacheManager; } /** * 获取Jackson2JsonRedisSerializer序列化对象 * * @return o */ private Jackson2JsonRedisSerializer<Object> getJackson2JsonRedisSerializer() { /* 明文存取 */ Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class); ObjectMapper om = new ObjectMapper(); //指定要序列化的域,field,get和set,以及修饰符范围,ANY是都有包括private和public om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); jackson2JsonRedisSerializer.setObjectMapper(om); return jackson2JsonRedisSerializer; }}
2、订阅/发布模式
生产者通过convertAndSend
方法发送消息,
消费者者需要配置Receiver
、Listener
和Adapter
对相应的频道进行监听,有消息时就会接收处理。
优点:
- 可以实现广播模式,一个消息可以发布到多个消费者。
- 多频道订阅,一个消费者可以同时订阅多个频道。
缺点:
- 消息必须及时消费,不能做消息储存。
2.1、消费者代码
我们测试代码,设置两个频道:sms
和wx
启动类开启异步(@EnableAsync):
@EnableAsync@SpringBootApplicationpublic class RedisConsumerApplication { public static void main(String[] args) { SpringApplication.run(RedisConsumerApplication.class, args); }}
消息接收配置:
配置消息异步接收器 (RedisQueueReceiver):
@Component@Slf4jpublic class RedisQueueReceiver { /** * 接收wx消息,开启异步监听 * * @param message */ @Async public void wxMessage(String message) {log.info("消息接收:wx,消息内容为:" + message); } /** * 接收sms消息,开启异步监听 * * @param message */ @Async public void smsMessage(String message) {log.info("消息接收:sms,消息内容为:" + message); }}
配置sms类型消息监听器(RedisSmsQueueListener):
@Configurationpublic class RedisSmsQueueListener { /** * 系统消息适配器,如果有多个监听器使用改adapter,可以将adapter进行独立出来 * * @param receiver * @return */ @Bean(name = "smsAdapter") public MessageListenerAdapter adapter(RedisQueueReceiver receiver) {//指定RedisQueueReceiver类中回调接收消息的方法,监听器收到的所有内容,>RedisQueueReceiver的wxMessage()方法中进行处理MessageListenerAdapter adapter = new MessageListenerAdapter(receiver, "smsMessage");return adapter; } /** * 构建redis消息监听器容器 * * @param connectionFactory * @param smsAdapter 绑定指定的adapter * @return */ @Bean("smsContainer") public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter smsAdapter) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);// 一个频道可以有多个监听方法,多个频道可以同时被一个方法监听// new PatternTopic()中的值与监听的生产者的channel一致container.addMessageListener(smsAdapter, new PatternTopic("sms"));return container; }}
配置wx类型消息监听器(RedisWxQueueListener):
@Configurationpublic class RedisWxQueueListener { /** * 系统消息适配器,如果有多个监听器使用改adapter,可以将adapter进行独立出来 * * @param receiver * @return */ @Bean(name = "wxAdapter") public MessageListenerAdapter adapter(RedisQueueReceiver receiver) {//指定RedisQueueReceiver类中回调接收消息的方法,监听器收到的所有内容,>RedisQueueReceiver的wxMessage()方法中进行处理MessageListenerAdapter adapter = new MessageListenerAdapter(receiver, "wxMessage");//adapter.afterPropertiesSet();return adapter; } /** * 构建redis消息监听器容器 * * @param connectionFactory * @param wxAdapter 绑定指定的adapter * @return */ @Bean("wxContainer") public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter wxAdapter) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);// new PatternTopic()中的值与监听的生产者的channel一致// 一个频道可以有多个监听方法,多个频道可以同时被一个方法监听,比如下列情况:container.addMessageListener(wxAdapter, new PatternTopic("wx1"));container.addMessageListener(wxAdapter, new PatternTopic("wx2"));return container; }}
2.2、生产者代码
注意:生产者的中channel与监听器中的new PatternTopic("xxx")设置的xxx一致
@RestController@RequestMapping@Slf4jpublic class ProducerController { @Resource private RedisTemplate redisTemplate; /** * 发送异步队列消息 —— 发布订阅模式 * 类型为sms,频道为sms,要与消费者监听器中的new PatternTopic("xxx")设置的xxx一致 * * @return */ @GetMapping("/sms") public Object sms() { String channel = "sms"; String msg = "这里是 SMS"; log.info("频道:{}发送内容:{}", channel, msg); // 生成者发送消息 redisTemplate.convertAndSend(channel, msg); return "成功"; } /** * 发送异步队列消息 —— 发布订阅模式 * 类型为wx,频道为wx1,要与消费者监听器中的new PatternTopic("xxx")设置的xxx一致 * * @return */ @GetMapping("/wx1") public Object wx1() { String channel = "wx1"; String msg = "这里是 WX1"; log.info("频道:{}发送内容:{}", channel, msg); redisTemplate.convertAndSend(channel, msg); return "成功"; } /** * 发送异步队列消息 —— 发布订阅模式 * 类型为wx,频道为wx2,要与消费者监听器中的new PatternTopic("xxx")设置的xxx一致 * * @return */ @GetMapping("/wx2") public Object wx2() { String channel = "wx2"; String msg = "这里是 WX2"; log.info("频道:{}发送内容:{}", channel, msg); redisTemplate.convertAndSend(channel, msg); return "成功"; }}
2.3、测试
通过浏览器依次访问:sms
、wx1
、wx2
这三个接口,观察打印台:
3、LPUSH+BRPOP模式
优点:
- 可以做消息储存,直到消息被消费。
缺点: - 一个消息只能一个消费者被消费一次。
3.1、消费者代码
消费者创建Listener
,内部通过线程进行监听。
@Component@Slf4jpublic class RedisPushPopListener { @Resource private RedisTemplate redisTemplate; /** * 启动时开启一个线程来消费消息,实际开发中可以实现线程池来进行处理 */ @PostConstruct public void init() { new Thread(() -> { while (true) { // 与生产者中的channel 需要一致 String channel = "push-pop"; Object pop = redisTemplate.opsForList().rightPop(channel, 1, TimeUnit.SECONDS); if (pop != null) { log.info("频道为:{},消息内容为:{}", channel, pop); } } }).start(); }}
3.2、生产者代码
@RestController@RequestMapping@Slf4jpublic class ProducerController { @Resource private RedisTemplate redisTemplate; /** * 发送异步队列消息 —— LPUSH+RPOP 模式 * 频道为push-pop * * @return */ @GetMapping("/push/pop") public Object pushPop() { String channel = "push-pop"; String msg = "这里是 push-pop"; log.info("频道:{}发送内容:{}", channel, msg); redisTemplate.opsForList().leftPush(channel, msg); // Object o = redisTemplate.opsForList().rightPop(channel); // log.info("接收:" + o); return "成功"; }}
3.3、测试
通过浏览器访问:/push/pop
接口,观察打印台:
补充:
LPUSH+BRPOP模式
相比于订阅/发布模式
,可以实现消息的储存,我们先启动生产者发生消息,然后再启动消费者,此时消费者才会进行消息的消费。而订阅/发布模式
是必须生产者和消费者都在线。