利用Java多线程技术导入数据到Elasticsearch的方法步骤

 更新时间:2019年07月16日 09:47:06   作者:Wooola  
这篇文章主要介绍了利用Java多线程技术导入数据到Elasticsearch的方法步骤,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

前言


近期接到一个任务,需要改造现有从mysql往Elasticsearch导入数据MTE(mysqlToEs)小工具,由于之前采用单线程导入,千亿数据需要两周左右的时间才能导入完成,导入效率非常低。所以楼主花了3天的时间,利用java线程池框架Executors中的FixedThreadPool线程池重写了MTE导入工具,单台服务器导入效率提高十几倍(合理调整线程数据,效率更高)。

关键技术栈

  • Elasticsearch
  • jdbc
  • ExecutorService\Thread
  • sql

工具说明

maven依赖

<dependency> 
 <groupId>mysql</groupId> 
 <artifactId>mysql-connector-java</artifactId> 
 <version>${mysql.version}</version> 
</dependency> 
<dependency> 
 <groupId>org.elasticsearch</groupId> 
 <artifactId>elasticsearch</artifactId> 
 <version>${elasticsearch.version}</version> 
</dependency> 
<dependency> 
 <groupId>org.elasticsearch.client</groupId> 
 <artifactId>transport</artifactId> 
 <version>${elasticsearch.version}</version> 
</dependency> 
<dependency> 
 <groupId>org.projectlombok</groupId> 
 <artifactId>lombok</artifactId> 
 <version>${lombok.version}</version> 
</dependency> 
<dependency> 
 <groupId>com.alibaba</groupId> 
 <artifactId>fastjson</artifactId> 
 <version>${fastjson.version}</version> 
</dependency> 

java线程池设置

默认线程池大小为21个,可调整。其中POR为处理流程已办数据线程池,ROR为处理流程已阅数据线程池。

private static int THREADS = 21; 
public static ExecutorService POR = Executors.newFixedThreadPool(THREADS); 
public static ExecutorService ROR = Executors.newFixedThreadPool(THREADS); 

定义已办生产者线程/已阅生产者线程:ZlPendProducer/ZlReadProducer

public class ZlPendProducer implements Runnable { 
 ... 
 @Override 
 public void run() { 
 System.out.println(threadName + "::启动..."); 
 for (int j = 0; j < Const.TBL.TBL_PEND_COUNT; j++) 
 try { 
 .... 
 int size = 1000; 
 for (int i = 0; i < count; i += size) { 
 if (i + size > count) { 
 //作用为size最后没有100条数据则剩余几条newList中就装几条 
 size = count - i; 
 } 
 String sql = "select * from " + tableName + " limit " + i + ", " + size; 
 System.out.println(tableName + "::sql::" + sql); 
 rs = statement.executeQuery(sql); 
 List<HistPendingEntity> lst = new ArrayList<>(); 
 while (rs.next()) { 
 HistPendingEntity p = PendUtils.getHistPendingEntity(rs); 
 lst.add(p); 
 } 
 MteExecutor.POR.submit(new ZlPendConsumer(lst)); 
 Thread.sleep(2000); 
 } 
 .... 
 } catch (Exception e) { 
 e.printStackTrace(); 
 } 
 } 
} 
public class ZlReadProducer implements Runnable { 
 ...已阅生产者处理逻辑同已办生产者 
} 

定义已办消费者线程/已阅生产者线程:ZlPendConsumer/ZlReadConsumer

public class ZlPendConsumer implements Runnable { 
 private String threadName; 
 private List<HistPendingEntity> lst; 
 public ZlPendConsumer(List<HistPendingEntity> lst) { 
 this.lst = lst; 
 } 
 @Override 
 public void run() { 
 ... 
 lst.forEach(v -> { 
 try { 
 String json = new Gson().toJson(v); 
 EsClient.addDataInJSON(json, Const.ES.HistPendDB_Index, Const.ES.HistPendDB_type, v.getPendingId(), null); 
 Const.COUNTER.LD_P.incrementAndGet(); 
 } catch (Exception e) { 
 e.printStackTrace(); 
 System.out.println("err::PendingId::" + v.getPendingId()); 
 } 
 }); 
 ... 
 } 
} 
public class ZlReadConsumer implements Runnable { 
 //已阅消费者处理逻辑同已办消费者 
} 

定义导入Elasticsearch数据监控线程:Monitor

监控线程-Monitor为了计算每分钟导入Elasticsearch的数据总条数,利用监控线程,可以调整线程池的线程数的大小,以便利用多线程更快速的导入数据。

public void monitorToES() { 
 new Thread(() -> { 
 while (true) { 
 StringBuilder sb = new StringBuilder(); 
 sb.append("已办表数::").append(Const.TBL.TBL_PEND_COUNT) 
 .append("::已办总数::").append(Const.COUNTER.LD_P_TOTAL) 
 .append("::已办入库总数::").append(Const.COUNTER.LD_P); 
 sb.append("~~~~已阅表数::").append(Const.TBL.TBL_READ_COUNT); 
 sb.append("::已阅总数::").append(Const.COUNTER.LD_R_TOTAL) 
 .append("::已阅入库总数::").append(Const.COUNTER.LD_R); 
 if (ldPrevPendCount == 0 && ldPrevReadCount == 0) { 
 ldPrevPendCount = Const.COUNTER.LD_P.get(); 
 ldPrevReadCount = Const.COUNTER.LD_R.get(); 
 start = System.currentTimeMillis(); 
 } else { 
 long end = System.currentTimeMillis(); 
 if ((end - start) / 1000 >= 60) { 
 start = end; 
 sb.append("\n#########################################\n"); 
 sb.append("已办每分钟TPS::" + (Const.COUNTER.LD_P.get() - ldPrevPendCount) + "条"); 
 sb.append("::已阅每分钟TPS::" + (Const.COUNTER.LD_R.get() - ldPrevReadCount) + "条"); 
 ldPrevPendCount = Const.COUNTER.LD_P.get(); 
 ldPrevReadCount = Const.COUNTER.LD_R.get(); 
 } 
 } 
 System.out.println(sb.toString()); 
 try { 
 Thread.sleep(3000); 
 } catch (InterruptedException e) { 
 e.printStackTrace(); 
 } 
 } 
 }).start(); 
} 

初始化Elasticsearch:EsClient

String cName = meta.get("cName");//es集群名字 
String esNodes = meta.get("esNodes");//es集群ip节点 
Settings esSetting = Settings.builder() 
 .put("cluster.name", cName) 
 .put("client.transport.sniff", true)//增加嗅探机制,找到ES集群 
 .put("thread_pool.search.size", 5)//增加线程池个数,暂时设为5 
 .build(); 
String[] nodes = esNodes.split(","); 
client = new PreBuiltTransportClient(esSetting); 
for (String node : nodes) { 
 if (node.length() > 0) { 
 String[] hostPort = node.split(":"); 
 client.addTransportAddress(new TransportAddress(InetAddress.getByName(hostPort[0]), Integer.parseInt(hostPort[1]))); 
 } 
} 

初始化数据库连接

conn = DriverManager.getConnection(url, user, password); 

启动参数

nohup java -jar mte.jar ES-Cluster2019 node1:9300,node2:9300,node3:9300 root 123456! jdbc:mysql://ip:3306/mte 130 130 >> ./mte.log 2>&1 & 

参数说明

ES-Cluster2019 为Elasticsearch集群名字

node1:9300,node2:9300,node3:9300为es的节点IP

130 130为已办已阅分表的数据

程序入口:MteMain

// 监控线程 
Monitor monitorService = new Monitor(); 
monitorService.monitorToES(); 
// 已办生产者线程 
Thread pendProducerThread = new Thread(new ZlPendProducer(conn, "ZlPendProducer")); 
pendProducerThread.start(); 
// 已阅生产者线程 
Thread readProducerThread = new Thread(new ZlReadProducer(conn, "ZlReadProducer")); 
readProducerThread.start(); 

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持脚本之家。

相关文章

  • IDEA Maven Mybatis generator 自动生成代码(实例讲解)

    IDEA Maven Mybatis generator 自动生成代码(实例讲解)

    下面小编就为大家分享一篇IDEA Maven Mybatis generator 自动生成代码的实例讲解,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2017-12-12
  • Java多线程——基础概念

    Java多线程——基础概念

    这篇文章主要介绍了java多线程编程实例,分享了几则多线程的实例代码,具有一定参考价值,加深多线程编程的理解还是很有帮助的,需要的朋友可以参考下,希望可以帮到你
    2021-07-07
  • resty upload无需依赖的文件上传与下载

    resty upload无需依赖的文件上传与下载

    这篇文章主要为大家介绍了resty upload中无需依赖的文件上传与下载过程,有需要的朋友可以借鉴参考下,希望能够有所帮助祝大家多多进步,早日升职加薪
    2022-03-03
  • 解决IDEA修改 .vmoptions 文件后导致无法启动的问题

    解决IDEA修改 .vmoptions 文件后导致无法启动的问题

    这篇文章主要介绍了解决IDEA修改 .vmoptions 文件后导致无法启动的问题,需要的朋友可以参考下
    2020-12-12
  • Java设计模式之外观模式示例详解

    Java设计模式之外观模式示例详解

    外观模式为多个复杂的子系统,提供了一个一致的界面,使得调用端只和这个接口发生调用,而无须关系这个子系统内部的细节。本文将通过示例详细为大家讲解一下外观模式,需要的可以参考一下
    2022-03-03
  • SpringBoot中GlobalExceptionHandler异常处理机制详细说明

    SpringBoot中GlobalExceptionHandler异常处理机制详细说明

    Spring Boot的GlobalExceptionHandler是一个全局异常处理器,用于捕获和处理应用程序中发生的所有异常,这篇文章主要给大家介绍了关于Java中GlobalExceptionHandler异常处理机制的相关资料,需要的朋友可以参考下
    2024-03-03
  • Java并发应用之任务执行分析

    Java并发应用之任务执行分析

    这篇文章主要为大家详细介绍了JavaJava并发应用编程中任务执行分析的相关知识,文中的示例代码讲解详细,感兴趣的小伙伴可以了解一下
    2023-07-07
  • 关于springboot2.4跨域配置问题

    关于springboot2.4跨域配置问题

    这篇文章主要介绍了springboot2.4跨域配置的方法,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2021-07-07
  • Java基于FFmpeg实现Mp4视频转GIF

    Java基于FFmpeg实现Mp4视频转GIF

    FFmpeg是一套可以用来记录、转换数字音频、视频,并能将其转化为流的开源计算机程序。本文主要介绍了在Java中如何基于FFmpeg进行Mp4视频到Gif动图的转换,感兴趣的小伙伴可以了解一下
    2022-11-11
  • idea 默认路径修改从C盘更改到D盘

    idea 默认路径修改从C盘更改到D盘

    本文主要介绍了idea 默认路径修改从C盘更改到D盘,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2024-07-07

最新评论