Python利用py-redis实现分布式锁

 更新时间:2023年08月07日 08:18:41   作者:so1n  
随着业务的增长,后端技术架构会慢慢的从单体服务转向多服务或者微服务的分布式架构,本文主要为大家介绍了如何利用Py-Redis实现简单的分布式锁,需要的可以参考一下

前记

随着业务的增长,后端技术架构会慢慢的从单体服务转向多服务或者微服务的分布式架构,此时语言级别的锁无法管理所有资源的竞争,只能采用分布式锁。而分布式锁的主体思想虽然与语言级别的锁类似,但还需要考虑到一些网络因素,使其变得复杂。

1.为何需要分布式锁

举一个栗子,比如在做聊天服务时,需要统计一个聊天会话的在线人数,它的简单示例代码如下:

class Counter(object):
    def __init__(self, namespace: str) -> None:
        self.namespace: int = namespace
        self.count: int = 0
    def login(self) -> None:
        self.count += 1
    def logout(self) -> None:
        self.count -= 1

这份代码比较简单,它是每个namespace的全局计数器实现,每有一个用户成功登陆就会调用login方法使计数器+1,而每次退出就会调用logout方法使计数器-1。这个计数器看起来实现了需求,但是它也符合了最简单的线程不安全模型,意味着在多线程/进程等环境下无法得出正确的结果。

这个操作之所以线程不安全,是因为self.count+=1的这类操作不是原子性的,它在运行之前会被编译为self.count = self.count + 1,这是一个先更改再赋值的操作,实际在执行的时候CPU会分为下面三个步骤去执行:

1.将count的值从内存读到CPU对应的寄存器上。

2.CPU操作寄存器上的count并进行+1操作。

3.把寄存器里的指写回内存中。

这样在多线程/进程的场景下可能出现了CPU核心1和CPU核心2同时从内存读到对应值0,并放到了自己的寄存器上面,然后再对它进行+1操作,最后又把值(此时已经为1)写回到内存中,导致self.count的结果变为1而不是真正想要的值2了。

1.self.count-=1同理。

2.线程是操作系统的最小调度单位,在多核心系统时,会出现多核心同时调用线程去进行资源争夺。

3.Python 3.11做了优化,可能进行了几百次加减处理,结果也是对的。

这就是多线程环境对同一个资源竞争从而产生数据安全性的就问题,许多语言为了解决这个问题引入了锁机制,并使用锁机制保护了多线程环境下对同一个资源竞争的数据安全性。 开发者可以非常方便的通过锁机制给一些代码块加上锁从而使这些操作变成了原子性,比如对示例代码的Counter进行了如下修改:

from threading import Lock
class Counter(object):
    def __init__(self, namespace: str) -> None:
        self.namespace: int = namespace
        self.count: int = 0
        self._lock: Lock = Lock()
    def login(self) -> None:
        with lock():
            self.count += 1
    def logout(self) -> None:
        with lock():
            self.count -= 1

代码引入了Lock对象,并把它套在了资源冲突的self.count+=1self.count-=1上面,使得线程只有持有锁的时候才能对self.count进行操作,而拿不到锁的线程则需要等待到获取锁才能继续操作,这样一来就不会产生多个线程同时操作一份数据而导致了结果不一致的问题。

不过现在的大多数服务不再是单体应用,更多的是以多服务,微服务的形式存在,这时上述的问题就会从不同的线程/进程争夺一个资源变为不同的机器上的服务对同一个资源进行竞争。 而语言级别的锁只存在于进程中,无法跨进程,只能管理自己进程里面的资源竞争,无法解决跨服务资源竞争的问题,只能使用一个带有锁机制的中间人来协调各个服务的资源竞争的问题,而这个中间人就是分布式锁。

2.分布式锁的实现

为了解决多服务,微服务的资源竞争这问题,分布式锁诞生了,分布式锁与语言级别的锁一样都是在某块空间打上标记,然后再通过打标记是否成功来判断是否获取锁,与语言级别锁唯一不同的是分布式锁需要通过网络进行通信,而网络是复杂的,这也就导致分布式锁的实现变得复杂。

为了降低分布式锁实现的复杂度,大多数分布式锁的方案都会基于拥有存储媒介和防止资源冲突的数据库进行开发,比如关系数据库MySQL,KV数据库RedisEtcd等,它们都有一套逻辑来确保数据的一致性和可用性,同时也有一套完善的传输协议,这样就可以不去考虑网络传输的问题和数据冲突与丢失的问题,只专注分布式锁功能的实现了。 不过实际业务中需要分布式锁有较高的性能,所以大多数会分布式锁都会基于KV数据库开发,目前常用的分布式锁使用的KV数据库是Redis

2.1.基于Redis的分布式锁

Redis本质上就是使用一个单进程对一块内存进行读写(只考虑基本的读写),且每个操作都是以一个协程去操作内存,这保证了客户端提交的每个操作都是拥有原子性的。同时Redis还支持使用Lua脚本去编写复杂的操作,这两个特性组合起来意味着可以通过Redis实现出一个高性能且功能复杂的分布式锁。

PythonRedis客户端库py-redis中,提供了一个简单的Redis锁封装,开发者通过这个封装可以很方便的使用基于Redis的分布式锁。如以上面计数器示例代码进行修改后的代码:

import asyncio
from redis.asyncio import Redis
class Counter(object):
    def __init__(self, namespace: str, redis: Redis) -> None:
        self._redis = redis
        self.namespace: int = namespace
        self.count: int = 0
    async def login(self) -> None:
        async with self._redis.lock("demo"):
            self.count += 1
    async def logout(self) -> None:
        async with self._redis.lock("demo"):
            self.count -= 1
async def main():
    counter = Counter("demo", Redis())
    await counter.login()
    await counter.logout()
asyncio.run(main())

在这个示例代码中可以发现,锁的使用方法很简单,只要通过redis.lock方法就可以获取到分布式锁的实例,而这个锁实例的使用方法与thread.Lock类似,不用大改代码。

不过lock方法只是提供了一个简单的调用,实际上它返回的是符合如下函数签名的对象:

class Lock:
    async def __aenter__(self):
        pass
    async def __aexit__(self, exc_type, exc_value, traceback):
        pass
    async def acquire(
        self,
        blocking: Optional[bool] = None,
        blocking_timeout: Optional[float] = None,
        token: Optional[Union[str, bytes]] = None,
    ):
        pass
    async def locked(self) -> bool:
        pass
    async def owned(self) -> bool:
        pass
    def release(self) -> Awaitable[None]:
        pass

这个对象拥有多个方法,首先是针对async with语法提供了__aenter____aexit__方法,它们分别在进入和离开async with语法块时被调用,它们的源码如下:

class Lock:
    ...
    async def __aenter__(self):
        if await self.acquire():
            return self
        raise LockError("Unable to acquire lock within the time specified")
    async def __aexit__(self, exc_type, exc_value, traceback):
        await self.release()

通过源码可以看到它们的实现很简单,__aenter__只调用acquire方法,如果返回True就允许进入代码块,如果返回False则抛出获取锁错误,而__aexit__则更简单,它只是调用release方法执行分布式锁的释放。

接下来就是acquirerelease这两个分别代表获取锁和释放锁的核心方法了,其中acquire的源码如下:

class Lock:
    ...
    async def acquire(
        self,
        blocking: Optional[bool] = None,
        blocking_timeout: Optional[float] = None,
        token: Optional[Union[str, bytes]] = None,
    ):
        ################################
        # 第一部分,这里主要是初始化各种参数 #
        ################################
        sleep = self.sleep
        if token is None:
            token = uuid.uuid1().hex.encode()
        else:
            try:
                encoder = self.redis.connection_pool.get_encoder()
            except AttributeError:
                # Cluster
                encoder = self.redis.get_encoder()
            token = encoder.encode(token)
        if blocking is None:
            blocking = self.blocking
        if blocking_timeout is None:
            blocking_timeout = self.blocking_timeout
        stop_trying_at = None
        if blocking_timeout is not None:
            stop_trying_at = asyncio.get_running_loop().time() + blocking_timeout
        #####################################################
        # 第二部分,这里通过一个循环去争夺锁,在争夺成功时会返回True #
        #####################################################
        while True:
            if await self.do_acquire(token):
                # 争夺成功,把Token存入当前线程存储中,并返回True
                self.local.token = token
                return True
            # 争夺失败且设置blocking为False,则返回获取锁失败
            if not blocking:
                return False
            # 在设置的时间内争夺失败,返回获取锁失败
            next_try_at = asyncio.get_running_loop().time() + sleep
            if stop_trying_at is not None and next_try_at > stop_trying_at:
                return False
            # 每次争夺锁的间隔,这是在初始化Lock对象时指定的,
            # 建议根据业务设置比较大的sleep时间,防止获取锁失败时频繁的与Redis通信。
            await asyncio.sleep(sleep)
    async def do_acquire(self, token: Union[str, bytes]) -> bool:
        if self.timeout:
            # convert to milliseconds
            timeout = int(self.timeout * 1000)
        else:
            timeout = None
        # 真正通过Redis设置锁的方法,
        if await self.redis.set(self.name, token, nx=True, px=timeout):
            return True
        return False

通过源码可以发现acquire方法主要是做三件事:

1.初始化各种参数,其中token是采用uuid1生成的,该方法虽然会泄露主机信息,但它是能确保每个客户端生成的ID唯一且速度很快的方法,同时Redis一般都在内网运行的,只要能确保内网安全,一般也没啥事。

2.通过do_acquire方法去获取锁,当获取成功就会返回True,获取失败且设置不阻塞就返回False,而获取失败且设置阻塞就会通过循环去竞争锁。

3.获取锁的重点(do_acquire方法),这里通过Redisset <key> <value> nx xxx ps xxx的方法向Redis设置了一个K-V,并返回设置是否成功。这个命令是原子性的执行三个操作,从而保证获取锁的操作要么成功要么失败。 命令中的nx是确保Key不存在时且该命令能正常写入才返回True,而ps是设置了Key的过期时间,防止客户端假死或宕机而导致整个锁无法被释放(避免死锁的一种技术手段)。

了解完获取锁acquire方法的执行原理后再看释放锁release方法的源码,由于release操作要多个操作,所以采用了Lua脚本,如下:

class Lock:
    ...
    # 初始化时会初始化lua脚本--`LUA_RELEASE_SCRIPT`,
    # 并把Redis返回的ID存放到lua_release中,减少后续调用的网络传输
    lua_release = None
    # lua脚本的逻辑是通过name获取token,如果token不存在或者不是实例产生的则返回0,
    # 否则代表成功获取到锁,然后会执行删除锁并返回1告诉客户端说锁释放成功。
    LUA_RELEASE_SCRIPT = """
        local token = redis.call('get', KEYS[1])
        if not token or token ~= ARGV[1] then
            return 0
        end
        redis.call('del', KEYS[1])
        return 1
    """
    ...
    def release(self) -> Awaitable[None]:
        # 如果没有从本地线程存储获取到Token,则代表还没有获取到锁,抛出对应的异常
        expected_token = self.local.token
        if expected_token is None:
            raise LockError("Cannot release an unlocked lock")
        # 先清楚本地线程存放的Token,再释放锁
        self.local.token = None
        return self.do_release(expected_token)
    async def do_release(self, expected_token: bytes) -> None:
        # 调用lua脚本并根据响应结果判断释放锁是否成功。
        if not bool(
            await self.lua_release(
                keys=[self.name], args=[expected_token], client=self.redis
            )
        ):
            raise LockNotOwnedError("Cannot release a lock that's no longer owned")

release也比较简单,它的整个逻辑是先通过本地线程存储中获取token,如果该值为空,则证明有可能没有执行acquire获取锁,需要抛出锁已经被释放的异常,如果不为空则置空,再通过lua脚本去释放锁,在释放锁时会校验token的值防止释放一个不是自己产生的锁。

此外,Lock中的token存放在本地线程存储的原因是为了防止多线程调用同个Loc实例导致的问题,如下:

1.当A线程以timeout为30秒获取了锁。

2.B线程获取了锁,但是由于A已经获取了锁了,所以通过自旋进行等待。

3.A线程执行的逻辑超过30秒还未执行完成,而锁已经过期而被释放。

4.B线程发现锁已经被释放,开始获取锁并执行,最终在A线程执行完成之前运行完毕,并执行了释放锁的操作。

5.A线程执行了释放锁的操作,发现锁已经被释放了。

可以看到这个方法只是防止动作没执行完,但锁却过期的一种情况,它并不能真正的解决问题,如果要真正的解决这个问题,则需要引入WatchDog机制。

2.2.WatchDog实现

py-redisLock对象支持Timeout参数,Timeout参数的作用是标记锁在被获取的n秒后被自动释放,这样加锁的程序即使崩溃了也能确保锁会在一定的时间后被释放,避免了死锁问题。 不过需要注意的是,Timeout参数就不能设置太长,如果设置太长,且程序在获取锁后崩溃而无法释放锁时,其他等待获取锁的程序会花时间进行无效的等待。 然而Timeout参数设置得太短也不行,如果程序的执行时间超过了Timeout设置的时间,那么就会出现程序还在运行着,但是锁却提前释放了,最终就会导致多个程序争夺同一个资源,也就是锁机制无效了。

由于Timeout参数设置太短太长都有问题,这意味着Timeout参数并不能完美的解决问题,这时就需要一个更好的机制--WatchDog来完善Timeout参数的不足。 WatchDog机制会在程序获取锁之后启动,在释放锁之前关闭,也就是跟随程序获取锁的行为一起运行,然后它会在程序执行期间按照一定的时间间隔帮锁自动续约(也就是增加锁的过期时间),从而防止业务代码没执行完,锁却过期的情况。

py-redis库的Lock追求的是简单的原则,它没有提供一套完整的WatchDog实现,但是提供了一个续约机制,代码如下:

class Lock:
    # 效果与lua_release一致 
    lua_extend = None
    # 续约的Lua代码主要逻辑为:
    # 1.判断token是否一致,不一致则不是自己获取的锁,返回0。
    # 2.判读锁的过期时间,如果小于0则锁不存在,返回0
    # 3.如果当前锁的过期时间还未变为0,则为当前的锁续约
    #   续约的方式有两种,一种是在原来的基础增加一个固定值,另一种是把过期时间设置为固定值。
    #   最后再返回1,代表续约成功
    LUA_EXTEND_SCRIPT = """
        local token = redis.call('get', KEYS[1])
        if not token or token ~= ARGV[1] then
            return 0
        end
        local expiration = redis.call('pttl', KEYS[1])
        if not expiration then
            expiration = 0
        end
        if expiration < 0 then
            return 0
        end
        local newttl = ARGV[2]
        if ARGV[3] == "0" then
            newttl = ARGV[2] + expiration
        end
        redis.call('pexpire', KEYS[1], newttl)
        return 1
    """
    def extend(
        self, additional_time: float, replace_ttl: bool = False
    ) -> Awaitable[bool]:
        # 判断是否持有锁,没有锁不能续约
        if self.local.token is None:
            raise LockError("Cannot extend an unlocked lock")
        # 没设置超时不能续约
        if self.timeout is None:
            raise LockError("Cannot extend a lock with no timeout")
        return self.do_extend(additional_time, replace_ttl)
    async def do_extend(self, additional_time, replace_ttl) -> bool:
        # 通过Lua脚本执行续约的逻辑
        additional_time = int(additional_time * 1000)
        if not bool(
            await self.lua_extend(
                keys=[self.name],
                args=[self.local.token, additional_time, replace_ttl and "1" or "0"],
                client=self.redis,
            )
        ):
            raise LockNotOwnedError("Cannot extend a lock that's no longer owned")
        return True

它的逻辑非常简单,就是先校验当前是不是自己持有锁以及锁是否还在,当所有条件都满足时才续约,不过续约有两种方案,一种是把key的过期时间设置为指定的时间,另一种是在剩余的过期时间基础上再添加指定的时间。了解了py-redis提供的续约机制后,我们还需要考虑WatchDog剩余的逻辑,一个是什么时候开启/关闭WatchDog,另一个是如何制订WatchDog的执行周期。

前面说到WatchDog会伴随着加锁一直运行着,那么意味着WatchDog会在加锁成功后开始运行,并在释放锁之前停止,在根据之前针对Lock的代码分析可以判断,WatchDog需要在acquire后开始运行,在do_release之前停止运行。 至于WatchDog的间隔时间,大部分框架都是采用用户定义Timeout时间的1/3,这是考虑到网络通信的不可靠以及防止发送太多请求而权衡的结果。

现在WatchDog的原理分析完毕,可以着手实现WatchDog了,WatchDog最终的代码实现如下:

import asyncio
from typing import Optional, Union
from redis.asyncio.lock import Lock
class MyLock(Lock):
    # 存储循环的协程对象
    _watch_dog: Optional[asyncio.Task]
    async def _watch(self) -> None:
        """
        这是一个一直在循环的方法,它每次循环都会执行续约,然后休眠超时时间的1/3,然后再执行下一个循环。
        """
        while True:
            await self.extend(self.timeout)
            await asyncio.sleep(self.timeout / 3)
    def _cancel_watch_dog(self) -> None:
        """
        取消正在运行WatchDog的协程
        """
        _old_watch_dog: Optional[asyncio.Future] = getattr(self, "_watch_dog", None)
        if _old_watch_dog and not _old_watch_dog.cancelled():
            _old_watch_dog.cancel()
    async def acquire(
        self,
        blocking: Optional[bool] = None,
        blocking_timeout: Optional[float] = None,
        token: Optional[Union[str, bytes]] = None,
    ) -> bool:
        result = await super().acquire(blocking, blocking_timeout, token)
        if result:
            # 获取锁成功就开始启用WatchDog机制
            # 它先把旧的WatchDog取消,再通过`asyncio.create_task`方法使用一个单独的协程执行WatchDog
            self._cancel_watch_dog()
            self._watch_dog = asyncio.create_task(self._watch())
        return result
    async def do_release(self, expected_token: bytes) -> None:
        # 在释放之前取消WatchDod机制
        # 即使取消TachDog成功,但是锁释放失败也没关系,因为有超时机制兜底
        self._cancel_watch_dog()
        return await super().do_release(expected_token)
    def __del__(self) -> None:
        try:
            self._cancel_watch_dog()
        except Exception:
            pass

接着编写一个Demo文件来验证编写的WatchDog是否有效,Demo文件代码如下:

import asyncio
import time
from typing import Optional, Union
from redis.asyncio.lock import Lock
from redis.asyncio import Redis
# WatchDog实现省略...
async def main():
    _redis = Redis()
    s_t = time.time()
    # pyredis的`lock`方法通过`lock_class`参数支持自定义Lock类
    async with _redis.lock("demo", lock_class=MyLock, timeout=3):
        print("lock")
        await asyncio.sleep(5)
    print("ok", time.time() - s_t)
if __name__ == "__main__":
    asyncio.run(main())

然后在终端直接运行,会看到终端有如下输出:

lock
ok 5.005310297012329

通过输出可以发现,虽然锁设定的timeout参数为3秒,但是被锁住的代码能够正常的执行了5秒,也就代表WatchDog的实现是成功的。

需要注意的是,协程的创建销毁成本很低,所以使用一个协程执行一个WatchDog,如果是在线程模型下,则不能使用一个单独的线程来执行WatchDog,这样会导致频繁的开启和关闭线程,建议使用一个线程池来管理所有锁的WatchDog的运行。不过WatchDog是每隔一段时间运行的,所以也可以使用时间轮+单独的Worker来执行WatchDog。

3.总结

到目前为止,实现的分布式锁基本完备,也没有什么缺陷,同时它的性能也是非常高的。不过由于py-redis的锁实现比较简单,导致拓展性比较低,无法兼容部分场景,同时py-redis库并没有打算开发出包含更多功能的分布锁实现。这意味着开发者只能根据他提供的Lock对象进行重新开发,并通过redis.lock中的lock_class参数传递重新开发后的锁实现。

以上就是Python利用py-redis实现分布式锁的详细内容,更多关于Python分布式锁的资料请关注脚本之家其它相关文章!

相关文章

  • PyTorch搭建LSTM实现多变量时序负荷预测

    PyTorch搭建LSTM实现多变量时序负荷预测

    这篇文章主要为大家介绍了PyTorch搭建LSTM实现多变量时间序列预测及负荷预测,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2022-05-05
  • Python实现PS图像明亮度调整效果示例

    Python实现PS图像明亮度调整效果示例

    这篇文章主要介绍了Python实现PS图像明亮度调整效果,结合实例形式分析了Python基于skimage模块调整图片明亮度的原理与具体操作技巧,需要的朋友可以参考下
    2018-01-01
  • Python深入学习之内存管理

    Python深入学习之内存管理

    这篇文章主要介绍了Python深入学习之内存管理,本文比较详细的讲解了Python的内存管理相关知识,需要的朋友可以参考下
    2014-08-08
  • 详解Python中用于计算指数的exp()方法

    详解Python中用于计算指数的exp()方法

    这篇文章主要介绍了详解Python中用于计算指数的exp()方法,是Python入门中必会的基本方法,需要的朋友可以参考下
    2015-05-05
  • Python实现绘制多角星实例

    Python实现绘制多角星实例

    这篇文章要给大家分享Python实现绘制多角星的实例,在具备一定的Python编程基础以后,我们可以结合for循环进行多角星的编写,只要简单的几次循环,即可以极大的解决重复编写相同代码方面的问题,下面小编将以三角星,五角星为例,进而引入如何绘制多角星,需要的朋友可以参考一下
    2021-11-11
  • python基础教程项目四之新闻聚合

    python基础教程项目四之新闻聚合

    这篇文章主要为大家详细介绍了python基础教程项目四之新闻聚合,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2018-04-04
  • 查看已安装tensorflow版本的方法示例

    查看已安装tensorflow版本的方法示例

    这篇文章主要介绍了查看已安装tensorflow版本的方法示例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-04-04
  • python使用xlrd模块读写Excel文件的方法

    python使用xlrd模块读写Excel文件的方法

    这篇文章主要介绍了python使用xlrd模块读写Excel文件的方法,较为详细的分析了xlrd模块的安装、使用与操作Excel文件的相关技巧,需要的朋友可以参考下
    2015-05-05
  • Python获取与处理文件路径/目录路径实例代码

    Python获取与处理文件路径/目录路径实例代码

    我们在用python进行数据处理时往往需要将文件中的数据取出来做一些处理,下面这篇文章主要给大家介绍了关于Python获取与处理文件路径/目录路径的相关资料,文中通过实例代码介绍的非常详细,需要的朋友可以参考下
    2023-04-04
  • pandas DataFrame的修改方法(值、列、索引)

    pandas DataFrame的修改方法(值、列、索引)

    这篇文章主要介绍了pandas DataFrame的修改方法(值、列、索引),文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2019-08-08

最新评论