博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
SparkStreaming(12):高级数据源kafka Receiver方式(生产)
阅读量:4280 次
发布时间:2019-05-27

本文共 2090 字,大约阅读时间需要 6 分钟。

1.准备环境

(1) 启动zk

    bin/zkServer.sh start

(2)启动kafka

    bin/kafka-server-start.sh -daemon config/server.properties 

(3)创建topic

bin/kafka-topics.sh --create --topic kafka_streaming_topic --zookeeper hadoop:2181/kafka08 --partitions 1 --replication-factor 1

查看

bin/kafka-topics.sh --list --zookeeper hadoop:2181/kafka08

(4)测试kafka可以正常接收产生的消息,并且消费

  生产者

bin/kafka-console-producer.sh --broker-list hadoop:9092 --topic kafka_streaming_topic

  消费:

bin/kafka-console-consumer.sh --topic kafka_streaming_topic --zookeeper hadoop:2181/kafka08    

(经测试,成功!)

2.开发代码

(1)pom依赖

【参考:http://spark.apache.org/docs/2.1.0/streaming-kafka-0-8-integration.html】

    
   
org.apache.spark
   
spark-streaming-kafka-0-8_2.11
   
2.1.0
   

(2)代码

package Sparkimport org.apache.spark.SparkConfimport org.apache.spark.streaming.dstream.ReceiverInputDStreamimport org.apache.spark.streaming.kafka.KafkaUtilsimport org.apache.spark.streaming.{Seconds, StreamingContext}/**  */object KafkaReceiverWordCount_product {  def main(args: Array[String]): Unit = {    if(args.length!=4){      System.err.println("Usage: KafkaReceiverWordCount 
") } val Array(zkQuorum,group,topics,numThreads)=args //因为这个是生产环境,所以注释 val sparkConf=new SparkConf() val ssc=new StreamingContext(sparkConf,Seconds(5)) val topicMap=topics.split(",").map((_,numThreads.toInt)).toMap //TODO: Spark streaming如何对接kafka //参考源码createStream val messages: ReceiverInputDStream[(String, String)] =KafkaUtils.createStream(ssc,zkQuorum,group,topicMap) //取第2个 messages.map(_._2).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print() ssc.start() ssc.awaitTermination() }}

3.测试

(1)jar包放入

/opt/datas/lib/scalaProjectMaven.jar

(2)打开hdfs

(3)提交spark任务

bin/spark-submit \--class Spark.KafkaReceiverWordCount_product \--master local[2] \--name KafkaReceiverWordCount_product \--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.0 \/opt/datas/lib/scalaProjectMaven.jar  hadoop:2181/kafka08 test kafka_streaming_topic 1

(经测试成功!)

转载地址:http://ytygi.baihongyu.com/

你可能感兴趣的文章
Linux电源管理(6)_Generic PM之Suspend功能
查看>>
Linux电源管理(7)_Wakeup events framework
查看>>
Linux电源管理(8)_Wakeup count功能
查看>>
Linux电源管理(9)_wakelocks
查看>>
Linux电源管理(10)_autosleep
查看>>
Linux电源管理(11)_Runtime PM之功能描述
查看>>
linux下,如何debug Suspend and resume
查看>>
Linux时间子系统之(一):时间的基本概念
查看>>
Linux时间子系统之(二):软件架构
查看>>
objdump命令的使用
查看>>
linux下如何打开core dump
查看>>
Linux时间子系统之(三):用户空间接口函数
查看>>
Linux时间子系统之(四):timekeeping
查看>>
How to modify linux time
查看>>
repo 的使用
查看>>
cmake 命令 安装,用法简介
查看>>
Intel Edison C++ 开发之I2C-使用MRAA库进行C/C++开发
查看>>
Intel Edison C++ 开发之I2C-深入MRAA开发
查看>>
android中wifi的上下层的连接、命令发送
查看>>
Android平台开发-WIFI function porting-WIFI功能移植
查看>>