项目说明:附件为要计算数据的demo。
其中bs_log文件夹数据格式为(手机号,时间戳,基站ID,连接状态(“1”为连接,“0”为断开))
lac_info.txt 文件数据格式为(基站ID,经度,纬度,信号辐射类型)
程序思路:
1, 先根据”手机号,基站ID”构成一个元祖,做为唯一标识, 和时间戳构成新的数据结构->(手机号, 站点, 时间戳)
2、(手机号,基站ID)作为key,通过reduceByKey算子进行聚合,计算出在基站的停留时间,构成新的数据结构,以便和坐标数据进行join,->(基站ID,(手机号,停留时间))
3、将基站坐标数据信息通过map,构建成数据类型 ->(基站ID,(经度,纬度))
4、将2、3进行join操作,构成新的数据类型 ->(手机号,基站ID,停留时间,经度,纬度)
5、按手机号进行分组。->(手机号,(手机号,基站ID,停留时间,经度,纬度))
6、取出停留时间最长的两个基站ID。
具体程序如下:
package cn.allengao.Location import org.apache.spark.{SparkConf, SparkContext} /** * class_name: * package: * describe: 基站信息查询 * creat_user: Allen Gao * creat_date: 2018/1/29 * creat_time: 10:03 **/ /* * 说明: * 1, 先根据"手机号,基站ID"构成一个元祖,做为唯一标识, 和时间戳构成新的数据结构->(手机号, 站点, 时间戳) * 2、(手机号,基站ID)作为key,通过reduceByKey算子进行聚合,计算出在基站的停留时间,构成新的数据结构, * 以便和坐标数据进行join,->(基站ID,(手机号,停留时间)) * 3、将基站坐标数据信息通过map,构建成数据类型 ->(基站ID,(经度,纬度)) * 4、将2、3进行join操作,构成新的数据类型 ->(手机号,基站ID,停留时间,经度,纬度) * 5、按手机号进行分组。->(手机号,(手机号,基站ID,停留时间,经度,纬度)) * 6、取出停留时间最长的两个基站ID。 * */ object UserLocation { def main(args: Array[String]): Unit = { //创建Spark配置信息 val conf = new SparkConf().setAppName("UserLocation").setMaster("local[*]") //建立Spark上下文,并将配置信息导入 val sc = new SparkContext(conf) /* 基站连接手机号,连接时间戳,基站站点ID信息,“1”表示连接,“0”表示断开连接。 18688888888,20160327082400,16030401EAFB68F1E3CDF819735E1C66,1 */ //从log文件拿到数据,并按行采集。 //sc.textFile("c://information//bs_log").map(_.split(",")).map(x => (x(0), x(1), x(2), x(3))) val rdd_Info = sc.textFile("j://information//bs_log").map(line => { //通过“,”将数据进行切分field(0)手机号,field(1)时间戳,field(2)基站ID信息,field(3)事件类型 val fields = line.split(",") //事件类型,“1”表示连接,“0”表示断开。 val eventType = fields(3) val time = fields(1) //连接基站将时间戳至为“-”,断开基站将时间戳至为“+”,以便后面进行计算。 val timeLong = if(eventType == "1") -time.toLong else time.toLong //构成一个数据类型(手机号,基站ID信息,带符号的时间戳) ((fields(0),fields(2)),timeLong) }) val rdd_lacInfo = rdd_Info.reduceByKey(_+_).map(t=>{ val mobile = t._1._1 val lac = t._1._2 val time = t._2 (lac, (mobile, time)) }) val rdd_coordinate = sc.textFile("j://information//lac_info.txt").map(line =>{ val f = line.split(",") //(基站ID, (经度, 纬度)) (f(0),(f(1), f(2))) }) //rdd1.join(rdd2)-->(CC0710CC94ECC657A8561DE549D940E0,((18688888888,1300),(116.303955,40.041935))) val rdd_all = rdd_lacInfo.join(rdd_coordinate).map(t =>{ val lac = t._1 val mobile = t._2._1._1 val time = t._2._1._2 val x = t._2._2._1 val y = t._2._2._2 (mobile, lac, time, x, y) }) //按照手机号进行分组 val rdd_mobile = rdd_all.groupBy(_._1) //取出停留时间最长的前两个基站 val rdd_topTwo= rdd_mobile.mapValues(it =>{ it.toList.sortBy(_._3).reverse.take(2) }) // println(rdd_Info.collect().toBuffer) // println(rdd_lacInfo.collect().toBuffer) // println(rdd_coordinate.collect().toBuffer) // println(rdd_all.collect().toBuffer) // println(rdd_mobile.collect().toBuffer) // println(rdd_topTwo.collect().toBuffer) rdd_topTwo.saveAsTextFile("j://information//out") sc.stop() } }