Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。
1、第一种方式,通过push的方式读取数据。
首先在一台虚拟机上安装flume1.8.0,vi /etc/profile,添加配置信息
# flume-1.8.0 config
export FLUME_HOME=/home/hadoop/app/flume
export PATH=$FLUME_HOME/bin:$PATH
进入flume的conf文件夹 cp flume-env.sh.template flume-env.sh,添加配置
export JAVA_HOME=/home/hadoop/app/jdk
新建文件flume-push.conf,添加配置文件信息:
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # source a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /home/hadoop/data/flume a1.sources.r1.fileHeader = true # Describe the sink a1.sinks.k1.type = avro #这是接收方 a1.sinks.k1.hostname = 192.168.119.1 a1.sinks.k1.port = 8888 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
编写spark程序:
package cn.allengao.stream import org.apache.spark.SparkConf import org.apache.spark.streaming.flume.FlumeUtils import org.apache.spark.streaming.{Seconds, StreamingContext} /** * class_name: * package: * describe: Push方式读取数据 * creat_user: Allen Gao * creat_date: 2018/2/12 * creat_time: 11:38 **/ object FlumePushWordCount { def main(args: Array[String]) { // val host = args(0) // val port = args(1).toInt LoggerLevels.setStreamingLogLevels() val conf = new SparkConf().setAppName("FlumeWordCount").setMaster("local[2]") val ssc = new StreamingContext(conf, Seconds(5)) //推送方式: flume向spark发送数据 // val flumeStream = FlumeUtils.createStream(ssc, host, port) val flumeStream = FlumeUtils.createStream(ssc, "192.168.119.1", 8888) //flume中的数据通过event.getBody()才能拿到真正的内容 val words = flumeStream.flatMap(x => new String(x.event.getBody().array()).split(" ")).map((_, 1)) val results = words.reduceByKey(_ + _) results.print() ssc.start() ssc.awaitTermination() } }
进入flume,运行bin/flume-ng agent -n a1 -c conf -f conf/flume-push.conf ,启动flume程序。运行spark程序。
进入flume采集数据的文件夹,新建aaa.log文件,添加内容:
hello tom hello jerry ,如果连接成功,可以在idea中看到如下运行结果:
——————————————-
Time: 1518400235000 ms
——————————————-
(tom,1)
(hello,2)
(jerry,1)
2、第二种方式,通过poll的方式读取数据。
进入flume的conf文件夹,新建文件flume-push.conf,添加配置文件信息:
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # source a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /home/hadoop/data/flume a1.sources.r1.fileHeader = true # Describe the sink a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink a1.sinks.k1.hostname = 192.168.119.51 a1.sinks.k1.port = 8888 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
编写spark程序:
package cn.allengao.stream import java.net.InetSocketAddress import org.apache.spark.SparkConf import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.flume.FlumeUtils import org.apache.spark.streaming.{Seconds, StreamingContext} /** * class_name: * package: * describe: poll的方式读取数据,使用较多。 * creat_user: Allen Gao * creat_date: 2018/2/12 * creat_time: 11:41 **/ object FlumePollWordCount { def main(args: Array[String]) { val conf = new SparkConf().setAppName("FlumePollWordCount").setMaster("local[2]") val ssc = new StreamingContext(conf, Seconds(5)) //从flume中拉取数据(flume的地址),可以设置多个flume数据地址。 val address = Seq(new InetSocketAddress("192.168.119.51", 8888)) val flumeStream = FlumeUtils.createPollingStream(ssc, address, StorageLevel.MEMORY_AND_DISK) val words = flumeStream.flatMap(x => new String(x.event.getBody().array()).split(" ")).map((_,1)) val results = words.reduceByKey(_+_) results.print() ssc.start() ssc.awaitTermination() } }
进入flume,运行bin/flume-ng agent -n a1 -c conf -f conf/flume-poll.conf ,启动flume程序。运行spark程序。
将一个文件导入flume采集数据的文件夹,例如 cp /home/hadoop/files/wc/word1.log /home/hadoop/data/flume/a.txt。
如果连接成功,可以在idea中看到如下运行结果:
——————————————-
Time: 1518405530000 ms
——————————————-
(scala,1)
(tom,2)
(hello,5)
(java,1)
(jerry,1)
这里要注意的是有可能发生flume正常运行,数据文件采集正常,spark程序运行正常,但是在idea运行结果中看不到数据,有可能是你的数据采集文件夹的权限设置不正确。我的权限设置是这样的:
drwxrwxr-x. 3 hadoop hadoop 163 2月 12 11:05 flume
请根据自己的机器配置进行修改。