SpringBoot用多线程批量导入数据库实现方法

 更新时间:2023年02月03日 10:24:29   作者:愿做无知一猿  
这篇文章主要介绍了SpringBoot用多线程批量导入数据库实现方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习吧

环境

springboot、mybatisPlus、mysql8

mysql8(部署在1核2G的服务器上,很卡,所以下面的数据条数用5000,太大怕不是要等到花儿都谢了 0.0)

原始的for循环入库

@Service
@Slf4j
public class MoreTestServiceImpl extends ServiceImpl<MoreTestMapper, MoreTestEntity> implements MoreTestService {
    @Override
    @Transactional(rollbackFor = Exception.class)
    public Object doTest() {
        long start = System.currentTimeMillis();
        List<MoreTestEntity> entityList = new ArrayList<>();
        for (int i = 0; i < 5000; i++) {
            MoreTestEntity entity = new MoreTestEntity();
            entity.setId((long) i);
            entity.setA(UUID.randomUUID().toString());
            entity.setB(UUID.randomUUID().toString());
            entity.setC(UUID.randomUUID().toString());
            entity.setD(UUID.randomUUID().toString());
            entity.setE(UUID.randomUUID().toString());
            entity.setF(UUID.randomUUID().toString());
            entity.setG(UUID.randomUUID().toString());
            entity.setH(UUID.randomUUID().toString());
            entity.setI(UUID.randomUUID().toString());
            entity.setJ(UUID.randomUUID().toString());
            entity.setK(UUID.randomUUID().toString());
            entityList.add(entity);
            //在循环中入库
            baseMapper.insert(entity);
        }
        long end = System.currentTimeMillis();
        System.err.println(end - start);
        return end - start;
    }
}

共耗时:180121 ms

批量保存操作

@Service
@Slf4j
public class MoreTestServiceImpl extends ServiceImpl<MoreTestMapper, MoreTestEntity> implements MoreTestService {
    @Override
    @Transactional(rollbackFor = Exception.class)
    public Object doTest() {
        long start = System.currentTimeMillis();
        List<MoreTestEntity> entityList = new ArrayList<>();
        for (int i = 0; i < 5000; i++) {
            MoreTestEntity entity = new MoreTestEntity();
            entity.setId((long) i);
            entity.setA(UUID.randomUUID().toString());
            entity.setB(UUID.randomUUID().toString());
            entity.setC(UUID.randomUUID().toString());
            entity.setD(UUID.randomUUID().toString());
            entity.setE(UUID.randomUUID().toString());
            entity.setF(UUID.randomUUID().toString());
            entity.setG(UUID.randomUUID().toString());
            entity.setH(UUID.randomUUID().toString());
            entity.setI(UUID.randomUUID().toString());
            entity.setJ(UUID.randomUUID().toString());
            entity.setK(UUID.randomUUID().toString());
            entityList.add(entity);
        }
      	//mybatisPlus提供的批量保存方法,数字代表每几条数据提交一次事务,默认1000
        saveBatch(entityList, 1000);
        long end = System.currentTimeMillis();
        System.err.println(end - start);
        return end - start;
    }
}

耗时时间:87217ms

在批量插入的基础上使用多线程

@Service
@Slf4j
public class MoreTestServiceImpl extends ServiceImpl<MoreTestMapper, MoreTestEntity> implements MoreTestService {
    @Override
    @Transactional(rollbackFor = Exception.class)
    public Object doTest() throws InterruptedException {
        long start = System.currentTimeMillis();
        //手动创建线程池,注意你 数据库连接池的 允许连接数量,别超过了就行。
        ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(
                5,
                5,
                30,
                TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(10),
                //isDaemon 设置线程是否是守护线程,true的话,主线程结束,new的线程就不会继续工作
                new NamedThreadFactory("执行线程", false),
                (r, executor) -> System.out.println("拒绝" + r));
        List<MoreTestEntity> entityList = new ArrayList<>();
        for (int i = 0; i < 5000; i++) {
            MoreTestEntity entity = new MoreTestEntity();
            entity.setId((long) i);
            entity.setA(UUID.randomUUID().toString());
            entity.setB(UUID.randomUUID().toString());
            entity.setC(UUID.randomUUID().toString());
            entity.setD(UUID.randomUUID().toString());
            entity.setE(UUID.randomUUID().toString());
            entity.setF(UUID.randomUUID().toString());
            entity.setG(UUID.randomUUID().toString());
            entity.setH(UUID.randomUUID().toString());
            entity.setI(UUID.randomUUID().toString());
            entity.setJ(UUID.randomUUID().toString());
            entity.setK(UUID.randomUUID().toString());
            entityList.add(entity);
        }
        //拆分list,将其拆分成5份,然后上面线程池创建也是5个核心线程,刚好执行
        List<List<MoreTestEntity>> partition = ListUtils.partition(entityList, 1000);
        //使用CountDownLatch保证所有线程都执行完成
        CountDownLatch latch = new CountDownLatch(5);
        partition.forEach(item -> {
            poolExecutor.execute(() -> {
                saveBatch(item, 1000);
                latch.countDown();
            });
        });
        latch.await();
        // 也可以这么写,设定超时时间
        //latch.await(100,TimeUnit.SECONDS);
        long end = System.currentTimeMillis();
        System.err.println(end - start);
        //关闭线程池
        poolExecutor.shutdown();
        return end - start;
    }
}

耗时时间: 28235

可见时间从180秒,缩短到了28秒,但是@Transactional对于多线程是控制不了所有的事务的。

Spring实现事务的原理是通过ThreadLocal把数据库连接绑定到当前线程中,同一个事务中数据库操作使用同一个jdbc connection,新开启的线程获取不到当前jdbc connection。

如下代码:

partition.forEach(item -> {
            poolExecutor.execute(() -> {
                saveBatch(item, 1000);
                latch.countDown();
                //让每个都报错
                int i = 1/0;
            });
        });

控制台打印:

Exception in thread "执行线程5" java.lang.ArithmeticException: / by zero
    at com.kusch.ares.service.impl.MoreTestServiceImpl.lambda$null$1(MoreTestServiceImpl.java:68)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
Exception in thread "执行线程2" java.lang.ArithmeticException: / by zero
    at com.kusch.ares.service.impl.MoreTestServiceImpl.lambda$null$1(MoreTestServiceImpl.java:68)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
Exception in thread "执行线程4" java.lang.ArithmeticException: / by zero
    at com.kusch.ares.service.impl.MoreTestServiceImpl.lambda$null$1(MoreTestServiceImpl.java:68)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
Exception in thread "执行线程1" java.lang.ArithmeticException: / by zero
    at com.kusch.ares.service.impl.MoreTestServiceImpl.lambda$null$1(MoreTestServiceImpl.java:68)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
Exception in thread "执行线程3" 30179
java.lang.ArithmeticException: / by zero
    at com.kusch.ares.service.impl.MoreTestServiceImpl.lambda$null$1(MoreTestServiceImpl.java:68)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)

可见5个线程都报错了,但是去查询数据库,却可以查询到5000条数据,这是不应该出现的情况。

处理多线程入库的事务问题

@Service
@Slf4j
public class MoreTestServiceImpl extends ServiceImpl<MoreTestMapper, MoreTestEntity> implements MoreTestService {
    @Resource
    private DataSourceTransactionManager dataSourceTransactionManager;
    @Resource
    private TransactionDefinition transactionDefinition;
    @Override
    //此处手动管理事务的提交后,这个注解就可以去掉了
    //    @Transactional(rollbackFor = Exception.class)
    public Object doTest() {
        long start = System.currentTimeMillis();
        //手动创建线程池,注意你 数据库连接池的 允许连接数量,别超过了就行。
        ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(
                5,
                5,
                30,
                TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(10),
                //isDaemon 设置线程是否是守护线程,true的话,主线程结束,new的线程就不会继续工作
                new NamedThreadFactory("执行线程", false),
                (r, executor) -> System.out.println("拒绝" + r));
        List<MoreTestEntity> entityList = new ArrayList<>();
        for (int i = 0; i < 50; i++) {
            MoreTestEntity entity = new MoreTestEntity();
            entity.setId((long) i);
            entity.setA(UUID.randomUUID().toString());
            entity.setB(UUID.randomUUID().toString());
            entity.setC(UUID.randomUUID().toString());
            entity.setD(UUID.randomUUID().toString());
            entity.setE(UUID.randomUUID().toString());
            entity.setF(UUID.randomUUID().toString());
            entity.setG(UUID.randomUUID().toString());
            entity.setH(UUID.randomUUID().toString());
            entity.setI(UUID.randomUUID().toString());
            entity.setJ(UUID.randomUUID().toString());
            entity.setK(UUID.randomUUID().toString());
            entityList.add(entity);
        }
        //拆分list,将其拆分成5份,然后上面线程池创建也是5个核心线程,刚好执行
        List<List<MoreTestEntity>> partition = ListUtils.partition(entityList, 10);
        //使用CountDownLatch保证所有线程都执行完成
        CountDownLatch sonLatch = new CountDownLatch(5);
        //主线程的 肯定为1
        CountDownLatch mainLatch = new CountDownLatch(1);
        AtomicBoolean hasError = new AtomicBoolean(false);
        partition.forEach(item -> {
            poolExecutor.execute(() -> {
                doSave(item, sonLatch, hasError, mainLatch);
            });
        });
        try {
            //此处应该是用try catch 包裹着主线程的所有业务代码,以此保证主线程中任何一处报错都可以通知子线程
            //这里加一个是为了调试主线程中的数据入库操作
            MoreTestEntity entity = new MoreTestEntity();
            entity.setId((long) 99999);
            entity.setA(UUID.randomUUID().toString());
            entity.setB(UUID.randomUUID().toString());
            entity.setC(UUID.randomUUID().toString());
            entity.setD(UUID.randomUUID().toString());
            entity.setE(UUID.randomUUID().toString());
            entity.setF(UUID.randomUUID().toString());
            entity.setG(UUID.randomUUID().toString());
            entity.setH(UUID.randomUUID().toString());
            entity.setI(UUID.randomUUID().toString());
            entity.setJ(UUID.randomUUID().toString());
            entity.setK(UUID.randomUUID().toString());
            save(entity);
            //主线程报错
            int i = 10 / 0;
            sonLatch.await();
        } catch (InterruptedException e) {
            hasError.set(true);
            e.printStackTrace();
        }
        mainLatch.countDown();
        long end = System.currentTimeMillis();
        System.err.println(end - start);
        //关闭线程池
        if (!poolExecutor.isShutdown()) {
            poolExecutor.shutdown();
        }
        return end - start;
    }
    /**
     * 包装后的子线程的保存代码
     *
     * @param entityList 要保存的集合
     * @param sonLatch   子线程 CountDownLatch
     * @param hasError   是否发生错误
     * @param mainLatch  主线程 CountDownLatch
     */
    private void doSave(List<MoreTestEntity> entityList,
                        CountDownLatch sonLatch,
                        AtomicBoolean hasError,
                        CountDownLatch mainLatch) {
        TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
        try {
            //            //子线程报错
            //            int i = 10 / 0;
            saveBatch(entityList);
        } catch (Throwable throwable) {
            throwable.printStackTrace();
            hasError.set(true);
        } finally {
            //这是必须的,每个子线程走完,要让主线程继续走,然后再回到子线程的每个任务,决定是提交还是回滚
            sonLatch.countDown();
        }
        try {
            //等待主线程的执行结束
            mainLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
            hasError.set(true);
        }
        //事务操作
        if (hasError.get()) {
            dataSourceTransactionManager.rollback(transactionStatus);
        } else {
            dataSourceTransactionManager.commit(transactionStatus);
        }
    }
}

分别放开子线程报错和主线程报错,会发现事务都可以正常回滚,达到了预期的效果。

主要思路就是通过子线程CountDownLatch和主线程CountDownLatch,控制线程好代码的执行顺序即可。

最后补充几点:

  • 上述代码中的countDown()一旦出现不执行的情况那会导致线程堵塞堆积,所以建议给await()增加超时时间
  • 这样操作可能还会出现问题,比如主线程通知子线程可以进行实务操作了,但是各个子线程之间非透明,所以还是有几率存在某个子线程事务回滚失败的情况。

到此这篇关于SpringBoot用多线程批量导入数据库实现方法的文章就介绍到这了,更多相关SpringBoot多线程导入数据库内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • java实现猜拳游戏

    java实现猜拳游戏

    这篇文章主要为大家详细介绍了java实现猜拳游戏,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2018-08-08
  • Java中finally关键字对返回值的影响详解

    Java中finally关键字对返回值的影响详解

    这篇文章主要介绍了Java中finally关键字对返回值的影响详解,执行完try catch里面内容准备return时,如果还有finally需要执行这是编译器会为我们增加一个全局变量去暂存return 的值,等到finally执行完成去return这个全局变量,需要的朋友可以参考下
    2024-01-01
  • SpringBoot数据层处理方案精讲

    SpringBoot数据层处理方案精讲

    这篇文章主要介绍了SpringBoot数据层技术的解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习吧
    2022-10-10
  • 详解Java中wait和sleep的区别

    详解Java中wait和sleep的区别

    这篇文章主要介绍了Java中wait和sleep的区别,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2019-03-03
  • 使用JSONObject.toJSONString 过滤掉值为空的key

    使用JSONObject.toJSONString 过滤掉值为空的key

    这篇文章主要介绍了使用JSONObject.toJSONString 过滤掉值为空的key,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-03-03
  • Java实现发送短信验证码功能

    Java实现发送短信验证码功能

    这篇文章主要为大家详细介绍了Java实现发送短信验证码功能,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2017-11-11
  • java hashtable实现代码

    java hashtable实现代码

    这篇文章介绍了java hashtable实现代码,有需要的朋友可以参考一下
    2013-10-10
  • 详述IntelliJ IDEA远程调试Tomcat的方法(图文)

    详述IntelliJ IDEA远程调试Tomcat的方法(图文)

    本篇文章主要介绍了详述IntelliJ IDEA远程调试Tomcat的方法,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2017-12-12
  • Java面试题冲刺第十二天--数据库(2)

    Java面试题冲刺第十二天--数据库(2)

    这篇文章主要为大家分享了最有价值的三道数据库面试题,涵盖内容全面,包括数据结构和算法相关的题目、经典面试编程题等,感兴趣的小伙伴们可以参考一下
    2021-07-07
  • java实现二叉树的创建及5种遍历方法(总结)

    java实现二叉树的创建及5种遍历方法(总结)

    下面小编就为大家带来一篇java实现二叉树的创建及5种遍历方法(总结)。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2017-04-04

最新评论