1、通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。
2、高吞吐量:即使是非常普通的硬件Kafka也可以支持每秒数百万的消息。
3、支持通过Kafka服务器和消费机集群来分区消息。
4、支持Hadoop并行数据加载。
Broker
Kafka集群包含一个或多个服务器,这种服务器被称为broker[5]
Topic
每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)
Partition
Partition是物理上的概念,每个Topic包含一个或多个Partition.
Producer
负责发布消息到Kafka broker
Consumer
消息消费者,向Kafka broker读取消息的客户端。
Consumer Group
每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。
JDK1.8
3台服务器:(CentOS 7.3)
192.168.119.51 hadoop001
192.168.119.52 hadoop002
192.168.119.53 hadoop003
1、安装zookeeper3.4.10
下载zookeeper,
wget http://mirrors.cnnic.cn/apache/zookeeper/zookeeper-3.4.6/zookeeper-3.4.10.tar.gz
解压,tar -zxvf zookeeper-3.4.10.tar.gz
mv命令重命名文件夹,改名为zookeeper。
修改配置文件
进入到解压好的目录里面的conf目录中,cd /home/hadoop/app/zookeeper/conf
tickTime=2000 initLimit=10 syncLimit=5 dataDir=/home/hadoop/data/zookeeper/zkdata dataLogDir=/home/hadoop/data/zookeeper/zkdatalog clientPort=2181 server.1=192.168.119.51:2888:3888 server.2=192.168.119.52:2888:3888 server.3=192.168.119.53:2888:3888
创建myid文件
#hadoop001
echo “1” > /home/hadoop/data/zookeeper/zkdata/myid
#hadoop002
echo “2” > /home/hadoop/data/zookeeper/zkdata/myid
#hadoop003
echo “3” > /home/hadoop/data/zookeeper/zkdata/myid
重要配置说明
1、myid文件和server.myid 在快照目录下存放的标识本台服务器的文件,他是整个zk集群用来发现彼此的一个重要标识。
2、zoo.cfg 文件是zookeeper配置文件 在conf目录里。
3、log4j.properties文件是zk的日志输出文件 在conf目录里用java写的程序基本上有个共同点日志都用log4j,来进行管理。
4、zkEnv.sh和zkServer.sh文件
zkServer.sh 主的管理程序文件
zkEnv.sh 是主要配置,zookeeper集群启动时配置环境变量的文件
5、还有一个需要注意
ZooKeeper server will not remove old snapshots and log files when using the default configuration (see autopurge below), this is the responsibility of the operator
zookeeper不会主动的清除旧的快照和日志文件,这个是操作者的责任。
启动zookeeper服务并查看
1、启动服务
#进入到Zookeeper的bin目录下
/zookeeper/bin
#启动服务(3台都需要操作)
./zkServer.sh start
2、检查服务状态
#检查服务器状态
./zkServer.sh status
启动成功后jps查看进程
[hadoop@hadoop002 conf]$ jps
1988 Jps
1287 QuorumPeerMain
说明zookeeper启动成功。
2、安装kafka0.11
下载压缩包,解压,重命名文件夹,进入文件夹的config目录
修改server.properties配置文件,三台机器配置相应修改。
broker.id分别修改为 broker.id=0,broker.id=1,broker.id=2,
host.name 为各机器的IP地址,分别改为192.168.119.51,192.168.119.52,192.168.119.53
分别在后台启动kafka的三个节点:
/home/hadoop/app/kafka/bin/kafka-server-start.sh /home/hadoop/app/kafka/config/server.properties >/dev/null 2>&1 &
jps查看进程启动情况:
[hadoop@hadoop003 conf]$ jps
1360 Kafka
1988 Jps
1287 QuorumPeerMain
有kafka进程,说明启动成功。
3、编写程序,从kafka采集数据
pom.xml文件中添加依赖
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka_2.10</artifactId> <version>1.6.1</version> </dependency>
编写程序:
package cn.allengao.stream import org.apache.log4j.{Logger, Level} import org.apache.spark.Logging object LoggerLevels extends Logging { def setStreamingLogLevels() { val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements if (!log4jInitialized) { logInfo("Setting log level to [WARN] for streaming example." + " To override add a custom log4j.properties to the classpath.") Logger.getRootLogger.setLevel(Level.WARN) } } }
package cn.allengao.stream import org.apache.spark.storage.StorageLevel import org.apache.spark.{HashPartitioner, SparkConf} import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{Seconds, StreamingContext} /** * class_name: * package: * describe: sparkstreaming从kafka读取数据 * creat_user: Allen Gao * creat_date: 2018/2/12 * creat_time: 17:20 **/ object KafkaWordCount { val updateFunc = (iter: Iterator[(String, Seq[Int], Option[Int])]) => { //iter.flatMap(it=>Some(it._2.sum + it._3.getOrElse(0)).map(x=>(it._1,x))) iter.flatMap { case (x, y, z) => Some(y.sum + z.getOrElse(0)).map(i => (x, i)) } } def main(args: Array[String]) { LoggerLevels.setStreamingLogLevels() val Array(zkQuorum, group, topics, numThreads) = args val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]") val ssc = new StreamingContext(sparkConf, Seconds(5)) ssc.checkpoint("j://ck2") //"alog-2018-02-10,alog-2016-02-11,alog-2018-02-12" //"Array((alog-2018-02-10, 2), (alog-2016-02-11, 2), (alog-2018-02-12, 2))" val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap val data = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap, StorageLevel.MEMORY_AND_DISK_SER) val words = data.map(_._2).flatMap(_.split(" ")) val wordCounts = words.map((_, 1)).updateStateByKey(updateFunc, new HashPartitioner(ssc.sparkContext.defaultParallelism), true) wordCounts.print() ssc.start() ssc.awaitTermination() } }
配置运行参数:
这样,就创建了一个名为test03的topic。
在任一台机器运行命令:
bin/kafka-console-producer.sh –broker-list 192.168.119.51:9092 –topic test03
生产者开始生产数据
>hello tom hello jerry hello java hello
在idea中可以看到如下结果:
——————————————-
Time: 1518424550000 ms
——————————————-
(tom,1)
(hello,4)
(java,1)
(jerry,1)
说明SparkStreaming和kafka的连接通讯成功。