Java中的Semaphore信号量使用解析

 更新时间:2023年11月21日 08:34:10   作者:dalianpai  
这篇文章主要介绍了Java中的Semaphore信号量使用解析,Semaphore 通常我们叫它信号量,可以用来控制同时访问特定资源的线程数量,通过协调各个线程,以保证合理的使用资源,需要的朋友可以参考下

1、Semaphore 是什么

Semaphore 通常我们叫它信号量, 可以用来控制同时访问特定资源的线程数量,通过协调各个线程,以保证合理的使用资源。

可以把它简单的理解成我们停车场入口立着的那个显示屏,每有一辆车进入停车场显示屏就会显示剩余车位减1,每有一辆车从停车场出去,显示屏上显示的剩余车辆就会加1,当显示屏上的剩余车位为0时,停车场入口的栏杆就不会再打开,车辆就无法进入停车场了,直到有一辆车从停车场出去为止。

2、使用场景

主要用于那些资源有明确访问数量限制的场景,常用于限流 。

比如:数据库连接池,同时进行连接的线程有数量限制,连接不能超过一定的数量,当连接达到了限制数量后,后面的线程只能排队等前面的线程释放了数据库连接才能获得数据库连接。

public class TestPoolSemaphore {
    public static void main(String[] args) {
        Pool pool = new Pool(2);
        for (int i = 0; i < 5; i++) {
            new Thread(() -> {
                Connection conn = pool.borrow();
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                pool.free(conn);
            }).start();
        }
    }
}
@Slf4j(topic = "c.Pool")
class Pool {
    // 1. 连接池大小
    private final int poolSize;
    // 2. 连接对象数组
    private Connection[] connections;
    // 3. 连接状态数组 0 表示空闲, 1 表示繁忙
    private AtomicIntegerArray states;
    private Semaphore semaphore;
    // 4. 构造方法初始化
    public Pool(int poolSize) {
        this.poolSize = poolSize;
        // 让许可数与资源数一致
        this.semaphore = new Semaphore(poolSize);
        this.connections = new Connection[poolSize];
        this.states = new AtomicIntegerArray(new int[poolSize]);
        for (int i = 0; i < poolSize; i++) {
            connections[i] = new MockConnection("连接" + (i+1));
        }
    }
    // 5. 借连接
    public Connection borrow() {// t1, t2, t3
        // 获取许可
        try {
            semaphore.acquire(); // 没有许可的线程,在此等待
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        for (int i = 0; i < poolSize; i++) {
            // 获取空闲连接
            if(states.get(i) == 0) {
                if (states.compareAndSet(i, 0, 1)) {
                    log.debug("borrow {}", connections[i]);
                    return connections[i];
                }
            }
        }
        // 不会执行到这里
        return null;
    }
    // 6. 归还连接
    public void free(Connection conn) {
        for (int i = 0; i < poolSize; i++) {
            if (connections[i] == conn) {
                states.set(i, 0);
                log.debug("free {}", conn);
                semaphore.release();
                break;
            }
        }
    }
}
class MockConnection implements Connection {
    private String name;
    public MockConnection(String name) {
        this.name = name;
    }
    @Override
    public String toString() {
        return "MockConnection{" +
                "name='" + name + '\'' +
                '}';
    }
    @Override
    public Statement createStatement() throws SQLException {
        return null;
    }
    @Override
    public PreparedStatement prepareStatement(String sql) throws SQLException {
        return null;
    }
    @Override
    public CallableStatement prepareCall(String sql) throws SQLException {
        return null;
    }
    @Override
    public String nativeSQL(String sql) throws SQLException {
        return null;
    }
    @Override
    public void setAutoCommit(boolean autoCommit) throws SQLException {
    }
    @Override
    public boolean getAutoCommit() throws SQLException {
        return false;
    }
    @Override
    public void commit() throws SQLException {
    }
    @Override
    public void rollback() throws SQLException {
    }
    @Override
    public void close() throws SQLException {
    }
    @Override
    public boolean isClosed() throws SQLException {
        return false;
    }
    @Override
    public DatabaseMetaData getMetaData() throws SQLException {
        return null;
    }
    @Override
    public void setReadOnly(boolean readOnly) throws SQLException {
    }
    @Override
    public boolean isReadOnly() throws SQLException {
        return false;
    }
    @Override
    public void setCatalog(String catalog) throws SQLException {
    }
    @Override
    public String getCatalog() throws SQLException {
        return null;
    }
    @Override
    public void setTransactionIsolation(int level) throws SQLException {
    }
    @Override
    public int getTransactionIsolation() throws SQLException {
        return 0;
    }
    @Override
    public SQLWarning getWarnings() throws SQLException {
        return null;
    }
    @Override
    public void clearWarnings() throws SQLException {
    }
    @Override
    public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException {
        return null;
    }
    @Override
    public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
        return null;
    }
    @Override
    public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
        return null;
    }
    @Override
    public Map<String, Class<?>> getTypeMap() throws SQLException {
        return null;
    }
    @Override
    public void setTypeMap(Map<String, Class<?>> map) throws SQLException {
    }
    @Override
    public void setHoldability(int holdability) throws SQLException {
    }
    @Override
    public int getHoldability() throws SQLException {
        return 0;
    }
    @Override
    public Savepoint setSavepoint() throws SQLException {
        return null;
    }
    @Override
    public Savepoint setSavepoint(String name) throws SQLException {
        return null;
    }
    @Override
    public void rollback(Savepoint savepoint) throws SQLException {
    }
    @Override
    public void releaseSavepoint(Savepoint savepoint) throws SQLException {
    }
    @Override
    public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
        return null;
    }
    @Override
    public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
        return null;
    }
    @Override
    public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
        return null;
    }
    @Override
    public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException {
        return null;
    }
    @Override
    public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException {
        return null;
    }
    @Override
    public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException {
        return null;
    }
    @Override
    public Clob createClob() throws SQLException {
        return null;
    }
    @Override
    public Blob createBlob() throws SQLException {
        return null;
    }
    @Override
    public NClob createNClob() throws SQLException {
        return null;
    }
    @Override
    public SQLXML createSQLXML() throws SQLException {
        return null;
    }
    @Override
    public boolean isValid(int timeout) throws SQLException {
        return false;
    }
    @Override
    public void setClientInfo(String name, String value) throws SQLClientInfoException {
    }
    @Override
    public void setClientInfo(Properties properties) throws SQLClientInfoException {
    }
    @Override
    public String getClientInfo(String name) throws SQLException {
        return null;
    }
    @Override
    public Properties getClientInfo() throws SQLException {
        return null;
    }
    @Override
    public Array createArrayOf(String typeName, Object[] elements) throws SQLException {
        return null;
    }
    @Override
    public Struct createStruct(String typeName, Object[] attributes) throws SQLException {
        return null;
    }
    @Override
    public void setSchema(String schema) throws SQLException {
    }
    @Override
    public String getSchema() throws SQLException {
        return null;
    }
    @Override
    public void abort(Executor executor) throws SQLException {
    }
    @Override
    public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException {
    }
    @Override
    public int getNetworkTimeout() throws SQLException {
        return 0;
    }
    @Override
    public <T> T unwrap(Class<T> iface) throws SQLException {
        return null;
    }
    @Override
    public boolean isWrapperFor(Class<?> iface) throws SQLException {
        return false;
    }
}

比如:停车场场景,车位数量有限,同时只能容纳多少台车,车位满了之后只有等里面的车离开停车场外面的车才可以进入。

/**
 * @author WGR
 * @create 2020/12/27 -- 22:19
 */
public class Test1 {
    public static void main(String[] args) {
        // 1. 创建 semaphore 对象
        Semaphore semaphore = new Semaphore(3);
         // 2. 10个线程同时运行
        for (int i = 0; i < 10; i++) {
           final int x = i;
            new Thread(() -> {
           // 3. 获取许可
                try {
                    semaphore.acquire();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                try {
                    System.out.println(x +"占到车位。。。");
                    Thread.sleep(1);
                    System.out.println(x +"释放车位。。。");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                // 4. 释放许可
                    semaphore.release();
                }
            }).start();
        }
    }
}

3、Semaphore实现原理

(1)、Semaphore初始化

Semaphore semaphore=new Semaphore(3);

1、当调用new Semaphore(3) 方法时,默认会创建一个非公平的锁的同步阻塞队列。

2、把初始令牌数量赋值给同步队列的state状态,state的值就代表当前所剩余的令牌数量。

(2)获取令牌

semaphore.acquire();

1、当前线程会尝试去同步队列获取一个令牌,获取令牌的过程也就是使用原子的操作去修改同步队列的state ,获取一个令牌则修改为state=state-1。

2、 当计算出来的state<0,则代表令牌数量不足,此时会创建一个Node节点加入阻塞队列,挂起当前线程。

3、当计算出来的state>=0,则代表获取令牌成功。

源码:

/**
     *  获取1个令牌
     */
    public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
/**
     * 共享模式下获取令牌,获取成功则返回,失败则加入阻塞队列,挂起线程
     * @param arg
     * @throws InterruptedException
     */
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        //尝试获取令牌,arg为获取令牌个数,当可用令牌数减当前令牌数结果小于0,则创建一个节点加入阻塞队列,挂起当前线程。
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }
/**
     * 1、创建节点,加入阻塞队列,
     * 2、重双向链表的head,tail节点关系,清空无效节点
     * 3、挂起当前节点线程
     * @param arg
     * @throws InterruptedException
     */
    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        //创建节点加入阻塞队列
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                //获得当前节点pre节点
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);//返回锁的state
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                //重组双向链表,清空无效节点,挂起当前线程
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

Semaphore 有点像一个停车场,permits 就好像停车位数量,当线程获得了 permits 就像是获得了停车位,然后停车场显示空余车位减一刚开始,permits(state)为 3,这时 5 个线程来获取资源

image-20201227223458916

假设其中 Thread-1,Thread-2,Thread-4 cas 竞争成功,而 Thread-0 和 Thread-3 竞争失败,进入 AQS 队列park 阻塞

image-20201227223533126

(3)、释放令牌

semaphore.release();

当调用semaphore.release() 方法时

1、线程会尝试释放一个令牌,释放令牌的过程也就是把同步队列的state修改为state=state+1的过程

2、释放令牌成功之后,同时会唤醒同步队列的所有阻塞节共享节点线程

3、被唤醒的节点会重新尝试去修改state=state-1 的操作,如果state>=0则获取令牌成功,否则重新进入阻塞队列,挂起线程。

源码:

/**
     * 释放令牌
     */
    public void release() {
        sync.releaseShared(1);
    }
/**
     *释放共享锁,同时唤醒所有阻塞队列共享节点线程
     * @param arg
     * @return
     */
    public final boolean releaseShared(int arg) {
        //释放共享锁
        if (tryReleaseShared(arg)) {
            //唤醒所有共享节点线程
            doReleaseShared();
            return true;
        }
        return false;
    }
/**
     * 唤醒所有共享节点线程
     */
    private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {//是否需要唤醒后继节点
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))//修改状态为初始0
                        continue;
                    unparkSuccessor(h);//唤醒h.nex节点线程
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE));
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

这时 Thread-4 释放了 permits,状态如下

image-20201227223613606

接下来 Thread-0 竞争成功,state 再次设置为 0,设置自己为 head 节点,断开原来的 head 节点,unpark 接下来的 Thread-3 节点,但由于 state 是 0,因此 Thread-3 在尝试不成功后再次进入 park 状态

image-20201227223708466

到此这篇关于Java中的Semaphore信号量使用解析的文章就介绍到这了,更多相关Semaphore信号量内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

最新评论