Java中的Semaphore原理解析

 更新时间:2024年01月22日 10:28:32   作者:我不是欧拉_  
这篇文章主要介绍了Java中的Semaphore原理解析,Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源,需要的朋友可以参考下

1. Semaphore是什么?

Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。

Semaphore一般用于流量的控制,特别是公共资源有限的应用场景。例如数据库的连接,假设数据库的连接数上线为10个,多个线程并发操作数据库可以使用Semaphore来控制并发操作数据库的线程个数最多为10个。

2. 类图

通过类图可以看到,Semaphore与ReentrantLock的内部类的结构相同,类内部总共存在Sync、NonfairSync、FairSync三个类,NonfairSync与FairSync类继承自Sync类,Sync类继承自AbstractQueuedSynchronizer抽象类。 

3. 实现原理

3.1 使用示例

    // 定义一个资源池类
    class Pool {
        // 可用资源数100
        private static final int MAX_AVAILABLE = 100;
        // 定义信号量100
        private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);
        // 获取资源
        public Object getItem() throws InterruptedException {
            // 尝试获取
            available.acquire();
            // 返回可用资源
            return getNextAvailableItem();
        }
        // 释放资源
        public void putItem(Object x) {
            // 如果资源标记为未被使用
            if (markAsUnused(x))
            // 释放资源
            available.release();
        }
        // Not a particularly efficient data structure; just for demo
        // 定义资源类型,可以是满足业务的任何类型
        protected Object[] items = new Object[MAX_AVAILABLE] ... whatever kinds of items being managed
        // 是否被使用标记
        protected boolean[] used = new boolean[MAX_AVAILABLE];
        // 获取下一个可用资源
        protected synchronized Object getNextAvailableItem() {
            // 循环遍历
            for (int i = 0; i < MAX_AVAILABLE; ++i) {
                // 如果未被使用
                if (!used[i]) {
                    // 使用标记设置为true
                    used[i] = true;
                    // 返回当前的资源
                    return items[i];
                }
            }
            return null; // not reached
        }
        // 标记资源为未被使用
        protected synchronized boolean markAsUnused(Object item) {
            // 循环遍历
            for (int i = 0; i < MAX_AVAILABLE; ++i) {
                // 找到需要释放的资源
                if (item == items[i]) {
                    // 如果是被使用中
                    if (used[i]) {
                    // 使用标记设置为false
                    used[i] = false;
                    // 返回true表示标记成功
                    return true;
                } else
                    // 返回false表示标记失败
                    return false;
                }
            }
            return false;
        }
    }

3.2 Sync

abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1192457210091910933L;
        // 构造方法,调用父类AQS的setState方法,给共享变量state赋值
        // 即通过构造方法给锁的数量附初始值
        Sync(int permits) {
            setState(permits);
        }
        // 获取锁,也叫许可
        final int getPermits() {
            return getState();
        }
        // 共享模式下的非公平获取
        // 此方法也体现出与ReentrantLock中Sync的实现不同
        // ReentrantLock中Sync是独占模式下的获取
        // 具体实现的不同体现在int remaining = available - acquires;
        final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                // 获取锁的可用数量
                int available = getState();
                // 可用数量 - 请求的数量(acquires默认值为1) = 剩余量
                int remaining = available - acquires;
                // 如果remaining < 0即请求的锁大于可用的数量,马上返回负数,表示获取锁失败
                if (remaining < 0 ||
                    // 否则通过CAS的方式将可用数量换成剩余量,并返回剩余量
                    // 自旋 + CAS 保证线程安全,线程不用排队体现出非公平性
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
        // 共享模式下释放锁
        protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                // CAS修改锁数量,成功则返回,失败则继续自旋
                if (compareAndSetState(current, next))
                    return true;
            }
        }
        // 根据指定的缩减量减小可用锁的数目
        final void reducePermits(int reductions) {
            for (;;) {
                int current = getState();
                int next = current - reductions;
                if (next > current) // underflow
                    throw new Error("Permit count underflow");
                if (compareAndSetState(current, next))
                    return;
            }
        }
        // 获取并返回立即可用的所有锁
        final int drainPermits() {
            for (;;) {
                int current = getState();
                if (current == 0 || compareAndSetState(current, 0))
                    return current;
            }
        }
    }

3.3 NonfairSync

static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -2694183684443567898L;
        // 构造方法初始化锁数量
        NonfairSync(int permits) {
            super(permits);
        }
        // 直接调用nonfairTryAcquireShared方法,走非公平策略
        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
    }

3.4 FairSync

static final class FairSync extends Sync {
        private static final long serialVersionUID = 2014338818796000944L;
        // 构造方法初始化锁数量
        FairSync(int permits) {
            super(permits);
        }
        // 共享模式下的公平策略获取
        // 与非公平策略唯一的不同体现在线程是否需要排队
        // 即是否调用hasQueuedPredecessors()方法进行判断
        // 如果需要排队则立即返回继续排队
        // 否则通过CAS方式获取锁并返货锁的剩余量,结束自旋
        protected int tryAcquireShared(int acquires) {
            for (;;) {
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
    }

通过分析代码发现,Semaphore与ReentrantLock的内部类的结构相同,具体实现的不同体现在 int remaining = available - acquires这行代码上。

ReentrantLock对于锁的控制是 int c = getState(); if (c == 0){....}。体现为一种独占的控制。

Semaphore对锁的控制是 for (;;) { int available = getState(); int remaining = available - acquires;......}。即所有线程都可以进入自旋,只要锁有剩余量都可以尝试获取锁,体现为一种共享的控制。

3.5 Semaphore

public class Semaphore implements java.io.Serializable {
    private static final long serialVersionUID = -3222578661600680210L;
    /** All mechanics via AbstractQueuedSynchronizer subclass */
    // 同步队列
    private final Sync sync;
    // 构造方法初始话锁数量
    // 默认采用非公平策略
    public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }
    // 构造方法,带一个布尔参数,true表示采用公平策略,false表示采用非公平策略
    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }
}

3.5.1 acquire() 方法解析

// Semaphore
public void acquire() throws InterruptedException {
        // 调用sync的acquireSharedInterruptibly,即响应中断的获取
        // 因为sync继承AbstractQueuedSynchronizer
        // 即调用AQS的acquireSharedInterruptibly
        sync.acquireSharedInterruptibly(1);
    }
// 进入AQS
public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        // 如果线程被中断,则响应中断
        if (Thread.interrupted())
            throw new InterruptedException();
        // 否则调用tryAcquireShared,如果获取的锁小于0即获取锁失败则调用doAcquireSharedInterruptibly方法,进入同步队列排队
        // 如果获取锁成功则不排队,走业务逻辑
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }
// Semaphore 中tryAcquireShared的实现
// 公平策略
protected int tryAcquireShared(int acquires) {
            for (;;) {
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
// 非公平策略
protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
// 前面已经解析过,不在赘述
 

3.5.2 release() 方法解析

// Semaphore
public void release() {
        sync.releaseShared(1);
    }
// 进入AQS
public final boolean releaseShared(int arg) {
        // 尝试释放锁, 如果释放锁成功
        if (tryReleaseShared(arg)) {
            // 线程出同步队列,返回true
            doReleaseShared();
            return true;
        }
        // 否则返回false
        return false;
    }
// Semaphore 中tryReleaseShared实现
protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }

3.5.3 其他方法

方法说明调用
acquire(int permits)获取信号量,指定获取许可的个数,响应中断sync.acquireSharedInterruptibly(permits)
acquireUninterruptibly()获取信号量,默认获取1个许可,不响应中断sync.acquireShared(1)
acquireUninterruptibly(int permits)获取信号量,指定获取许可的个数,不响应中断sync.acquireShared(permits)
release(int permits)释放信号量,指定释放许可的个数sync.releaseShared(permits);
tryAcquire()尝试获取许可,如果获取成功返回true,否则返回false,不会阻塞线程,而且不响应中断sync.nonfairTryAcquireShared(1)
tryAcquire(int permits)同上,可以指定获取许可的个数sync.nonfairTryAcquireShared(permits)
tryAcquire(long timeout, TimeUnit unit)共享式超时获取sync.tryAcquireSharedNanos(1, unit.toNanos(timeout))
tryAcquire(int permits, long timeout, TimeUnit unit)同上,可以指定获取许可的个数sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout))
availablePermits()获取可用许可数sync.getPermits()
drainPermits()将剩下的信号量一次性消耗光,并且返回所消耗的信号量sync.drainPermits()
reducePermits(int reduction)减少信号量的总数,不会导致任何线程阻塞,调用该方法可能会导致信号量最终为负数sync.reducePermits(reduction)
isFair()是否采用公平策略
hasQueuedThreads()是否是已排队的线程
getQueueLength()获取排队线程的长度
getQueuedThreads()获取排队线程

4. 总结

Semaphore是一个有效的流量控制工具,它基于AQS共享锁实现。我们常常用它来控制对有限资源的访问。使用步骤

每次使用资源前,先申请一个信号量,如果资源数不够,就会阻塞等待;每次释放资源后,就释放一个信号量。

到此这篇关于Java中的Semaphore原理解析的文章就介绍到这了,更多相关Semaphore原理内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • 抽象类使用Jackson序列化问题

    抽象类使用Jackson序列化问题

    这篇文章主要介绍了抽象类使用Jackson序列化问题,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2023-01-01
  • jdbc连接数据库实例详解

    jdbc连接数据库实例详解

    在本篇内容里小编给大家分享了关于jdbc如何连接数据库的相关知识点内容,需要的朋友们学习下。
    2019-02-02
  • Java 多线程同步 锁机制与synchronized深入解析

    Java 多线程同步 锁机制与synchronized深入解析

    从尺寸上讲,同步代码块比同步方法小。你可以把同步代码块看成是没上锁房间里的一块用带锁的屏风隔开的空间
    2013-09-09
  • java为移动端写接口开发实例

    java为移动端写接口开发实例

    本篇文章主要介绍了java如何为移动端写接口,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2017-08-08
  • java并发之Lock接口的深入讲解

    java并发之Lock接口的深入讲解

    从Java 5之后,在java.util.concurrent.locks包下提供了另外一种方式来实现同步访问.那就是Lock,这篇文章主要给大家介绍了关于java并发之Lock接口的相关资料,需要的朋友可以参考下
    2021-08-08
  • Java的枚举,注解和反射(一)

    Java的枚举,注解和反射(一)

    今天小编就为大家分享一篇关于Java枚举,注解与反射原理说明,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看看吧
    2021-07-07
  • JAVA常用API总结与说明

    JAVA常用API总结与说明

    这篇文章主要介绍了JAVA常用API总结与说明,包括JAVA线程常用API,JAVA队列常用API,JAVA泛型集合算法常用API,JAVA并发常用API需要的朋友可以参考下
    2022-12-12
  • JAVA大作业之图书管理系统实现全解

    JAVA大作业之图书管理系统实现全解

    随着网络技术的高速发展,计算机应用的普及,利用计算机对图书馆的日常工作进行管理势在必行,本篇文章手把手带你用Java实现一个图书管理系统,大家可以在过程中查缺补漏,提升水平
    2022-01-01
  • Java中@DS+@Transactional注解切换数据源失效解决方案

    Java中@DS+@Transactional注解切换数据源失效解决方案

    本文主要介绍了@DS+@Transactional注解切换数据源失效解决方案,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2023-06-06
  • java判断对象中某个属性是否为空方法代码

    java判断对象中某个属性是否为空方法代码

    这篇文章主要给大家介绍了关于java判断对象中某个属性是否为空的相关资料,最近遇到后台接收值的时候,需要对接收对象进行非空校验,需要的朋友可以参考下
    2023-07-07

最新评论