Redis分布式限流的几种实现

 更新时间:2023年12月11日 09:32:11   作者:格林希尔  
分布式限流是指通过将限流策略嵌入到分布式系统中,以控制流量或保护服务,本文就来介绍一下Redis分布式限流的几种实现,感兴趣的可以了解一下

一、 简介

分布式限流是指通过将限流策略嵌入到分布式系统中,以控制流量或保护服务,保证系统在高并发访问情况下不被过载。

分布式限流可以防止系统因大量请求同时到达导致压力过大而崩溃,从而提高系统的稳定性和可靠性。同时,它可以使得业务资源能够更好地分配,提高系统的效率。

二、分布式限流

1 数据结构

1.1 Redis List

Redis List 是一个可以输入相同元素和非唯一元素的集合且支持在两端进行快速(O(1))插入和删除元素。

1.2 Redis Set

Redis Set 是一个无序,但不允许重复元素的集合。您可以使用这些命令对Redis集进行常规操作: SADD,SREM,SISMEMBER,SMEMBERS等。

1.3 Redis Sorted Set

Redis Sorted Set 是一个有序的、不重复的元素集合。每个元素都关联着一个浮点数值(称为 Score)。通过 Score 可以从小到大排序得到一个有序集合。

2 实现分布式限流

在Redis中可以使用令牌桶算法来实现分布式限速。具体方法为:

Step 1:创建一个列表作为Redis令牌桶

String key = "rate_limit:" + userId;
// 模拟用户请求访问
List<String> tokens = redis.lrange(key, 0, -1);

Step 2:设定令牌桶的基准参数

int maxTokens = 50;
long timeInterval = 1 * 1000;
long now = System.currentTimeMillis();

Step 3:计算Redis中令牌数量

long expiredTokens = tokens.stream().filter(t -> Long.parseLong(t) < now - timeInterval).count();
tokens = tokens.subList((int) expiredTokens, tokens.size());
long remainTokens = maxTokens - tokens.size();

Step 4:基于令牌数量,判断是否超过限制

if (remainTokens < 1) {
    throw new RateLimitException("请求太频繁,请稍后再试!");
}

Step 5:如果没有超出限制,则更新Redis中令牌数并设置过期时间

Long expiresIn = now + timeInterval;
redis.multi();
redis.rpush(key, String.valueOf(expiresIn));
redis.pexpire(key, timeInterval);
redis.exec();

3 实现原理分析

以上代码所示首先需要建立Redis List用于存储Token,其次需要设定令牌桶的基准参数(比如最大Token数量和Token过期间隔等)。在用户访问请求时,需要计算Redis中的令牌数量,根据规则对访问量进行限制。如果没有超过限制,则需要更新Redis List中令牌数并设置过期时间;如果超过了限制,则需要返回错误信息并拒绝服务。

整个过程中,需要注意并发访问情况下的线程安全问题,并确保流量控制配置的公共协商,如最大QPS(Queries Per Second),哪些接口需限制流量等。

三、分布式限流算法

在实际的系统设计中,为了防止某一时刻出现大量请求导致系统崩溃,我们通常会采用限流策略来控制流量,而Redis作为分布式NoSQL数据库,在限流中也有着广泛的应用。下面介绍一些Redis分布式限流的经典算法。

1. 计数器算法

计数器算法比较简单,直接利用Redis存储每个IP或者用户的请求次数,当请求次数超过预设阈值时拒绝服务。代码如下:

public boolean isAllowed(String key, int limit, int timeout) {
    Jedis jedis = getJedis();
    long count = jedis.incr(key);
    if (count == 1) {
        jedis.expire(key, timeout);
    }
    boolean allowed = count <= limit;
    if (!allowed) {
        jedis.del(key);
    }
    jedis.close();
    return allowed;
}
  • key:需要限流的用户标识,可根据IP、UserID等进行定义
  • limit:阈值,即允许的最大请求数
  • timeout:过期时间,对于计数器算法,一定要设置过期时间,否则缓存中的请求次数会一直不断累加

2. 漏斗算法

漏斗算法的核心思想是将请求按照恒定的速率转换为水流,有效控制请求超出服务处理能力的情况。漏斗算法实现代码如下:

public boolean isAllowed(String key, int capacity, double leakRate, int reqCount) {
    Jedis jedis = getJedis();
    long nowTime = System.currentTimeMillis();
    String luaScript =
          "local currentCapacity = tonumber(redis.call('hget', KEYS[1], 'leftCapacity'))\n"
        + "if currentCapacity == nil then\n"
        + "    redis.call('hset', KEYS[1], 'lastTime', ARGV[2])\n"
        + "    redis.call('hset', KEYS[1], 'leftCapacity', ARGV[1] - 1)\n"
        + "    return 1\n"
        + "end\n"
        + "local changeTime = tonumber(redis.call('hget', KEYS[1], 'lastTime'))\n"
        + "local delayMillSeconds = nowTime - changeTime\n"
        + "local currentDelayCount = tonumber(delayMillSeconds*ARGV[3])\n"
        + "local currentCapacity = math.min(currentDelayCount+currentCapacity, ARGV[1])\n"
        + "if currentCapacity >= ARGV[4] then\n"
        + "    return 0\n"
        + "else\n"
        + "    redis.call('hset', KEYS[1], 'leftCapacity', currentCapacity-1)\n"
        + "    redis.call('hset', KEYS[1], 'lastTime', nowTime)\n"
        + "    return 1\n"
        + "end";
    Object result = jedis.eval(
        luaScript,
        Collections.singletonList(key),
        Arrays.asList(String.valueOf(capacity), String.valueOf(nowTime), String.valueOf(leakRate), String.valueOf(reqCount))
    );
    boolean allowed = (result instanceof Long ? (Long) result : 0L) == 1L;
    jedis.close();
    return allowed;
}
  • key:需要进行限流的用户标识
  • capacity:漏斗容量,即最大允许请求数量
  • leakRate:漏嘴流水速率,保证有序的请求到达
  • reqCount:预计请求量,用于计算漏斗每次流出的数量

3. 令牌桶算法

令牌桶算法的特点是以一个固定的速率不断产生令牌,并将令牌放入到桶中,访问时若桶为空,则表示请求数超限。令牌桶算法实现代码如下:

public boolean isAllowed(String key, int capacity, double rate, int reqCount) {
    long nowTime = System.currentTimeMillis();
    Jedis jedis = getJedis();
    String luaScript =
          "local currentLimit = tonumber(redis.call('get', KEYS[1]) or '0')\n"
        + "if currentLimit + ARGV[1] > tonumber(KEYS[2]) then\n"
        + "    return false\n"
        + "else\n"
        + "    redis.call('incrby', KEYS[1], ARGV[1])\n"
        + "    redis.call('expire', KEYS[1], ARGV[2])\n"
        + "    return true\n"
        + "end";
    Object result = jedis.eval(luaScript, 2, key, String.valueOf(capacity), String.valueOf(reqCount), String.valueOf(rate * (nowTime / 1000)));
    boolean allowed = (result instanceof Boolean ? (Boolean) result : false);
    jedis.close();
    return allowed;
}
  • key:需要进行限流的用户标识
  • capacity:桶容量
  • rate:令牌发放速率
  • reqCount:请求数量

四、分布式限流实战

1. 单机限流实现

假设我们有一个需求,需要限制每个IP一分钟内最多只能发送100个请求。可以通过Redis的INCR、EXPIRE等API操作来简单实现单机限流。

public boolean isAllowed(String ip, int limit, int interval) {
    Jedis jedis = getJedis();
    String key = "ip:" + ip;
    long count = jedis.incr(key);
    if (count == 1) {
        jedis.expire(key, interval);
    }
    boolean allowed = count <= limit;
    if (!allowed) {
        jedis.del(key);
    }
    jedis.close();
    return allowed;
}

2. 基于Redis Clusters的分布式限流实现

当业务规模扩大时,单机的限流已经无法满足需求,这时候需要考虑使用Redis Clusters实现分布式限流。Clusers扩展了原先Redis的功能,不仅支持横向扩展,而且提高了整个集群的可用性。限流算法同上,只是需要使用把数据分配到Cluser内不同的节点上。

public boolean isAllowed(String ip, int limit, int interval) {
    JedisCluster jedis = getJedisCluster();
    String key = "ip:" + ip;
    long count = jedis.incr(key);
    if (count == 1) {
        jedis.expire(key, interval);
    }
    boolean allowed = count <= limit;
    if (!allowed) {
        jedis.del(key);
    }
    return allowed;
}

五、基于Redis分布式限流的优化

1. 缓存击穿

1.1 问题描述

在高并发场景下,如果存在大量的缓存未命中请求,将会导致访问底层数据存储系统,这种情况被称为缓存击穿

1.2 解决方案

1.2.1 使用互斥锁

/**
 * 获取缓存值方法
 * @param key 缓存键值
 * @return 缓存值
 */
public String getCacheValue(String key) {
    String value = cache.get(key);
    if (value == null) { //缓存未命中
        //使用互斥锁
        Lock lock = redisson.getLock(key);
        if (lock.tryLock()) { //尝试获取锁
            try {
                value = cache.get(key); //再次尝试获取缓存
                if (value == null) { //如果仍然未命中,从数据库中获取
                    value = db.get(key);
                    cache.put(key, value); //将查询结果放入缓存
                }
            } finally {
                lock.unlock(); //释放锁
            }
        } else {
            Thread.sleep(100); //自旋一段时间后重试
            return getCacheValue(key);
        }
    }
    return value;
}

1.2.2 使用预热机制

预热机制是指在系统启动的时候,提前加载热点数据到缓存中,以减少缓存未命中请求。预热的方式可以使用定时任务或者其他方式,在系统低峰期进行加载。

2. 热点key问题的解决方案

2.1 问题描述

在高并发场景下,如果某个key的请求量过大,将会导致这个key成为热点key,从而导致缓存雪崩问题。

2.2 解决方案

2.2.1 分布式锁

/**
 * 获取缓存值方法
 * @param key 缓存键值
 * @return 缓存值
 */
public String getCacheValue(String key) {
    String value = cache.get(key);
    if (value == null) { //缓存未命中
        //使用分布式锁
        RLock lock = redisson.getFairLock(key);
        if (lock.tryLock()) { //尝试获取锁
            try {
                value = cache.get(key); //再次尝试获取缓存
                if (value == null) { //如果仍然未命中,从数据库中获取
                    value = db.get(key);
                    cache.put(key, value); //将查询结果放入缓存
                }
            } finally {
                lock.unlock(); //释放锁
            }
        } else {
            Thread.sleep(100); //自旋一段时间后重试
            return getCacheValue(key);
        }
    }
    return value;
}

2.2.2 分布式缓存

使用分布式缓存将数据均匀地分散到多个节点上,从而避免单点瓶颈问题。

3. 并发竞争优化

3.1 问题描述

在高并发场景下,对于某些资源的并发访问将会导致性能瓶颈,需要进行并发竞争优化。

3.2 解决方案

3.2.1 使用限流器

限流器类似于信号灯,用于控制并发请求的数量,在高峰期可以采用漏桶算法或令牌桶算法进行限流。

3.2.2 使用异步线程池

对于一些耗时的操作,可以使用异步线程池进行处理,从而避免阻塞主线程,提升系统的并发能力。

到此这篇关于Redis分布式限流的几种实现的文章就介绍到这了,更多相关Redis分布式限流内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Redis的setNX分布式锁超时时间失效 -1问题及解决

    Redis的setNX分布式锁超时时间失效 -1问题及解决

    这篇文章主要介绍了Redis的setNX分布式锁超时时间失效 -1问题及解决方案,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2023-01-01
  • Redis过期数据是否会被立马删除

    Redis过期数据是否会被立马删除

    这篇文章主要为大家介绍了Redis过期数据会被立马删除么的问题解析,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2022-07-07
  • redis主从复制的原理及实现

    redis主从复制的原理及实现

    Redis主从复制是一种数据同步机制,它通过将一个Redis实例的数据复制到其他Redis,本文主要介绍了redis主从复制的原理及实现,具有一定的参考价值,感兴趣的可以了解一下
    2023-08-08
  • Redis的Python客户端redis-py安装使用说明文档

    Redis的Python客户端redis-py安装使用说明文档

    这篇文章主要介绍了Redis的Python客户端redis-py安装使用说明文档,本文讲解了安装方法、入门使用实例、API参考和详细说明,需要的朋友可以参考下
    2015-06-06
  • Redis key的过期时间和永久有效的实现

    Redis key的过期时间和永久有效的实现

    在Redis中,键可以设置过期时间或被永久保存,`EXPIRE`和`PEXPIRE`命令分别用于设置键的过期时间,具有一定的参考价值,感兴趣的可以了解一下
    2024-09-09
  • Redis批量删除key的命令详解

    Redis批量删除key的命令详解

    这篇文章主要介绍了Redis批量删除key的命令详解,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2023-03-03
  • Redis 数据类型的详解

    Redis 数据类型的详解

    这篇文章主要介绍了Redis 数据类型的详解的相关资料,支持五种数据类型,字符串,哈希,列表,集合及zset,需要的朋友可以参考下
    2017-08-08
  • 详解redis在微服务领域的贡献

    详解redis在微服务领域的贡献

    本文以dubbo为例看下redis是如何利用自身特性来完成注册中心的功能,对redis微服务相关知识感兴趣的朋友一起看看吧
    2021-10-10
  • Linux下Redis集群搭建全过程(主从+哨兵)

    Linux下Redis集群搭建全过程(主从+哨兵)

    这篇文章主要介绍了Linux下Redis集群搭建全过程(主从+哨兵),具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2023-07-07
  • 解析高可用Redis服务架构分析与搭建方案

    解析高可用Redis服务架构分析与搭建方案

    我们按照由简至繁的步骤,搭建一个最小型的高可用的Redis服务。 本文通过四种方案给大家介绍包含每种方案的优缺点及详细解说,具体内容详情跟随小编一起看看吧
    2021-06-06

最新评论