java动态线程池的简单实现思路

 更新时间:2023年06月05日 08:31:49   作者:耶耶耶耶yeyeye  
本文主要介绍了java 动态线程池的简单实现思路,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

什么是动态线程池?

在线程池日常实践中我们常常会遇到以下问题:

  • 代码中创建了一个线程池却不知道核心参数设置多少比较合适。
  • 参数设置好后,上线发现需要调整,改代码重启服务非常麻烦。
  • 线程池相对于开发人员来说是个黑箱,运行情况在出现问题 前很难被感知。

因此,动态可监控线程池一种针对以上痛点开发的线程池管理工具。主要可实现功能有:提供对 Spring 应用内线程池实例的全局管控、应用运行时动态变更线程池参数以及线程池数据采集和监控阈值报警。

已经实现的优秀开源动态线程池

hippo4j、dynamic-tp.....

实现思路

核心管理类

  • 需要能实现对线程池的
  • 服务注册
  • 获取已经注册好的线程池

以及对注册号线程池参数的刷新。

对于每一个线程池,我们使用一个线程池名字作为标识每个线程池的唯一ID。

伪代码实现

public class DtpRegistry {
    /**
     * 储存线程池
     */
    private static final Map<String, Executor> EXECUTOR_MAP = new ConcurrentHashMap<>();
    /**
     * 获取线程池
     * @param executorName 线程池名字
     */
    public static Executor getExecutor(String executorName) {
        return EXECUTOR_MAP.get(executorName);
    }
    /**
     * 线程池注册
     * @param executorName 线程池名字
     */
    public static void registry(String executorName, Executor executor) {
        //注册
        EXECUTOR_MAP.put(executorName, executorWrapper);
    }
    /**
     * 刷新线程池参数
     * @param executorName 线程池名字
     * @param properties 线程池参数
     */
    public static void refresh(String executorName, ThreadPoolProperties properties) {
        Executor executor = EXECUTOR_MAP.get(executorName)
        //刷新参数
        //.......
    }
}

如何创建线程池?

STEP 1. 我们可以使用yml配置文件的方式配置一个线程池,将线程池实例的创建交由Spring容器。

相关配置

public class DtpProperties {
    private List<ThreadPoolProperties> executors;
}
public class ThreadPoolProperties {
    /**
     * 标识每个线程池的唯一名字
     */
    private String poolName;
    private String poolType = "common";
    /**
     * 是否为守护线程
     */
    private boolean isDaemon = false;
    /**
     * 以下都是核心参数
     */
    private int corePoolSize = 1;
    private int maximumPoolSize = 1;
    private long keepAliveTime;
    private TimeUnit timeUnit = TimeUnit.SECONDS;
    private String queueType = "arrayBlockingQueue";
    private int queueSize = 5;
    private String threadFactoryPrefix = "-td-";
    private String RejectedExecutionHandler;
}

yml example:

spring:
  dtp:
    executors:
      # 线程池1
      - poolName: dtpExecutor1
        corePoolSize: 5
        maximumPoolSize: 10
      # 线程池2
      - poolName: dtpExecutor2
        corePoolSize: 2
        maximumPoolSize: 15

STEP 2 根据配置信息添加线程池的BeanDefinition

关键类

@Slf4j
public class DtpImportBeanDefinitionRegistrar implements ImportBeanDefinitionRegistrar, EnvironmentAware {
    private Environment environment;
    @Override
    public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
        log.info("注册");
        //绑定资源
        DtpProperties dtpProperties = new DtpProperties();
        ResourceBundlerUtil.bind(environment, dtpProperties);
        List<ThreadPoolProperties> executors = dtpProperties.getExecutors();
        if (Objects.isNull(executors)) {
            log.info("未检测本地到配置文件线程池");
            return;
        }
        //注册beanDefinition
        executors.forEach((executorProp) -> {
            BeanUtil.registerIfAbsent(registry, executorProp);
        });
    }
    @Override
    public void setEnvironment(Environment environment) {
        this.environment = environment;
    }
}
/**
 *
 * 工具类
 *
 */
public class BeanUtil{
    public static void registerIfAbsent(BeanDefinitionRegistry registry, ThreadPoolProperties executorProp) {
        register(registry, executorProp.getPoolName(), executorProp);
    }
    public static void register(BeanDefinitionRegistry registry, String beanName, ThreadPoolProperties executorProp) {
        Class<? extends Executor> executorType = ExecutorType.getClazz(executorProp.getPoolType());
        Object[] args = assembleArgs(executorProp);
        register(registry, beanName, executorType, args);
    }
    public static void register(BeanDefinitionRegistry registry, String beanName, Class<?> clazz, Object[] args) {
        BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(clazz);
        for (Object arg : args) {
            builder.addConstructorArgValue(arg);
        }
        registry.registerBeanDefinition(beanName, builder.getBeanDefinition());
    }
    private static Object[] assembleArgs(ThreadPoolProperties executorProp) {
        return new Object[]{
                executorProp.getCorePoolSize(),
                executorProp.getMaximumPoolSize(),
                executorProp.getKeepAliveTime(),
                executorProp.getTimeUnit(),
                QueueType.getInstance(executorProp.getQueueType(), executorProp.getQueueSize()),
                new NamedThreadFactory(
                        executorProp.getPoolName() + executorProp.getThreadFactoryPrefix(),
                        executorProp.isDaemon()
                ),
                //先默认不做设置
                RejectPolicy.ABORT.getValue()
        };
    }
}

下面解释一下这个类的作用,environment实例中储存着spring启动时解析的yml配置,所以我们spring提供的Binder将配置绑定到我们前面定义的DtpProperties类中,方便后续使用。接下来的比较简单,就是将线程池的BeanDefinition注册到IOC容器中,让spring去帮我们实例化这个bean。

STEP 3. 将已经实例化的线程池注册到核心类 DtpRegistry 中

我们注册了 beanDefinition 后,spring会帮我们实例化出来, 在这之后我们可以根据需要将这个bean进行进一步的处理,spring也提供了很多机制让我们对bean的生命周期管理进行更多的扩展。对应到这里我们就是将实例化出来的线程池注册到核心类 DtpRegistry 中进行管理。

这里我们使用 BeanPostProcessor 进行处理。

@Slf4j
public class DtpBeanPostProcessor implements BeanPostProcessor {
    private DefaultListableBeanFactory beanFactory;
    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        if (bean instanceof DtpExecutor) {
            //直接纳入管理
            DtpRegistry.registry(beanName, (DtpExecutor) bean);
        }
        return bean;
    }
}

这里的逻辑很简单, 就是判断一下这个bean是不是线程池,是就统一管理起来。

STEP 4. 启用 BeanDefinitionRegistrar 和 BeanPostProcessor

在springboot程序中,只要加一个@MapperScan注解就能启用mybatis的功能,我们可以学习其在spring中的启用方式,自定义一个注解:

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Import(DtpImportSelector.class)
public @interface EnableDynamicThreadPool {
}

其中,比较关键的是@Import注解,spring会导入注解中的类DtpImportSelector

而DtpImportSelector这个类实现了:

public class DtpImportSelector implements DeferredImportSelector {
    @Override
    public String[] selectImports(AnnotationMetadata importingClassMetadata) {
        return new String[]{
                DtpImportBeanDefinitionRegistrar.class.getName(),
                DtpBeanPostProcessor.class.getName()
        };
    }
}

这样,只要我们再启动类或者配置类上加上@EnableDynamicThreadPool这个注解,spring就会将DtpImportBeanDefinitionRegistrar和DtpBeanPostProcessor这两个类加入spring容器管理,从而实现我们的线程池的注册。

@SpringBootApplication
@EnableDynamicThreadPool
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

如何实现线程池配置的动态刷新

首先明确一点,对于线程池的实现类,例如:ThreadPoolExecutor等,都有提供核心参数对应的 set 方法,让我们实现参数修改。因此,在核心类DtpRegistry中的refresh方法,我们可以这样写:

public class DtpRegistry {
    /**
     * 储存线程池
     */
    private static final Map<String, ThreadPoolExecutor> EXECUTOR_MAP = new ConcurrentHashMap<>();
    /**
     * 刷新线程池参数
     * @param executorName 线程池名字
     * @param properties 线程池参数
     */
    public static void refresh(String executorName, ThreadPoolProperties properties) {
        ThreadPoolExecutor executor = EXECUTOR_MAP.get(executorName)
        //设置参数
        executor.setCorePoolSize(...);
        executor.setMaximumPoolSize(...);
        ......
    }
}

而这些新参数怎么来呢?我们可以引入Nacos、Apollo等配置中心,实现他们的监听器方法,在监听器方法里调用DtpRegistry的refresh方法刷新即可。

到此这篇关于java动态线程池的简单实现思路的文章就介绍到这了,更多相关java动态线程池内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • 详解Java设计模式编程中的访问者模式

    详解Java设计模式编程中的访问者模式

    这篇文章主要介绍了Java设计模式编程中的访问者模式,访问者模式的合理利用可以避免项目中出现大量重复的代码,需要的朋友可以参考下
    2016-02-02
  • Scala常用List列表操作方法示例

    Scala常用List列表操作方法示例

    这篇文章主要介绍了Scala常用List列表操作方法示例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2019-06-06
  • 从dubbo源码分析qos-server端口冲突问题及解决

    从dubbo源码分析qos-server端口冲突问题及解决

    这篇文章主要介绍了从dubbo源码分析qos-server端口冲突问题及解决方案,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2023-02-02
  • Maven发布Jar包中文乱码解决方法

    Maven发布Jar包中文乱码解决方法

    这篇文章主要介绍了Maven发布Jar包中文乱码解决方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-03-03
  • Java9的一些新特性介绍

    Java9的一些新特性介绍

    这篇文章主要介绍了Java9的一些新特性介绍,Java随着其开源的特点和甲骨文的推动正在不断改进中,需要的朋友可以参考下
    2015-07-07
  • 解决阿里云OSS使用URL无法访问图片的两种方法

    解决阿里云OSS使用URL无法访问图片的两种方法

    这篇文章主要介绍了解决阿里云OSS使用URL无法访问图片的两种方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-08-08
  • Java延迟队列DelayQueue原理详解

    Java延迟队列DelayQueue原理详解

    这篇文章主要介绍了Java延迟队列DelayQueue原理详解,DelayQueue 是一个通过PriorityBlockingQueue实现延迟获取元素的无界队列无界阻塞队列,其中添加进该队列的元素必须实现Delayed接口,而且只有在延迟期满后才能从中提取元素,需要的朋友可以参考下
    2023-12-12
  • SpringMVC高级开发功能实现过程解析

    SpringMVC高级开发功能实现过程解析

    这篇文章主要介绍了SpringMVC高级开发功能实现过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-06-06
  • 浅谈Spring boot cache使用和原理

    浅谈Spring boot cache使用和原理

    这篇文章主要介绍了浅谈Spring boot cache使用和原理,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2018-09-09
  • Java常用API类之Math System tostring用法详解

    Java常用API类之Math System tostring用法详解

    System类代表系统,系统级的很多属性和控制方法都放置在该类的内部。该类位于java.lang包,Java 的 Math 包含了用于执行基本数学运算的属性和方法,如初等指数、对数、平方根和三角函数,toString() 方法用于返回以一个字符串表示的 Number 对象值
    2021-10-10

最新评论