Java AQS 线程安全同步队列的实现

 更新时间:2023年08月30日 08:31:03   作者:Tans5  
AQS 同步队列是很多的 Java 线程安全对象的实现,例如 ReentrantLock, Semaphore, CountDownLatch, ReentrantReadWriteLock 等等,本文就介绍了Java AQS 线程安全同步队列的实现,感兴趣的可以了解一下

AQS 同步队列是很多的 Java 线程安全对象的实现,例如 ReentrantLock, Semaphore, CountDownLatch, ReentrantReadWriteLock 等等。

AQS 是 AbstractQueuedSynchronizer 的简称,它是一个抽象类,我们需要实现其中的一些关键方法来完成他的基本功能。

这里简单介绍一下它的实现方式,当一个线程想要获取该对象的锁的时候,会通过方法检查该线程是否能够获取锁,如果能够获取锁就结束了,完事儿;如果不能够获取锁,就加入同步队列等待,同时挂起该线程,如果这个时候还有别的线程在竞争该对象的锁接着加入同步队列,挂起,当占有这个锁的线程完事儿后会释放锁,释放时会去检查同步队列,取出最先进入队列的线程,然后把它唤醒,它就获得了锁,当它也完事儿释放后,又唤醒下一个,直到队列中的等待线程全部唤醒。

网上已经有很多的源码分析的文章了,所以我想尽可能的简化分析,很多的细节我就不说了。

1 自定义 AQS 的重要方法

val qs = object : AbstractQueuedSynchronizer() {
    /**
     * 尝试获取互斥锁,如果返回,如果获取失败,后续就会进入同步队列,同时挂起线程
     */
    override fun tryAcquire(arg: Int): Boolean {
        return super.tryAcquire(arg)
    }
    /**
     * 尝试释放互斥锁
     */
    override fun tryRelease(arg: Int): Boolean {
        return super.tryRelease(arg)
    }
    /**
     * 尝试获取共享锁,和同步锁一样,失败就进入队列
     */
    override fun tryAcquireShared(arg: Int): Int {
        return super.tryAcquireShared(arg)
    }
    /**
     * 尝试释放同步锁
     */
    override fun tryReleaseShared(arg: Int): Boolean {
        return super.tryReleaseShared(arg)
    }
    /**
     * 当前线程是否获得锁
     */
    override fun isHeldExclusively(): Boolean {
        return super.isHeldExclusively()
    }
}

下面是 JDK 中 ReentrantLock 中不公平锁的实现:

    class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -5179523762034025860L;
        /**
         * Performs non-fair tryLock.  tryAcquire is implemented in
         * subclasses, but both need nonfair try for trylock method.
         */
        @ReservedStackAccess
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
         protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
        @ReservedStackAccess
        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }
        protected final boolean isHeldExclusively() {
            // While we must in general read state before owner,
            // we don't need to do so to check if current thread is owner
            return getExclusiveOwnerThread() == Thread.currentThread();
        }
        final ConditionObject newCondition() {
            return new ConditionObject();
        }
        // Methods relayed from outer class
        final Thread getOwner() {
            return getState() == 0 ? null : getExclusiveOwnerThread();
        }
        final int getHoldCount() {
            return isHeldExclusively() ? getState() : 0;
        }
        final boolean isLocked() {
            return getState() != 0;
        }
    }

ReentrantLock 只实现了互斥锁。

  • 尝试获取互斥锁

acquires 这个值外部每次都是传的 1,首先通过 getState() 方法获取 state。
如果 state 为 0 就表示锁可以使用,这时通过 CAS 的方式设置新的状态,如果 CAS 竞争失败,说明有其他线程同时也在竞争这个锁,这时就直接返回失败,如果竞争成功就会通过setExclusiveOwnerThread 方法设置当前线程为拥有者,返回成功。
如果 state 不为 0 但是 owner 就是为当前线程,就表示当前线程调用了多次 lock() 方法,这次就是简单的在原有的 state 上再加 1,同时返回获取锁成功。
其他的情况也就是返回失败了,失败了就会和前面说到的就会进入同步队列,同时挂起当前线程。

  • 尝试释放互斥锁

同样的 releases 每次传的值也是 1。
首先判断当前线程是否拥有锁,如果不拥有锁直接抛出异常,通过 getState() 获取上次的状态,上次的状态再减去 releases,就是新的状态,如果新的状态为 0 就表示应该释放锁,同时通过 setExclusiveOwnerThread 方法把拥有线程设置为空,同时返回 true,其他情况返回 false。
根据前面介绍的,如果成功释放,这时 AQS 还得去检查同步队列,拿到最近一个等待锁的线程,并唤醒。

上面锁的代码很简单,相信你已经看懂了,这也解释了为什么叫可重入锁,也就是同一个线程可以多次调用 lock() 方法,同样的也要对等的调用 unlock() 方法完成解锁,lock() 和 unlock() 必须成对出现。共享锁的实现我就不贴了,大同小异。

2 互斥锁

2.1 没有出现锁竞争

前面的 Reentrant 源码也提到了,在没有锁竞争的时候和被其他线程占有锁的情况下,只是简单的设置 state 为 1 和 设置 owner 的线程。
这个过程的性能消耗是非常小的,几乎可以忽略不计。

2.2 锁竞争失败或者锁已经被占用

如果尝试获取锁失败,就会新建一个 Node 对象(AQS 的队列实现是双向链表,Node 就是他的节点实现),其中包含了节点状态和关联线程等关键信息,创建后将其加入到等待队列的队尾,同时将其线程挂起(挂起是使用的 LockSupport.park() 方法,其内部的实现是 C++ 代码,使用的是 Mutex 和 synchronized 用的是一样的方法),需要等待占用锁的线程释放锁后,根据同步队列的顺序把下一个同步队列的线程唤醒(唤醒使用的是 LockSupport.unpark() 方法)。

这种情况和没有锁竞争的情况性能消耗就要大一些了。在进入队列和释放锁的过程中可能会有多次的 CAS 自旋(也就是 CAS 失败后通过 while 循环重试,直到成功,这时 CPU 是在空转);还有关键一点是线程的挂起和唤醒是需要操作系统来操作的,也就是会涉及到用户态向内核态的转换,这个过程是比较消耗性能的。

3 共享锁

共享锁如果在理解了互斥锁的前提下是比较简单的。

在没有被互斥锁占用的情况下(tryAcquireShared() 方法返回 true),共享锁是每一个线程都不会被挂起。

在互斥锁被占用的情况下(tryAcquireShared() 方法返回 false),也会创建一个 Node 对象加入到队列中,不过添加了一个 waiter 对象来标记这是一个共享的节点,同样的这个线程也会被挂起,等待互斥锁被释放后,按照先后会唤醒该线程,当该线程被唤醒后如果他的下一个节点也是共享的节点也会被唤醒,就像多米诺骨牌一样挨个唤醒所有的共享节点,直到又被下一个互斥结点把互斥锁给占用。

4 Condition

AQS 的 Condition 的 await/signal 和 synchronized 的 wait/notify 有异曲同工之妙。在获取到互斥锁后可以通过 Condition#await 方法把锁释放出去,同时自己被挂起,当获取到锁的线程调用 Condition#signal 方法又可以唤醒之前 await 挂起的线程。

在每个 Condition 对象中也会维护一个队列(和 AQS 中的队列是分开的,但是都是 Node 对象),每次有获取锁的线程调用 await 方法后都会在其中添加一个 Node,会用 CONDITION 标注状态,同时释放当前占用的锁唤醒同步队列中的下一个线程,并把自己挂起。当有线程调用 signal 方法后,会把 Condition 对象中的头节点(如果是 signalAll 就是把全部的节点都加入到 AQS 队列中)加入到 AQS 的同步队列中,同时触发 await 方法重新去获取锁,这里也和前面说的获取同步锁一样,就相当于 signal 后,await 需要重新排队去获取互斥锁。

5 最后

最后再简单聊一下 synchronized 和 AQS, 在 Java 早期的版本,每次 synchronized 都会去请求 mutex,导致没有锁竞争的时候性能不好,在 1.6 版本后加入了多级锁,性能得到了不错的提升。
在 Java 对象中定义了一个 Mark Word 空间来记录对象的一些信息,其中就包括了重要的锁信息,在对象没有锁的时候,一个线程需获取锁默认就是偏向锁, 只需要在对象的 Mark Word 中通过 CAS 设置锁的类型和锁属于的线程 ID,当没有别的线程竞争那就皆大欢喜,完事儿了;如果在 偏向锁竞争失败或者占有偏向锁的线程还没有完事儿,那么锁就会升级成轻量锁,当然升级后还是之前持有偏向锁的线程继续持有,其中轻量锁需要在持有的线程中添加一个 Lock Record 来指向对应的对象,对象的 Mark Work 也会添加指向 Thread 对应的 Lock Record,在等待获取锁的线程也会通过 CAS 自旋的方式去修改这些值,来尝试获取轻量锁,当自旋超过一定次数了或者有别的线程来竞争,这时就会升级成 重量锁,重量锁也是用了 monitor 锁,内部也是用 mutex 实现。
synchronized 和 AQS 目前在性能上差距不大,当有多个线程竞争是都会升级成 mutex,不同的是 synchronized 使用起来非常简单,但是功能没有那么多,AQS 使用起来比较复杂,但是包含互斥锁和共享锁,他们之间的组合能够完成很多复杂的功能,JDK 中很多的线程安全对象也用到了 AQS。

到此这篇关于Java AQS 线程安全同步队列的实现的文章就介绍到这了,更多相关Java AQS 线程安全同步队列内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • java绘制哆啦A梦 超可爱

    java绘制哆啦A梦 超可爱

    这篇文章主要介绍了java绘制哆啦A梦,特别的可爱,文中示例代码介绍的也非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2020-05-05
  • SpringBoot如何获取Kafka的Topic列表

    SpringBoot如何获取Kafka的Topic列表

    这篇文章主要介绍了SpringBoot如何获取Kafka的Topic列表问题,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-09-09
  • Java数据长度获取方式对比之length属性、length()和size()方法详解

    Java数据长度获取方式对比之length属性、length()和size()方法详解

    在Java编程语言中length、length()和size()是三个常见的用来获取不同数据类型对象长度或大小的方法,但它们各自适用于不同的上下文,这篇文章主要给大家介绍了关于Java数据长度获取方式对比之length属性、length()和size()方法详解
    2024-07-07
  • java复制文件和java移动文件的示例分享

    java复制文件和java移动文件的示例分享

    本文主要介绍了java将文件夹下面的所有的jar文件拷贝到指定的文件夹下面的方法,需要的朋友可以参考下
    2014-02-02
  • 浅析java中常用的定时任务框架-单体

    浅析java中常用的定时任务框架-单体

    这篇文章主要带大家了解常用的单体应用定时任务框架以及掌握定时任务在单体中如何使用,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下
    2021-12-12
  • 一文搞懂Java SPI机制的原理与使用

    一文搞懂Java SPI机制的原理与使用

    Java 程序员在日常工作中经常会听到 SPI,而且很多框架都使用了 SPI 的技术,那么问题来了,到底什么是 SPI 呢?今天小编就带大家好好了解一下 SPI
    2022-10-10
  • 解决IDEA 2020.3 lombok失效问题

    解决IDEA 2020.3 lombok失效问题

    这篇文章主要介绍了IDEA 2020.3 lombok失效问题,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-12-12
  • Servlet+JDBC实现登陆功能的小例子(带验证码)

    Servlet+JDBC实现登陆功能的小例子(带验证码)

    这篇文章主要介绍了Servlet+JDBC实现登陆功能的小例子(带验证码),文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-06-06
  • Java中String类常用方法总结详解

    Java中String类常用方法总结详解

    String类是一个很常用的类,是Java语言的核心类,用来保存代码中的字符串常量的,并且封装了很多操作字符串的方法。本文为大家总结了一些String类常用方法的使用,感兴趣的可以了解一下
    2022-08-08
  • java关于调用方法的汇总

    java关于调用方法的汇总

    本文小编给大家整理了在Java中关于静态调用和动态调用的方法汇总,值得大家学习和参考。
    2017-11-11

最新评论