Redisson分布式信号量RSemaphore的使用超详细讲解

 更新时间:2023年02月11日 11:19:18   作者:每天都要进步一点点  
这篇文章主要介绍了Redisson分布式信号量RSemaphore的使用,基于Redis的Redisson的分布式信号量RSemaphore采用了与java.util.concurrent.Semaphore相似的接口和用法

本篇文章基于redisson-3.17.6版本源码进行分析

一、RSemaphore的使用

@Test
public void testRSemaphore() {
    Config config = new Config();
    config.useSingleServer().setAddress("redis://127.0.0.1:6379");
    RedissonClient redissonClient = Redisson.create(config);
    RSemaphore rSemaphore = redissonClient.getSemaphore("semaphore");
    // 设置5个许可,模拟五个停车位
    rSemaphore.trySetPermits(5);
    // 创建10个线程,模拟10辆车过来停车
    for (int i = 1; i <= 10; i++) {
        new Thread(() -> {
            try {
                rSemaphore.acquire();
                System.out.println(Thread.currentThread().getName() + "进入停车场...");
                TimeUnit.MILLISECONDS.sleep(new Random().nextInt(100));
                System.out.println(Thread.currentThread().getName() + "离开停车场...");
                rSemaphore.release();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }, "A" + i).start();
    }
    try {
        TimeUnit.MINUTES.sleep(1);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
}

二、RSemaphore设置许可数量

初始化RSemaphore,需要调用trySetPermits()设置许可数量:

/**
 * 尝试设置许可数量,设置成功,返回true,否则返回false
 */
boolean trySetPermits(int permits);

trySetPermits()内部调用了trySetPermitsAsync():

// 异步设置许可
@Override
public RFuture<Boolean> trySetPermitsAsync(int permits) {
    RFuture<Boolean> future = commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            // 判断分布式信号量的key是否存在,如果不存在,才设置
            "local value = redis.call('get', KEYS[1]); " +
                    "if (value == false) then "
                    // set "semaphore" permits
                    // 使用String数据结构设置信号量的许可数
                    + "redis.call('set', KEYS[1], ARGV[1]); "
                    // 发布一条消息到redisson_sc:{semaphore}通道
                    + "redis.call('publish', KEYS[2], ARGV[1]); "
                    // 设置成功,返回1
                    + "return 1;"
                    + "end;"
                    // 否则返回0
                    + "return 0;",
            Arrays.asList(getRawName(), getChannelName()), permits);
    if (log.isDebugEnabled()) {
        future.thenAccept(r -> {
            if (r) {
                log.debug("permits set, permits: {}, name: {}", permits, getName());
            } else {
                log.debug("unable to set permits, permits: {}, name: {}", permits, getName());
            }
        });
    }
    return future;
}

可以看到,设置许可数量底层使用LUA脚本,实际上就是使用redis的String数据结构,保存了我们指定的许可数量。如下图:

参数说明:

  • KEYS[1]: 我们指定的分布式信号量key,例如redissonClient.getSemaphore("semaphore")中的"semaphore")
  • KEYS[2]: 释放锁的channel名称,redisson_sc:{分布式信号量key},在本例中,就是redisson_sc:{semaphore}
  • ARGV[1]: 设置的许可数量

总结设置许可执行流程为:

  • get semaphore,获取到semaphore信号量的当前的值
  • 第一次数据为0, 然后使用set semaphore 3,将这个信号量同时能够允许获取锁的客户端的数量设置为3。(注意到,如果之前设置过了信号量,将无法再次设置,直接返回0。想要更改信号量总数可以使用addPermits方法)
  • 然后redis发布一些消息,返回1

三、RSemaphore的加锁流程

许可数量设置好之后,我们就可以调用acquire()方法获取了,如果未传入许可数量,默认获取一个许可。

public void acquire() throws InterruptedException {
    acquire(1);
}
public void acquire(int permits) throws InterruptedException {
    // 尝试获取锁成功,直接返回
    if (tryAcquire(permits)) {
        return;
    }
    // 对于没有获取锁的那些线程,订阅redisson_sc:{分布式信号量key}通道的消息
    CompletableFuture<RedissonLockEntry> future = subscribe();
    semaphorePubSub.timeout(future);
    RedissonLockEntry entry = commandExecutor.getInterrupted(future);
    try {
        // 不断循环尝试获取许可
        while (true) {
            if (tryAcquire(permits)) {
                return;
            }
            entry.getLatch().acquire();
        }
    } finally {
        // 取消订阅
        unsubscribe(entry);
    }
//        get(acquireAsync(permits));
}

可以看到,获取许可的核心逻辑在tryAcquire()方法中,如果tryAcquire()返回true说明获取许可成功,直接返回;如果返回false,说明当前没有许可可以使用,则对于没有获取锁的那些线程,订阅redisson_sc:{分布式信号量key}通道的消息,并通过死循环不断尝试获取锁。

我们看一下tryAcquire()方法的逻辑,内部调用了tryAcquireAsync()方法:

// 异步获取许可
@Override
public RFuture<Boolean> tryAcquireAsync(int permits) {
    if (permits < 0) {
        throw new IllegalArgumentException("Permits amount can't be negative");
    }
    if (permits == 0) {
        return new CompletableFutureWrapper<>(true);
    }
    return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
              // 获取当前剩余的许可数量
              "local value = redis.call('get', KEYS[1]); " +
              // 许可不为空,并且许可数量 大于等于 当前线程申请的许可数量        
              "if (value ~= false and tonumber(value) >= tonumber(ARGV[1])) then " +
                  // 通过decrby减少剩余可用许可    
                  "local val = redis.call('decrby', KEYS[1], ARGV[1]); " +
                  // 返回1    
                  "return 1; " +
              "end; " +
              // 其它情况,返回0        
              "return 0;",
              Collections.<Object>singletonList(getRawName()), permits);
}

从源码可以看到,获取许可就是操作redis中的数据,首先获取到redis中剩余的许可数量,只有当剩余的许可数量大于线程申请的许可数量时,才获取成功,返回1;否则获取失败,返回0;

总结加锁执行流程为:

  • get semaphore,获取到一个当前的值,比如说是3,3 > 1
  • decrby semaphore 1,将信号量允许获取锁的客户端的数量递减1,变成2
  • decrby semaphore 1
  • decrby semaphore 1
  • 执行3次加锁后,semaphore值为0
  • 此时如果再来进行加锁则直接返回0,然后进入死循环去获取锁

四、RSemaphore的解锁流程

通过前面对RSemaphore获取锁的分析,我们很容易能猜到,释放锁,无非就是归还许可数量到redis中。我们查看具体的源码:

public RFuture<Void> releaseAsync(int permits) {
    if (permits < 0) {
        throw new IllegalArgumentException("Permits amount can't be negative");
    }
    if (permits == 0) {
        return new CompletableFutureWrapper<>((Void) null);
    }
    RFuture<Void> future = commandExecutor.evalWriteAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.EVAL_VOID,
            // 通过incrby增加许可数量
            "local value = redis.call('incrby', KEYS[1], ARGV[1]); " +
                    // 发布一条消息到redisson_sc:{semaphore}中
                    "redis.call('publish', KEYS[2], value); ",
            Arrays.asList(getRawName(), getChannelName()), permits);
    if (log.isDebugEnabled()) {
        future.thenAccept(o -> {
            log.debug("released, permits: {}, name: {}", permits, getName());
        });
    }
    return future;
}

到此这篇关于Redisson分布式信号量RSemaphore的使用超详细讲解的文章就介绍到这了,更多相关Redisson RSemaphore内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Java实现树形菜单的方法总结

    Java实现树形菜单的方法总结

    当我们想要展示层级结构,如文件目录、组织结构或分类目录时,树形菜单是一个直观且有效的解决方案,本文为大家整理了java中几种常见方法,希望对大家有所帮助
    2023-08-08
  • 在SpringBoot中添加Redis及配置方法

    在SpringBoot中添加Redis及配置方法

    这篇文章主要介绍了在SpringBoot中添加Redis及配置redis的代码,需要的朋友可以参考下
    2018-10-10
  • Java控制台输入数组并逆序输出的方法实例

    Java控制台输入数组并逆序输出的方法实例

    这篇文章主要介绍了Java手动输入数组并逆向输出的方法实例,需要的朋友可以参考下。
    2017-08-08
  • SpringBoot整合数据库访问层的实战

    SpringBoot整合数据库访问层的实战

    本文主要介绍了SpringBoot整合数据库访问层的实战,主要包含JdbcTemplate和mybatis框架的整合应用,具有一定的参考价值,感兴趣的可以了解一下
    2022-03-03
  • 深入理解Java Socket通信

    深入理解Java Socket通信

    本篇文章主要介绍了深入理解Java Socket,Java中的网络通信是通过Socket实现的,Socket分为ServerSocket和Socket两大类,有兴趣的可以了解一下
    2017-02-02
  • Springboot通过ObjectMapper配置json序列化详解

    Springboot通过ObjectMapper配置json序列化详解

    SpringBoot默认集成Jackson库,其中ObjectMapper类是核心,用于Java对象与JSON字符串的互转,提供配置序列化特性、注册模块等方法,在SpringBoot中可以全局配置JSON格式,如日期格式化、将Long转为字符串,还可以配置序列化时的各种规则,感兴趣的可以了解一下
    2024-10-10
  • Java Grpc实例创建负载均衡详解

    Java Grpc实例创建负载均衡详解

    这篇文章主要介绍了Java Grpc实例创建负载均衡详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-03-03
  • logback的addtivity属性定义源码解读

    logback的addtivity属性定义源码解读

    这篇文章主要为大家介绍了logback的addtivity属性定义源码解读,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-11-11
  • springboot(thymeleaf)中th:field和th:value的区别及说明

    springboot(thymeleaf)中th:field和th:value的区别及说明

    这篇文章主要介绍了springboot(thymeleaf)中th:field和th:value的区别及说明,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-10-10
  • Java中的SimpleDateFormat使用详解

    Java中的SimpleDateFormat使用详解

    SimpleDateFormat 是一个以国别敏感的方式格式化和分析数据的具体类。这篇文章主要介绍了Java中的SimpleDateFormat使用详解,需要的朋友可以参考下
    2017-03-03

最新评论