package cn.allengao.exercise import org.apache.spark.{SparkConf, SparkContext} /** * class_name: * package: * describe: SparkRDD算子练习 * creat_user: Allen Gao * creat_date: 2018/1/25 * creat_time: 10:04 **/ object SparkRDDTest { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("SparkRDDTest").setMaster("local") val sc = new SparkContext(conf) //通过并行化生成rdd val rdd1 = sc.parallelize(List(5, 6, 4, 7, 3, 8, 2, 9, 1, 10)) //对rdd1里面的每一个元素乘以2然后排序,true表示正序排序,false表示倒序排序 val res1 = rdd1.map(_ * 2).sortBy(x => x, true) //过滤出大于等于10的元素 val res2 = res1.filter(_ >= 10) val rdd2 = sc.parallelize(Array("a b c", "d e f", "h i j")) //将rdd2里面的每一个元素先切分再压平 val res3 = rdd2.flatMap(_.split(" ")) val rdd3 = sc.parallelize(List(List("a b c", "a b b"), List("e f g", "a f g"), List("h i j", "a a b"))) //将rdd3里面的每一个元素先切分再压平 val res4 = rdd3.flatMap(_.flatMap(_.split(" "))) val rdd4 = sc.parallelize(List(5, 6, 4, 3)) val rdd5 = sc.parallelize(List(1, 2, 3, 4)) //求并集 val res5 = rdd4.union(rdd5) //求交集 val res6 = rdd4.intersection(rdd5) //去重 val res7 = rdd4.union(rdd5).distinct() val rdd6 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2))) val rdd7 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2))) //求join val res8 = rdd6.join(rdd7) //求左连接和右连接 val res9 = rdd6.leftOuterJoin(rdd7) val res10 = rdd6.rightOuterJoin(rdd7) //求并集,这里注意到不用 “.union” 也可以,非常强大 val res11 = rdd6 union (rdd7) //按key进行分组 val res12 = res11.groupByKey() //分别用groupByKey和reduceByKey实现单词计数,注意groupByKey与reduceByKey的区别 //groupByKey val res13 = res11.groupByKey().mapValues(_.sum) /* groupByKey:groupByKey会对每一个RDD中的value值进行聚合形成一个序列(Iterator), 此操作发生在reduce端,所以势必会将所有的数据通过网络进行传输,造成不必要的浪费。 同时如果数据量十分大,可能还会造成OutOfMemoryError。 */ //reduceByValue,先进行局部聚合,再进行全局聚合 val res14 = res11.reduceByKey(_ + _) /*reduceByKey:reduceByKey会在结果发送至reducer之前会对每个mapper在本地进行merge, 有点类似于在MapReduce中的combiner。这样做的好处在于,在map端进行一次reduce之后, 数据量会大幅度减小,从而减小传输,保证reduce端能够更快的进行结果计算。 */ val rdd8 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2))) val rdd9 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2))) //cogroup 注意cogroup与groupByKey的区别 val res15 = rdd8.cogroup(rdd9) val rdd10 = sc.parallelize(List(1, 2, 3, 4, 5)) //reduce聚合 val res16 = rdd10.reduce(_ + _) val rdd11 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2), ("shuke", 1))) val rdd12 = sc.parallelize(List(("jerry", 2), ("tom", 3), ("shuke", 2), ("kitty", 5))) val rdd13 = rdd11.union(rdd12) //按key进行聚合 val res17 = rdd13.reduceByKey(_ + _) //按value的降序排序 val res18 = rdd13.reduceByKey(_ + _).map(t=>(t._2,t._1)).sortByKey(false).map(t=>(t._2,t._1)) /* 笛卡尔积: 笛卡尔乘积是指在数学中,两个集合X和Y的笛卡尓积(Cartesian product),又称直积, 表示为X×Y,第一个对象是X的成员而第二个对象是Y的所有可能有序对的其中一个成员[3] 。 假设集合A={a, b},集合B={0, 1, 2},则两个集合的笛卡尔积为{(a, 0), (a, 1), (a, 2), (b, 0), (b, 1), (b, 2)}。 */ val res19 = rdd11.cartesian(rdd12) //要通过action类型的算子才能显示出结果,将结果放到可变数组中,就可以看到输出结果, // 如果不加toBuffer,则打印出来的是一个引用。 //执行结果:ArrayBuffer(2, 4, 6, 8, 10, 12, 14, 16, 18, 20) // println(res1.collect().toBuffer) //执行结果:ArrayBuffer(10,12, 14, 16, 18, 20) // println(res2.collect().toBuffer) //将元素以数组的方式打印出来 //执行结果:ArrayBuffer(a, b, c, d, e, f, h, i, j) // println(res3.collect().toBuffer) //执行结果:ArrayBuffer(a, b, c, a, b, b, e, f, g, a, f, g, h, i, j, a, a, b) // println(res4.collect().toBuffer) //执行结果:ArrayBuffer(5, 6, 4, 3, 1, 2, 3, 4) // println(res5.collect().toBuffer) //执行结果:ArrayBuffer(4, 3) // println(res6.collect().toBuffer) //执行结果:ArrayBuffer(4, 6, 2, 1, 3, 5) // println(res7.collect().toBuffer) //执行结果:ArrayBuffer((tom,(1,1)), (jerry,(3,2))) // println(res8.collect().toBuffer) //执行结果:ArrayBuffer((tom,(1,Some(1))), (jerry,(3,Some(2))), (kitty,(2,None))) // println(res9.collect().toBuffer) //执行结果:ArrayBuffer((tom,(Some(1),1)), (jerry,(Some(3),2)), (shuke,(None,2))) // println(res10.collect().toBuffer) //执行结果:ArrayBuffer((tom,1), (jerry,3), (kitty,2), (jerry,2), (tom,1), (shuke,2)) // println(res11.collect().toBuffer) //执行结果:ArrayBuffer((tom,CompactBuffer(1, 1)), (jerry,CompactBuffer(3, 2)), (shuke,CompactBuffer(2)), (kitty,CompactBuffer(2))) // println(res12.collect().toBuffer) //执行结果:ArrayBuffer((tom,2), (jerry,5), (shuke,2), (kitty,2)) // println(res13.collect().toBuffer) //执行结果:ArrayBuffer((tom,2), (jerry,5), (shuke,2), (kitty,2)) // println(res14.collect().toBuffer) //执行结果:ArrayBuffer((tom,(CompactBuffer(1, 2),CompactBuffer(1))), (jerry,(CompactBuffer(3),CompactBuffer(2))), (shuke,(CompactBuffer(),CompactBuffer(2))), (kitty,(CompactBuffer(2),CompactBuffer()))) // println(res15.collect().toBuffer) //执行结果:15 //println(res16) //执行结果:ArrayBuffer((tom,4), (jerry,5), (shuke,3), (kitty,7)) // println(res17.collect().toBuffer) //执行结果:ArrayBuffer((kitty,7), (jerry,5), (tom,4), (shuke,3)) // println(res18.collect().toBuffer) /* 执行结果:ArrayBuffer(((tom,1),(jerry,2)), ((tom,1),(tom,3)), ((tom,1),(shuke,2)), ((tom,1),(kitty,5)), ((jerry,3),(jerry,2)), ((jerry,3),(tom,3)), ((jerry,3),(shuke,2)), ((jerry,3),(kitty,5)), ((kitty,2),(jerry,2)), ((kitty,2),(tom,3)), ((kitty,2),(shuke,2)), ((kitty,2),(kitty,5)), ((shuke,1),(jerry,2)), ((shuke,1),(tom,3)), ((shuke,1),(shuke,2)), ((shuke,1),(kitty,5))) */ println(res19.collect().toBuffer) } }