Django REST framework 限流功能的使用

 更新时间:2021年06月24日 08:57:08   作者:火腿蛋炒饭  
DRF常用功能的案例基本用法都有讲解,关于限流(Throttling)这个功能其实在真实的业务场景中能真正用到的其实不算多。今天说这个话题其实一方面是讨论功能,另一方面也是希望换个角度去审视我们的开发过程,希望大家可以在使用DRF功能的同时,也了解一下功能背后的实现

正文开始

先说一个限流这个概念,最早接触这个概念是在前端。真实的业务场景是在搜索框中输入文字进行搜索时,并不希望每输一个字符都去调用后端接口,而是有停顿后才真正的调用接口。这个功能很有必要,一方面减少前端请求与渲染的压力,同时减轻后端接口访问的压力。类似前端的功能的代码如下:

// 前端函数限流示例
function throttle(fn, delay) {
    var timer;
    return function () {
        var _this = this;
        var args = arguments;
        if (timer) {
            return;
        }
        timer = setTimeout(function () {
            fn.apply(_this, args);
            timer = null;
        }, delay)
    }
}

但是后端的限流从目的上来说与前端类似,但是实现上会有所不同,让我们看看 DRF 的限流。

1. DRF 中的限流

项目配置

# demo/settings.py

REST_FRAMEWORK = {
    # ...
    'DEFAULT_THROTTLE_CLASSES': (
        'rest_framework.throttling.AnonRateThrottle',
        'rest_framework.throttling.UserRateThrottle',
         'rest_framework.throttling.ScopedRateThrottle',
    ),
    'DEFAULT_THROTTLE_RATES': {
        'anon': '10/day',
        'user': '2/day'
    },
}

# article/views.py

# 基于ViewSet的限流
class ArticleViewSet(viewsets.ModelViewSet, ExceptionMixin):
    """
    允许用户查看或编辑的API路径。
    """
    queryset = Article.objects.all()
    # 使用默认的用户限流
    throttle_classes = (UserRateThrottle,)
    serializer_class = ArticleSerializer

# 基于view的限流
@throttle_classes([UserRateThrottle])

因为我配置的用户每天只能请求两次,所以在请求第三次之后就会给出 429 Too Many Requests的异常,具体的异常信息为下一次可用时间为 86398 秒后。

2. 限流进阶配置

上述演示的限流配置适用于对用户的限流,比如我换个用户继续访问,依然是有两次的机会。

$ curl -H 'Accept: application/json; indent=4' -u root:root   http://127.0.0.1:8000/api/article/1/ 
{
    "id": 1,
    "creator": "admin",
    "tag": "现代诗",
    "title": "如果",
    "content": "今生今世 永不再将你想起\n除了\n除了在有些个\n因落泪而湿润的夜里 如果\n如果你愿意"
}

分别介绍一下三种限流类

  • AnonRateThrottle 适用于任何用户对接口访问的限制
  • UserRateThrottle 适用于请求认证结束后对接口访问的限制
  • ScopedRateThrottle 适用于对多个接口访问的限制

所以三种不同的类适用于不同的业务场景,具体使用根据不同的业务场景选择,通过配置相对应 scope 的频率的配置就可以达到预期的效果。

3. 限流思路分析

试想一下如果是你编码实现这个需求应该怎么实现?

其实这个功能不难,核心的参数就是 时间、次数、使用范围,下面演示对函数调用次数的限制。

from functools import wraps

TOTAL_RATE = 2

FUNC_SCOPE = ['test', 'test1']


def rate_count(func):
    func_num = {
        # 需要注意函数名不能重复
        func.__name__: 0
    }

    @wraps(func)
    def wrapper():
        if func.__name__ in FUNC_SCOPE:
            if func_num[func.__name__] >= TOTAL_RATE:
                raise Exception(f"{func.__name__}函数调用超过设定次数")
            result = func()
            func_num[func.__name__] += 1
            print(f" 函数 {func.__name__} 调用次数为: {func_num[func.__name__]}")
            return result
        else:
            # 不在计数限制的函数不受限制
            return func()

    return wrapper


@rate_count
def test1():
    pass


@rate_count
def test2():
    print("test2")
    pass


if __name__ == "__main__":
    try:
        test2()
        test2()
        test1()
        test1()
        test1()
    except Exception as e:
        print(e)
    test2()
    test2()
    
"""
test2
test2
 函数 test1 调用次数为: 1
 函数 test1 调用次数为: 2
test1函数调用超过设定次数
test2
test2
"""

这里实现了对函数调用次数的监控同时设置了能够使用该功能的函数。当函数调用次数超过设定阀值久抛出异常。只是这里没有对时间做限制。

4. 源码分析

刚才分析了如何实现对函数调用次数的限制,对于一个请求来说可能会复杂一点,下面就看看 DRF 如何实现的:

class SimpleRateThrottle(BaseThrottle):
   
    # ......
    
    def allow_request(self, request, view):
        """
        Implement the check to see if the request should be throttled.

        On success calls `throttle_success`.
        On failure calls `throttle_failure`.
        """
        if self.rate is None:
            return True

        self.key = self.get_cache_key(request, view)
        if self.key is None:
            return True

        self.history = self.cache.get(self.key, [])
        self.now = self.timer()

        # 根据设置时间的限制改变请求次数的缓存
        while self.history and self.history[-1] <= self.now - self.duration:
            self.history.pop()
        # 核心逻辑就是这里判断请求次数
        if len(self.history) >= self.num_requests:
            return self.throttle_failure()
        return self.throttle_success()
    
    # ......
    
class UserRateThrottle(SimpleRateThrottle):
    """
    Limits the rate of API calls that may be made by a given user.

    The user id will be used as a unique cache key if the user is
    authenticated.  For anonymous requests, the IP address of the request will
    be used.
    """
    scope = 'user'

    def get_cache_key(self, request, view):
        if request.user.is_authenticated:
            ident = request.user.pk
        else:
            # 考虑到用户没有认证的情况 与 AnonRateThrottle 中 key 一致
            ident = self.get_ident(request)
        # 根据设置的范围构建缓存的 key
        return self.cache_format % {
            'scope': self.scope,
            'ident': ident
        }

综上所述:

  • 核心的判断逻辑依旧是缓存中获取每个用户调用次数,根据范围与时间判断是否超过设置定的阀值。
  • 不同类型的限流,在缓存 key 的设计上会有区别,默认的 key 为请求中REMOTE_ADDR。

5. 其它注意事项

  • 因为这里的实现用到缓存,所以需要注意在多实例部署的情况下需要配置统一的缓存服务(默认的缓存为 Django 基于内存实现的)。
  • 缓存服务的重启可能会导致已有的计数清零,如果有较强的业务逻辑需要,还请自己实现限流的逻辑。
  • 如果是自定义的用户表,需要重写缓存中 get_cache_key 的逻辑。
  • 如果需要统计分析用户被限流情况也是需要重新设计限流的逻辑。
  • 限流的逻辑在生产环境中慎用,因为会限制用户使用产品,对用户不够友好。

参考资料

DRF 限流
Django 缓存

以上就是Django REST framework 限流功能的使用的详细内容,更多关于Django REST framework 限流功能的资料请关注脚本之家其它相关文章!

相关文章

最新评论