> 文档中心 > Redis实现消息队列(订阅/发布模式 、LPUSH+BRPOP)

Redis实现消息队列(订阅/发布模式 、LPUSH+BRPOP)

目录

  • 1、生产者+消费者工程搭建
  • 2、订阅/发布模式
    • 2.1、消费者代码
    • 2.2、生产者代码
    • 2.3、测试
  • 3、LPUSH+BRPOP模式
    • 3.1、消费者代码
    • 3.2、生产者代码
    • 3.3、测试

1、生产者+消费者工程搭建

创建两个SpringBoot工程,名称叫做producerconsumer,并且都引入相应的pom、配置yaml文件、配置redisConfig。

pom:

<dependency>     <groupId>org.springframework.boot</groupId>     <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency>     <groupId>org.apache.commons</groupId>     <artifactId>commons-pool2</artifactId> </dependency>

yaml配置:

server:  port: xxx  servlet:    context-path: /xxxspring:  #redis配置  redis:    # Redis数据库索引(默认为0)    database: 1    # 连接地址    host: 127.0.0.1    #端口号    port: 6389    ##连接超时时间    timeout: 3600ms    #密码    password:    lettuce:      pool: # 连接池最大连接数(使用负值表示没有限制) max-active: 8 # 连接池最大阻塞等待时间(使用负值表示没有限制) max-wait: -1ms # 连接池中的最大空闲连接 max-idle: 8 # 连接池中的最小空闲连接 min-idle: 1      #关闭超时      shutdown-timeout: 500ms

配置redisConfig:

@Configurationpublic class RedisTemplateConfig {    @Bean("redisTemplate")    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory) { RedisTemplate<String, Object> template = new RedisTemplate<String, Object>(); // 设置连接工厂 template.setConnectionFactory(connectionFactory); // 设置序列化方式 StringRedisSerializer stringRedisSerializer = new StringRedisSerializer(); // key序列化 template.setKeySerializer(stringRedisSerializer); // value序列化 template.setValueSerializer(getJackson2JsonRedisSerializer()); // Hash key序列化 template.setHashKeySerializer(stringRedisSerializer); // Hash value序列化 template.setHashValueSerializer(getJackson2JsonRedisSerializer()); template.afterPropertiesSet(); return template;    }    /**     * 缓存管理器     *     * @param redisConnectionFactory     * @return     */    @Bean    public CacheManager cacheManager(RedisConnectionFactory redisConnectionFactory) { RedisCacheConfiguration defaultCacheConfig = RedisCacheConfiguration.defaultCacheConfig(); // 设置缓存管理器管理 defaultCacheConfig = defaultCacheConfig  // 缓存的默认过期时间:  .entryTtl(Duration.ofSeconds(36000))  // 设置 key为string序列化  .serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(new StringRedisSerializer()))  // 设置value为json序列化  .serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(getJackson2JsonRedisSerializer()))  // 不缓存空值  .disableCachingNullValues(); Set<String> cacheNames = new HashSet<>(); // 对每个缓存空间应用不同的配置 Map<String, RedisCacheConfiguration> configMap = new HashMap<>(8); RedisCacheManager cacheManager = RedisCacheManager.builder(redisConnectionFactory)  .cacheDefaults(defaultCacheConfig)  .initialCacheNames(cacheNames)  .withInitialCacheConfigurations(configMap)  .build(); return cacheManager;    }    /**     * 获取Jackson2JsonRedisSerializer序列化对象     *     * @return o     */    private Jackson2JsonRedisSerializer<Object> getJackson2JsonRedisSerializer() { /* 明文存取 */ Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class); ObjectMapper om = new ObjectMapper(); //指定要序列化的域,field,get和set,以及修饰符范围,ANY是都有包括private和public om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); jackson2JsonRedisSerializer.setObjectMapper(om); return jackson2JsonRedisSerializer;    }}

2、订阅/发布模式

生产者通过convertAndSend方法发送消息

消费者者需要配置ReceiverListenerAdapter对相应的频道进行监听,有消息时就会接收处理。

优点:

  • 可以实现广播模式,一个消息可以发布到多个消费者。
  • 多频道订阅,一个消费者可以同时订阅多个频道。

缺点:

  • 消息必须及时消费,不能做消息储存。

2.1、消费者代码

我们测试代码,设置两个频道:smswx

启动类开启异步(@EnableAsync):

@EnableAsync@SpringBootApplicationpublic class RedisConsumerApplication {    public static void main(String[] args) { SpringApplication.run(RedisConsumerApplication.class, args);    }}

消息接收配置:

配置消息异步接收器 (RedisQueueReceiver):

@Component@Slf4jpublic class RedisQueueReceiver {   /**    * 接收wx消息,开启异步监听    *    * @param message    */   @Async   public void wxMessage(String message) {log.info("消息接收:wx,消息内容为:" + message);   }   /**    * 接收sms消息,开启异步监听    *    * @param message    */   @Async   public void smsMessage(String message) {log.info("消息接收:sms,消息内容为:" + message);   }}

配置sms类型消息监听器(RedisSmsQueueListener):

@Configurationpublic class RedisSmsQueueListener {   /**    * 系统消息适配器,如果有多个监听器使用改adapter,可以将adapter进行独立出来    *    * @param receiver    * @return    */   @Bean(name = "smsAdapter")   public MessageListenerAdapter adapter(RedisQueueReceiver receiver) {//指定RedisQueueReceiver类中回调接收消息的方法,监听器收到的所有内容,>RedisQueueReceiver的wxMessage()方法中进行处理MessageListenerAdapter adapter = new MessageListenerAdapter(receiver, "smsMessage");return adapter;   }   /**    * 构建redis消息监听器容器    *    * @param connectionFactory    * @param smsAdapter 绑定指定的adapter    * @return    */   @Bean("smsContainer")   public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter smsAdapter) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);// 一个频道可以有多个监听方法,多个频道可以同时被一个方法监听//  new PatternTopic()中的值与监听的生产者的channel一致container.addMessageListener(smsAdapter, new PatternTopic("sms"));return container;   }}

配置wx类型消息监听器(RedisWxQueueListener):

@Configurationpublic class RedisWxQueueListener {   /**    * 系统消息适配器,如果有多个监听器使用改adapter,可以将adapter进行独立出来    *    * @param receiver    * @return    */   @Bean(name = "wxAdapter")   public MessageListenerAdapter adapter(RedisQueueReceiver receiver) {//指定RedisQueueReceiver类中回调接收消息的方法,监听器收到的所有内容,>RedisQueueReceiver的wxMessage()方法中进行处理MessageListenerAdapter adapter = new MessageListenerAdapter(receiver, "wxMessage");//adapter.afterPropertiesSet();return adapter;   }   /**    * 构建redis消息监听器容器    *    * @param connectionFactory    * @param wxAdapter  绑定指定的adapter    * @return    */   @Bean("wxContainer")   public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter wxAdapter) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);//  new PatternTopic()中的值与监听的生产者的channel一致// 一个频道可以有多个监听方法,多个频道可以同时被一个方法监听,比如下列情况:container.addMessageListener(wxAdapter, new PatternTopic("wx1"));container.addMessageListener(wxAdapter, new PatternTopic("wx2"));return container;   }}

2.2、生产者代码

注意:生产者的中channel与监听器中的new PatternTopic("xxx")设置的xxx一致

@RestController@RequestMapping@Slf4jpublic class ProducerController {    @Resource    private RedisTemplate redisTemplate;    /**     * 发送异步队列消息 —— 发布订阅模式     * 类型为sms,频道为sms,要与消费者监听器中的new PatternTopic("xxx")设置的xxx一致     *     * @return     */    @GetMapping("/sms")    public Object sms() { String channel = "sms"; String msg = "这里是 SMS"; log.info("频道:{}发送内容:{}", channel, msg); // 生成者发送消息 redisTemplate.convertAndSend(channel, msg); return "成功";    }    /**     * 发送异步队列消息 —— 发布订阅模式     * 类型为wx,频道为wx1,要与消费者监听器中的new PatternTopic("xxx")设置的xxx一致     *     * @return     */    @GetMapping("/wx1")    public Object wx1() { String channel = "wx1"; String msg = "这里是 WX1"; log.info("频道:{}发送内容:{}", channel, msg); redisTemplate.convertAndSend(channel, msg); return "成功";    }    /**     * 发送异步队列消息 —— 发布订阅模式     * 类型为wx,频道为wx2,要与消费者监听器中的new PatternTopic("xxx")设置的xxx一致     *     * @return     */    @GetMapping("/wx2")    public Object wx2() { String channel = "wx2"; String msg = "这里是 WX2"; log.info("频道:{}发送内容:{}", channel, msg); redisTemplate.convertAndSend(channel, msg); return "成功";    }}

2.3、测试

通过浏览器依次访问:smswx1wx2这三个接口,观察打印台:
在这里插入图片描述

3、LPUSH+BRPOP模式

优点:

  • 可以做消息储存,直到消息被消费。
    缺点:
  • 一个消息只能一个消费者被消费一次。

3.1、消费者代码

消费者创建Listener,内部通过线程进行监听。

@Component@Slf4jpublic class RedisPushPopListener {    @Resource    private RedisTemplate redisTemplate;    /**     * 启动时开启一个线程来消费消息,实际开发中可以实现线程池来进行处理     */    @PostConstruct    public void init() { new Thread(() -> {     while (true) {     // 与生产者中的channel 需要一致  String channel = "push-pop";  Object pop = redisTemplate.opsForList().rightPop(channel, 1, TimeUnit.SECONDS);  if (pop != null) {      log.info("频道为:{},消息内容为:{}", channel, pop);  }     } }).start();    }}

3.2、生产者代码

@RestController@RequestMapping@Slf4jpublic class ProducerController {    @Resource    private RedisTemplate redisTemplate;    /**     * 发送异步队列消息 —— LPUSH+RPOP 模式     * 频道为push-pop     *     * @return     */    @GetMapping("/push/pop")    public Object pushPop() { String channel = "push-pop"; String msg = "这里是 push-pop"; log.info("频道:{}发送内容:{}", channel, msg); redisTemplate.opsForList().leftPush(channel, msg); // Object o = redisTemplate.opsForList().rightPop(channel); // log.info("接收:" + o); return "成功";    }}

3.3、测试

通过浏览器访问:/push/pop接口,观察打印台:
在这里插入图片描述

补充:LPUSH+BRPOP模式相比于订阅/发布模式,可以实现消息的储存,我们先启动生产者发生消息,然后再启动消费者,此时消费者才会进行消息的消费。而订阅/发布模式是必须生产者和消费者都在线。

狗狗宠物资料大全