Java Semaphore信号量使用分析讲解
前言
大家应该都用过synchronized
关键字加锁,用来保证某个时刻只允许一个线程运行。那么如果控制某个时刻允许指定数量的线程执行,有什么好的办法呢? 答案就是JUC提供的信号量Semaphore
。
介绍和使用
Semaphore
(信号量)可以用来限制能同时访问共享资源的线程上限,它内部维护了一个许可的变量,也就是线程许可的数量
Semaphore
的许可数量如果小于0个,就会阻塞获取,直到有线程释放许可
Semaphore
是一个非重入锁
API介绍
构造方法
public Semaphore(int permits)
:permits
表示许可线程的数量public Semaphore(int permits, boolean fair)
:fair
表示公平性,如果设为true
,表示是公平,那么等待最久的线程先执行
常用API
public void acquire()
:表示一个线程获取1个许可,那么线程许可数量相应减少一个public void release()
:表示释放1个许可,那么线程许可数量相应会增加
其他API
void acquire(int permits)
:表示一个线程获取n个许可,这个数量由参数permits
决定void release(int permits)
:表示一个线程释放n个许可,这个数量由参数permits
决定int availablePermits()
:返回当前信号量线程许可数量int getQueueLength()
: 返回等待获取许可的线程数的预估值
基本使用
public static void main(String[] args) { // 1. 创建 semaphore 对象 Semaphore semaphore = new Semaphore(2); // 2. 10个线程同时运行 for (int i = 0; i < 8; i++) { new Thread(() -> { // 3. 获取许可 try { semaphore.acquire(); } catch (InterruptedException e) { e.printStackTrace(); } try { log.debug("running..."); sleep(1); log.debug("end..."); } catch (InterruptedException e) { e.printStackTrace(); } finally { // 4. 释放许可 semaphore.release(); } }).start(); } }
运行结果:
原理介绍
上面是Semaphore
的类结构图,其中FairSync
和NonfairSync
是它的内部类,他们共同继承了AQS类,AQS的共享模式提供了Semaphore
的加锁、解锁。
为了更好的搞懂原理,我们通过一个例子来帮助我们理解。
假设Semaphore
的 permits
为 3,这时 5 个线程来获取资源,其中Thread-1
,Thread-2
,Thread-4
CAS 竞争成功,permits
变为 0,而 Thread-0
和 Thread-3
竞争失败。
获取许可acquire()
acquire()
主方法会调用sync.acquireSharedInterruptibly(1)
方法acquireSharedInterruptibly()
方法会先调用tryAcquireShared()
方法返回许可的数量,如果小于0个,调用doAcquireSharedInterruptibly()
方法进入阻塞
// acquire() -> sync.acquireSharedInterruptibly(1),可中断 public final void acquireSharedInterruptibly(int arg) { if (Thread.interrupted()) throw new InterruptedException(); // 尝试获取通行证,获取成功返回 >= 0的值 if (tryAcquireShared(arg) < 0) // 获取许可证失败,进入阻塞 doAcquireSharedInterruptibly(arg); }
tryAcquireShared()
方法在终会调用到Sync#nonfairTryAcquireShared()
方法nonfairTryAcquireShared()
方法中会减去获取的许可数量,返回剩余的许可数量
// tryAcquireShared() -> nonfairTryAcquireShared() // 非公平,公平锁会在循环内 hasQueuedPredecessors()方法判断阻塞队列是否有临头节点(第二个节点) final int nonfairTryAcquireShared(int acquires) { for (;;) { // 获取 state ,state 这里【表示通行证】 int available = getState(); // 计算当前线程获取通行证完成之后,通行证还剩余数量 int remaining = available - acquires; // 如果许可已经用完, 返回负数, 表示获取失败, if (remaining < 0 || // 许可证足够分配的,如果 cas 重试成功, 返回正数, 表示获取成功 compareAndSetState(available, remaining)) return remaining; } }
- 如果剩余的许可数量<0, 会调用
doAcquireSharedInterruptibly()
方法将当前线程加入到阻塞队列中阻塞 - 方法中调用
parkAndCheckInterrupt()
阻塞当前线程
private void doAcquireSharedInterruptibly(int arg) { // 将调用 Semaphore.aquire 方法的线程,包装成 node 加入到 AQS 的阻塞队列中 final Node node = addWaiter(Node.SHARED); // 获取标记 boolean failed = true; try { for (;;) { final Node p = node.predecessor(); // 前驱节点是头节点可以再次获取许可 if (p == head) { // 再次尝试获取许可,【返回剩余的许可证数量】 int r = tryAcquireShared(arg); if (r >= 0) { // 成功后本线程出队(AQS), 所在 Node设置为 head // r 表示【可用资源数】, 为 0 则不会继续传播 setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } // 不成功, 设置上一个节点 waitStatus = Node.SIGNAL, 下轮进入 park 阻塞 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { // 被打断后进入该逻辑 if (failed) cancelAcquire(node); } }
最终的AQS状态如下图所示:
Thread-1
、Thread-2
、Thread-4
正常运行- AQS的
state
也就是等于0 Thread-0
、Thread-3
再阻塞队列中
释放许可release()
现在Thread-4
运行完毕,要释放许可,Thread-0
、Thread-3
又是如何恢复执行的呢?
- 调用
release()
方法释放许可,最终调用Sync#releaseShared()
方法 - 如果方法
tryReleaseShared(arg)
尝试释放许可成功,那么调用doReleaseShared();
进行唤醒
// release() -> releaseShared() public final boolean releaseShared(int arg) { // 尝试释放锁 if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
tryReleaseShared()
方法主要是尝试释放许可- 获取当前许可数量 + 释放的数量,然后通过cas设置回去
protected final boolean tryReleaseShared(int releases) { for (;;) { // 获取当前锁资源的可用许可证数量 int current = getState(); int next = current + releases; // 索引越界判断 if (next < current) throw new Error("Maximum permit count exceeded"); // 释放锁 if (compareAndSetState(current, next)) return true; } }
- 调用
doReleaseShared()
方法唤醒队列中的线程 - 其中
unparkSuccessor()
方法是唤醒的核心操作
// 唤醒 private void doReleaseShared() { // 如果 head.waitStatus == Node.SIGNAL ==> 0 成功, 下一个节点 unpark // 如果 head.waitStatus == 0 ==> Node.PROPAGATE for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { // 防止 unparkSuccessor 被多次执行 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // 唤醒后继节点 unparkSuccessor(h); } // 如果已经是 0 了,改为 -3,用来解决传播性 else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; } if (h == head) break; } }
最终AQS状态如下图所示:
- 许可state变回1
- 然后
Thread-0
开始竞争,如果竞争成功,如下图所示:
- 由于Thread-0竞争成功,再次获取到许可,许可数量减1,最终又变回0
- 然后等待队列中剩余
Thread-3
总结
Semaphore
信号量类基于AQS的共享锁实现,有公平锁和非公平锁两个版本,它用来限制能同时访问共享资源的线程上限,典型的应用场景是可以用来保护有限的公共资源,比如数据库连接等。
到此这篇关于Java Semaphore信号量使用分析讲解的文章就介绍到这了,更多相关Java Semaphore信号量内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
相关文章
MyBatis-Plus多表联查的实现方法(动态查询和静态查询)
本文用示例介绍使用MyBatis-Plus进行多表查询的方法,包括静态查询和动态查询,通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧2022-03-03spring使用@Async注解导致循环依赖问题异常的排查记录
这篇文章主要介绍了spring使用@Async注解导致循环依赖问题异常的排查记录,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪2023-08-08
最新评论