Redis简易延时队列的实现示例

 更新时间:2023年12月10日 10:44:39   作者:!chen  
在实际的业务场景中,经常会遇到需要延时处理的业务,本文就来介绍有下Redis简易延时队列的实现示例,具有一定的参考价值,感兴趣的可以了解一下

一、背景

在实际的业务场景中,经常会遇到需要延时处理的业务,比如订单超时未支付,需要取消订单,或者是用户注册后,需要在一段时间内激活账号,否则账号失效等等。这些业务场景都可以通过延时队列来实现。
最近在实际业务当中就遇到了这样的一个场景,需要实现一个延时队列,用来处理订单超时未支付的业务。在网上找了一些资料,发现大部分都是使用了mq来实现,比如rabbitmq,rocketmq等等,但是这些mq都是需要安装的,而且还需要配置,对于此项目来说不想增加额外的依赖,所以就想到了使用redis来实现一个简易的延时队列。

二、实现思路

1. 业务场景

订单超时未支付,需要取消订单,这个业务场景可以分为两个步骤来实现:

  • 用户下单后,将订单信息存入数据库,并将订单信息存入延时队列中,设置延时时间为30分钟。
  • 30分钟后,从延时队列中取出订单信息,判断订单是否已支付,如果未支付,则取消订单。
  • 如果用户在30分钟内支付了订单,则将订单从延时队列中删除。

2. 实现思路

  • 使用redis的zset来实现延时队列,zset的score用来存储订单的超时时间,value用来存储订单信息。
  • 使用redis的set来存储已支付的订单,set中的value为订单id。

三、实现代码

1. 使用了两个注解类分别标记生产者类、生产者方法,消费者方法

/**
 * @program: 
 * @description: redis延时队列生产者类注解,标记生产者类,用来扫描生产者类中的生产者方法,将生产者方法注册到redis延时队列中
 * @author: jiangchengxuan
 * @created: 2023/12/09 10:32
 */
@Component
@Documented
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface RedisMessageQueue {}
/**
 * @program: 
 * @description: 
 * 带有此注解的方法,方法的入参首先会被转换为json字符串,然后存入redis的zset中,score为当前时间+延时时间,value为json字符串
 * 当延时时间到达后,会从redis的zset中取出value,然后将value转换为入参类型,调用此方法,执行业务逻辑
 * 此注解只能标记在方法上,且方法必须为public,且只能有一个参数
 * 此注解标记的方法,必须在redis延时队列生产者类中,否则不会生效
 * @author: jiangchengxuan
 * @created:  2023/12/09 10:37
 */
@Documented
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface RedisMessageQueueMethod {
    String threadName() default "redis消息队列默认线程";
    String queueKey();   // 队列key值
    int threadNum() default 1;      //默认线程数量
    int threadSleepTime() default 500;  //默认线程休眠时间默认500ms
}

2. 生产者类具体实现

/**
 * @program: 
 * @description:  生产者类具体实现
 * @author: jiangchengxuan
 * @created: 2023/12/09 10:44
 */
@Slf4j
@Component
public class DelayQueueWorkerConfig implements InitializingBean {
    private volatile boolean monitorStarted = false;

    private volatile boolean monitorShutDowned = false;

    private ExecutorService executorService;

    // 需要监控的延时队列
    @Autowired
    protected IDelayQueue<String> monitorQueue;

    @Autowired
    private ApplicationContext applicationContext;


    @Override
    public void afterPropertiesSet(){
        //spring工具类,可以获取指定注解的类
        Map<String, Object> allNeedClass = applicationContext.getBeansWithAnnotation(RedisMessageQueue.class);
        for (Map.Entry<String, Object> entry : allNeedClass.entrySet()) {
            Object bean = entry.getValue();
            Method[] methods = bean.getClass().getMethods();
            for (Method method : methods) {
                Annotation[] annotations = method.getDeclaredAnnotations();
                for (Annotation annotation : annotations) {
                    if (annotation instanceof RedisMessageQueueMethod) {
                        RedisMessageQueueMethod queueMethod = (RedisMessageQueueMethod) annotation;
                        //找的需要使用消息队列的方法后,
                        initExecuteQueue(queueMethod, method, bean);
                    }
                    }
                }
            }
        }


    /**
     * 初始化执行造作
     * @param queueAnnotations 注解
     * @param method 方法
     * @param bean 对象
     */
    void initExecuteQueue(RedisMessageQueueMethod queueAnnotations ,Method method,Object bean) {
        String threadName = queueAnnotations.threadName();
        int threadNum = queueAnnotations.threadNum();
        int threadSheepTime = queueAnnotations.threadSleepTime();
        String queueKey = queueAnnotations.queueKey();
        //获取所有消息队列名称
        executorService = Executors.newFixedThreadPool(threadNum);
        for (int i = 0; i < threadNum; i++) {
            final int num = i;
            executorService.execute(() -> {
                Thread.currentThread().setName(threadName + "[" + num + "]");
                //如果没有设置队列queuekey或者已经暂停则不执行
                while (!monitorShutDowned) {
                    String value = null;
                    try {
                        value = monitorQueue.get(queueKey);
                        // 获取数据时进行删除操作,删除成功,则进行处理,业务逻辑处理失败则继续添加回队列但是时间设置最大以达到保存现场的目的,防止并发获取重复数据
                        if (StringUtils.isNotEmpty(value)) {
                            if (log.isDebugEnabled()) {
                                log.debug("Monitor Thread[" + Thread.currentThread().getName() + "], get from queue,value = {}", value);
                            }
                            boolean success = (Boolean) method.invoke(bean, value);
                            // 失败重试
                            if (!success) {
                                success =  (Boolean) method.invoke(bean, value);;
                                if (!success) {
                                    log.warn("Monitor Thread[" + Thread.currentThread().getName() + "] execute Failed,value = {}", value);
                                    monitorQueue.add(TimeUnit.DAYS,365, value, queueKey);
                                }
                            } else {
                                if (log.isDebugEnabled()) {
                                    log.debug("Monitor Thread[" + Thread.currentThread().getName() + "]:execute successfully!values = {}", value);
                                }
                            }
                        } else {
                            if (log.isDebugEnabled()) {
                                log.debug("Monitor Thread[" + Thread.currentThread().getName() + "]:monitorThreadRunning = {}", monitorStarted);
                            }
                            Thread.sleep(threadSheepTime);
                        }
                    } catch (Exception e) {
                        log.error("Monitor Thread[" + Thread.currentThread().getName() + "] execute Failed,value = " + value, e);
                    }
                }
                log.info("Monitor Thread[" + Thread.currentThread().getName() + "] Completed...");
            });
        }
        log.info("thread pool is started...");
    }

}
/**
 * @program: 
 * @description: 
 * 延时队列接口实现类,
 * 使用redis的zset实现延时队列,
 * @author: jiangchengxuan
 * @created:  2023/12/09 23:34
 */
public interface IDelayQueue <E> {
    /**
     * 向延时队列中添加数据
     *
     * @param score 分数
     * @param data  数据
     * @return true 成功 false 失败
     */
    boolean add(long score, E data,String queueKey);


    /**
     * 向延时队列中添加数据
     *
     * @param timeUnit 时间单位
     * @param time     延后时间
     * @param data     数据
     * @param queueKey
     * @return true 成功 false 失败
     */
    boolean add(TimeUnit timeUnit, long time, E data, String queueKey);

    /**
     * 从延时队列中获取数据
     * @param queueKey 队列key
     * @return 数据
     */
    String get(String queueKey);

    /**
     * 删除数据
     *
     * @param key
     * @param data 数据
     * @return
     */
    public<T> boolean rem(String key, T data) ;
}
/**
 * @program: 
 * @description:  redis操作类,封装了redis的操作方法,使用时直接注入即可使用,不需要关心redis的操作细节,使用时只需要关心业务逻辑即可
 * @author: jiangchengxuan
 * @created: 2023/12/09 23:35
 */
@Service
public class RedisDelayQueue implements IDelayQueue<String> {

    @Autowired
    private RedisService redisService;


    @Override
    public boolean add(long score, String data,String queueKey) {
        return redisService.opsForZSet(Constant.DEFAULT_REDIS_QUEUE_KEY_PREFIX+queueKey, data, score);
    }

    @Override
    public boolean add(TimeUnit timeUnit, long time, String data, String queueKey) {
        switch (timeUnit) {
            case SECONDS:
                return add(LocalDateTime.now().plusSeconds(time).atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(), data, queueKey);
            case MINUTES:
                return add(LocalDateTime.now().plusMinutes(time).atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(), data,queueKey);
            case HOURS:
                return add(LocalDateTime.now().plusHours(time).atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(), data,queueKey);
            case DAYS:
                return add(LocalDateTime.now().plusDays(time).atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(), data,queueKey);
            default:
                return false;
        }
    }


    @Override
    public String get(String queueKey) {
        long now = System.currentTimeMillis();
        long min = Long.MIN_VALUE;
        Set<String> res = redisService.rangeByScoreZSet(Constant.DEFAULT_REDIS_QUEUE_KEY_PREFIX+queueKey, min, now, 0, 10);
        if (!CollectionUtils.isEmpty(res)) {
            for (String data : res){
                // 删除成功,则进行处理,防止并发获取重复数据
                if (rem(queueKey, data)){
                    return data;
                }
            }
        }
        return null;
    }


    @Override
    public<T> boolean rem(String key, T data) {
        return redisService.remZSet(Constant.DEFAULT_REDIS_QUEUE_KEY_PREFIX+key, data);
    }
}
  • 使用
@RedisMessageQueue
public class SomethingClass
{
    @Autowired
    private IDelayQueue<String> messageQueue;

    /**
     * 生产者,向队列中添加数据,30秒后消费者进行消费
     */
    public void test(){
        messageQueue.add(TimeUnit.SECONDS,30L,"这是参数数据","new_queue");
    }
    
    /**
     * 消费者,如果按此配置的话,会启动一个线程,线程名称为:测试线程名称,线程数量为1,线程休眠时间为10毫秒
     * 注意:queueKey需要与生产者中的queueKey保持一致才能进行消费
     * @param data 
     */
    @Override
    @RedisMessageQueueMethod(threadName = "测试线程名称",queueKey = "new_queue",threadNum = 1,threadSleepTime = 10)
    public void testMethod(String data) {
        //do something
    }

}

 到此这篇关于Redis简易延时队列的实现示例的文章就介绍到这了,更多相关Redis 延时队列内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • redis 替代php文件存储session的实例

    redis 替代php文件存储session的实例

    这篇文章主要介绍了redis 替代php文件存储session的实例的相关资料,希望通过本文能帮助到大家,让大家掌握这样的方法,需要的朋友可以参考下
    2017-10-10
  • 如何基于Session实现短信登录功能

    如何基于Session实现短信登录功能

    对比起Cookie,Session是存储在服务器端的会话,相对安全,并且不像Cookie那样有存储长度限制,下面这篇文章主要给大家介绍了关于如何基于Session实现短信登录功能的相关资料,需要的朋友可以参考下
    2022-10-10
  • 动态添加Redis密码认证的方法

    动态添加Redis密码认证的方法

    本篇文章主要介绍了动态添加Redis密码认证的方法,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2017-06-06
  • Redis的数据过期清除策略实现

    Redis的数据过期清除策略实现

    Redis实现了数据过期清除策略,本文将深入解析Redis的数据过期清除策略,包括过期键的删除方式、清除策略的选择以及相关配置参数的介绍,感兴趣的可以了解一下
    2024-05-05
  • Redis实现信息已读未读状态提示

    Redis实现信息已读未读状态提示

    这篇文章主要介绍了Redis实现信息已读未读状态提示的相关资料,需要的朋友可以参考下
    2016-04-04
  • Redis哨兵模式实现一主二从三哨兵

    Redis哨兵模式实现一主二从三哨兵

    本文主要介绍了Redis哨兵模式实现一主二从三哨兵,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2022-07-07
  • 利用控制台如何对Redis执行增删改查命令

    利用控制台如何对Redis执行增删改查命令

    这篇文章主要给大家介绍了关于利用控制台如何对Redis执行增删改查命令的相关资料,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2018-08-08
  • redis3.2配置文件redis.conf详细说明

    redis3.2配置文件redis.conf详细说明

    redis3.2配置详解,Redis启动的时候,可以指定配置文件,详细说明请看本文说明
    2018-03-03
  • reids自定义RedisTemplate以及乱码问题解决

    reids自定义RedisTemplate以及乱码问题解决

    本文主要介绍了reids自定义RedisTemplate以及乱码问题解决,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2024-04-04
  • redis使用skiplist跳表的原因解析

    redis使用skiplist跳表的原因解析

    经常会有人问这个问题,redis中为什么要使用跳表?这个问题,redis作者已经给出过明确答案,今天通过本文再给大家讲解下这个问题,对redis skiplist跳表知识感兴趣的朋友一起看看吧
    2022-10-10

最新评论