Java 多线程并发编程提高数据处理效率的详细过程

 更新时间:2023年04月04日 15:02:50   作者:ReadThroughLife  
这篇文章主要介绍了Java 多线程并发编程提高数据处理效率,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下

🎉工作场景中遇到这样一个需求:根据主机的 IP 地址联动更新其他模型的相关信息。需求很简单,只涉及一般的数据库联动查询以及更新操作,然而在编码实现过程中发现,由于主机的数量很多,导致循环遍历查询、更新时花费很长的时间,调用一次接口大概需要 30-40 min 时间才能完成操作。

💡因此,为了有效缩短接口方法的执行时间,便考虑使用多线程并发编程方法,利用多核处理器并行执行的能力,通过异步处理数据的方式,便可以大大缩短执行时间,提高执行效率。

📍这里使用可重用固定线程数的线程池 FixedThreadPool,并使用 CountDownLatch 并发工具类提供的并发流程控制工具作为配合使用,保证多线程并发编程过程中的正常运行:

  • 首先,通过 Runtime.getRuntime().availableProcessors() 方法获取运行机器的 CPU 线程数,用于后续设置固定线程池的线程数量。
  • 其次,判断任务的特性,如果为计算密集型任务则设置线程数为 CPU 线程数+1,如果为 IO 密集型任务则设置线程数为 2 * CPU 线程数,由于在方法中需要与数据库进行频繁的交互,因此属于 IO 密集型任务。
  • 之后,对数据进行分组切割,每个线程处理一个分组的数据,分组的组数与线程数保持一致,并且还要创建计数器对象 CountDownLatch,调用构造函数,初始化参数值为线程数个数,保证主线程等待所有子线程运行结束后,再进行后续的操作。
  • 然后,调用 executorService.execute() 方法,重写 run 方法编写业务逻辑与数据处理代码,执行完当前线程后记得将计数器减1操作。
  • 最后,当所有子线程执行完成后,关闭线程池。

✨在省略工作场景中的业务逻辑代码后,通用的处理方法示例如下所示:

public ResponseData updateHostDept() {
		// ...
		List<Map> hostMapList = mongoTemplate.find(query, Map.class, "host");
        // split the hostMapList for the following multi-threads task
        // return the number of logical CPUs
        int processorsNum = Runtime.getRuntime().availableProcessors();
        // set the threadNum as 2*(the number of logical CPUs) for handling IO Tasks,
        // if Computing Tasks set the threadNum as (the number of logical  CPUs) + 1
        int threadNum = processorsNum * 2;  
        // the number of each group data 
        int eachGroupNum = hostMapList.size() / threadNum; 
        List<List<Map>> groupList = new ArrayList<>();
        for (int i = 0; i < threadNum; i++) {
            int start = i * eachGroupNum;
            if (i == threadNum - 1) {
                int end = mapList.size();
                groupList.add(hostMapList.subList(start, end));
            } else {
                int end = (i+1) * eachGroupNum;
                groupList.add(hostMapList.subList(start, end));
            }
        }
        // update data by using multi-threads asynchronously
        ExecutorService executorService = Executors.newFixedThreadPool(threadNum/2);
        CountDownLatch countDownLatch = new CountDownLatch(threadNum);
        for (List<Map> group : groupList) {
            executorService.execute(()->{
                try {
                    for (Map map : group) {
                    	// update the data in mongodb
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                	// let counter minus one 
                    countDownLatch.countDown();  
                }
            });
        }
        try {
        	// main thread donnot execute until all child threads finish
            countDownLatch.await();  
        } catch (Exception e) {
            e.printStackTrace();
        }
        // remember to shutdown the threadPool
        executorService.shutdown();  
        return ResponseData.success();
}

🎉那么在使用多线程异步更新的策略后,从当初调用接口所需的大概时间为 30-40 min 下降到了 8-10 min,大大提高了执行效率。

💡需要注意的是,这里使用的 newFixedThreadPool 创建线程池,它有一个缺陷就是,它的阻塞队列默认是一个无界队列,默认值为 Integer.MAX_VALUE 极有可能会造成 OOM 问题。因此,一般可以使用 ThreadPoolExecutor 来创建线程池,自己可以指定等待队列中的线程个数,避免产生 OOM 问题。

public ResponseData updateHostDept() {
		// ...
		List<Map> hostMapList = mongoTemplate.find(query, Map.class, "host");
        // split the hostMapList for the following multi-threads task
        // return the number of logical CPUs
        int processorsNum = Runtime.getRuntime().availableProcessors();
        // set the threadNum as 2*(the number of logical CPUs) for handling IO Tasks,
        // if Computing Tasks set the threadNum as (the number of logical  CPUs) + 1
        int threadNum = processorsNum * 2;  
        // the number of each group data 
        int eachGroupNum = hostMapList.size() / threadNum; 
        List<List<Map>> groupList = new ArrayList<>();
        for (int i = 0; i < threadNum; i++) {
            int start = i * eachGroupNum;
            if (i == threadNum - 1) {
                int end = mapList.size();
                groupList.add(hostMapList.subList(start, end));
            } else {
                int end = (i+1) * eachGroupNum;
                groupList.add(hostMapList.subList(start, end));
            }
        }
        // update data by using multi-threads asynchronously
        ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 8, 30L, TimeUnit.SECONDS, 
                new ArrayBlockingQueue<>(100));
        CountDownLatch countDownLatch = new CountDownLatch(threadNum);
        for (List<Map> group : groupList) {
            executor.execute(()->{
                try {
                    for (Map map : group) {
                    	// update the data in mongodb
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                	// let counter minus one 
                    countDownLatch.countDown();  
                }
            });
        }
        try {
        	// main thread donnot execute until all child threads finish
            countDownLatch.await();  
        } catch (Exception e) {
            e.printStackTrace();
        }
        // remember to shutdown the threadPool
        executor.shutdown();  
        return ResponseData.success();
}

在上述的代码中,核心线程数和最大线程数分别为 5 和 8,并没有设置的很大的值,因为如果如果设置的很大,线程间频繁的上下文切换也会增加时间消耗,反而不能最大程度上发挥多线程的优势。至于如何选择合适的参数,需要根据机器的参数以及任务的类型综合考虑决定。

🎉最后补充一点,如果想要通过非编码的方式获取机器的 CPU 线程个数也很简单,windows 系统通过任务管理器,选择 “性能”,便可以查看 CPU 线程个数的情况,如下图所示:

🎉从上图可以看到,我的机器中内核是八个 CPU,但是通过超线程技术一个物理的 CPU 核心可以模拟成两个逻辑 CPU 线程,因此我的机器是支持8核16线程的。

到此这篇关于Java 多线程并发编程提高数据处理效率的文章就介绍到这了,更多相关Java 多线程提高数据处理效率内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • 深入理解Java序列化与反序列化

    深入理解Java序列化与反序列化

    今天教大家深入理解Java的序列化与反序列化,文中介绍的非常详细,有很多代码示例,对正在学习Java的小伙伴们很有帮助,需要的朋友可以参考下
    2021-05-05
  • SpringMVC九大组件之HandlerMapping详解

    SpringMVC九大组件之HandlerMapping详解

    这篇文章主要介绍了SpringMVC九大组件之HandlerMapping详解,HandlerMapping 叫做处理器映射器,它的作用就是根据当前 request 找到对应的 Handler 和 Interceptor,然后封装成一个 HandlerExecutionChain 对象返回,需要的朋友可以参考下
    2023-09-09
  • Java中数字黑洞实现代码

    Java中数字黑洞实现代码

    这篇文章主要介绍了Java编程中如何实现数字黑洞算法游戏,其中涉及到了数组、scanner、if语句等Java编程的基础知识,需要的朋友可以参考下
    2017-09-09
  • SpringBoot集成quartz实现定时任务详解

    SpringBoot集成quartz实现定时任务详解

    最为常用定时任务框架是Quartz,并且Spring也集成了Quartz的框架,Quartz不仅支持单实例方式还支持分布式方式。本文主要介绍Quartz,基础的Quartz的集成案例本,以及实现基于数据库的分布式任务管理和控制job生命周期
    2022-08-08
  • Java图片裁剪和生成缩略图的实例方法

    Java图片裁剪和生成缩略图的实例方法

    这篇文章主要介绍了Java图片裁剪和生成缩略图的实例方法,非常不错,具有参考借鉴价值,需要的朋友可以参考下
    2017-01-01
  • Java_Spring之基于注解的 AOP 配置

    Java_Spring之基于注解的 AOP 配置

    这篇文章主要介绍了Java_Spring中基于注解的AOP配置,我们要先进行环境的搭建,在进行注解配置,感兴趣的同学可以参考阅读
    2023-04-04
  • SpringBoot实现配置文件的替换

    SpringBoot实现配置文件的替换

    这篇文章主要介绍了SpringBoot实现配置文件的替换,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-12-12
  • java 实现黄金分割数的示例详解

    java 实现黄金分割数的示例详解

    这篇文章主要介绍了java 实现黄金分割数的示例详解,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2021-02-02
  • SpringBoot整合Redis的实现示例

    SpringBoot整合Redis的实现示例

    本文主要介绍了SpringBoot整合Redis的实现示例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2023-01-01
  • Spring中的10种事务失效的常见场景

    Spring中的10种事务失效的常见场景

    这篇文章主要介绍了Spring中的10种事务失效的常见场景,Spring的声明式事务功能更是提供了极其方便的事务配置方式,配合Spring Boot的自动配置,大多数Spring Boot项目只需要在方法上标记@Transactional注解,即可一键开启方法的事务性配置,需要的朋友可以参考下
    2023-11-11

最新评论