Redisson分布式闭锁RCountDownLatch的使用详细讲解
更新时间:2023年02月11日 11:04:12 作者:每天都要进步一点点
分布式锁和我们java基础中学习到的synchronized略有不同,synchronized中我们的锁是个对象,当前系统部署在不同的服务实例上,单纯使用synchronized或者lock已经无法满足对库存一致性的判断。本次主要讲解基于rediss实现的分布式锁
本篇文章基于redisson-3.17.6版本源码进行分析
一、RCountDownLatch的使用
RCountDownLatch的功能跟CountDownLatch,用于实现某个线程需要等待其他线程都完成之后,我再去执行,这种场景就可以使用CountDownLatch。
@Test public void testRCountDownLatch() { Config config = new Config(); config.useSingleServer().setAddress("redis://127.0.0.1:6379"); RedissonClient redissonClient = Redisson.create(config); RCountDownLatch rCountDownLatch = redissonClient.getCountDownLatch("anyCountDownLatch"); rCountDownLatch.trySetCount(5); for (int i = 1; i <= 5; i++) { new Thread(() -> { System.out.println(Thread.currentThread().getName() + "离开教师..."); rCountDownLatch.countDown(); }, "A" + i).start(); } try { rCountDownLatch.await(); } catch (InterruptedException e) { throw new RuntimeException(e); } System.out.println("班长锁门..."); }
A1离开教师...
A2离开教师...
A4离开教师...
A3离开教师...
A5离开教师...
班长锁门...
二、trySetCount()设置计数器
/** * 仅当先前的计数已达到零或根本未设置时才设置新的计数值。 */ boolean trySetCount(long count);
public RFuture<Boolean> trySetCountAsync(long count) { return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, // 往redis中写入一个String类型的数据 anyCountDownLatch:5 "if redis.call('exists', KEYS[1]) == 0 then " + "redis.call('set', KEYS[1], ARGV[2]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1 " + "else " + "return 0 " + "end", Arrays.asList(getRawName(), getChannelName()), CountDownLatchPubSub.NEW_COUNT_MESSAGE, count); }
同样,在redis中写入了一个{key}:{计数器总数}的String类型的数据。
三、countDown()源码
减少锁存器的计数器。当计数达到零时通知所有等待线程。
public RFuture<Void> countDownAsync() { return commandExecutor.evalWriteNoRetryAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, // 减少redis中计数器的值 "local v = redis.call('decr', KEYS[1]);" + // 计数器减为0后,删除对应的key "if v <= 0 then redis.call('del', KEYS[1]) end;" + "if v == 0 then redis.call('publish', KEYS[2], ARGV[1]) end;", Arrays.<Object>asList(getRawName(), getChannelName()), CountDownLatchPubSub.ZERO_COUNT_MESSAGE); }
四、await()源码
等到计数器达到零。
public void await() throws InterruptedException { // 如果计数器为0,直接返回 if (getCount() == 0) { return; } // 订阅redisson_countdownlatch__channel__{anyCountDownLatch}的消息 CompletableFuture<RedissonCountDownLatchEntry> future = subscribe(); RedissonCountDownLatchEntry entry = commandExecutor.getInterrupted(future); try { // 不断循环判断计数器的值是否大于0,大于0说明还有线程没执行完成,在这里阻塞:LockSupport.park(this) while (getCount() > 0) { // waiting for open state entry.getLatch().await(); } } finally { unsubscribe(entry); } }
到此这篇关于Redisson分布式闭锁RCountDownLatch的使用详细讲解的文章就介绍到这了,更多相关Redisson RCountDownLatch内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
最新评论