Redisson分布式锁源码解析

 更新时间:2018年08月02日 08:25:22   投稿:laozhang  
文章给大家分享了关于Redisson分布式锁源码相关的知识点内容,有兴趣的朋友们可以参考学习下。

Redisson锁继承Implements Reentrant Lock,所以具备 Reentrant Lock 锁中的一些特性:超时,重试,可中断等。加上Redisson中Redis具备分布式的特性,所以非常适合用来做Java中的分布式锁。 下面我们对其加锁、解锁过程中的源码细节进行一一分析。

锁的接口定义了一下方法:

分布式锁当中加锁,我们常用的加锁接口:

boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;

下面我们来看一下方法的具体实现:

public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
  long time = unit.toMillis(waitTime);
  long current = System.currentTimeMillis();
  final long threadId = Thread.currentThread().getId();
  Long ttl = tryAcquire(leaseTime, unit, threadId);
  // lock acquired
  if (ttl == null) {
   return true;
  }
  
  time -= (System.currentTimeMillis() - current);
  if (time <= 0) {
   acquireFailed(threadId);
   return false;
  }
  
  current = System.currentTimeMillis();
  final RFuture subscribeFuture = subscribe(threadId);
  if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
   if (!subscribeFuture.cancel(false)) {
    subscribeFuture.addListener(new FutureListener() {
     @Override
     public void operationComplete(Future future) throws Exception {
      if (subscribeFuture.isSuccess()) {
       unsubscribe(subscribeFuture, threadId);
      }
     }
    });
   }
   acquireFailed(threadId);
   return false;
  }

  try {
   time -= (System.currentTimeMillis() - current);
   if (time <= 0) {
    acquireFailed(threadId);
    return false;
   }
  
   while (true) {
    long currentTime = System.currentTimeMillis();
    ttl = tryAcquire(leaseTime, unit, threadId);
    // lock acquired
    if (ttl == null) {
     return true;
    }

    time -= (System.currentTimeMillis() - currentTime);
    if (time = 0 && ttl < time) {
     getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
    } else {
     getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
    }

    time -= (System.currentTimeMillis() - currentTime);
    if (time <= 0) {
     acquireFailed(threadId);
     return false;
    }
   }
  } finally {
   unsubscribe(subscribeFuture, threadId);
  }
//  return get(tryLockAsync(waitTime, leaseTime, unit));
 }

首先我们看到调用tryAcquire尝试获取锁,在这里是否能获取到锁,是根据锁名称的过期时间TTL来判定的(TTL

下面我们接着看一下tryAcquire的实现:

private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
 return get(tryAcquireAsync(leaseTime, unit, threadId));
}

可以看到真正获取锁的操作经过一层get操作里面执行的,这里为何要这么操作,本人也不是太理解,如有理解错误,欢迎指正。

get 是由CommandAsyncExecutor(一个线程Executor)封装的一个Executor

设置一个单线程的同步控制器CountDownLatch,用于控制单个线程的中断信息。个人理解经过中间的这么一步:主要是为了支持线程可中断操作。

public V get(RFuture future) {
 if (!future.isDone()) {
  final CountDownLatch l = new CountDownLatch(1);
  future.addListener(new FutureListener() {
   @Override
   public void operationComplete(Future future) throws Exception {
    l.countDown();
   }
  });
  
  boolean interrupted = false;
  while (!future.isDone()) {
   try {
    l.await();
   } catch (InterruptedException e) {
    interrupted = true;
   }
  }
  
  if (interrupted) {
   Thread.currentThread().interrupt();
  }
 }

 // commented out due to blocking issues up to 200 ms per minute for each thread:由于每个线程的阻塞问题,每分钟高达200毫秒
 // future.awaitUninterruptibly();
 if (future.isSuccess()) {
  return future.getNow();
 }

 throw convertException(future);
}

我们进一步往下看:

private RFuture tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
 if (leaseTime != -1) {
  return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
 }
 RFuture ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
 ttlRemainingFuture.addListener(new FutureListener() {
  @Override
  public void operationComplete(Future future) throws Exception {
   if (!future.isSuccess()) {
    return;
   }

   Long ttlRemaining = future.getNow();
   // lock acquired
   if (ttlRemaining == null) {
    scheduleExpirationRenewal(threadId);
   }
  }
 });
 return ttlRemainingFuture;
}

首先判断锁是否有超时时间,有过期时间的话,会在后面获取锁的时候设置进去。没有过期时间的话,则会用默认的

private long lockWatchdogTimeout = 30 * 1000;

下面我们在进一步往下分析真正获取锁的操作:

RFuture tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand command) {
 internalLockLeaseTime = unit.toMillis(leaseTime);

 return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
    "if (redis.call('exists', KEYS[1]) == 0) then " +
     "redis.call('hset', KEYS[1], ARGV[2], 1); " +
     "redis.call('pexpire', KEYS[1], ARGV[1]); " +
     "return nil; " +
    "end; " +
    "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
     "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
     "redis.call('pexpire', KEYS[1], ARGV[1]); " +
     "return nil; " +
    "end; " +
    "return redis.call('pttl', KEYS[1]);",
    Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}

我把里面的重点信息做了以下三点总结:

1:真正执行的是一段具有原子性的Lua脚本,并且最终也是由CommandAsynExecutor去执行。

2:锁真正持久化到Redis时,用的hash类型key field value

3:获取锁的三个参数:getName()是逻辑锁名称,例如:分布式锁要锁住的methodName+params;internalLockLeaseTime是毫秒单位的锁过期时间;getLockName则是锁对应的线程级别的名称,因为支持相同线程可重入,不同线程不可重入,所以这里的锁的生成方式是:UUID+":"threadId。有的同学可能会问,这样不是很缜密:不同的JVM可能会生成相同的threadId,所以Redission这里加了一个区分度很高的UUID;

Lua脚本中的执行分为以下三步:

1:exists检查redis中是否存在锁名称;如果不存在,则获取成功;同时把逻辑锁名称KEYS[1],线程级别的锁名称[ARGV[2],value=1,设置到redis。并设置逻辑锁名称的过期时间ARGV[2],返回;

2:如果检查到存在KEYS[1],[ARGV[2],则说明获取成功,此时会自增对应的value值,记录重入次数;并更新锁的过期时间

3:key不存,直接返回key的剩余过期时间(-2)

相关文章

  • 分布式系统中的降级熔断设计问题面试

    分布式系统中的降级熔断设计问题面试

    这篇文章主要为大家介绍了分布式系统中的降级熔断设计问题面试解答,有需要的朋友可以借鉴参考下,希望能有所帮助,祝大家多多进步,早日升职加薪
    2022-03-03
  • 解决Mybatis的@Param()注解导致分页失效的问题

    解决Mybatis的@Param()注解导致分页失效的问题

    这篇文章主要介绍了解决Mybatis的@Param()注解导致分页失效的问题,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2021-04-04
  • spring拓展之如何定义自己的namespace

    spring拓展之如何定义自己的namespace

    这篇文章主要介绍了spring拓展之如何定义自己的namespace方式,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-09-09
  • Java四舍五入时保留指定小数位数的五种方式

    Java四舍五入时保留指定小数位数的五种方式

    这篇文章主要介绍了Java四舍五入时保留指定小数位数的五种方式,帮助大家更好的理解和使用Java,感兴趣的朋友可以了解下
    2020-09-09
  • SpringMVC数据响应详细介绍

    SpringMVC数据响应详细介绍

    Spring MVC 是 Spring 提供的一个基于 MVC 设计模式的轻量级 Web 开发框架,本质上相当于 Servlet,Spring MVC 角色划分清晰,分工明细,本章来讲解SpringMVC数据响应
    2023-02-02
  • 详解Spring Boot中MyBatis的使用方法

    详解Spring Boot中MyBatis的使用方法

    mybatis初期使用比较麻烦,需要各种配置文件、实体类、dao层映射关联、还有一大推其它配置。当然mybatis也发现了这种弊端。下面通过本文给大家详细介绍Spring Boot中MyBatis的使用方法,感兴趣的朋友一起看看吧
    2017-07-07
  • 基于@Bean修饰的方法参数的注入方式

    基于@Bean修饰的方法参数的注入方式

    这篇文章主要介绍了@Bean修饰的方法参数的注入方式,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-09-09
  • Java 在游戏中探索数组二维数组

    Java 在游戏中探索数组二维数组

    数组和二维数组感觉用王者荣耀的装备栏来举例解释,应该更易懂一些。从基础开始讲,后续会讲到JAVA高级,中间会穿插面试题和项目实战,希望能给大家带来帮助
    2022-03-03
  • Java实现局域网聊天室功能(私聊、群聊)

    Java实现局域网聊天室功能(私聊、群聊)

    这篇文章主要为大家详细介绍了Java实现局域网聊天室功能,包括私聊、群聊,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2022-05-05
  • Spring AI 使用本地 Ollama Embeddings的操作方法

    Spring AI 使用本地 Ollama Embeddings的操作方法

    使用 OpenAI 的 Embeddings 接口是有费用的,如果想对大量文档进行测试,使用本地部署的 Embeddings 就能省去大量的费用,所以我们尝试使用本地的 Ollama Embeddings,这篇文章主要介绍了Spring AI 使用本地 Ollama Embeddings,需要的朋友可以参考下
    2024-05-05

最新评论