JAVA多线程之JDK中的各种锁详解(看这一篇就够了)
1.概论
1.1.实现锁的要素
JAVA中的锁都是可重入的锁,因为不可重入的试用的时候很容易造成死锁。这个道理很好想明白:
当一个线程已经持有一个锁,并在持有该锁的过程中再次尝试获取同一把锁时,如果没有重入机制,第二次请求会被阻塞,因为锁已经被自己持有。这会导致线程自我死锁,因为它在等待自己释放的锁。
可重入是指获取锁的线程可以继续重复的获得此锁。其实我们想都能想到要实现一把锁需要些什么,首先肯定是:
标志位,也叫信号量,标记锁的状态和重入次数,这样才能完成持有锁和释放锁。
接下来要考虑的是拒接策略,当前锁被持有期间,后续的请求线程该怎么处理,当然可以直接拒绝,JAVA的选择委婉点,选择了允许这些线程躺在锁上阻塞等待锁被释放。要实现让线程躺在锁上等待,我们想想无非要:
需要支持对一个线程的阻塞、唤醒
需要记录当前哪个线程持有锁
需要一个队列维护所有阻塞在当前锁上的线程
OK,以上四点就是JAVA锁的核心,总结起来就是信号量+队列,分别用来记录持有者和等待者。
1.2.阻塞、唤醒操作
首先我们来看看阻塞和唤醒的操作,在JDK中提供了一个Unsafe类,该类中提供了阻塞或唤醒线程的一对操作 原语——park/unpark:
public native void unpark(Object var1); public native void park(boolean var1, long var2);
这对原语最终会调用操作系统的程序接口执行线程操作。
1.2.阻塞队列
拿来维护所有阻塞在当前锁上的线程的队列能是个普通队列吗?很显然不是,它的操作必须是线程安全的是吧,所以这个队列用阻塞队列实现才合适。什么是阻塞队列:
阻塞队列提供了线程安全的元素插入和移除操作,并且在特定条件下会阻塞线程,直到满足操作条件。
说到JDK中的阻塞队列,其核心就是AbstractQueuedSynchronizer,简称AQS,由双向链表实现的一个元素操作绝对安全的队列,用来在锁的实现中维护阻塞在锁上的线程上的队列的这个角色。
来看看AQS的源码:
它有指向前后节点的指针、有一个标志位state、还有一个提供线程操作原原语(阻塞、唤醒)的unsafe类。
所以其实AQS就长这样:
点进源码可以看到其随便一个方法都是线程安全的:
由于本文不是专门聊AQS这里就不扩展了,反正知道AQS是一个线程安全的阻塞队列就对了。
1.3.Lock接口和Sync类
JAVA中所有锁的顶级父接口,用来规范定义一把锁应该有那些行为职责:
public interface Lock { void lock(); void lockInterruptibly() throws InterruptedException; boolean tryLock(long time, TimeUnit unit) throws InterruptedException; void unlock(); Condition newCondition(); }
JAVA中所有锁的实现都是依托AQS去作为阻塞队列,每个锁内部都会实现一个Sync内部类,在自身Sync内部以不同的策略去操作AQS实现不同种类的锁。
abstract static class Sync extends AbstractQueuedSynchronizer {......}
2.各种锁
2.1.互斥锁
2.1.1.概论
ReentrantLock,互斥锁,ReentrantLock本身没有任何代码逻辑,依靠内部类Sync干活儿:
public class ReentrantLock implements Lock, Serializable { private final ReentrantLock.Sync sync; public void lock() { this.sync.lock(); } public void unlock() { this.sync.release(1); } ...... }
ReentrantLock的Sync继承了AQS
abstract static class Sync extends AbstractQueuedSynchronizer {......}
Sync是抽象类,有两个实现:
NonfairSync,公平锁
FairSync,非公平锁
实例化ReentrantLock的实例时,根据传入的标志位可以创建公平和公平的实现
public class ReentrantLock implements Lock, java.io.Serializable{ public ReentrantLock() { sync = new NonfairSync(); } public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); } ...... } }
2.1.2.源码
1.lock()
公平锁的lock():
static final class FairSync extends Sync { final void lock() { acquire(1);//进来直接排队 }
非公平锁的lock():
static final class NonfairSync extends Sync { final void lock() { if (compareAndSetState(0, 1))//进来直接抢锁 setExclusiveOwnerThread(Thread.currentThread());//将锁的持有者设置为当前线程 else acquire(1);//没抢过再去排队 } }
acquire()是AQS的模板方法:
tryAcquire,尝试再去获取一次锁,公平锁依然是排队抢,去看看阻塞队列是否为空;非公平锁依然是直接抢。
acquireQueued,将线程放入阻塞队列。
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
acquireQueued(..)是lock()最关键的一部分,addWaiter(..)把Thread对象加入阻塞队列,acquireQueued(..)完成对线程的阻塞。
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) {//如果发现自己在队头就去拿锁 setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())//调用原语,阻塞自己 interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
acquireQueued(..)函数有一个返回值,表示什么意思 呢?虽然该函数不会中断响应,但它会记录被阻塞期间有没有其他线 程向它发送过中断信号。如果有,则该函数会返回true;否则,返回false。所以才有了以下逻辑:
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
当 acquireQueued(..) 返回 true 时,会调用 selfInterrupt (),自己给自己发送中断信号,也就是自己把自己的中断标志位设 为true。之所以要这么做,是因为自己在阻塞期间,收到其他线程中 断信号没有及时响应,现在要进行补偿。这样一来,如果该线程在loc k代码块内部有调用sleep()之类的阻塞方法,就可以抛出异常,响 应该中断信号。
2.unlock()
unlock的逻辑很简单,每次unlock,state-1,直到state=0时,将锁的拥有者置null,释放锁。由于只有锁的持有线程才能操作lock,所以unlock()不需要用CAS,操作时直接判断一下是不是锁的持有线程在操作即可。
public void unlock() { sync.release(1); } public final boolean release(int arg) { if (tryRelease(arg)) {//释放锁 Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h);//唤醒阻塞队列中的后继者 return true; } return false; }
释放锁:
protected final boolean tryRelease(int releases) { int c = getState() - releases;//每次unlock,state减1 if (Thread.currentThread() != getExclusiveOwnerThread())//判断是不是锁的持有线程 throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) {//state为0表示该锁没有被持有 free = true; setExclusiveOwnerThread(null);//将锁的持有者置null } setState(c); return free; }
唤醒后继者:
private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
2.2.读写锁
读写锁是一个实现读写互斥的锁,读写锁包含一个读锁、一个写锁:
public interface ReadWriteLock{ Lock readLock(); Lock writeLock(); }
读写锁的使用就是直接调用对应锁进行锁定和解锁:
ReadWriteLock rwLock=new ReetrantReadWriteLock(); Lock rLock=rwLock.readLock(); rLock.lock(); rLock.unLock(); Lock wLock=rwLock.writeLock(); wLock.lock(); wLock.unLock();
读写锁的Sync内部类对读锁和写锁采用同一个int型的信号量的高16位和低16位分别表示读写锁的状态和重入次数,这样一次CAS就能统一处理进行读写互斥操作:
abstract static class Sync extends AbstractQueuedSynchronizer { static final int SHARED_SHIFT = 16; static final int SHARED_UNIT = (1 << SHARED_SHIFT); static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; static int sharedCount(int c) { return c >>> SHARED_SHIFT; } static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; } }
2.3.Condition
2.3.1.概论
condition用于更加细粒度的控制锁上面的线程阻塞、唤醒。
以下以一个经典的生产、消费者问题为例:
队列空的时候进来的消费者线程阻塞,有数据放进来后唤醒阻塞的消费者线程。
队列满的时候进来的生产者线程阻塞,有空位后唤醒阻塞的生产者线程。
锁粒度的实现:
public void enqueue(){ synchronized(queue){ while(queue.full()){ queue.wait(); } //入队列 ...... //通知消费者,队列中有数据了 queue.notify(); } } public void dequeue(){ synchronized(queue){ while(queue.empty()){ queue.wait(); } //出队列 ...... //通知生产者,队列中有空位了,可以继续放数据 queue.notify(); } }
可以发现,唤醒的时候把阻塞的生产消费线程一起唤醒了。
条件粒度的实现:
private final Lock lock = new ReentrantLock(); private final Condition notFull = lock.newCondition(); // 用于等待队列不满 private final Condition notEmpty = lock.newCondition(); // 用于等待队列非空 public void enqueue(Object item) { try { while (queue.isFull()) { notFull.await(); // 等待队列不满 } // 入队列操作 // ... // 入队后,通知等待的消费者 notEmpty.signal(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); // 保持中断状态 // 处理中断逻辑 } finally { queue.unlock(); } } public void dequeue() { try { while (queue.isEmpty()) { notEmpty.await(); // 等待队列非空 } // 出队列操作 // ... // 出队后,通知等待的生产者 notFull.signal(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); // 保持中断状态 // 处理中断逻辑 } finally { queue.unlock(); } }
2.3.2.底层实现
Condition由Lock产生,因此Lock中持有Condition:
public interface Lock { ...... Condition newCondition(); }
承担功能的其实就是Syn中的ConditionObject,也就是AQS中的ConditionObject:
final ConditionObject newCondition() { return new ConditionObject(this); }
一个Condition上面阻塞着多个线程,所以每个Condition内部都有一个队列,用来记录阻塞在这个condition上面的线程,这个队列其实也是AQS实现的,AQS中除了实现一个以Node为节点的队列,还实现了一个以ConditionObject为节点的队列:
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { public class ConditionObject implements Condition, java.io.Serializable { private static final long serialVersionUID = 1173984872572414699L; private transient Node firstWaiter; private transient Node lastWaiter; ...... } }
Condition是个接口,定义了一系列条件操作:
public interface Condition { void await() throws InterruptedException; void awaitUninterruptibly(); long awaitNanos(long var1) throws InterruptedException; boolean await(long var1, TimeUnit var3) throws InterruptedException; boolean awaitUntil(Date var1) throws InterruptedException; void signal(); void signalAll(); }
总结
到此这篇关于JAVA多线程之JDK中各种锁的文章就介绍到这了,更多相关JAVA多线程JDK各种锁内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
相关文章
SpringBoot如何启动自动加载自定义模块yml文件(PropertySourceFactory)
这篇文章主要介绍了SpringBoot如何启动自动加载自定义模块yml文件(PropertySourceFactory),具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教2024-07-07
最新评论