打开IntelliJ IDEA软件,新建maven项目,具体目录如下:
pom.xml文件配置如下:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>cn.allengao.spark</groupId> <artifactId>hello-spark</artifactId> <version>1.0</version> <properties> <maven.compiler.source>1.7</maven.compiler.source> <maven.compiler.target>1.7</maven.compiler.target> <encoding>UTF-8</encoding> <scala.version>2.10.6</scala.version> <spark.version>1.6.1</spark.version> <hadoop.version>2.6.4</hadoop.version> </properties> <dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> </dependencies> <build> <sourceDirectory>src/main/scala</sourceDirectory> <testSourceDirectory>src/test/scala</testSourceDirectory> <plugins> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.2</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> <configuration> <args> <arg>-make:transitive</arg> <arg>-dependencyfile</arg> <arg>${project.build.directory}/.scala_dependencies</arg> </args> </configuration> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.4.3</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> </configuration> </execution> </executions> </plugin> </plugins> </build> </project>
编写SparkWordCount程序:
package cn.allengao.hellospark import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /* 参数设置: Name:SparkWC Name class:cn.allengao.hellospark.SparkWC arguments: hdfs://hadoop001:9000/user/hadoop/input/wc hdfs://hadoop001:9000/output/wc */ object SparkWC { def main(args: Array[String]): Unit = { //配置信息类 /* 关于“local”参数的说明: 1、"local",最简单的本地模式,这种本地模式下,任务的失败重试次数为1,即失败不重试。 2、local[*]、local[N],指定线程个数的本地模式,指定方式及最终的线程数如下: 1)local[*]:当前处理器个数。 2)local[N]:指定的N。 这种本地模式下,任务的失败重试次数为1,即失败不重试。 3、local[*, M]、local[N, M] 指定线程个数以及失败重试次数的本地模式,仅比上一种本地模式多了一个失败重试次数的设置,对应为M。 4、local-cluster[numSlaves, coresPerSlave, memoryPerSlave] 本地伪分布式集群,由于本地模式下没有集群,因此需要构建一个用于模拟集群的实例:localCluster = new LocalSparkCluster。 对应的三个参数: numSlaves:模拟集群的Slave节点个数。 coresPerSlave:模拟集群的各个Slave节点上的内核数。 memoryPerSlave:模拟集群的各个Slave节点上的内存大小。 */ val conf :SparkConf = new SparkConf().setAppName("SparkWC").setMaster("local[*]") //上下文对象 val sc: SparkContext = new SparkContext(conf) //读取数据 val lines = sc.textFile(args(0)) //处理数据 val words: RDD[String] = lines.flatMap(_.split(" ")) val paired: RDD[(String, Int)] = words.map((_, 1)) //把相同key的value值聚合到一起。 val reduced: RDD[(String, Int)] = paired.reduceByKey(_+_) //以value作为排序方式,false表示倒序排列,true表示正序排列 val res: RDD[(String, Int)] = reduced.sortBy(_._2, false) //保存 // res.saveAsTextFile(args(1)) // toBuffer转换成可变数组,就可以打印出来了。 println(res.collect().toBuffer) //结束任务 sc.stop() } }
启动hadoop集群,我这里是用三台机器创建的集群,分别为:
hadoop001 : 192.168.119.51 hadoop002 : 192.168.119.52 hadoop003 : 192.168.119.53
在IDEA运行配置参数中输入读取HDFS的地址、文件参数:hdfs://hadoop001:9000/user/hadoop/input/wc
我在HDFS存储信息如下:
-rw-r–r– 2 hadoop supergroup 56 2018-01-23 17:15 input/wc/word1.log
-rw-r–r– 2 hadoop supergroup 44 2018-01-23 17:15 input/wc/word2.log
右键点击运行SparkWC程序,看到如下结果:
至此,读取HDFS资源,在本地运行的spark入门小程序wordcount讲解完毕。下一步通过spark集群运行该程序。