Spark操作之aggregate、aggregateByKey详解

 更新时间:2019年06月17日 10:43:47   作者:午夜阳光psb  
这篇文章主要介绍了Spark操作之aggregate、aggregateByKey详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

1. aggregate函数

将每个分区里面的元素进行聚合,然后用combine函数将每个分区的结果和初始值(zeroValue)进行combine操作。这个函数最终返回的类型不需要和RDD中元素类型一致。

seqOp操作会聚合各分区中的元素,然后combOp操作把所有分区的聚合结果再次聚合,两个操作的初始值都是zeroValue.   seqOp的操作是遍历分区中的所有元素(T),第一个T跟zeroValue做操作,结果再作为与第二个T做操作的zeroValue,直到遍历完整个分区。combOp操作是把各分区聚合的结果,再聚合。aggregate函数返回一个跟RDD不同类型的值。因此,需要一个操作seqOp来把分区中的元素T合并成一个U,另外一个操作combOp把所有U聚合。

例子程序:

scala> val rdd = List(1,2,3,4,5,6,7,8,9)
rdd: List[Int] = List(1, 2, 3, 4, 5, 6, 7, 8, 9)
scala> rdd.par.aggregate((0,0))(
(acc,number) => (acc._1 + number, acc._2 + 1),
(par1,par2) => (par1._1 + par2._1, par1._2 + par2._2)
)
res0: (Int, Int) = (45,9)

scala> res0._1 / res0._2
res1: Int = 5

过程大概这样:

首先,初始值是(0,0),这个值在后面2步会用到。
然后,(acc,number) => (acc._1 + number, acc._2 + 1),number即是函数定义中的T,这里即是List中的元素。所以acc._1 + number,acc._2 + 1的过程如下。

1.  0+1,  0+1
2.  1+2,  1+1
3.  3+3,  2+1
4.  6+4,  3+1
5.  10+5,  4+1
6.  15+6,  5+1
7.  21+7,  6+1
8.  28+8,  7+1
9.  36+9,  8+1

结果即是(45,9)。这里演示的是单线程计算过程,实际Spark执行中是分布式计算,可能会把List分成多个分区,假如3个,p1(1,2,3,4),p2(5,6,7,8),p3(9),经过计算各分区的的结果(10,4),(26,4),(9,1),这样,执行(par1,par2) =>(par1._1 + par2._1, par1._2 + par2._2)就是(10+26+9,4+4+1)即(45,9),再求平均值就简单了。

2. aggregateByKey函数:

对PairRDD中相同的Key值进行聚合操作,在聚合过程中同样使用了一个中立的初始值。和aggregate函数类似,aggregateByKey返回值的类型不需要和RDD中value的类型一致。因为aggregateByKey是对相同Key中的值进行聚合操作,所以aggregateByKey'函数最终返回的类型还是PairRDD,对应的结果是Key和聚合后的值,而aggregate函数直接返回的是非RDD的结果。

例子程序:

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

object AggregateByKeyOp {
 def main(args:Array[String]){
   val sparkConf: SparkConf = new SparkConf().setAppName("AggregateByKey").setMaster("local")
  val sc: SparkContext = new SparkContext(sparkConf)
   
   val data=List((1,3),(1,2),(1,4),(2,3))
   val rdd=sc.parallelize(data, 2)
   
   //合并不同partition中的值,a,b得数据类型为zeroValue的数据类型
   def combOp(a:String,b:String):String={
    println("combOp: "+a+"\t"+b)
    a+b
   }
   //合并在同一个partition中的值,a的数据类型为zeroValue的数据类型,b的数据类型为原value的数据类型
   def seqOp(a:String,b:Int):String={
    println("SeqOp:"+a+"\t"+b)
    a+b
   }
   rdd.foreach(println)
   //zeroValue:中立值,定义返回value的类型,并参与运算
   //seqOp:用来在同一个partition中合并值
   //combOp:用来在不同partiton中合并值
   val aggregateByKeyRDD=rdd.aggregateByKey("100")(seqOp, combOp)
   sc.stop()
 }
}

运行结果:

将数据拆分成两个分区

//分区一数据
(1,3)
(1,2)
//分区二数据
(1,4)
(2,3)

//分区一相同key的数据进行合并
seq: 100     3   //(1,3)开始和中立值进行合并  合并结果为 1003
seq: 1003     2   //(1,2)再次合并 结果为 10032

//分区二相同key的数据进行合并
seq: 100     4  //(1,4) 开始和中立值进行合并 1004
seq: 100     3  //(2,3) 开始和中立值进行合并 1003

将两个分区的结果进行合并
//key为2的,只在一个分区存在,不需要合并 (2,1003)
(2,1003)

//key为1的, 在两个分区存在,并且数据类型一致,合并
comb: 10032     1004
(1,100321004)

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持脚本之家。

相关文章

  • SpringBoot实现日志链路追踪的项目实践

    SpringBoot实现日志链路追踪的项目实践

    在分布式系统中,由于请求的处理过程可能会跨越多个服务,因此,对请求的追踪变得尤为重要,本文主要介绍了SpringBoot实现日志链路追踪的项目实践,感兴趣的可以了解一下
    2024-03-03
  • Java实现Floyd算法的示例代码

    Java实现Floyd算法的示例代码

    Floyd算法又称为插点法,是一种利用动态规划的思想寻找给定的加权图中多源点之间最短路径的算法。本文将用Java语言实现Floyd算法,需要的可以参考一下
    2022-07-07
  • Java实现深度优先搜索(DFS)和广度优先搜索(BFS)算法

    Java实现深度优先搜索(DFS)和广度优先搜索(BFS)算法

    深度优先搜索(DFS)和广度优先搜索(BFS)是两种基本的图搜索算法,可用于图的遍历、路径搜索等问题。DFS采用栈结构实现,从起点开始往深处遍历,直到找到目标节点或遍历完整个图;BFS采用队列结构实现,从起点开始往广处遍历,直到找到目标节点或遍历完整个图
    2023-04-04
  • 基于从request获取各种路径的方法介绍

    基于从request获取各种路径的方法介绍

    下面小编就为大家分享一篇基于从request获取各种路径的方法介绍,具有很好的参考价值,希望对大家有所帮助
    2017-11-11
  • java基础理论Stream管道流Map操作示例

    java基础理论Stream管道流Map操作示例

    这篇文章主要未大家介绍了java基础理论Stream管道流Map操作方法示例解析,有需要的朋友可以借鉴参考下希望能够有所帮助,祝大家多多进步
    2022-03-03
  • Java实现在线预览的示例代码(openOffice实现)

    Java实现在线预览的示例代码(openOffice实现)

    本篇文章主要介绍了Java实现在线预览的示例代码(openOffice实现),小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2017-11-11
  • 关于Java中@SuppressWarnings的正确使用方法

    关于Java中@SuppressWarnings的正确使用方法

    这篇文章主要介绍了关于Java中@SuppressWarnings的正确使用方法,@SuppressWarnings注解主要用在取消一些编译器产生的警告对代码左侧行列的遮挡,有时候这会挡住我们断点调试时打的断点,需要的朋友可以参考下
    2023-05-05
  • Spring Boot整合FTPClient线程池的实现示例

    Spring Boot整合FTPClient线程池的实现示例

    这篇文章主要介绍了Spring Boot整合FTPClient线程池的实现示例,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2018-12-12
  • java并发中DelayQueue延迟队列原理剖析

    java并发中DelayQueue延迟队列原理剖析

    DelayQueue队列是一个延迟队列,本文将结合实例代码,详细的介绍DelayQueue延迟队列的源码分析,感兴趣的小伙伴们可以参考一下
    2021-06-06
  • mybatis多表查询的实现(xml方式)

    mybatis多表查询的实现(xml方式)

    本文主要介绍了mybatis多表查询的实现(xml方式),文中通过示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2022-03-03

最新评论