Spark调优多线程并行处理任务实现方式

 更新时间:2020年08月06日 10:21:41   作者:lshan  
这篇文章主要介绍了Spark调优多线程并行处理任务实现方式,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下

方式1:

1. 明确 Spark中Job 与 Streaming中 Job 的区别

1.1 Spark Core

一个 RDD DAG Graph 可以生成一个或多个 Job(Action操作)

一个Job可以认为就是会最终输出一个结果RDD的一条由RDD组织而成的计算

Job在spark里应用里是一个被调度的单位

1.2 Streaming

一个 batch 的数据对应一个 DStreamGraph

而一个 DStreamGraph 包含一或多个关于 DStream 的输出操作

每一个输出对应于一个Job,一个 DStreamGraph 对应一个JobSet,里面包含一个或多个Job

2. Streaming Job的并行度

Job的并行度由两个配置决定:

spark.scheduler.mode(FIFO/FAIR)
spark.streaming.concurrentJobs

一个 Batch 可能会有多个 Action 执行,比如注册了多个 Kafka 数据流,每个Action都会产生一个Job

所以一个 Batch 有可能是一批 Job,也就是 JobSet 的概念

这些 Job 由 jobExecutor 依次提交执行

而 JobExecutor 是一个默认池子大小为1的线程池,所以只能执行完一个Job再执行另外一个Job

这里说的池子,大小就是由spark.streaming.concurrentJobs 控制的

concurrentJobs 决定了向 Spark Core 提交Job的并行度

提交一个Job,必须等这个执行完了,才会提交第二个

假设我们把它设置为2,则会并发的把 Job 提交给 Spark Core

Spark 有自己的机制决定如何运行这两个Job,这个机制其实就是FIFO或者FAIR(决定了资源的分配规则)

默认是 FIFO,也就是先进先出,把 concurrentJobs 设置为2,但是如果底层是FIFO,那么会优先执行先提交的Job

虽然如此,如果资源够两个job运行,还是会并行运行两个Job

Spark Streaming 不同Batch任务可以并行计算么 https://developer.aliyun.com/article/73004

conf.setMaster("local[4]")
conf.set("spark.streaming.concurrentJobs", "3") //job 并行对
conf.set("spark.scheduler.mode", "FIFO")
val sc = new StreamingContext(conf, Seconds(5))

你会发现,不同batch的job其实也可以并行运行的,这里需要有几个条件:

有延时发生了,batch无法在本batch完成

concurrentJobs > 1

如果scheduler mode 是FIFO则需要某个Job无法一直消耗掉所有资源

Mode是FAIR则尽力保证你的Job是并行运行的,毫无疑问是可以并行的。

方式2:

场景1:

程序每次处理的数据量是波动的,比如周末比工作日多很多,晚八点比凌晨四点多很多。

一个spark程序处理的时间在1-2小时波动是OK的。而spark streaming程序不可以,如果每次处理的时间是1-10分钟,就很蛋疼。
设置10分钟吧,实际上10分钟的也就那一段高峰时间,如果设置每次是1分钟,很多时候会出现程序处理不过来,排队过多的任务延迟更久,还可能出现程序崩溃的可能。

场景2:

  • 程序需要处理的相似job数随着业务的增长越来越多
  • 我们知道spark的api里无相互依赖的stage是并行处理的,但是job之间是串行处理的。
  • spark程序通常是离线处理,比如T+1之类的延迟,时间变长是可以容忍的。而spark streaming是准实时的,如果业务增长导致延迟增加就很不合理。

spark虽然是串行执行job,但是是可以把job放到线程池里多线程执行的。如何在一个SparkContext中提交多个任务

DStream.foreachRDD{
   rdd =>
    //创建线程池
    val executors=Executors.newFixedThreadPool(rules.length)
    //将规则放入线程池
    for( ru <- rules){
     val task= executors.submit(new Callable[String] {
      override def call(): String ={
       //执行规则
       runRule(ru,spark)
      }
     })
    }
    //每次创建的线程池执行完所有规则后shutdown
    executors.shutdown()
  }

注意点

1.最后需要executors.shutdown()。

  • 如果是executors.shutdownNow()会发生未执行完的task强制关闭线程。
  • 如果使用executors.awaitTermination()则会发生阻塞,不是我们想要的结果。
  • 如果没有这个shutdowm操作,程序会正常执行,但是长时间会产生大量无用的线程池,因为每次foreachRDD都会创建一个线程池。

2.可不可以将创建线程池放到foreachRDD外面?

不可以,这个关系到对于scala闭包到理解,经测试,第一次或者前几次batch是正常的,后面的batch无线程可用。

3.线程池executor崩溃了就会导致数据丢失

原则上是这样的,但是正常的代码一般不会发生executor崩溃。至少我在使用的时候没遇到过。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持脚本之家。

相关文章

  • 解析ConcurrentHashMap: put方法源码分析

    解析ConcurrentHashMap: put方法源码分析

    ConcurrentHashMap是由Segment数组结构和HashEntry数组结构组成。Segment的结构和HashMap类似,是一种数组和链表结构,今天给大家普及java面试常见问题---ConcurrentHashMap知识,一起看看吧
    2021-06-06
  • Spring数据源及配置文件数据加密实现过程详解

    Spring数据源及配置文件数据加密实现过程详解

    这篇文章主要介绍了Spring数据源及配置文件数据加密实现过程详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-05-05
  • 浅谈多线程中的锁的几种用法总结(必看)

    浅谈多线程中的锁的几种用法总结(必看)

    下面小编就为大家带来一篇浅谈多线程中的锁的几种用法总结(必看)。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2017-05-05
  • IDEA加载项目没有src目录的问题及解决

    IDEA加载项目没有src目录的问题及解决

    这篇文章主要介绍了IDEA加载项目没有src目录的问题及解决方案,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2023-12-12
  • Spring Security认证器实现过程详解

    Spring Security认证器实现过程详解

    一些权限框架一般都包含认证器和决策器,前者处理登陆验证,后者处理访问资源的控制,这篇文章主要介绍了Spring Security认证器实现过程,需要的朋友可以参考下
    2022-06-06
  • 分布式调度器之Spring Task 的使用详解

    分布式调度器之Spring Task 的使用详解

    SpringTask是Spring框架中用于任务调度的组件,通过简单的注解就能实现定时任务的创建和调度,可以通过配置线程池来实现,本文给大家介绍分布式调度器之Spring Task 的使用,感兴趣的朋友跟随小编一起看看吧
    2024-10-10
  • jmap执行失败如何获取heapdump详解

    jmap执行失败如何获取heapdump详解

    这篇文章主要为大家介绍了jmap执行失败如何获取heapdump详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-04-04
  • Java代码实现酒店管理系统

    Java代码实现酒店管理系统

    这篇文章主要为大家详细介绍了Java代码实现酒店管理系统,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2022-05-05
  • 新手初学Java-Map

    新手初学Java-Map

    Map简介:将键映射到值的对象。一个映射不能包含重复的键;每个键最多只能映射到一个值。此接口取代 Dictionary 类,后者完全是一个抽象类,而不是一个接口
    2021-07-07
  • Spring Boot指定外部配置文件简单示例

    Spring Boot指定外部配置文件简单示例

    Spring Boot可以让你将配置外部化,这样你就可以在不同的环境中使用相同的应用程序代码,这篇文章主要给大家介绍了关于Spring Boot指定外部配置文件的相关资料,需要的朋友可以参考下
    2024-01-01

最新评论