SparkAPI Java版】JavaPairRDD——aggregateByKey(二)

网友投稿 832 2025-04-01

Aggregate the values of each key, using given combine functions and a neutral "zero value". This function can return a different result type, U, than the type of the values in this RDD, V. Thus, we need one operation for merging a V into a U and one operation for merging two U's. The former operation is used for merging values within a partition, and the latter is used for merging values between partitions. To avoid memory allocation, both of these functions are allowed to modify and return their first argument instead of creating a new U. Parameters: zeroValue - (undocumented) seqFunc - (undocumented) combFunc - (undocumented) Returns: (undocumented)

aggregateByKey函数对PairRDD中相同Key的值进行聚合操作,在聚合过程中同样使用了一个中立的初始值。和aggregate函数类似,aggregateByKey返回值的类型不需要和RDD中value的类型一致。因为aggregateByKey是对相同Key中的值进行聚合操作,所以aggregateByKey函数最终返回的类型还是Pair RDD,对应的结果是Key和聚合好的值;而aggregate函数直接是返回非RDD的结果,这点需要注意。在实现过程中,定义了三个aggregateByKey函数原型,但最终调用的aggregateByKey函数都一致。

// Scala def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)     (seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)] def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int)     (seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)] def aggregateByKey[U: ClassTag](zeroValue: U)     (seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)] // java public JavaPairRDD aggregateByKey(U zeroValue, Partitioner partitioner, Function2 seqFunc, Function2 combFunc) public JavaPairRDD aggregateByKey(U zeroValue, int numPartitions, Function2 seqFunc, Function2 combFunc) public JavaPairRDD aggregateByKey(U zeroValue, Function2 seqFunc, Function2 combFunc)

【SparkAPI JAVA版】JavaPairRDD——aggregateByKey(二)

第一个aggregateByKey函数我们可以自定义Partitioner。除了这个参数之外,其函数声明和aggregate很类似;其他的aggregateByKey函数实现最终都是调用这个。

第二个aggregateByKey函数可以设置分区的个数(numPartitions),最终用的是HashPartitioner。

最后一个aggregateByKey实现先会判断当前RDD是否定义了分区函数,如果定义了则用当前RDD的分区;如果当前RDD并未定义分区 ,则使用HashPartitioner。

public class AggregateByKey { public static void main(String[] args) { System.setProperty("hadoop.home.dir","F:\\hadoop-2.7.1"); SparkConf conf = new SparkConf().setMaster("local").setAppName("TestSpark"); JavaSparkContext sc = new JavaSparkContext(conf); JavaPairRDD javaPairRDD = sc.parallelizePairs(Lists.>newArrayList( new Tuple2("cat",3), new Tuple2("dog",33), new Tuple2("cat",16), new Tuple2("tiger",66)), 2); // 打印样例数据 javaPairRDD.foreach(new VoidFunction>() { public void call(Tuple2 stringIntegerTuple2) throws Exception { System.out.println("样例数据>>>>>>>" + stringIntegerTuple2); } }); JavaPairRDD javaPairRDD1 = javaPairRDD.aggregateByKey(14, new Function2() { public Integer call(Integer v1, Integer v2) throws Exception { System.out.println("seqOp>>>>> 参数One:"+v1+"--参数Two:"+v2); return Math.max(v1,v2); } }, new Function2() { public Integer call(Integer v1, Integer v2) throws Exception { System.out.println("combOp>>>>> 参数One:"+v1+"--参数Two:"+v2); return v1+v2; } }); // 打印结果数据 javaPairRDD1.foreach(new VoidFunction>() { public void call(Tuple2 stringIntegerTuple2) throws Exception { System.out.println("结果数据>>>>>>>" + stringIntegerTuple2); } }); } }

// 打印样例数据 这里的分区是两个 其中分区内都有一个相同key值 19/03/03 22:16:07 INFO Executor: Running task 0.0 in stage 0.0 (TID 0) 样例数据>>>>>>>(cat,3) 样例数据>>>>>>>(dog,33) 19/03/03 22:16:07 INFO Executor: Running task 1.0 in stage 0.0 (TID 1) 样例数据>>>>>>>(cat,16) 样例数据>>>>>>>(tiger,66) 19/03/03 22:16:07 INFO Executor: Running task 0.0 in stage 1.0 (TID 2) // 第一个分区比较大小 14 3 => 14(cat) , 14 33 => 33(dog) seqOp>>>>> 参数One:14--参数Two:3 seqOp>>>>> 参数One:14--参数Two:33 19/03/03 22:16:08 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 4 ms 19/03/03 22:16:07 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 2) in 76 ms on localhost (executor driver) (1/2) // 第二个分区比较 14 16 => 16(cat) ,14 66 => 66(tiger) seqOp>>>>> 参数One:14--参数Two:16 seqOp>>>>> 参数One:14--参数Two:66 19/03/03 22:16:08 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 4 ms // 这个就是combOp阶段 在不同分区内 相同key的值做聚合操作 也就是(cat)14 + (cat)16 = 30 combOp>>>>> 参数One:14--参数Two:16 // 最后结果 结果数据>>>>>>>(dog,33) 结果数据>>>>>>>(cat,30) 19/03/03 22:16:08 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms 结果数据>>>>>>>(tiger,66)

一定要记住: combOp 是聚合的不同分区相同key的值

从上述过程中,我们就能明白流程是什么了。

seqOp

开始我们的数据是:

分片1:(cat,3) (dog,33)

分片2:(cat,16) (tiger,66)

// 这里只有两个分片 所以写两个过程 第一个分片开始seqOp过程: 14(zeroValue) 和 3(cat) 比较 = 14(结果1), 14(zeroValue) 和 33(dog) 比较 = 14(结果2) 第二个分片开始元素聚合过程: 14(zeroValue) 和 16(cat) 比较 = 14(结果3), 14(zeroValue) 和 66(tiger) 比较 = 14(结果4)

combOp(不同分区相同key值)

开始分片combOp过程:cat在不同分区有相同key值 结果1 + 结果3 = 30(结果5) 最终得到的结果2 ,结果4,结果5 结果数据>>>>>>>(dog,33) 结果数据>>>>>>>(cat,30) 结果数据>>>>>>>(tiger,66)

如果有什么不明白的评论留言即可。

EI企业智能 Java spark 可信智能计算服务 TICS 智能数据

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:库存管理系统的来龙去脉
下一篇:“比较和合并工作簿”灰色选项不可用?90%的人不知道这个魔法技能的用法!
相关文章