Spark aggregate 例子

// 先上个最简单的例子看下

scala> val l = List(1,2,3,4,5,6,7,8,9,10)
l: List[Int] = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

// aggregate (默认值) ( seqOp, comOp )

scala> l.aggregate(0)((x, y) => x + y, (x, y) => x + y)
res7: Int = 55

// 一般的介绍全都是 x, y 看的可难受了。。看了一会资料之后我感觉还是换个名字看着舒服些:

l.aggregate(0)((res, next) => res + next, (chunkA, chunkB) => chunkA + chunkB)
res9: Int = 55

// seqOp 中参数 res 是上一次计算的结果,默认值是在第一阶的函数中传的 0, next 是遍历的下一个元素
// comOp 中参数 chunA, chunkB 则是 scala 并发计算时不同线程算出来的结果,所以不同的 chunk 直接相加即可
// 理解了这个之后就很简单了

// 求平均值
scala> val tmp = l.aggregate(0, 0)((res, next) => (res._1 + next, res._2 + 1), (chunkA, chunkB) => (chunkA._1 + chunkB._1, chunkA._2 + chunkB._2))
res10: (Int, Int) = (55,10)

scala> val avg = tmp._1 / tmp._2.toDouble
avg: Double = 5.5
Advertisements