本文共 2090 字,大约阅读时间需要 6 分钟。
(1) 启动zk
bin/zkServer.sh start
(2)启动kafka
bin/kafka-server-start.sh -daemon config/server.properties
(3)创建topic
bin/kafka-topics.sh --create --topic kafka_streaming_topic --zookeeper hadoop:2181/kafka08 --partitions 1 --replication-factor 1
查看
bin/kafka-topics.sh --list --zookeeper hadoop:2181/kafka08
(4)测试kafka可以正常接收产生的消息,并且消费
生产者
bin/kafka-console-producer.sh --broker-list hadoop:9092 --topic kafka_streaming_topic
消费:
bin/kafka-console-consumer.sh --topic kafka_streaming_topic --zookeeper hadoop:2181/kafka08
(经测试,成功!)
(1)pom依赖
【参考:http://spark.apache.org/docs/2.1.0/streaming-kafka-0-8-integration.html】
org.apache.spark spark-streaming-kafka-0-8_2.11 2.1.0
(2)代码
package Sparkimport org.apache.spark.SparkConfimport org.apache.spark.streaming.dstream.ReceiverInputDStreamimport org.apache.spark.streaming.kafka.KafkaUtilsimport org.apache.spark.streaming.{Seconds, StreamingContext}/** */object KafkaReceiverWordCount_product { def main(args: Array[String]): Unit = { if(args.length!=4){ System.err.println("Usage: KafkaReceiverWordCount") } val Array(zkQuorum,group,topics,numThreads)=args //因为这个是生产环境,所以注释 val sparkConf=new SparkConf() val ssc=new StreamingContext(sparkConf,Seconds(5)) val topicMap=topics.split(",").map((_,numThreads.toInt)).toMap //TODO: Spark streaming如何对接kafka //参考源码createStream val messages: ReceiverInputDStream[(String, String)] =KafkaUtils.createStream(ssc,zkQuorum,group,topicMap) //取第2个 messages.map(_._2).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print() ssc.start() ssc.awaitTermination() }}
(1)jar包放入
/opt/datas/lib/scalaProjectMaven.jar
(2)打开hdfs
(3)提交spark任务
bin/spark-submit \--class Spark.KafkaReceiverWordCount_product \--master local[2] \--name KafkaReceiverWordCount_product \--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.0 \/opt/datas/lib/scalaProjectMaven.jar hadoop:2181/kafka08 test kafka_streaming_topic 1
转载地址:http://ytygi.baihongyu.com/