redis之基于SpringBoot实现Redis stream实时流事件处理方式

 更新时间:2023年06月27日 15:50:47   作者:RachelHwang  
这篇文章主要介绍了redis之基于SpringBoot实现Redis stream实时流事件处理方式,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教

1、redis stream简介

Redis Stream 是 Redis 5.0 版本新增加的数据结构。Redis Stream 主要用于消息队列(MQ,Message Queue),Redis 本身是有一个 Redis 发布订阅 (pub/sub) 来实现消息队列的功能,但它有个缺点就是消息无法持久化,如果出现网络断开、Redis 宕机等,消息就会被丢弃。

简单来说发布订阅 (pub/sub) 可以分发消息,但无法记录历史消息。

Redis5.0中发布的Stream类型,也用来实现典型的消息队列。

提供了消息的持久化和主备复制功能,可以让任何客户端访问任何时刻的数据,并且能记住每一个客户端的访问位置,还能保证消息不丢失。

该Stream类型的出现,几乎满足了消息队列具备的全部内容,包括但不限于:

  • 消息ID的序列化生成
  • 消息遍历
  • 消息的阻塞和非阻塞读取
  • 消息的分组消费
  • 未完成消息的处理
  • 消息队列监控

Redis Stream 的结构如下所示,它有一个消息链表,将所有加入的消息都串起来,每个消息都有一个唯一的 ID 和对应的内容:

在某些特定场景可以使用redis的stream代替kafka等消息队列,减少系统复杂性,增强系统的稳定性

在这里插入图片描述

每个 Stream 都有唯一的名称,它就是 Redis 的 key,在我们首次使用 xadd 指令追加消息时自动创建。

上图解析:

  • Consumer Group :消费组,使用 XGROUP CREATE 命令创建,一个消费组有多个消费者(Consumer)。
  • last_delivered_id :游标,每个消费组会有个游标 last_delivered_id,任意一个消费者读取了消息都会使游标 last_delivered_id 往前移动。
  • pending_ids :消费者(Consumer)的状态变量,作用是维护消费者的未确认的 id。 pending_ids 记录了当前已经被客户端读取的消息,但是还没有 ack (Acknowledge character:确认字符)。

2、redis stream基础命令

添加:XADD

命令格式:XADD stream_name id key-value [key-value …]

127.0.0.1:6379> XADD mytopic * acctid 012 age 1
1527837352024-0

查看队列长度:xlen

命令格式:xlen xxx

127.0.0.1:6379> xlen mytopic
(integer) 1
127.0.0.1:6379> 

获取数据:xrange xrevrange

1.xrange 命令格式:

xrange mytopic - +
xrange mytopic 生成的ID + count 2

2.xrevrange命令格式:

xrevrange mytopic + 1527837440632 count 3

该命令的意思为:反向查询ID以无限大为开始,以1527837440632为结束的entry,但只取出查询结果集(降序排列)中的前三个entry;

获取数据:xread

1.非阻塞

从stream 中拿ID比0大的4个Entry,按升序排列

xread count 4 streams mytopic 0

2.阻塞

监听name为mystream的stream,从stream中拿比ID比"$"(特殊ID:该stream中此刻最大ID)还大的Entry

xread block 0 streams mystream $ 

block 0:block表示命令要阻塞,0表示阻塞时间为无限大,不超时,如果设置为>0的整数,即为阻塞超时时间

监听生效后,拿到数据监听就失效,与zk的watcher雷同。意思是该命令执行后,只能拿到一条ID比设置ID更大的entry,要想继续拿,必须执行xread命令,官方推荐下一次拿entry使用上一次得到的ID。注意千万别乱设置很大的ID ,否则你可能永远拿不到entry。

xread block 0 streams mystream mytopic $ $

收到任何一个stream的消息,本次监听就失效,只能拿到一条数据,后面还需要拿数据,可以将各自stream拿到的ID作为最大ID,重新执行命令

消费者组:Consumer groups

redis5引入了消费者组的概念,一个stream的数据每一个消费者组都发一份,消费者组里面的消费者竞争同一份数据,亦即在同一个消费者组内,一个消息是不可能发给多个消费者的:

在这里插入图片描述

消费者组提供了如下5点保障

  • 组内消费者消费的消息不重复
  • 组内消费者名称必须唯一
  • 消费者拿到的消息肯定是没有被组内其他消费者消费过的消息
  • 消费者成功消费消息之后要求发送ACK,然后这条消息才会从消费者组中移除,也就是说消息至少被消费一次,和kafka一样
  • 消费者组会跟踪所有待处理的消息

基础命令

1.创建消费者组

xgroup create mytopic mygroup $

该命令的意思是:使用xgroup命令创建了一个mygroup消费者组,该消费者组与mytopic stream进行了关联,以后mygroup消费者组中的消费者就会mytopic stream中拿数据;

符号" $ "代表mytopic stream中目前最大的ID,消费者拿到的entry的id一定会大于此刻$代表的最大ID。你也可以指定这个最大的ID,比如0;

2.从消费者组读数据

使用xreadgroup命令让消费者consumer_a从mygroup消费者组的mytopic stream中拿最新的,并且没有被发送给其他消费者处理的entry:

xreadgroup group mygroup consumer_a count 1 streams mytopic >

参数:

  • group:该参数是必选项
  • “>”:该符号只有在消费者组命令xreadgroup中有效,意思为mytopic stream中最新且没有被其他消费者处理的ID,千万记住不要与上面"$"最大ID搞混了,否则拿出来的数据与你的期望值不符,如果使用的是任何数组ID,那么该消费者就无法拿到任何新的消息,只是从它的已经处理过的消息中拿,并且不会有ACK机制;

如果想一个消费者组关联多个stream可以这样做:

xgroup create mystream mygroup $
xgroup create mytopic mygroup $
xreadgroup group mygroup consumer_a block 0 count 1 streams mytopic mystream > >

读消息的参数多了一个block 0,就是说读数据需要阻塞。

3.发送ACK

将指定ID对应的entry从consumer的已处理消息列表中删除

XACK mystream mygroup 1527864992409-0

3、结合Spring Boot进行redis实时流处理

样例应用:

在这里插入图片描述

项目依赖:

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-data-redis-reactive</artifactId>
</dependency>

3.1 生产者

理想情况下,生产者和消费者将是两个不同的微服务/应用程序。

在这里,我们把消费和生产都弄在同一个项目中。

但是,我们基于名为“ app.role ”的自定义属性来控制应用程序的行为,使其像生产者或消费者。

基于该值,将在Spring中创建相应的组件。

@Service
@ConditionalOnProperty(name="app.role", havingValue="producer")
public class PurchaseEventProducer {
    private AtomicInteger atomicInteger = new AtomicInteger(0);
    @Value("${stream.key}")
    private String streamKey;
    @Autowired
    private ProductRepository repository;
    @Autowired
    private ReactiveRedisTemplate<String, String> redisTemplate;
    @Scheduled(fixedRateString= "${publish.rate}")
    public void publishEvent(){
        Product product = this.repository.getRandomProduct();
        ObjectRecord<String, Product> record = StreamRecords.newRecord()
                .ofObject(product)
                .withStreamKey(streamKey);
        this.redisTemplate
                .opsForStream()
                .add(record)
                .subscribe(System.out::println);
        atomicInteger.incrementAndGet();
    }
    @Scheduled(fixedRate = 10000)
    public void showPublishedEventsSoFar(){
        System.out.println(
                "Total Events :: " + atomicInteger.get()
        );
    }
}
  • publishEvent方法定期发布一些随机购买的产品订单。
  • showPublishedEventsSoFar方法仅显示到目前为止已下的订单数。

3.2 消费者

我们的发布者已经准备好。让我们创建一个消费者。

要使用RedisStreams,我们需要实现StreamListener接口。

@Service
@ConditionalOnProperty(name="app.role", havingValue="consumer")
public class PurchaseEventConsumer implements StreamListener<String, ObjectRecord<String, Product>> {
    private AtomicInteger atomicInteger = new AtomicInteger(0);
    @Autowired
    private ReactiveRedisTemplate<String, String> redisTemplate;
    @Override
    @SneakyThrows
    public void onMessage(ObjectRecord<String, Product> record) {
        System.out.println(
                InetAddress.getLocalHost().getHostName() + " - consumed :" +
                record.getValue()
        );
        this.redisTemplate
                .opsForZSet()
                .incrementScore("revenue", record.getValue().getCategory().toString(), record.getValue().getPrice())
                .subscribe();
        atomicInteger.incrementAndGet();
    }
    @Scheduled(fixedRate = 10000)
    public void showPublishedEventsSoFar(){
        System.out.println(
                "Total Consumed :: " + atomicInteger.get()
        );
    }
}

在消费者端,我们只简单地显示消费记录情况。

然后,我们获得支付价格并将其添加到redis排序集中。

像发布者一样,我们会定期显示此使用者消费到的事件数。

3.3 Redis流配置

创建使用者后,我们需要通过将上述使用者添加到StreamMessageListenerContainer实例中来创建订阅。

@Configuration
@ConditionalOnProperty(name="app.role", havingValue="consumer")
public class RedisStreamConfig {
    @Value("${stream.key}")
    private String streamKey;
    @Autowired
    private StreamListener<String, ObjectRecord<String, Product>> streamListener;
    @Bean
    public Subscription subscription(RedisConnectionFactory redisConnectionFactory) throws UnknownHostException {
        var options = StreamMessageListenerContainer
                            .StreamMessageListenerContainerOptions
                            .builder()
                            .pollTimeout(Duration.ofSeconds(1))
                            .targetType(Product.class)
                            .build();
        var listenerContainer = StreamMessageListenerContainer
                                    .create(redisConnectionFactory, options);
        var subscription = listenerContainer.receiveAutoAck(
                Consumer.from(streamKey, InetAddress.getLocalHost().getHostName()),
                StreamOffset.create(streamKey, ReadOffset.lastConsumed()),
                streamListener);
        listenerContainer.start();
        return subscription;
    }
}

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

相关文章

  • java中常用XML解析器的使用

    java中常用XML解析器的使用

    这篇文章主要介绍了java中常用XML解析器的使用的相关资料,需要的朋友可以参考下
    2023-02-02
  • springboot项目idea热部署的教程详解

    springboot项目idea热部署的教程详解

    这篇文章主要介绍了springboot项目idea热部署,本文通过图文并茂的形式给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-08-08
  • 浅谈Mybatis版本升级踩坑及背后原理分析

    浅谈Mybatis版本升级踩坑及背后原理分析

    这篇文章主要介绍了浅谈Mybatis版本升级踩坑及背后原理分析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-05-05
  • SpringBoot集成elasticsearch使用图文详解

    SpringBoot集成elasticsearch使用图文详解

    Spring Boot集成Elasticsearch其实非常简单,这篇文章主要给大家介绍了关于SpringBoot集成elasticsearch使用的相关资料,文中通过实例代码介绍的非常详细,需要的朋友可以参考下
    2023-04-04
  • Java Springboot自动装配原理详解

    Java Springboot自动装配原理详解

    这篇文章主要介绍了详解SpringBoot自动配置原理,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2021-10-10
  • java发送kafka事务消息的实现方法

    java发送kafka事务消息的实现方法

    本文主要介绍了java发送kafka事务消息的实现方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2022-07-07
  • Java中四种引用类型详细介绍

    Java中四种引用类型详细介绍

    这篇文章主要介绍了Java中四种引用类型详细介绍的相关资料,需要的朋友可以参考下
    2017-03-03
  • java并发分段锁实践代码

    java并发分段锁实践代码

    这篇文章主要介绍了java并发分段锁实践代码,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2019-07-07
  • SpringBoot全局处理统一返回类型方式

    SpringBoot全局处理统一返回类型方式

    这篇文章主要介绍了SpringBoot全局处理统一返回类型方式,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2023-07-07
  • Java获得元素属性的注解信息的步骤

    Java获得元素属性的注解信息的步骤

    在Java编程中,注解是一种为代码添加元数据的方式,通过反射机制,我们可以获取元素属性上的注解信息,这个过程对于框架开发和元数据处理非常有用,能够实现更灵活的功能,对java获得元素属性的注解信息相关知识感兴趣的朋友一起看看吧
    2024-09-09

最新评论