Spark项目练习(计算用户访问学科子网页的top3)

项目说明:附件为要计算数据的demo。

learn

利用spark的缓存机制,读取需要筛选的数据,自定义一个分区器,将不同的学科数据分别放到一个分区器中,并且根据指定的学科,取出点击量前三的数据,并写入文件。

具体程序如下:

1、项目主程序:

package cn.allengao.Location

import java.net.URL

import org.apache.spark.rdd.RDD
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}
/**
 * class_name:
 * package:
 * describe: 缓存机制,自定义一个分区器,根据指定的学科, 取出点击量前三的,按照每种学科数据放到不同的分区器里
 * creat_user: Allen Gao
 * creat_date: 2018/1/30
 * creat_time: 11:21
 **/

object AdvUrlCount {

  def main(args: Array[String]) {

    //从数据库中加载规则
    //    val arr = Array("java.learn.com", "php.learn.com", "net.learn.com")

    val conf = new SparkConf().setAppName("AdvUrlCount").setMaster("local[2]")
    val sc = new SparkContext(conf)

    //获取数据
    val file = sc.textFile("j://information/learn.log")
    //提取出url并生成一个元祖,rdd1将数据切分,元组中放的是(URL, 1)
    val urlAndOne = file.map(line => {
      val fields = line.split("\t")
      val url = fields(1)
      (url, 1)
    })
    //把相同的url进行聚合
    val sumedUrl = urlAndOne.reduceByKey(_ + _)

    //获取学科信息缓存,提高运行效率
    val cachedProject = sumedUrl.map(x => {
      val url = x._1
      val project = new URL(url).getHost
      val count = x._2
      (project, (url, count))
    }).cache()

    //调用Spark自带的分区器此时会发生哈希碰撞,会有数据倾斜问题产生,需要自定义分区器
    //    val res = cachedProject.partitionBy(new HashPartitioner(3))
    //    res.saveAsTextFile("j://information//out")

    //得到所有学科
    val projects = cachedProject.keys.distinct().collect()
    //调用自定义分区器并得到分区号
    val partitioner = new ProjectPartitioner(projects)

    //分区
    val partitioned: RDD[(String, (String, Int))] = cachedProject.partitionBy(partitioner)

    //对每个分区的数据进行排序并取top3
    val res = partitioned.mapPartitions(it => {
      it.toList.sortBy(_._2._2).reverse.take(3).iterator
    })
    res.saveAsTextFile("j://information//out1")

    sc.stop()
  }
}

2、自定义分区器:

package cn.allengao.Location

import org.apache.spark.Partitioner

import scala.collection.mutable

class ProjectPartitioner(projects: Array[String]) extends Partitioner {
  //用来存放学科和分区号
  private val projectsAndPartNum = new mutable.HashMap[String,Int]()
  //计数器,用于指定分区号
  var n = 0

  for(pro<-projects){
    projectsAndPartNum += (pro -> n)
    n += 1
  }
  //得到分区数
  override def numPartitions = projects.length
  //得到分区号
  override def getPartition(key: Any) = {
    projectsAndPartNum.getOrElse(key.toString,0)
  }

}

运行结果:

Author: allengao

发表评论

电子邮件地址不会被公开。 必填项已用*标注