package cn.allengao.exercise import org.apache.spark.{SparkConf, SparkContext} object SparkRDDtest3 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("SparkRDDTest3").setMaster("local") val sc = new SparkContext(conf) /* combineByKey : 和reduceByKey是相同的效果 ###第一个参数x:原封不动取出来, 第二个参数:是函数, 局部运算, 第三个:是函数, 对局部运算后的结果再做运算 ###每个分区中每个key中value中的第一个值, (hello,1)(hello,1)(good,1)-->(hello(1,1),good(1))-->x就相当于hello的第一个1, good中的1 */ val rdd1 = sc.parallelize(List(("hello", 1), ("hello", 1), ("good", 1))) val rdd2 = rdd1.combineByKey(x => x, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n) //运行结果:ArrayBuffer((hello,1), (hello,1), (good,1)) // println(rdd1.collect.toBuffer) //运行结果:ArrayBuffer((hello,2), (good,1)) // println(rdd2.collect.toBuffer) // ###当第一个参数变成 x+10 相当于hello的初始计算值是10+1,即11,good的初始计算值是10+1,也是11。 val rdd3 = rdd1.combineByKey(x => x + 10, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n) //运行结果:ArrayBuffer((hello,12), (good,11)) // println(rdd3.collect.toBuffer) val rdd4 = sc.parallelize(List("dog", "cat", "gnu", "salmon", "rabbit", "turkey", "wolf", "bear", "bee"), 3) val rdd5 = sc.parallelize(List(1, 1, 2, 2, 2, 1, 2, 2, 2), 3) //拉链操作,形成map val rdd6 = rdd5.zip(rdd4) //将数量是 1 的放在一起,将数量是 2 的放在一起。 val rdd7 = rdd6.combineByKey(List(_), (x: List[String], y: String) => x :+ y, (m: List[String], n: List[String]) => m ++ n) //运行结果:ArrayBuffer((1,dog), (1,cat), (2,gnu), (2,salmon), (2,rabbit), (1,turkey), (2,wolf), (2,bear), (2,bee)) // println(rdd6.collect().toBuffer) //运行结果:ArrayBuffer((1,List(dog, cat, turkey)), (2,List(gnu, salmon, rabbit, wolf, bear, bee))) // println(rdd7.collect().toBuffer) //repartition val rdd8 = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9), 2) val func1 = (index: Int, iter: Iterator[(Int)]) => { iter.toList.map(x => "[partID:" + index + ", val: " + x + "]").iterator } val res1 = rdd8.mapPartitionsWithIndex(func1) /* 运行结果:ArrayBuffer([partID:0, val: 1], [partID:0, val: 2], [partID:0, val: 3], [partID:0, val: 4], [partID:1, val: 5], [partID:1, val: 6], [partID:1, val: 7], [partID:1, val: 8], [partID:1, val: 9]) */ // println(res1.collect().toBuffer) val rdd9 = rdd8.repartition(3) //运行结果:3 ,分区数变为3。 // println(rdd9.partitions.length) val res2 = rdd9.mapPartitionsWithIndex(func1) /* 运行结果:ArrayBuffer([partID:0, val: 3], [partID:0, val: 7], [partID:1, val: 1], [partID:1, val: 4], [partID:1, val: 5], [partID:1, val: 8], [partID:2, val: 2], [partID:2, val: 6], [partID:2, val: 9]) */ // println(res2.collect().toBuffer) //coalesce,默认数据不进行shuffle,则分区数量不变,true表示进行shuffle操作,分区数量根据参数改变 val rdd10 = rdd8.coalesce(3, true) //运行结果:3 // println(rdd10.partitions.length) val res3 = rdd10.mapPartitionsWithIndex(func1) /* 运行结果:ArrayBuffer([partID:0, val: 3], [partID:0, val: 7], [partID:1, val: 1], [partID:1, val: 4], [partID:1, val: 5], [partID:1, val: 8], [partID:2, val: 2], [partID:2, val: 6], [partID:2, val: 9]) */ // println(res3.collect().toBuffer) //collectAsMap : Map(b -> 2, a -> 1) val rdd11 = sc.parallelize(List(("a", 1), ("b", 2))) val res4 = rdd11.collectAsMap //运行结果:ArrayBuffer((a,1), (b,2)) // println(rdd11.collect().toBuffer) //运行结果:Map(b -> 2, a -> 1) // println(res4) //countByKey 计算key的数量 val rdd12 = sc.parallelize(List(("a", 1), ("b", 2), ("b", 2), ("c", 2), ("c", 1))) val res5 = rdd12.countByKey //countByValue 计算(key,value)的数量 val res6 = rdd12.countByValue //Map(a -> 1, b -> 2, c -> 2) // println(res5) //Map((b,2) -> 2, (c,2) -> 1, (a,1) -> 1, (c,1) -> 1) // println(res6) //filterByRange 范围过滤 val rdd13 = sc.parallelize(List(("e", 5), ("c", 3), ("d", 4), ("c", 2), ("a", 1), ("b", 6))) val res7 = rdd13.filterByRange("b", "d").collect() //运行结果:ArrayBuffer((c,3), (d,4), (c,2), (b,6)) // println(res7.toBuffer) // flatMapValues : val rdd14 = sc.parallelize(List(("a", "1 2"), ("b", "3 4"))) val res8 = rdd14.flatMapValues(_.split(" ")).collect() //运行结果:ArrayBuffer((a,1), (a,2), (b,3), (b,4)) // println(res8.toBuffer) // foldByKey val rdd15 = sc.parallelize(List("dog", "wolf", "cat", "bear"), 2) val rdd16 = rdd15.map(x => (x.length, x)) // _+_ 表示字符串的拼接 val rdd17 = rdd16.foldByKey(" ")(_ + _) //运行结果:ArrayBuffer((3,dog), (4,wolf), (3,cat), (4,bear)) // println(rdd16.collect().toBuffer) //运行结果:ArrayBuffer((4, wolf bear), (3, dog cat)) // println(rdd17.collect().toBuffer) // foreachPartition 不会生成一个新的RDD val rdd18 = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9), 3) val res9 = rdd18.foreachPartition(x => println(x.reduce(_ + _))) //运行结果: 6 15 24 // print(res9) //keyBy : 以传入的参数做key val rdd19 = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3) //以单词的长度作为key val res10 = rdd19.keyBy(_.length).collect() //以第一个字母作为key val res11 = rdd19.keyBy(_ (0)).collect() //运行结果:ArrayBuffer((3,dog), (6,salmon), (6,salmon), (3,rat), (8,elephant)) // println(res10.toBuffer) //运行结果:ArrayBuffer((d,dog), (s,salmon), (s,salmon), (r,rat), (e,elephant)) // println(res11.toBuffer) //keys values val rdd20 = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2) val rdd21 = rdd20.map(x => (x.length, x)) val res12 = rdd21.keys.collect val res13 = rdd21.values.collect //运行结果:ArrayBuffer(3, 5, 4, 3, 7, 5) println(res12.toBuffer) //运行结果:ArrayBuffer(dog, tiger, lion, cat, panther, eagle) println(res13.toBuffer) } }