文档章节

基于Java+SparkStreaming整合kafka编程

四叶草666
 四叶草666
发布于 2017/07/19 10:07
字数 1505
阅读 11
收藏 0
点赞 0
评论 0

一、下载依赖jar包

具体可以参考:SparkStreaming整合kafka编程

二、创建Java工程

太简单,略。

三、实际例子

spark的安装包里面有好多例子,具体路径:spark-2.1.1-bin-hadoop2.7\examples。

JavaDirectKafkaWordCount.java

 
  1. package com.spark.test;
  2.  
  3. import java.util.HashMap;
  4. import java.util.HashSet;
  5. import java.util.Arrays;
  6. import java.util.Iterator;
  7. import java.util.Map;
  8. import java.util.Set;
  9. import java.util.regex.Pattern;
  10.  
  11. import scala.Tuple2;
  12.  
  13. import kafka.serializer.StringDecoder;
  14.  
  15. import org.apache.spark.SparkConf;
  16. import org.apache.spark.api.java.function.*;
  17. import org.apache.spark.streaming.api.java.*;
  18. import org.apache.spark.streaming.kafka.KafkaUtils;
  19. import org.apache.spark.streaming.Durations;
  20.  
  21. public class JavaDirectKafkaWordCount {
  22.  
  23. public static void main(String[] args) throws Exception {
  24. //String brokers = args[0];
  25.    // String topics = args[1];
  26.  
  27.     // Create context with a 2 seconds batch interval
  28. /**
  29.  * setMaster("local[2]"),至少要指定两个线程,一条用于用于接收消息,一条线程用于处理消息
  30.  */
  31.     SparkConf sparkConf = new SparkConf().setAppName("JavaDirectKafkaWordCount").setMaster("local[2]");
  32.     JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2));
  33.  
  34.     Set<String> topicsSet = new HashSet<>(Arrays.asList("test"));
  35.     Map<String, String> kafkaParams = new HashMap<>();
  36.     kafkaParams.put("metadata.broker.list", "192.168.168.200:9092");
  37.  
  38.     // Create direct kafka stream with brokers and topics
  39.     JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
  40.         jssc,
  41.         String.class,
  42.         String.class,
  43.         StringDecoder.class,
  44.         StringDecoder.class,
  45.         kafkaParams,
  46.         topicsSet
  47.     );
  48.  
  49.     // Get the lines, split them into words, count the words and print
  50.     JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
  51.       @Override
  52.       public String call(Tuple2<String, String> tuple2) {
  53.         return tuple2._2();
  54.       }
  55.     });
  56.     JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
  57.       @Override
  58.       public Iterator<String> call(String line) {
  59.         return Arrays.asList(line.split(" ")).iterator();
  60.       }
  61.     });
  62.     JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
  63.       new PairFunction<String, String, Integer>() {
  64.         @Override
  65.         public Tuple2<String, Integer> call(String s) {
  66.           return new Tuple2<>(s, 1);
  67.         }
  68.       }).reduceByKey(
  69.         new Function2<Integer, Integer, Integer>() {
  70.         @Override
  71.         public Integer call(Integer i1, Integer i2) {
  72.           return i1 + i2;
  73.         }
  74.       });
  75.     wordCounts.print();
  76.  
  77.     // Start the computation
  78.     jssc.start();
  79.     jssc.awaitTermination();
  80. }
  81.  
  82. }

JavaKafkaWordCount.java

 

 
  1. package com.spark.test;
  2.  
  3. import java.util.Arrays;
  4. import java.util.Iterator;
  5. import java.util.Map;
  6. import java.util.HashMap;
  7. import java.util.regex.Pattern;
  8.  
  9. import scala.Tuple2;
  10.  
  11. import org.apache.spark.SparkConf;
  12. import org.apache.spark.api.java.function.FlatMapFunction;
  13. import org.apache.spark.api.java.function.Function;
  14. import org.apache.spark.api.java.function.Function2;
  15. import org.apache.spark.api.java.function.PairFunction;
  16. import org.apache.spark.streaming.Duration;
  17. import org.apache.spark.streaming.api.java.JavaDStream;
  18. import org.apache.spark.streaming.api.java.JavaPairDStream;
  19. import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
  20. import org.apache.spark.streaming.api.java.JavaStreamingContext;
  21. import org.apache.spark.streaming.kafka.KafkaUtils;
  22.  
  23. public class JavaKafkaWordCount{
  24.  
  25. public static void main(String[] args) throws InterruptedException {
  26. SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount").setMaster("local[2]");
  27. // Create the context with 2 seconds batch size
  28. JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));
  29.  
  30. int numThreads = Integer.parseInt("2");
  31. Map<String, Integer> topicMap = new HashMap<>();
  32. String[] topics = "test".split(",");
  33. for (String topic: topics) {
  34. topicMap.put(topic, numThreads);
  35. }
  36.  
  37. JavaPairReceiverInputDStream<String, String> messages =
  38. KafkaUtils.createStream(jssc, "192.168.168.200:2181", "test-group", topicMap);
  39.  
  40. JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
  41. @Override
  42. public String call(Tuple2<String, String> tuple2) {
  43. return tuple2._2();
  44. }
  45. });
  46.  
  47. JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
  48. @Override
  49. public Iterator<String> call(String line) {
  50. return Arrays.asList(line.split(" ")).iterator();
  51. }
  52. });
  53.  
  54. JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
  55. new PairFunction<String, String, Integer>() {
  56. @Override
  57. public Tuple2<String, Integer> call(String s) {
  58. return new Tuple2<>(s, 1);
  59. }
  60. }).reduceByKey(new Function2<Integer, Integer, Integer>() {
  61. @Override
  62. public Integer call(Integer i1, Integer i2) {
  63. return i1 + i2;
  64. }
  65. });
  66.  
  67. wordCounts.print();
  68. jssc.start();
  69. jssc.awaitTermination();
  70. }
  71.  
  72. }

 

JavaLocalWordCount.java

 

 
  1. package com.spark.test;
  2.  
  3. import java.util.Arrays;
  4. import java.util.Iterator;
  5.  
  6. import org.apache.spark.SparkConf;
  7. import org.apache.spark.api.java.JavaPairRDD;
  8. import org.apache.spark.api.java.JavaRDD;
  9. import org.apache.spark.api.java.JavaSparkContext;
  10. import org.apache.spark.api.java.function.FlatMapFunction;
  11. import org.apache.spark.api.java.function.Function2;
  12. import org.apache.spark.api.java.function.PairFunction;
  13. import org.apache.spark.api.java.function.VoidFunction;
  14.  
  15. import scala.Tuple2;
  16.  
  17. public class JavaLocalWordCount {
  18.  
  19. public static void main(String[] args) {
  20. /**
  21. * 第一步,创建Spark的配置对象SparkConf,设置Spark程序的运行时的配置信息,
  22. * 例如说通过setMaster来设置程序要链接的Spark集群的Master的URL,
  23. * 如果设置为local,则代表Spark程序在本地运行,特别适合于机器配置较差的情况
  24. */
  25. SparkConf sparkConf = new SparkConf().setAppName("LocalWordCountByJava").setMaster("local");
  26. /**
  27. * 第二步,创建SparkContext对象
  28. * SparkContext是Spark程序所有功能的唯一入口,无论是采用Scala,java,python,R等都
  29. * 必须有一个SparkContext(不同语言具体类名称不同,如果是Java的话,则为JavaSparkContext)
  30. * 同时还会负责Spark程序在Master注册程序等
  31. * SparkContext是整个Spark应用程序至关重要的一个对象
  32. */
  33. JavaSparkContext jsc = new JavaSparkContext(sparkConf);//其底层实际上是Scala的SparkContext
  34. /**
  35. * 第三步,根据具体的数据来源(HDFS,HBase,Local,FS,DB,S3等),通过JavaSparkContext来创建JavaRDD
  36. * JavaRDD的创建方式有三种:根据外部数据来源(例如HDFS),
  37. * 根据Scala集合,由其他的RDD操作数据会将RDD划分成一系列Partition,
  38. * 分配到每个Partition的数据属于一个Task处理范畴
  39. */
  40. JavaRDD<String> lines = jsc.textFile("words.txt");
  41.  
  42. //如果是Scala,由于SAM转化,所以可以写成val words=lines.flatMap{line =>line.split(" ")}
  43. JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
  44. @Override
  45. public Iterator<String> call(String line) {
  46. return Arrays.asList(line.split(" ")).iterator();
  47. }
  48. });
  49.  
  50. /**
  51. * 第4步:对初始的JavaRDD进行Transformation级别的处理,例如map,filter等高阶函数等的编程,来进行具体的数据计算
  52. * 第4.1步:在单词拆分的基础上对每个单词实例进行计数为1,也就是word =>(word,1)
  53. */
  54. JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
  55. public Tuple2<String, Integer> call(String word) throws Exception{
  56. return new Tuple2<String, Integer>(word, 1);
  57. }
  58. });
  59. /**
  60. * 统计总次数
  61. */
  62. JavaPairRDD<String,Integer> wordCount=pairs.reduceByKey(new Function2<Integer,Integer,Integer>()
  63. {
  64. public Integer call(Integer v1,Integer v2)throws Exception
  65. {
  66. return v1+v2;
  67.  
  68. }
  69. });
  70.  
  71. wordCount.foreach(new VoidFunction<Tuple2<String,Integer>>(){
  72. public void call(Tuple2<String,Integer> pairs) throws Exception {
  73. System.out.println(pairs._1()+":"+pairs._2());
  74. }
  75. });
  76.  
  77. jsc.close();
  78. }
  79.  
  80. }

 

JavaClusterWordCount.java

 

 
  1. package com.spark.test;
  2.  
  3. import java.util.Arrays;
  4. import java.util.Iterator;
  5.  
  6. import org.apache.spark.SparkConf;
  7. import org.apache.spark.api.java.JavaPairRDD;
  8. import org.apache.spark.api.java.JavaRDD;
  9. import org.apache.spark.api.java.JavaSparkContext;
  10. import org.apache.spark.api.java.function.FlatMapFunction;
  11. import org.apache.spark.api.java.function.Function2;
  12. import org.apache.spark.api.java.function.PairFunction;
  13. import org.apache.spark.api.java.function.VoidFunction;
  14.  
  15. import scala.Tuple2;
  16.  
  17. public class JavaClusterWordCount {
  18.  
  19. public static void main(String[] args) {
  20. /**
  21. * 第一步,创建Spark的配置对象SparkConf,设置Spark程序的运行时的配置信息,
  22. * 例如说通过setMaster来设置程序要链接的Spark集群的Master的URL,
  23. * 如果设置为local,则代表Spark程序在本地运行,特别适合于机器配置较差的情况
  24. */
  25. SparkConf sparkConf = new SparkConf().setAppName("LocalWordCountByJava").setMaster("local");
  26. /**
  27. * 第二步,创建SparkContext对象
  28. * SparkContext是Spark程序所有功能的唯一入口,无论是采用Scala,java,python,R等都
  29. * 必须有一个SparkContext(不同语言具体类名称不同,如果是Java的话,则为JavaSparkContext)
  30. * 同时还会负责Spark程序在Master注册程序等
  31. * SparkContext是整个Spark应用程序至关重要的一个对象
  32. */
  33. JavaSparkContext jsc = new JavaSparkContext(sparkConf);//其底层实际上是Scala的SparkContext
  34. /**
  35. * 第三步,根据具体的数据来源(HDFS,HBase,Local,FS,DB,S3等),通过JavaSparkContext来创建JavaRDD
  36. * JavaRDD的创建方式有三种:根据外部数据来源(例如HDFS),
  37. * 根据Scala集合,由其他的RDD操作数据会将RDD划分成一系列Partition,
  38. * 分配到每个Partition的数据属于一个Task处理范畴
  39. */
  40. JavaRDD<String> lines = jsc.textFile("hdfs://192.168.168.200:9000/input/words.txt");
  41.  
  42. //如果是Scala,由于SAM转化,所以可以写成val words=lines.flatMap{line =>line.split(" ")}
  43. JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
  44. @Override
  45. public Iterator<String> call(String line) {
  46. return Arrays.asList(line.split(" ")).iterator();
  47. }
  48. });
  49.  
  50. /**
  51. * 第4步:对初始的JavaRDD进行Transformation级别的处理,例如map,filter等高阶函数等的编程,来进行具体的数据计算
  52. * 第4.1步:在单词拆分的基础上对每个单词实例进行计数为1,也就是word =>(word,1)
  53. */
  54. JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
  55. public Tuple2<String, Integer> call(String word) throws Exception{
  56. return new Tuple2<String, Integer>(word, 1);
  57. }
  58. });
  59. /**
  60. * 统计总次数
  61. */
  62. JavaPairRDD<String,Integer> wordCount=pairs.reduceByKey(new Function2<Integer,Integer,Integer>()
  63. {
  64. public Integer call(Integer v1,Integer v2)throws Exception
  65. {
  66. return v1+v2;
  67.  
  68. }
  69. });
  70.  
  71. wordCount.foreach(new VoidFunction<Tuple2<String,Integer>>(){
  72. public void call(Tuple2<String,Integer> pairs) throws Exception {
  73. System.out.println(pairs._1()+":"+pairs._2());
  74. }
  75. });
  76.  
  77. jsc.close();
  78. }
  79.  
  80. }

 

© 著作权归作者所有

共有 人打赏支持
四叶草666
粉丝 0
博文 51
码字总数 50778
作品 0
深圳
程序员
flume+kafka+storm运行实例

概述 在基于Hadoop平台的很多应用场景中,我们需要对数据进行离线和实时分析,离线分析可以很容易地借助于Hive或者mr来实现统计分析,但是对于实时的需求Hive和mr就不合适了。实时应用场景可...

张欢19933
2016/02/04
879
0
Flume+Kafka+Storm+Redis构建大数据实时处理系统

一、大数据处理的常用方法 之前在《采集→清洗→处理:基于MapReduce的离线数据分析》中已经有提及到,这里依然给出下面的图示: 前面给出的那篇文章是基于MapReduce的离线数据分析案例,其通...

技术小能手
07/09
0
0
Kafka+Storm+HDFS整合实践

在基于Hadoop平台的很多应用场景中,我们需要对数据进行离线和实时分析,离线分析可以很容易地借助于Hive来实现统计分析,但是对于实时的需求Hive就不合适了。实时应用场景可以使用Storm,它...

HI曲奇饼干
2016/06/04
90
1
Kafka 状态信息监视器--burrowx

burrowx 是一个简单、轻量的 Kafka 状态信息监视器,目前使用 influxdb 存储指标。burrowx 受到 Burrow 的启发,但比它更快、更干净和更稳定。burrowx 是 influxdb 和 grafana 的良好整合。 ...

sundy-li
2017/08/08
292
0
大数据架构:flume-ng+Kafka+Storm+HDFS 实时系统组合

转自:http://www.cnblogs.com/zhengah/p/4729304.html 个人观点:大数据我们都知道hadoop,但并不都是hadoop.我们该如何构建大数据库项目。对于离线处理,hadoop还是比较适合的,但是对于实...

xiangxizhishi
2017/11/03
0
0
Flume+Kafka+Storm+Redis构建大数据实时处理系统:实时统计网站PV、UV+展示

[TOC] 1 大数据处理的常用方法 前面在我的另一篇文章中《大数据采集、清洗、处理:使用MapReduce进行离线数据分析完整案例》中已经有提及到,这里依然给出下面的图示: 前面给出的那篇文章是...

xpleaf
04/16
0
0
基于Kafka+SparkStreaming+HBase实时点击流案例

前言 最近在专注Spark开发,记录下自己的工作和学习路程,希望能跟大家互相交流成长 本文章更倾向于实战案例,涉及框架原理及基本应用还请读者自行阅读相关文章,相关在本文章最后参考资料中...

舒运
07/13
0
0
Kafka+Spark Streaming+Redis实时计算整合实践

基于Spark通用计算平台,可以很好地扩展各种计算类型的应用,尤其是Spark提供了内建的计算库支持,像SparkStreaming、SparkSQL、MLlib、GraphX,这些内建库都提供了高级抽象,可以用非常简洁...

openthings
2016/03/11
379
0
12月中旬值得一读的10本技术新书(Go Web编程、Linux、TensorFlow等)!文末有福利!

12月12日,阿里云云栖社区机构号 联合人邮异步社区为大家带来十本技术书籍(Go Web编程、Linux、TensorFlow等)。以下为书籍详情,文末还有福利哦! 书籍名称:《Go Web编程》 内容简介 本书...

阿里云云栖社区
2017/12/12
0
0
[转]flume-ng+Kafka+Storm+HDFS 实时系统搭建

一直以来都想接触Storm实时计算这块的东西,之前在弄这个的时候,跟群里的一些人讨论过,有的人说,直接用storm不就可以做实时处理了,用不着那么麻烦;其实不然,做软件开发的都知道模块化思...

HI曲奇饼干
2016/08/10
232
1

没有更多内容

加载失败,请刷新页面

加载更多

下一页

回想过往,分析当下,着眼未来

好久没有真正的在纸质笔记本上写过东西了,感觉都快不会写字了,笔画都不知道怎么写了。接下来就说说咱们的正事。 2018年7月22日,我做了一个决定,那就是去参加安全培训(可能是我职业生涯中...

yeahlife
41分钟前
4
0
关于工作中的人际交往

关于工作中的人际交往 Intro 写了篇发泄情绪的博客,但不会发布出来。 大概就是,要么忍,要么滚。 以及一些不那么符合社会主义核心价值观,不满于大资本家与小资本家剥削的废话。

uniqptr
46分钟前
0
0
springMVC的流程

1.用户发送请求至前端控制器DispatcherServlet 2.DispatcherServlet收到请求调用HandlerMapping处理器映射器。 3.处理器映射器根据请求url找到具体的处理器,生成处理器对象及处理器拦截器(...

JavaSon712
今天
0
0
大数据教程(3.2):Linux系统软件安装之自动化脚本

博主前面文章有介绍过软件的安装,可以帮助IT人员顺利的完成功能软件安装;但是,对于我们运维人员或者需要管理软件安装的项目经理来说,有些应用一次行需要搭建很多台相同的软件环境(如tom...

em_aaron
今天
0
1
Spring Boot 2.0.3 JDBC整合Oracle 12

整合步骤 1. Oracle驱动引入 Oracle驱动一般不能通过maven仓库直接下载得到,需自行下载并导入到项目的lib目录下,建议通过如下pom依赖引入下载的Oracle驱动 <!-- Oracle 驱动 -->...

OSC_fly
今天
0
0
java 8 并行流 - 1

下面创建一个并行流,与顺序流 //顺序流Stream.iterate(0L, i -> i + 1) .limit(Integer.MAX_VALUE) .reduce(0L, Long::sum);//并行流Stream.iterate(0L, i -> i......

Canaan_
今天
0
0
数据结构与算法5

二分法采用向下取整的方法 使用有序数组的好处是查找的速度比无序数组快的多,不好的方面是因为要将所有靠后的数据移开,所以速度较慢,有序数组和无序数组的删除操作都很慢。 有序数组在查找...

沉迷于编程的小菜菜
昨天
1
1
SpringBoot | 第十一章:Redis的集成和简单使用

前言 上几节讲了利用Mybatis-Plus这个第三方的ORM框架进行数据库访问,在实际工作中,在存储一些非结构化或者缓存一些临时数据及热点数据时,一般上都会用上mongodb和redis进行这方面的需求。...

oKong
昨天
5
0
对基于深度神经网络的Auto Encoder用于异常检测的一些思考

一、前言 现实中,大部分数据都是无标签的,人和动物多数情况下都是通过无监督学习获取概念,故而无监督学习拥有广阔的业务场景。举几个场景:网络流量是正常流量还是攻击流量、视频中的人的...

冷血狂魔
昨天
0
0
并发设计之A系统调用B系统

A-->B A在发送请求之前,用乐观锁,减少对B的重复调用,这样一定程度上是幂等性。 比如A系统支付功能,要调用B系统进行支付操作,但是前端对"支付"按钮不进行控制,即用户会不断多次点击支付...

汉斯-冯-拉特
昨天
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部