Spark aggregate 例子

// 先上个最简单的例子看下

scala> val l = List(1,2,3,4,5,6,7,8,9,10)
l: List[Int] = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

// aggregate (默认值) ( seqOp, comOp )

scala> l.aggregate(0)((x, y) => x + y, (x, y) => x + y)
res7: Int = 55

// 一般的介绍全都是 x, y 看的可难受了。。看了一会资料之后我感觉还是换个名字看着舒服些:

l.aggregate(0)((res, next) => res + next, (chunkA, chunkB) => chunkA + chunkB)
res9: Int = 55

// seqOp 中参数 res 是上一次计算的结果,默认值是在第一阶的函数中传的 0, next 是遍历的下一个元素
// comOp 中参数 chunA, chunkB 则是 scala 并发计算时不同线程算出来的结果,所以不同的 chunk 直接相加即可
// 理解了这个之后就很简单了

// 求平均值
scala> val tmp = l.aggregate(0, 0)((res, next) => (res._1 + next, res._2 + 1), (chunkA, chunkB) => (chunkA._1 + chunkB._1, chunkA._2 + chunkB._2))
res10: (Int, Int) = (55,10)

scala> val avg = tmp._1 / tmp._2.toDouble
avg: Double = 5.5
Advertisements

Mac 安装 scala & spark

# 确保已安装 java 7+

# spark 需要 scala 2.11.x (目前最新 2.12.x)
wget https://downloads.lightbend.com/scala/2.11.11/scala-2.11.11.tgz
tar zxvf scala-2.11.11.tgz
cd scala-2.11.11

# 配置环境变量
# export PATH="$PATH:`pwd`/bin"

# 检查安装是否成功
scala -version

# 本地模式设置 ip (for spark)
sudo hostname -s 127.0.0.1

# 安装 spark
wget https://d3kbcqa49mib13.cloudfront.net/spark-2.1.1-bin-hadoop2.7.tgz
tar zxvf spark-2.1.1-bin-hadoop2.7.tgz
cd spark-2.1.1-bin-hadoop2.7

# 启动 spark
./bin/spark-shell
# 没有报错即成功

然后就可以 hello world 了