Java的信号量semaphore讲解

 更新时间:2023年12月22日 09:47:39   作者:正经人z.  
这篇文章主要介绍了Java的信号量semaphore讲解,Semaphore底层是基于AbstractQueuedSynchronizer来实现的,Semaphore称为计数信号量,它允许n个任务同时访问某个资源,需要的朋友可以参考下

一、简介

1、Semaphore底层是基于AbstractQueuedSynchronizer来实现的。Semaphore称为计数信号量,它允许n个任务同时访问某个资源,可以将信号量看做是在向外分发使用资源的许可证,只有成功获取许可证,才能使用资源。

2、它也是通过内部类Sync继承了AQS,它的方法几乎都是用的AQS的,少部分重写了AQS的方法,同时,它的信号量机制是通过AQS的state属性来操作的,变更state属性使用到了CAS操作保证变量的数据一致性。

3、它的方法几乎是通过Sync类,也就是AQS实现的。

4、该类如果单独使用,不涉及到条件队列condition,使用到条件队列需要调用await方法、notify方法,notifyAll方法等一系列线程通信的方法,但是,semaphore类中不涉及到这些方法。

5、注意其中一个方法:release,这个方法是释放信号量到semaphore中,默认是释放一个。源码中作者这么说的:不要求一个线程在调用release方法释放许可前必须通过acquire方法获取到许可。正确的使用semaphore建立在应用程序里面,对许可证的使用,由程序员掌握!semaphore只是提供了一个信号量的机制供其使用。

6、核心方法都在Sync类中,外部方法都是调用的AQS和Sync中重写的AQS的方法

7、注意其中一个方法:acquire,这个方法是获取信号量从semaphore中,默认是获取一个。源码中作者这么说的:如果没有可用的许可证,则当前线程变为出于线程调度目的而禁用,并处于休眠状态,直到发生两件事之一:

a、一些其他的线程调用了release方法释放了许可,同时semaphore中的许可满足当前线程,且当前线程是下一个被分配许可证的线程

b、一些其他的线程打断了当前线程且当前线程是通过acquire方式获取许可的

二、源码分析

三个内部类

Sync

abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1192457210091910933L;
    //构造方法设置许可数
        Sync(int permits) {
            setState(permits);
        }
    //获取许可
        final int getPermits() {
            return getState();
        }
    //共享模式下非公平策略获取许可数
        final int nonfairTryAcquireShared(int acquires) {
            for (;;) {   //无限循环
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))  //通过CAS修改许可数
                    return remaining;
            }
        }
    //共享模式下的公平策略释放许可
        protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))   //通过CAS修改许可数
                    return true;
            }
        }
    //减去许可数
        final void reducePermits(int reductions) {
            for (;;) {
                int current = getState();
                int next = current - reductions;
                if (next > current) // underflow
                    throw new Error("Permit count underflow");
                if (compareAndSetState(current, next))
                    return;
            }
        }
// 获取并返回立即可用的所有许可
        final int drainPermits() {
            for (;;) {
                int current = getState();
                if (current == 0 || compareAndSetState(current, 0))
                    return current;
            }
        }
    }

Sync类继承了AQS类,重写了nonfairTryAcquireShared、tryReleaseShared方法

NonfairSync和FairSync

//非公平策略	
static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -2694183684443567898L;
        NonfairSync(int permits) {
            super(permits);
        }
        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);  //调用的父类Sync的方法
        }
    }
//公平策略
    static final class FairSync extends Sync {
        private static final long serialVersionUID = 2014338818796000944L;
        FairSync(int permits) {
            super(permits);
        }
        //公平策略下子类重写了方法
        protected int tryAcquireShared(int acquires) {
            for (;;) {
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
    }

构造方法

public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }
    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }

常用方法

//默认获取一个许可
public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
//默认释放一个许可
public void release() {sync.releaseShared(1); }
//获取指定的许可数    
public void acquire(int permits) throws InterruptedException {
        if (permits < 0) throw new IllegalArgumentException();
        sync.acquireSharedInterruptibly(permits);
    }
//尝试获取指定的许可数    
public boolean tryAcquire(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        return sync.nonfairTryAcquireShared(permits) >= 0;
    }
//释放指定的许可数    
public void release(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        sync.releaseShared(permits);
    }

没啥好说的,都是套娃子模式,真正的核心方法在Sync类中以及AQS类

示例

package com.hust.grid.leesf.semaphore;
import java.util.concurrent.Semaphore;
class MyThread extends Thread {
    private Semaphore semaphore;
    public MyThread(String name, Semaphore semaphore) {
        super(name);
        this.semaphore = semaphore;
    }
    public void run() {        
        int count = 3;
        System.out.println(Thread.currentThread().getName() + " trying to acquire");
        try {
            semaphore.acquire(count);
            System.out.println(Thread.currentThread().getName() + " acquire successfully");
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            semaphore.release(count);
            System.out.println(Thread.currentThread().getName() + " release successfully");
        }
    }
}
public class SemaphoreDemo {
    public final static int SEM_SIZE = 10;
    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(SEM_SIZE);
        MyThread t1 = new MyThread("t1", semaphore);
        MyThread t2 = new MyThread("t2", semaphore);
        t1.start();
        t2.start();
        int permits = 5;
        System.out.println(Thread.currentThread().getName() + " trying to acquire");
        try {
            semaphore.acquire(permits);
            System.out.println(Thread.currentThread().getName() + " acquire successfully");
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            semaphore.release();
            System.out.println(Thread.currentThread().getName() + " release successfully");
        }
    }
}

说明:首先,生成一个信号量,信号量有10个许可,然后,main,t1,t2三个线程获取许可运行

首先,main线程执行acquire操作,并且成功获得许可,之后t1线程执行acquire操作,成功获得许可,之后t2执行acquire操作,由于此时许可数量不够,t2线程将会阻塞,直到许可可用。之后t1线程释放许可,main线程释放许可,此时的许可数量可以满足t2线程的要求,所以,此时t2线程会成功获得许可运行,t2运行完成后释放许可。

三、总结

1、semaphore类的核心是Sync内部类,它继承了AQS类,适当重写了一些方法,其他的方法都调用的这个Sync中的方法,包括Sync类的两个子类:FairSync和NonFairSync。

2、semaphore类中实现了公平锁FairSync和非公平锁NonFairSync。默认使用的是非公平锁。

3、对于公平锁和非公平锁,主要体现在获取锁上面是否公平。非公平锁不会判断当前线程是否为同步队列中的第一个节点,而是直接操作state属性;公平锁会判断当前线程是否为同步队列中的第一个节点,具体方法是hasQueuedPredecessors,如果不是,则返回-1,如果是,则执行下面步骤。

4、semaphore的信号量机制使用的是AQS类的state属性,默认每次获取或释放信号量都是1,除非你指定要使用的信号量或释放的信号量数。

5、对state属性的添加和释放都必须保证是原子性的,所以,semaphore类中使用的是unsafe类的compareAndSetState方法配合for(; ;)无限循环,语义为CAS自旋,来保证对state属性的操作是原子性的。

到此这篇关于Java的信号量semaphore讲解的文章就介绍到这了,更多相关信号量semaphore内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • java使用apache commons连接ftp修改ftp文件名失败原因

    java使用apache commons连接ftp修改ftp文件名失败原因

    这篇文章主要介绍了java使用apache commons连接ftp修改ftp文件名失败原因解析,本文给大家介绍的非常详细,具有一定的参考借鉴价值,需要的朋友可以参考下
    2019-08-08
  • Nacos服务实例的权重设置方式(以及设置为0时的作用与场景)

    Nacos服务实例的权重设置方式(以及设置为0时的作用与场景)

    这篇文章主要介绍了Nacos服务实例的权重设置方式(以及设置为0时的作用与场景),具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2024-07-07
  • SpringMVC框架使用jackson封装数据过程中遇到的问题及解决

    SpringMVC框架使用jackson封装数据过程中遇到的问题及解决

    这篇文章主要介绍了SpringMVC框架使用jackson封装数据过程中遇到的问题及解决,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2024-07-07
  • 基于SpringBoot实现自动装配返回属性的设计思路

    基于SpringBoot实现自动装配返回属性的设计思路

    这篇文章主要介绍了基于SpringBoot实现自动装配返回属性,这里涉及到的技术知识点有注解解析器,为什么用ResponseBodyAdvice这里解析?不在Filter,Interceptors,本文结合示例代码给大家介绍的非常详细,需要的朋友参考下吧
    2022-03-03
  • IDEA未配置SQL方言:无法使用SQL提示解决方法

    IDEA未配置SQL方言:无法使用SQL提示解决方法

    在使用IDEA进行SQL开发时,如果未配置SQL方言可能会导致一些问题,如无法正确识别数据库中的关键字、数据类型等,这篇文章主要给大家介绍了关于IDEA未配置SQL方言,无法使用SQL提示解决方法的相关资料,需要的朋友可以参考下
    2024-07-07
  • MyBatis之关于动态SQL解读

    MyBatis之关于动态SQL解读

    这篇文章主要介绍了MyBatis之关于动态SQL解读,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2023-06-06
  • Java异常处理机制深入理解

    Java异常处理机制深入理解

    如果某个方法不能按照正常的途径完成任务,就可以通过另一种路径退出方法。在这种情况下会抛出一个封装了错误信息的对象。此时,这个方法会立刻退出同时不返回任何值。另外,调用这个方法的其他代码也无法继续执行,异常处理机制会将代码执行交给异常处理器
    2022-01-01
  • Java中Reactor的反应器模式详解

    Java中Reactor的反应器模式详解

    这篇文章主要介绍了Java中Reactor的反应器模式详解,Reactor反应器模式有点儿类似事件驱动模式,当有事件触发时,事件源会将事件dispatch分发到handler处理器进行事件处理,反应器模式中的反应器角色类似于事件驱动模式中的dispatcher事件分发器角色,需要的朋友可以参考下
    2023-12-12
  • Java获取e.printStackTrace()打印的信息方式

    Java获取e.printStackTrace()打印的信息方式

    这篇文章主要介绍了Java获取e.printStackTrace()打印的信息方式,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-08-08
  • 解析Mybatis的insert方法返回数字-2147482646的解决

    解析Mybatis的insert方法返回数字-2147482646的解决

    这篇文章主要介绍了解析Mybatis的insert方法返回数字-2147482646的解决,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2021-04-04

最新评论