SparkStreaming通过Flume获取数据(单机,push和poll两种方式)的实现

Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。

1、第一种方式,通过push的方式读取数据。

首先在一台虚拟机上安装flume1.8.0,vi /etc/profile,添加配置信息

# flume-1.8.0 config
export FLUME_HOME=/home/hadoop/app/flume

export PATH=$FLUME_HOME/bin:$PATH

进入flume的conf文件夹 cp flume-env.sh.template flume-env.sh,添加配置

export JAVA_HOME=/home/hadoop/app/jdk

新建文件flume-push.conf,添加配置文件信息:

# Name the components on this agent  
a1.sources = r1  
a1.sinks = k1  
a1.channels = c1  
  
# source  
a1.sources.r1.type = spooldir  
a1.sources.r1.spoolDir = /home/hadoop/data/flume  
a1.sources.r1.fileHeader = true  
  
# Describe the sink  
a1.sinks.k1.type = avro  
#这是接收方  
a1.sinks.k1.hostname = 192.168.119.1  
a1.sinks.k1.port = 8888  
  
# Use a channel which buffers events in memory  
a1.channels.c1.type = memory  
a1.channels.c1.capacity = 1000  
a1.channels.c1.transactionCapacity = 100  
  
# Bind the source and sink to the channel  
a1.sources.r1.channels = c1  
a1.sinks.k1.channel = c1  

编写spark程序:

package cn.allengao.stream  
  
import org.apache.spark.SparkConf  
import org.apache.spark.streaming.flume.FlumeUtils  
import org.apache.spark.streaming.{Seconds, StreamingContext}  
  
/**  
 * class_name:   
 * package:   
 * describe: Push方式读取数据  
 * creat_user: Allen Gao  
 * creat_date: 2018/2/12  
 * creat_time: 11:38  
 **/  
object FlumePushWordCount {  
  
  def main(args: Array[String]) {  
//    val host = args(0)  
  //  val port = args(1).toInt  
    LoggerLevels.setStreamingLogLevels()  
    val conf = new SparkConf().setAppName("FlumeWordCount").setMaster("local[2]")  
    val ssc = new StreamingContext(conf, Seconds(5))  
    //推送方式: flume向spark发送数据  
//    val flumeStream = FlumeUtils.createStream(ssc, host, port)  
    val flumeStream = FlumeUtils.createStream(ssc, "192.168.119.1", 8888)  
    //flume中的数据通过event.getBody()才能拿到真正的内容  
    val words = flumeStream.flatMap(x => new String(x.event.getBody().array()).split(" ")).map((_, 1))  
  
    val results = words.reduceByKey(_ + _)  
    results.print()  
    ssc.start()  
    ssc.awaitTermination()  
  }  
}  

进入flume,运行bin/flume-ng agent -n a1 -c conf -f conf/flume-push.conf ,启动flume程序。运行spark程序。

进入flume采集数据的文件夹,新建aaa.log文件,添加内容:

hello tom hello jerry ,如果连接成功,可以在idea中看到如下运行结果:

——————————————-
Time: 1518400235000 ms
——————————————-
(tom,1)
(hello,2)
(jerry,1)

2、第二种方式,通过poll的方式读取数据。

进入flume的conf文件夹,新建文件flume-push.conf,添加配置文件信息:

# Name the components on this agent  
a1.sources = r1  
a1.sinks = k1  
a1.channels = c1  
  
# source  
a1.sources.r1.type = spooldir  
a1.sources.r1.spoolDir = /home/hadoop/data/flume  
a1.sources.r1.fileHeader = true  
  
# Describe the sink  
a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink  
a1.sinks.k1.hostname = 192.168.119.51  
a1.sinks.k1.port = 8888  
  
# Use a channel which buffers events in memory  
a1.channels.c1.type = memory  
a1.channels.c1.capacity = 1000  
a1.channels.c1.transactionCapacity = 100  
  
# Bind the source and sink to the channel  
a1.sources.r1.channels = c1  
a1.sinks.k1.channel = c1  

编写spark程序:

package cn.allengao.stream  
  
import java.net.InetSocketAddress  
  
import org.apache.spark.SparkConf  
import org.apache.spark.storage.StorageLevel  
import org.apache.spark.streaming.flume.FlumeUtils  
import org.apache.spark.streaming.{Seconds, StreamingContext}  
/**  
 * class_name:   
 * package:   
 * describe: poll的方式读取数据,使用较多。  
 * creat_user: Allen Gao  
 * creat_date: 2018/2/12  
 * creat_time: 11:41  
 **/  
object FlumePollWordCount {  
  def main(args: Array[String]) {  
    val conf = new SparkConf().setAppName("FlumePollWordCount").setMaster("local[2]")  
    val ssc = new StreamingContext(conf, Seconds(5))  
    //从flume中拉取数据(flume的地址),可以设置多个flume数据地址。  
    val address = Seq(new InetSocketAddress("192.168.119.51", 8888))  
    val flumeStream = FlumeUtils.createPollingStream(ssc, address, StorageLevel.MEMORY_AND_DISK)  
    val words = flumeStream.flatMap(x => new String(x.event.getBody().array()).split(" ")).map((_,1))  
    val results = words.reduceByKey(_+_)  
    results.print()  
    ssc.start()  
    ssc.awaitTermination()  
  }  
}  

进入flume,运行bin/flume-ng agent -n a1 -c conf -f conf/flume-poll.conf ,启动flume程序。运行spark程序。

将一个文件导入flume采集数据的文件夹,例如 cp /home/hadoop/files/wc/word1.log /home/hadoop/data/flume/a.txt。

如果连接成功,可以在idea中看到如下运行结果:

——————————————-
Time: 1518405530000 ms
——————————————-
(scala,1)
(tom,2)
(hello,5)
(java,1)
(jerry,1)

    这里要注意的是有可能发生flume正常运行,数据文件采集正常,spark程序运行正常,但是在idea运行结果中看不到数据,有可能是你的数据采集文件夹的权限设置不正确。我的权限设置是这样的:

drwxrwxr-x. 3 hadoop hadoop 163 2月  12 11:05 flume

请根据自己的机器配置进行修改。


Leave a Reply