Java多线程编程实战之模拟大量数据同步

 更新时间:2019年02月14日 10:20:01   作者:沉静  
这篇文章主要介绍了Java多线程编程实战之模拟大量数据同步,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

背景

最近对于 Java 多线程做了一段时间的学习,笔者一直认为,学习东西就是要应用到实际的业务需求中的。否则要么无法深入理解,要么硬生生地套用技术只是达到炫技的效果。

不过笔者仍旧认为自己对于多线程掌握不够熟练,不敢轻易应用到生产代码中。这就按照平时工作中遇到的实际问题,脑补了一个很可能存在的业务场景:

已知某公司管理着 1000 个微信服务号,每个服务号有 1w ~ 50w 粉丝不等。假设该公司每天都需要将所有微信服务号的粉丝数据通过调用微信 API 的方式更新到本地数据库。

需求分析

对此需求进行分析,主要存在以下问题:

  • 单个服务号获取粉丝 id,只能每次 1w 按顺序拉取
  • 微信的 API 对于服务商的并发请求数量有限制

单个服务号获取粉丝 id,只能每次 1w 按顺序拉取。这个问题决定了单个公众号在拉取粉丝 id 上,无法分配给多个线程执行。

微信的 API 对于服务商的并发请求数量有限制。这点最容易被忽略,如果我们同时有过多的请求,则会导致接口被封禁。这里可以通过信号量来控制同时执行的线程数量。

为了尽快完成数据同步,根据实际情况:整个数据同步可分为读数据和写数据两个部分。读数据是通过 API 获取,走网络 IO,速度较慢;写数据是写到数据库,速度较快。所以得出结论:需要分配较多的线程进行读数据,较少的线程进行写数据。

设计要点

首先,我们需要确定开启多少个线程(在生产中往往是使用线程池),线程数量需要根据服务器性能来决定,这里我们定为 40 个读取数据线程(将 1000 个公众号分为 40 份,分别在 40 个线程中执行),1个写入数据线程。(具体开多少个线程,取决于线程池的容量,以及可以分配给此业务的数量。具体的数字需要根据实际情况测试得出,比服务器阈值低一些较好。当然,配置允许范围内越大越好)

其次,考虑到微信对于 API 并发请求的限制,需要限制同时执行的线程数,使用java.util.concurrent.Semaphore进行控制,这里我们限制为 20 个(具体的信号量凭证数,取决于同一时间能够执行的线程,跟 API 限制,服务器性能有关)。

然后,我们需要知道数据何时读取、写入完毕,以控制程序逻辑以及终止程序,这里我们使用java.util.concurrent.CountDownLatch进行控制。

最后,我们需要一个数据结构,用来在多个线程共享处理的数据,此处同步数据的场景非常适合使用队列,这里我们使用线程安全的java.util.concurrent.ConcurrentLinkedQueue来进行处理。(需要注意的是,在实际开发中,队列不能够无限制地增长,这将会很快消耗掉内存,我们需要根据实际情况对队列长度做控制。例如,可以通过控制读取线程数和写入线程数的比例来控制队列的长度)

模拟代码

由于本文重点关注多线程的使用,模拟代码只体现多线程操作的方法。代码里添加了大量的注释,方便各位读者阅读理解。

JDK:1.8

import java.util.Arrays;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

/**
 * N个线程向队列添加数据
 * 一个线程消费队列数据
 */
public class QueueTest {
  private static List<String> data = Arrays.asList("a", "b", "c", "d", "e");

  private static final int OFFER_COUNT = 40; // 开启的线程数量

  private static Semaphore semaphore = new Semaphore(20); // 同一时间执行的线程数量(大多用于控制API调用次数或数据库查询连接数)

  public static void main(String[] args) throws InterruptedException {
    Queue<String> queue = new ConcurrentLinkedQueue<>(); // 处理队列,需要处理的数据,放置到此队列中

    CountDownLatch offerLatch = new CountDownLatch(OFFER_COUNT); // offer线程latch,每完成一个,latch减一,lacth的count为0时表示offer处理完毕
    CountDownLatch pollLatch = new CountDownLatch(1); // poll线程latch,latch的count为0时,表示poll处理完毕

    Runnable offerRunnable = () -> {
      try {
        semaphore.acquire(); // 信号量控制
      } catch (InterruptedException e) {
        e.printStackTrace();
      }

      try {
        for (String datum : data) {
          queue.offer(datum);
          TimeUnit.SECONDS.sleep(2); // 模拟取数据很慢的情况
        }
      } catch (InterruptedException e) {
        e.printStackTrace();
      } finally {
        // 在finally中执行latch.countDown()以及信号量释放,避免因异常导致没有正常释放
        offerLatch.countDown();
        semaphore.release();
      }
    };

    Runnable pollRunnable = () -> {
      int count = 0;
      try {
        while (offerLatch.getCount() > 0 || queue.size() > 0) { // 只要offer的latch未执行完,或queue仍旧有数据,则继续循环
          String poll = queue.poll();
          if (poll != null) {
            System.out.println(poll);
            count++;
          }
          // 无论是否poll到数据,均暂停一小段时间,可降低CPU消耗
          TimeUnit.MILLISECONDS.sleep(100);
        }
        System.out.println("total count:" + count);
      } catch (InterruptedException e) {
        e.printStackTrace();
      } finally {
        // 在finally中执行latch.countDown(),避免因异常导致没有正常释放
        pollLatch.countDown();
      }
    };

    // 启动线程(生产环境中建议使用线程池)
    new Thread(pollRunnable).start(); // 启动一个poll线程
    for (int i = 0; i < OFFER_COUNT; i++) {
      new Thread(offerRunnable).start();
    } // 模拟取数据很慢,需要开启40个线程处理

    // latch等待,会block主线程直到latch的count为0
    offerLatch.await();
    pollLatch.await();

    System.out.println("===the end===");
  }
}

到这里,本文结束。以上是笔者脑补的一个常见需求的解决方案。

注意:多线程编程对实际环境和需求有很大的依赖,需要根据实际的需求情况对各个参数做调整。实际在使用中,需要尽量模拟生产环境的数据情况来进行测试,对服务器执行期间的并发数,CPU、内存、网络 IO、磁盘 IO 做好观察。并适当地调低并发数,以给服务器留有处理其他请求的余量。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持脚本之家。

相关文章

  • MyBatis拦截器分表实践分享

    MyBatis拦截器分表实践分享

    部门内有一些亿级别核心业务表增速非常快,增量日均100W,但线上业务只依赖近一周的数据,随着数据量的迅速增长,慢SQL频发,数据库性能下降,系统稳定性受到严重影响,本篇文章,将分享如何使用MyBatis拦截器低成本的提升数据库稳定性,需要的朋友可以参考下
    2024-01-01
  • 基于SpringBoot实现图片上传及图片回显

    基于SpringBoot实现图片上传及图片回显

    本篇文章主要介绍了SpringBoot如何实现图片上传及图片回显,文中通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧
    2022-08-08
  • OpenFeign实现远程调用

    OpenFeign实现远程调用

    这篇文章主要为大家详细介绍了OpenFeign实现远程调用,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2022-08-08
  • JDK完全卸载超详细步骤

    JDK完全卸载超详细步骤

    这篇文章主要给大家介绍了关于JDK完全卸载超详细步骤的相关资料,在安装JDK之前,最好将原来可能安装过的JDK卸载掉,以免影响到新JDK的使用,需要的朋友可以参考下
    2023-08-08
  • SpringBoot+aop实现主从数据库的读写分离操作

    SpringBoot+aop实现主从数据库的读写分离操作

    读写分离的作用是为了缓解写库,也就是主库的压力,但一定要基于数据一致性的原则,就是保证主从库之间的数据一定要一致,这篇文章给大家介绍SpringBoot+aop实现主从数据库的读写分离操作,感兴趣的朋友跟随小编一起看看吧
    2024-03-03
  • 详解Java深拷贝,浅拷贝和Cloneable接口

    详解Java深拷贝,浅拷贝和Cloneable接口

    这篇文章主要为大家详细介绍了Java中Cloneable接口以及深拷贝与浅拷贝的相关知识,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下,希望能够给你带来帮助
    2022-08-08
  • java如何实现判断文件的真实类型

    java如何实现判断文件的真实类型

    本篇文章主要介绍了java如何实现判断文件的真实类型,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2017-08-08
  • 详解Java实现缓存(LRU,FIFO)

    详解Java实现缓存(LRU,FIFO)

    本篇文章主要介绍了详解Java实现缓存(LRU,FIFO) ,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2017-04-04
  • Java concurrency之互斥锁_动力节点Java学院整理

    Java concurrency之互斥锁_动力节点Java学院整理

    本文通过示例代码给大家介绍了Java concurrency之互斥锁的相关知识,非常不错,具有参考借鉴价值,需要的朋友可以参考下
    2017-06-06
  • Java Socket实现聊天室附1500行源代码

    Java Socket实现聊天室附1500行源代码

    Socket是应用层与TCP/IP协议族通信的中间软件抽象层,它是一组接口。本篇文章手把手带你通过Java Socket来实现自己的聊天室,大家可以在过程中查缺补漏,温故而知新
    2021-10-10

最新评论