文档章节

SparkStreaming整合kafka编程

四叶草666
 四叶草666
发布于 2017/07/19 10:09
字数 824
阅读 22
收藏 0
点赞 0
评论 0

1、下载spark-streaming-kafka插件包

由于Linux集群环境我使用spark是spark-2.1.1-bin-hadoop2.7,kafka是kafka_2.11-0.8.2.1,所以我下载的是spark-streaming-kafka-0-8_2.11-2.1.1.jar。

官网下载地址:http://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-8_2.11/2.1.1

百度云下载地址:链接:http://pan.baidu.com/s/1o83DOHO 密码:2dgx

2、整合spark和kafka的jar包

2.1添加spark-streaming-kafka插件包

新建一个lib目录,首先把1步骤下载的spark-streaming-kafka-0-8_2.11-2.1.1.jar放进去

如图:

2.2添加spark依赖包

找到spark-2.1.1-bin-hadoop2.7/jars目录下所有的jar包,如图:

把spark-2.1.1-bin-hadoop2.7/jars目录下所有的jar包复制到上述新建的lib目录下,如图:

2.3添加kafka依赖包

找到kafka_2.11-0.8.2.1/libs目录下所有的jar包,如图:

把kafka_2.11-0.8.2.1/libs目录下所有的jar包复制到上述新建的lib目录下,如图:

3、新建测试工程

新建scala project,引用上述lib目录下的所有jar包;新建一个KafkaWordCount.scala用于测试:

 
  1. import org.apache.spark.streaming.StreamingContext
  2. import org.apache.spark.SparkConf
  3. import org.apache.spark.streaming.kafka.KafkaUtils
  4. import org.apache.spark.streaming.Seconds
  5. import org.apache.spark.streaming.Minutes
  6. import org.apache.spark.SparkContext
  7. import kafka.serializer.StringDecoder
  8.  
  9. object KafkaWordCount {
  10.   def main(args: Array[String]) {
  11.     val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]")
  12.     sparkConf.set("spark.port.maxRetries","128")
  13.     val ssc = new StreamingContext(sparkConf, Seconds(2))
  14.     ssc.checkpoint("hdfs://192.168.168.200:9000/checkpoint")
  15.     val zkQuorum = "192.168.168.200:2181"
  16.     val group = "test-group"
  17.     val topics = "test"
  18.     val numThreads = 1
  19.     val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
  20.     val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
  21.     val words = lines.flatMap(_.split(" "))
  22.     val wordCounts = words.map(x => (x, 1L))
  23.       .reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
  24.     wordCounts.print()
  25.     ssc.start()
  26.     ssc.awaitTermination()          
  27.   }
  28. }

如图:

启动spark集群和kafka集群,默认已经开启,默认kafka有test主题,这是默认要会的,在这里不在详述。

运行成功,如图:

 
  1. SLF4J: Class path contains multiple SLF4J bindings.
  2. SLF4J: Found binding in [jar:file:/I:/001sourceCode/020SparkStreaming/%e5%a4%a7%e6%95%b0%e6%8d%ae%e5%bc%80%e5%8f%91%e6%96%b9%e6%a1%88%e8%b5%84%e6%96%99%ef%bc%88%e5%a4%a9%e7%bb%b4%e5%b0%94%ef%bc%89/%e5%bc%80%e5%8f%91%e6%89%80%e9%9c%80jar%e5%8c%85/lib/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
  3. SLF4J: Found binding in [jar:file:/I:/001sourceCode/020SparkStreaming/%e5%a4%a7%e6%95%b0%e6%8d%ae%e5%bc%80%e5%8f%91%e6%96%b9%e6%a1%88%e8%b5%84%e6%96%99%ef%bc%88%e5%a4%a9%e7%bb%b4%e5%b0%94%ef%bc%89/%e5%bc%80%e5%8f%91%e6%89%80%e9%9c%80jar%e5%8c%85/lib/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
  4. SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
  5. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
  6. -------------------------------------------
  7. Time: 1499667652000 ms
  8. -------------------------------------------
  9.  
  10. -------------------------------------------
  11. Time: 1499667654000 ms
  12. -------------------------------------------
  13.  
  14. -------------------------------------------
  15. Time: 1499667656000 ms
  16. -------------------------------------------

4、接收kafka的主题消息

启动一个kafka的生产者客户端:

 
  1. [root@master ~]# kafka-console-producer.sh --broker-list 192.168.168.200:9092 --topic test
  2. test success
  3. spark
  4. kafka

运行日志如下:

 
  1. -------------------------------------------
  2. Time: 1499667830000 ms
  3. -------------------------------------------
  4.  
  5. -------------------------------------------
  6. Time: 1499667832000 ms
  7. -------------------------------------------
  8. (test,1)
  9. (success,1)
  10.  
  11. -------------------------------------------
  12. Time: 1499667834000 ms
  13. -------------------------------------------
  14. (test,1)
  15. (success,1)
  16.  
  17. -------------------------------------------
  18. Time: 1499667836000 ms
  19. -------------------------------------------
  20. (test,1)
  21. (spark,1)
  22. (success,1)
  23.  
  24. -------------------------------------------
  25. Time: 1499667838000 ms
  26. -------------------------------------------
  27. (kafka,1)
  28. (test,1)
  29. (spark,1)
  30. (success,1)

5、sparkStreaming收不到kafka主题消息

如果出现kakfa的消费者客户端可以收到消息,而spark的消费者客户端收不到消息,后台也没有报错,那么要仔细检查kafka_home/conf目录下的server.properties,有没有配置:

 
  1. ############################# Socket Server Settings #############################
  2. # The port the socket server listens on
  3. port=9092
  4. # Hostname the broker will bind to. If not set, the server will bind to all interfaces
  5. host.name=192.168.168.200

一定要配置host.name,否则只能在kafk消费客户端收到消息,不能在sparkStreaming创建的topic消息客户端收到。

6、sbtassembly打包代码并上传到spark运行

可参考以下资料:

使用SBT构建Scala项目

本地开发spark代码上传spark集群服务并运行

 

© 著作权归作者所有

共有 人打赏支持
四叶草666
粉丝 0
博文 51
码字总数 50778
作品 0
深圳
程序员
sparkStreaming与Kafka整合

createStream那几个参数折腾了我好久。。网上都是一带而过,最终才搞懂..关于sparkStreaming的还是太少,最终尝试成功。。。 首先启动zookeeper ./bin/zookeeper-server-start.sh config/zook...

雪童子
2015/11/28
0
0
Flume+Kafka+SparkStreaming 最新最全整合

1.架构 第一步,Flume和Kakfa对接,Flume抓取日志,写到Kafka中 第二部,Spark Streaming读取Kafka中的数据,进行实时分析 本文首先使用Kakfa自带的消息处理(脚本)来获取消息,走通Flume和...

ericSM
2016/06/16
1K
0
Spark Streaming结合Flume、Kafka最新最全日志分析

Spark Streaming结合Flume、Kafka最新最全日志分析 1.修改相应的配置文件 按照 http://my.oschina.net/sunmin/blog/692994 整合安装Flume+Kafka+SparkStreaming 将flume/conf/producer.conf将......

ericSM
2016/06/23
556
0
使用Flume+Kafka+SparkStreaming进行实时日志分析

每个公司想要进行数据分析或数据挖掘,收集日志、ETL都是第一步的,今天就讲一下如何实时地(准实时,每分钟分析一次)收集日志,处理日志,把处理后的记录存入Hive中,并附上完整实战代码 ...

Trigl
2017/05/24
0
0
Kafka Streams 剖析

1.概述   Kafka Streams 是一个用来处理流式数据的库,属于Java类库,它并不是一个流处理框架,和Storm,Spark Streaming这类流处理框架是明显不一样的。那这样一个库是做什么的,能应用到...

smartloli
2017/09/14
0
0
spark spark streaming + kafka direct方式消费消息

kafka + spark streaming direct方式消费消息 前提: spark 安装成功,spark 1.6.0 zookeeper 安装成功 kafka 安装成功 集群环境见:《0423_SparkStreaming数据源kafka解析和安装配置及测试实...

柯里昂
2016/04/25
959
0
Spark实战(一)SparkStreaming集成Kafka

Spark Streaming + Kafka集成指南 Kafka项目在版本0.8和0.10之间引入了一个新的消费者API,因此有两个独立的相应Spark Streaming包可用。请选择正确的包, 请注意,0.8集成与后来的0.9和0.1...

FrankDeng
前天
0
0
Spark Streaming和Kafka整合之路(最新版本)

最近完成了Spark Streaming和Kafka的整合工作,耗时虽然不长,但是当中还是遇到了不少的坑,记录下来,大家方便绕行。 先说一下环境: Spark 2.0.0 kafka_2.11-0.10.0.0 之前的项目当中,已经...

大胖和二胖
2016/09/19
1K
1
【源码追踪】SparkStreaming 中用 Direct 方式每次从 Kafka 拉取多少条数据(offset取值范围)

我们知道 SparkStreaming 用 Direct 的方式拉取 Kafka 数据时,是根据 kafka 中的 fromOffsets 和 untilOffsets 来进行获取数据的,而 fromOffsets 一般都是需要我们自己管理的,而每批次的 ...

lin_wj1995
04/19
0
0
在线日志分析项目解读

1,日志的采集 从flume agent 上的数据一般分到两条线上一条是kafka 集群 ,后期可以用流式处理(spark streaming 或storm 等等)一条是到hdfs,后期可以用hive处理, 业界叫lambda架构 arch...

skinglzw
2017/09/20
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

python3.6用的外库遇到的问题

要用到wmi库 pip install wmi 装后不能用还要安装pywin32库。 要用到crypto加密模块安装pip instal pycrypto装后不能用。要安装pycryptodome。...

oisan_
5分钟前
0
0
select, poll, epoll I/O复用介绍

什么是I/O复用? 内核监视多个文件描述符(I/O文件句柄),一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知应用程序进行相应的读写操作。 I/O多路复用又被称为“事件驱动”(比如读写事...

xiaoxlm
6分钟前
0
0
【Spring 系列 给IOC容器添加组件的几种方式总结】

给Spring 注册Bean的几种方式总结。其中使用@Import注解是Spring Boot 完成自动配置的一个核心注解。 1、Spring 中给IOC容器添加组件的几种方式 在Spring的配置文件中,配置Bean(基于XML方式...

HansonReal
7分钟前
2
0
bootstrapTable语言包设置

###方法一引入不同的语言包 <script src="bootstrap-table-zh-CN.js"></script> ###方法二引入全语言包 <script src="bootstrap-table-locale-all.min.js"></script>//然后在初始化前设计默......

momo1987
7分钟前
0
0
Saltstack 常用命令

1、拷贝文件到客户端 # salt 'slaver.test.com' cp.get_file salt://apache.sls /tmp/cp.txt 2、拷贝目录到客户端 # salt 'slaver.test.com' cp.get_dir salt://test /tmp 3、显示存活的客户......

硅谷课堂
8分钟前
0
0
致初学者-如何学好Python这门编程语言?[图]

致初学者-如何学好Python这门编程语言?[图]: 对于很多Python3初学者,往往会面临以下问题:Python2和Python3我该学习哪一个?是否要安装Linux系统学习Python?Python3有各种版本我该安装哪...

原创小博客
11分钟前
1
0
E31---setopt=protected_multilib=false

今天在云服务器上装nginx,需要先安装一些依赖库比如zlib, ,但是安装zlib时候报错。 yum install -y zlib zlib-devel 1 (-y 指的是如果需要选yes no的自动y)下面是报错 Protected multil...

侠客行之石头
15分钟前
0
0
HTTP常见面试题

Http与Https的区别: HTTP 的URL 以http:// 开头,而HTTPS 的URL 以https:// 开头 HTTP 是不安全的,而 HTTPS 是安全的 HTTP 标准端口是80 ,而 HTTPS 的标准端口是443 在OSI 网络模型中,H...

JK_OPERA
17分钟前
0
0
python爬取站长素材网页图片保存到ppt中

网站地址:http://sc.chinaz.com/tupian/index.html 直接上代码: import requestsfrom bs4 import BeautifulSoupfrom pptx import Presentationfrom pptx.util import Inchesimpor......

你为什么不吃药
18分钟前
1
0
Ubuntu 18.04 swap空间的修改

一、准备工作 执行“sudo swapon -s”命令,查看是否已经存在swap file 二、修改swap file # 如果第一步存在swapfile则需要先禁用sudo swapoff /swapfile# 修改swap 空间的大小为2Gs...

Iceberg_XTY
21分钟前
1
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部