Redis分布式可重入锁实现方案

 更新时间:2024年02月19日 09:40:03   作者:程序员小潘  
在单进程环境下,要保证一个代码块的同步执行,直接用synchronized 关键字或ReetrantLock 即可,在分布式环境下,要保证多个节点的线程对代码块的同步访问,就必须要用到分布式锁方案,本文介绍一下基于 Redis实现的分布式锁方案,感兴趣的朋友一起看看吧

前言

在单进程环境下,要保证一个代码块的同步执行,直接用synchronized 关键字或ReetrantLock 即可。在分布式环境下,要保证多个节点的线程对代码块的同步访问,就必须要用到分布式锁方案。
分布式锁实现方案有很多,有基于关系型数据库行锁实现的;有基于ZooKeeper临时顺序节点实现的;还有基于 Redis setnx 命令实现的。本文介绍一下基于 Redis 实现的分布式锁方案。

理解分布式锁

实现分布式锁有几个要求

  • 互斥性:任意时刻,最多只会有一个客户端线程可以获得锁
  • 可重入:同一客户端的同一线程,获得锁后能够再次获得锁
  • 避免死锁:客户端获得锁后即使宕机,后续客户端也可以获得锁
  • 避免误解锁:客户端A加的锁只能由A自己释放
  • 释放锁通知:持有锁的客户端释放锁后,最好可以通知其它客户端继续抢锁
  • 高性能和高可用

Redis 服务端命令是单线程串行执行的,天生就是原子的,并且支持执行自定义的 lua 脚本,功能上更加强大。
关于互斥性,我们可以用 setnx 命令实现,Redis 可以保证只会有一个客户端 set 成功。但是由于我们要实现的是一个分布式的可重入锁,数据结构得用 hash,用客户端ID+线程ID作为 field,value 记作锁的重入次数即可。
关于死锁,代码里建议把锁的释放写在 finally 里面确保一定执行,针对客户端抢到锁后宕机的场景,可以给 redis key 设置一个超时时间来解决。
关于误解锁,客户端在释放锁时,必须判断 field 是否当前客户端ID以及线程ID一致,不一致就不执行删除,这里需要用到 lua 脚本判断。
关于释放锁通知,可以利用 Redis 发布订阅模式,给每个锁创建一个频道,释放锁的客户端负责往频道里发送消息通知等待抢锁的客户端。
最后关于高性能和高可用,因为 Redis 是基于内存的,天生就是高性能的。但是 Redis 服务本身一旦出现问题,分布式锁也就不可用了,此时可以多部署几台独立的示例,使用 RedLock 算法来解决高可用的问题。

设计实现

首先我们定义一个 RedisLock 锁对象的抽象接口,只有尝试加锁和释放锁方法

public interface RedisLock {
    boolean tryLock();
    boolean tryLock(long waitTime, long leaseTime, TimeUnit unit);
    void unlock();
}

然后提供一个默认实现 DefaultRedisLock

public class DefaultRedisLock implements RedisLock {
    // 客户端ID UUID
    private final String clientId;
    private final StringRedisTemplate redisTemplate;
    // 锁频道订阅器 接收释放锁通知
    private final LockSubscriber lockSubscriber;
    // 加锁的key
    private final String lockKey;
}

关于tryLock() ,首先执行lua脚本尝试获取锁,如果加锁失败则返回其它客户端持有锁的过期时间,客户端订阅锁对应的频道,然后sleep,直到收到锁释放的通知再继续抢锁。最终不管有没有抢到锁,都会在 finally 取消频道订阅。

@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) {
    final long timeout = System.currentTimeMillis() + unit.toMillis(waitTime);
    final long threadId = Thread.currentThread().getId();
    Long ttl = tryAcquire(leaseTime, unit, threadId);
    if (ttl == null) {
        return true;
    }
    if (System.currentTimeMillis() >= timeout) {
        return false;
    }
    final Semaphore semaphore = lockSubscriber.subscribe(getChannel(lockKey), threadId);
    try {
        while (true) {
            if (System.currentTimeMillis() >= timeout) {
                return false;
            }
            ttl = tryAcquire(leaseTime, unit, threadId);
            if (ttl == null) {
                return true;
            }
            if (System.currentTimeMillis() >= timeout) {
                return false;
            }
            semaphore.tryAcquire(timeout - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        }
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        lockSubscriber.unsubscribe(getChannel(lockKey), threadId);
    }
    return false;
}

tryAcquire() 就是执行lua脚本来加锁,解释一下这段脚本的逻辑:首先判断 lockKey 是否存在,不存在则直接设置 lockKey并且设置过期时间,返回空,表示加锁成功。存在则判断 field 是否和当前客户端ID+线程ID一致,一致则代表锁重入,递增一下value即可,不一致代表加锁失败,返回锁的过期时间

private Long tryAcquire(long leaseTime, TimeUnit timeUnit, long threadId) {
    return redisTemplate.execute(RedisScript.of(
                    "if (redis.call('exists', KEYS[1]) == 0) then " +
                            "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                            "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                            "return nil; end;" +
                            "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                            "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                            "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                            "return nil; end;" +
                            "return redis.call('pttl', KEYS[1]);", Long.class), Collections.singletonList(lockKey),
            String.valueOf(timeUnit.toMillis(leaseTime)), getLockName(threadId));
}

lockName是由客户端ID和线程ID组成的:

private String getLockName(long threadId) {
    return clientId + ":" + threadId;
}

如果加锁失败,客户端会尝试订阅对应的频道,名称规则是:

private String getChannel(String lockKey) {
    return "__lock_channel__:" + lockKey;
}

订阅方法是LockSubscriber#subscribe ,同一个频道无需订阅多个监听器,所以用一个 Map 记录。订阅成功以后,会返回当前线程对应的一个 Semaphore 对象,默认许可数是0,当前线程会调用Semaphore#tryAcquire 等待许可数,监听器在收到锁释放消息后会给 Semaphore 对象增加许可数,唤醒线程继续抢锁。

@Component
public class LockSubscriber {
    @Autowired
    private RedisMessageListenerContainer messageListenerContainer;
    private final Map<String, Map<Long, Semaphore>> channelSemaphores = new HashMap<>();
    private final Map<String, MessageListener> listeners = new HashMap<>();
    private final StringRedisSerializer serializer = new StringRedisSerializer();
    public synchronized Semaphore subscribe(String channelName, long threadId) {
        MessageListener old = listeners.put(channelName, new MessageListener() {
            @Override
            public void onMessage(Message message, byte[] pattern) {
                String channel = serializer.deserialize(message.getChannel());
                String ignore = serializer.deserialize(message.getBody());
                Map<Long, Semaphore> semaphoreMap = channelSemaphores.get(channel);
                if (semaphoreMap != null && !semaphoreMap.isEmpty()) {
                    semaphoreMap.values().stream().findFirst().ifPresent(Semaphore::release);
                }
            }
        });
        if (old == null) {
            messageListenerContainer.addMessageListener(listeners.get(channelName), new ChannelTopic(channelName));
        }
        Semaphore semaphore = new Semaphore(0);
        Map<Long, Semaphore> semaphoreMap = channelSemaphores.getOrDefault(channelName, new HashMap<>());
        semaphoreMap.put(threadId, semaphore);
        channelSemaphores.put(channelName, semaphoreMap);
        return semaphore;
    }
    public synchronized void unsubscribe(String channelName, long threadId) {
        Map<Long, Semaphore> semaphoreMap = channelSemaphores.get(channelName);
        if (semaphoreMap != null) {
            semaphoreMap.remove(threadId);
            if (semaphoreMap.isEmpty()) {
                MessageListener listener = listeners.remove(channelName);
                if (listener != null) {
                    messageListenerContainer.removeMessageListener(listener);
                }
            }
        }
    }
}

对于 unlock,就只是一段 lua 脚本,这里解释一下:判断当前客户端ID+线程ID 这个 field 是否存在,存在说明是自己加的锁,可以释放。不存在说明不是自己加的锁,无需做任何处理。因为是可重入锁,每次 unlock 都只是递减一下 value,只有当 value 等于0时才是真正的释放锁。释放锁的时候会 del lockKey,再 publish 发送锁释放通知,让其他客户端可以继续抢锁。

@Override
public void unlock() {
    long threadId = Thread.currentThread().getId();
    redisTemplate.execute(RedisScript.of(
                    "if (redis.call('hexists', KEYS[1], ARGV[1]) == 0) then " +
                            "return nil;end;" +
                            "local counter = redis.call('hincrby', KEYS[1], ARGV[1], -1); " +
                            "if (counter > 0) then " +
                            "return 0; " +
                            "else " +
                            "redis.call('del', KEYS[1]); " +
                            "redis.call('publish', KEYS[2], 1); " +
                            "return 1; " +
                            "end; " +
                            "return nil;", Long.class), Arrays.asList(lockKey, getChannel(lockKey)),
            getLockName(threadId));
}

最后,我们需要一个 RedisLockFactory 来创建锁对象,它同时会生成客户端ID

@Component
public class RedisLockFactory {
    private static final String CLIENT_ID = UUID.randomUUID().toString();
    @Autowired
    private StringRedisTemplate redisTemplate;
    @Autowired
    private LockSubscriber lockSubscriber;
    public RedisLock getLock(String lockKey) {
        return new DefaultRedisLock(CLIENT_ID, redisTemplate, lockSubscriber, lockKey);
    }
}

至此,一个基于 Redis 实现的分布式可重入锁就完成了。

尾巴

目前这个版本的分布式锁,保证了互斥性、可重入、避免死锁和误解锁、实现了释放锁通知,但是并没有高可用的保证。如果 Redis 是单实例部署,就会存在单点问题,Redis 一旦故障,整个分布式锁将不可用。如果 Redis 是主从集群模式部署,虽然有主从自动切换,但是 Master 和 Slave 之间的数据同步是存在延迟的,分布式锁可能会出现问题。比如:客户端A加锁成功,lockKey 写入了 Master,此时 Master 宕机,其它 Slave 升级成了 Master,但是还没有同步到 lockKey,客户端B来加锁也会成功,这就没有保证互斥性。针对这个问题,可以参考 RedLock 算法,部署多个单独的 Redis 示例,只要一半以上的Redis节点加锁成功就算成功,来尽可能的保证服务高可用。

到此这篇关于Redis分布式可重入锁实现方案的文章就介绍到这了,更多相关Redis重入锁内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • redis中redis-cli使用小结

    redis中redis-cli使用小结

    redis-cli 是Redis命令行界面,一个简单的程序,允许直接从终端向Redis发送命令,并读取服务器发送的回复,本文主要介绍了redis中redis-cli使用小结,感兴趣的可以了解一下
    2023-10-10
  • redis开启和禁用登陆密码校验的方法

    redis开启和禁用登陆密码校验的方法

    今天小编就为大家分享一篇redis开启和禁用登陆密码校验的方法,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2018-05-05
  • 详解用Redis实现Session功能

    详解用Redis实现Session功能

    本篇文章主要介绍了用Redis实现Session功能,具有一定的参考价值,小编觉得挺不错的,现在分享给大家,也给大家做个参考。
    2016-12-12
  • k8s部署redis哨兵的实现

    k8s部署redis哨兵的实现

    本文主要介绍了k8s部署redis哨兵的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2022-07-07
  • Redis的RDB持久化与AOF持久化详解

    Redis的RDB持久化与AOF持久化详解

    这篇文章主要介绍了Redis的RDB持久化与AOF持久化详解,Redis是许多公司都在使用的一款高性能、非关系型数据库,其中最为重要的一个特性就是它支持持久化,本文将深入介绍Redis持久化原理,包括RDB和AOF两种方式的实现,需要的朋友可以参考下
    2023-07-07
  • 查看redis占用内存的实现方法

    查看redis占用内存的实现方法

    这篇文章主要介绍了查看redis占用内存的实现方法,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2024-01-01
  • Redis实现每周热评的项目实践

    Redis实现每周热评的项目实践

    实时统计和展示热门内容是一种常见的需求,本文主要介绍了Redis实现每周热评的项目实践,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2024-03-03
  • Linux中Redis安装部署的操作步骤

    Linux中Redis安装部署的操作步骤

    公司一直在使用redis集群,寻思着自己也部署一套练练手,下面这篇文章主要给大家介绍了关于Linux中Redis安装部署的操作步骤,需要的朋友可以参考下
    2022-04-04
  • Redis Template使用详解示例教程

    Redis Template使用详解示例教程

    RedisTemplate的底层通过RedisConnectionFactory对多种Redis驱动进行集成,上层通过RedisOperations提供丰富的API,并结合Spring基于泛型的bean注入,为开发提供了极大的便利,这篇文章主要介绍了Redis Template使用详解示例教程,需要的朋友可以参考下
    2023-11-11
  • redis配置standAlone版的jedisPool示例

    redis配置standAlone版的jedisPool示例

    这篇文章主要为大家介绍了redis配置standAlone版的jedisPool示例详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-07-07

最新评论