SpringBoot Redis 发布订阅模式(Pub/Sub)的具体使用

 更新时间:2021年12月20日 15:20:30   作者:实习小生  
本文主要介绍了SpringBoot Redis 发布订阅模式(Pub/Sub)的具体使用,文中通过示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

注意:redis的发布订阅模式不可以将消息进行持久化,订阅者发生网络断开、宕机等可能导致错过消息。

Redis命令行下使用发布订阅

publish 发布

发布者通过以下命令可以往指定channel发布message

redis> publish channel message

subscribe 订阅

订阅者通过以下命令可以订阅一个或多个频道,如果频道不存在则会创建

redis> subscribe channel [channel ...]

对于redis的发布订阅的命令就这么简单。那么接下来我们在springboot中如何使用发布订阅的功能呢?

SpringBoot中使用Redis的发布订阅功能

添加依赖配置redis信息和连接池什么的就不说了,如果添加的有commons-pool2依赖的话,会自动帮我们配置redis连接池的

发布者

相对于订阅者来说,发布者的实现方式很简单,以下方式就可以往channel中发送message了。

    @Resource
    private RedisTemplate<String, Object> redisTemplate;

    public void publish(){
    // 使用高级的redisTemplate
    redisTemplate.convertAndSend("channel","message");
    
    // 使用低级的connection 实际上redisTemplate的底层就是使用的下面的方式
    redisTemplate.execute(new RedisCallback<Object>() {
          @Override
          public Object doInRedis(RedisConnection connection) throws DataAccessException {
              connection.publish("channel".getBytes(StandardCharsets.UTF_8), "message".getBytes(StandardCharsets.UTF_8));
              return null;
         }
     }, true);
     // true这个参数意思是 是否将redis连接暴露给回调代码,大多数情况下设置true就可以了,往后深入的话可以看到
     RedisConnection connToExpose = (exposeConnection ? connToUse : createRedisConnectionProxy(connToUse)); 如果为false的话会创建redis连接的代理
}

订阅者

订阅者因为涉及到连接、线程等 所以内容相对会多一点

    @Resource
    private RedisTemplate<String, Object> redisTemplate;

    public void subscribe() {
        redisTemplate.execute(new RedisCallback<Object>() {
            @Override
            public Object doInRedis(RedisConnection connection) throws DataAccessException {
                // 我定义了一个全局的 ConcurrentHashMap 用来存放连接 因为后面的取消订阅的线程要和订阅的线程用同一个连接
                map.put("connection",connection);

                // subscribe 按频道订阅 该方法会阻塞该线程 只有取消订阅才会释放该线程
                connection.subscribe(new MessageListener() {
                    @Override
                    public void onMessage(Message message, byte[] pattern) {
                        log.info("接收到消息");
                        System.out.println(new String(message.getBody()));
                    }
                }, "channelOne".getBytes(StandardCharsets.UTF_8), "channelTwo".getBytes(StandardCharsets.UTF_8));

                // 按模式订阅 pSubscribe 只有取消订阅才会释放该线程
//                connection.pSubscribe(new MessageListener() {
//                    @Override
//                    public void onMessage(Message message, byte[] pattern) {
//                        System.out.println(new String(message.getBody()));
//                    }
//                }, "patternOne".getBytes(StandardCharsets.UTF_8), "patternOne".getBytes(StandardCharsets.UTF_8));
                return null;
            }
        }, true);
    }

如何取消订阅呢?从刚才的map里取到连接

    RedisConnection the = map.get("connection");
    Subscription subscription = the.getSubscription();
    subscription.unsubscribe();

消息监听容器

上面的那种订阅为低级订阅,由于连接在调用subscribe的时候会导致当前线程阻塞,这种方式需要对每个监听器连接和线程管理,所以spring提供了RedisMessageListenerContainer类来帮我们完成这些工作。

RedisMessageListenerContainer顾名思义可以知道它是一个消息监听容器
详情请参考官方文档

如何实现

@Configuration
public class DefaultMessageListenerContainerConfig {

    @Bean
    public RedisMessageListenerContainer container(RedisConnectionFactory factory) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(factory);
        // 官方推荐我们使用自定义的线程池或者使用TaskExecutor
        container.setTaskExecutor(executor());
        container.addMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message, byte[] pattern) {
                System.out.println(Thread.currentThread().getName() + ": " + new String(message.getBody()));
            }
        }, new ChannelTopic("message"));
        return container;
    }

    @Bean
    public TaskExecutor executor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(Runtime.getRuntime().availableProcessors());
        executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 2);
        executor.setQueueCapacity(100);
        executor.initialize();
        return executor;
    }
}

这个时候我们在redis命令行内使用 publish channel message 的时候,我们的spring程序就可以订阅到消息了。

再说下 MessageListenerAdapter
我们可以通过 MessageListenerAdapter 消息接收者包装进去,消息接收者不会和redis有任何耦合。
官方文档给了spring传统的xml的方式配置的,下面我给出基于configuration配置的代码

public interface MessageDelegate {
    void handleMessage(String message);
}

public class DefaultMessageDelegate implements MessageDelegate {
    @Override
    public void handleMessage(String message) {
        System.out.println(message);
    }
}

@Configuration
public class MessageListenerContainerConfig {

    @Autowired
    private DefaultMessageDelegate defaultMessageDelegate;

    @Bean
    public RedisMessageListenerContainer container(RedisConnectionFactory factory,
                                                   MessageListenerAdapter messageListenerAdapter) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(factory);
        container.setTaskExecutor(executor());

        Map<MessageListenerAdapter, Collection<? extends Topic>> map = new HashMap<>();
        List<ChannelTopic> channelTopics = new ArrayList<>();
        ChannelTopic channelTopic = new ChannelTopic("message");
        channelTopics.add(channelTopic);
        map.put(messageListenerAdapter, channelTopics);
        container.setMessageListeners(map);

        return container;
    }

    @Bean
    public TaskExecutor executor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(Runtime.getRuntime().availableProcessors());
        executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 2);
        executor.setQueueCapacity(100);
        executor.initialize();
        return executor;
    }

    @Bean
    public MessageListenerAdapter messageListenerAdapter() {
        // handleMessage 参数消息来的时候要调用的方法 默认是 handleMessage
        return new MessageListenerAdapter(defaultMessageDelegate, "handleMessage");
    }
}

如果我们要在程序运行时添加订阅或者取消订阅的时候该怎么办呢?
我们需要提前准备好消息侦听器,添加的时候把侦听器注入到消息容器
取消的时候就调用消息容器的remove方法把侦听器删除掉即可。

到此这篇关于SpringBoot Redis 发布订阅模式(Pub/Sub)的具体使用的文章就介绍到这了,更多相关SpringBoot Redis发布订阅模式内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:

相关文章

  • Java通过SSM完成水果商城批发平台流程

    Java通过SSM完成水果商城批发平台流程

    这是一个使用了java+SSM开发的网上水果商城批发平台,是一个实战小练习,具有水果商城批发该有的所有功能,感兴趣的朋友快来看看吧
    2022-06-06
  • Spring Boot应用启动时自动执行代码的五种方式(常见方法)

    Spring Boot应用启动时自动执行代码的五种方式(常见方法)

    Spring Boot为开发者提供了多种方式在应用启动时执行自定义代码,这些方式包括注解、接口实现和事件监听器,本文我们将探讨一些常见的方法,以及如何利用它们在应用启动时执行初始化逻辑,感兴趣的朋友一起看看吧
    2024-04-04
  • ArrayList的自动扩充机制实例解析

    ArrayList的自动扩充机制实例解析

    本文主要介绍了ArrayList的自动扩充机制,由一个题目切入主题,逐步向大家展示了ArrayList的相关内容,具有一定参考价值,需要的朋友可以了解下。
    2017-10-10
  • 聊聊Java的switch为什么不支持long

    聊聊Java的switch为什么不支持long

    这篇文章主要介绍了Java的switch为什么不支持long,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-10-10
  • springBoot Junit测试用例出现@Autowired不生效的解决

    springBoot Junit测试用例出现@Autowired不生效的解决

    这篇文章主要介绍了springBoot Junit测试用例出现@Autowired不生效的解决,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-09-09
  • Spring中@Autowired和@Resource注解相同点和不同点

    Spring中@Autowired和@Resource注解相同点和不同点

    这篇文章主要介绍了Spring中@Autowired和@Resource注解相同点和不同点,本文通过示例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧
    2024-01-01
  • 你肯定能看懂的Java IO相关知识总结

    你肯定能看懂的Java IO相关知识总结

    群里有大佬说想让我写一篇NIO,一直也没写,但是和同事聊天也说对Java的IO不是很清晰,因此今天就写下Java的io,先打个基础,下次写NIO,需要的朋友可以参考下
    2021-05-05
  • MyBatis-Plus 查询指定字段的实现

    MyBatis-Plus 查询指定字段的实现

    这篇文章主要介绍了MyBatis-Plus 查询指定字段的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-12-12
  • 浅谈java String不可变的好处

    浅谈java String不可变的好处

    这篇文章主要介绍了java String不可变的好处,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2019-03-03
  • java实现简单的爬虫之今日头条

    java实现简单的爬虫之今日头条

    最近在学习搜索方面的东西,需要了解网络爬虫方面的知识,虽然有很多开源的强大的爬虫,但本着学习的态度,想到之前在做资讯站的时候需要用到爬虫来获取一些文章,今天刚好有空就研究了一下.在网上看到了一个demo,使用的是Jsoup,我拿过来修改了一下,有需要的朋友可以参考
    2016-11-11

最新评论