文档章节

基于Java+SparkStreaming整合kafka编程

四叶草666
 四叶草666
发布于 2017/07/19 10:07
字数 1505
阅读 14
收藏 0

一、下载依赖jar包

具体可以参考:SparkStreaming整合kafka编程

二、创建Java工程

太简单,略。

三、实际例子

spark的安装包里面有好多例子,具体路径:spark-2.1.1-bin-hadoop2.7\examples。

JavaDirectKafkaWordCount.java

 
  1. package com.spark.test;
  2.  
  3. import java.util.HashMap;
  4. import java.util.HashSet;
  5. import java.util.Arrays;
  6. import java.util.Iterator;
  7. import java.util.Map;
  8. import java.util.Set;
  9. import java.util.regex.Pattern;
  10.  
  11. import scala.Tuple2;
  12.  
  13. import kafka.serializer.StringDecoder;
  14.  
  15. import org.apache.spark.SparkConf;
  16. import org.apache.spark.api.java.function.*;
  17. import org.apache.spark.streaming.api.java.*;
  18. import org.apache.spark.streaming.kafka.KafkaUtils;
  19. import org.apache.spark.streaming.Durations;
  20.  
  21. public class JavaDirectKafkaWordCount {
  22.  
  23. public static void main(String[] args) throws Exception {
  24. //String brokers = args[0];
  25.    // String topics = args[1];
  26.  
  27.     // Create context with a 2 seconds batch interval
  28. /**
  29.  * setMaster("local[2]"),至少要指定两个线程,一条用于用于接收消息,一条线程用于处理消息
  30.  */
  31.     SparkConf sparkConf = new SparkConf().setAppName("JavaDirectKafkaWordCount").setMaster("local[2]");
  32.     JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2));
  33.  
  34.     Set<String> topicsSet = new HashSet<>(Arrays.asList("test"));
  35.     Map<String, String> kafkaParams = new HashMap<>();
  36.     kafkaParams.put("metadata.broker.list", "192.168.168.200:9092");
  37.  
  38.     // Create direct kafka stream with brokers and topics
  39.     JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
  40.         jssc,
  41.         String.class,
  42.         String.class,
  43.         StringDecoder.class,
  44.         StringDecoder.class,
  45.         kafkaParams,
  46.         topicsSet
  47.     );
  48.  
  49.     // Get the lines, split them into words, count the words and print
  50.     JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
  51.       @Override
  52.       public String call(Tuple2<String, String> tuple2) {
  53.         return tuple2._2();
  54.       }
  55.     });
  56.     JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
  57.       @Override
  58.       public Iterator<String> call(String line) {
  59.         return Arrays.asList(line.split(" ")).iterator();
  60.       }
  61.     });
  62.     JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
  63.       new PairFunction<String, String, Integer>() {
  64.         @Override
  65.         public Tuple2<String, Integer> call(String s) {
  66.           return new Tuple2<>(s, 1);
  67.         }
  68.       }).reduceByKey(
  69.         new Function2<Integer, Integer, Integer>() {
  70.         @Override
  71.         public Integer call(Integer i1, Integer i2) {
  72.           return i1 + i2;
  73.         }
  74.       });
  75.     wordCounts.print();
  76.  
  77.     // Start the computation
  78.     jssc.start();
  79.     jssc.awaitTermination();
  80. }
  81.  
  82. }

JavaKafkaWordCount.java

 

 
  1. package com.spark.test;
  2.  
  3. import java.util.Arrays;
  4. import java.util.Iterator;
  5. import java.util.Map;
  6. import java.util.HashMap;
  7. import java.util.regex.Pattern;
  8.  
  9. import scala.Tuple2;
  10.  
  11. import org.apache.spark.SparkConf;
  12. import org.apache.spark.api.java.function.FlatMapFunction;
  13. import org.apache.spark.api.java.function.Function;
  14. import org.apache.spark.api.java.function.Function2;
  15. import org.apache.spark.api.java.function.PairFunction;
  16. import org.apache.spark.streaming.Duration;
  17. import org.apache.spark.streaming.api.java.JavaDStream;
  18. import org.apache.spark.streaming.api.java.JavaPairDStream;
  19. import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
  20. import org.apache.spark.streaming.api.java.JavaStreamingContext;
  21. import org.apache.spark.streaming.kafka.KafkaUtils;
  22.  
  23. public class JavaKafkaWordCount{
  24.  
  25. public static void main(String[] args) throws InterruptedException {
  26. SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount").setMaster("local[2]");
  27. // Create the context with 2 seconds batch size
  28. JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));
  29.  
  30. int numThreads = Integer.parseInt("2");
  31. Map<String, Integer> topicMap = new HashMap<>();
  32. String[] topics = "test".split(",");
  33. for (String topic: topics) {
  34. topicMap.put(topic, numThreads);
  35. }
  36.  
  37. JavaPairReceiverInputDStream<String, String> messages =
  38. KafkaUtils.createStream(jssc, "192.168.168.200:2181", "test-group", topicMap);
  39.  
  40. JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
  41. @Override
  42. public String call(Tuple2<String, String> tuple2) {
  43. return tuple2._2();
  44. }
  45. });
  46.  
  47. JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
  48. @Override
  49. public Iterator<String> call(String line) {
  50. return Arrays.asList(line.split(" ")).iterator();
  51. }
  52. });
  53.  
  54. JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
  55. new PairFunction<String, String, Integer>() {
  56. @Override
  57. public Tuple2<String, Integer> call(String s) {
  58. return new Tuple2<>(s, 1);
  59. }
  60. }).reduceByKey(new Function2<Integer, Integer, Integer>() {
  61. @Override
  62. public Integer call(Integer i1, Integer i2) {
  63. return i1 + i2;
  64. }
  65. });
  66.  
  67. wordCounts.print();
  68. jssc.start();
  69. jssc.awaitTermination();
  70. }
  71.  
  72. }

 

JavaLocalWordCount.java

 

 
  1. package com.spark.test;
  2.  
  3. import java.util.Arrays;
  4. import java.util.Iterator;
  5.  
  6. import org.apache.spark.SparkConf;
  7. import org.apache.spark.api.java.JavaPairRDD;
  8. import org.apache.spark.api.java.JavaRDD;
  9. import org.apache.spark.api.java.JavaSparkContext;
  10. import org.apache.spark.api.java.function.FlatMapFunction;
  11. import org.apache.spark.api.java.function.Function2;
  12. import org.apache.spark.api.java.function.PairFunction;
  13. import org.apache.spark.api.java.function.VoidFunction;
  14.  
  15. import scala.Tuple2;
  16.  
  17. public class JavaLocalWordCount {
  18.  
  19. public static void main(String[] args) {
  20. /**
  21. * 第一步,创建Spark的配置对象SparkConf,设置Spark程序的运行时的配置信息,
  22. * 例如说通过setMaster来设置程序要链接的Spark集群的Master的URL,
  23. * 如果设置为local,则代表Spark程序在本地运行,特别适合于机器配置较差的情况
  24. */
  25. SparkConf sparkConf = new SparkConf().setAppName("LocalWordCountByJava").setMaster("local");
  26. /**
  27. * 第二步,创建SparkContext对象
  28. * SparkContext是Spark程序所有功能的唯一入口,无论是采用Scala,java,python,R等都
  29. * 必须有一个SparkContext(不同语言具体类名称不同,如果是Java的话,则为JavaSparkContext)
  30. * 同时还会负责Spark程序在Master注册程序等
  31. * SparkContext是整个Spark应用程序至关重要的一个对象
  32. */
  33. JavaSparkContext jsc = new JavaSparkContext(sparkConf);//其底层实际上是Scala的SparkContext
  34. /**
  35. * 第三步,根据具体的数据来源(HDFS,HBase,Local,FS,DB,S3等),通过JavaSparkContext来创建JavaRDD
  36. * JavaRDD的创建方式有三种:根据外部数据来源(例如HDFS),
  37. * 根据Scala集合,由其他的RDD操作数据会将RDD划分成一系列Partition,
  38. * 分配到每个Partition的数据属于一个Task处理范畴
  39. */
  40. JavaRDD<String> lines = jsc.textFile("words.txt");
  41.  
  42. //如果是Scala,由于SAM转化,所以可以写成val words=lines.flatMap{line =>line.split(" ")}
  43. JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
  44. @Override
  45. public Iterator<String> call(String line) {
  46. return Arrays.asList(line.split(" ")).iterator();
  47. }
  48. });
  49.  
  50. /**
  51. * 第4步:对初始的JavaRDD进行Transformation级别的处理,例如map,filter等高阶函数等的编程,来进行具体的数据计算
  52. * 第4.1步:在单词拆分的基础上对每个单词实例进行计数为1,也就是word =>(word,1)
  53. */
  54. JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
  55. public Tuple2<String, Integer> call(String word) throws Exception{
  56. return new Tuple2<String, Integer>(word, 1);
  57. }
  58. });
  59. /**
  60. * 统计总次数
  61. */
  62. JavaPairRDD<String,Integer> wordCount=pairs.reduceByKey(new Function2<Integer,Integer,Integer>()
  63. {
  64. public Integer call(Integer v1,Integer v2)throws Exception
  65. {
  66. return v1+v2;
  67.  
  68. }
  69. });
  70.  
  71. wordCount.foreach(new VoidFunction<Tuple2<String,Integer>>(){
  72. public void call(Tuple2<String,Integer> pairs) throws Exception {
  73. System.out.println(pairs._1()+":"+pairs._2());
  74. }
  75. });
  76.  
  77. jsc.close();
  78. }
  79.  
  80. }

 

JavaClusterWordCount.java

 

 
  1. package com.spark.test;
  2.  
  3. import java.util.Arrays;
  4. import java.util.Iterator;
  5.  
  6. import org.apache.spark.SparkConf;
  7. import org.apache.spark.api.java.JavaPairRDD;
  8. import org.apache.spark.api.java.JavaRDD;
  9. import org.apache.spark.api.java.JavaSparkContext;
  10. import org.apache.spark.api.java.function.FlatMapFunction;
  11. import org.apache.spark.api.java.function.Function2;
  12. import org.apache.spark.api.java.function.PairFunction;
  13. import org.apache.spark.api.java.function.VoidFunction;
  14.  
  15. import scala.Tuple2;
  16.  
  17. public class JavaClusterWordCount {
  18.  
  19. public static void main(String[] args) {
  20. /**
  21. * 第一步,创建Spark的配置对象SparkConf,设置Spark程序的运行时的配置信息,
  22. * 例如说通过setMaster来设置程序要链接的Spark集群的Master的URL,
  23. * 如果设置为local,则代表Spark程序在本地运行,特别适合于机器配置较差的情况
  24. */
  25. SparkConf sparkConf = new SparkConf().setAppName("LocalWordCountByJava").setMaster("local");
  26. /**
  27. * 第二步,创建SparkContext对象
  28. * SparkContext是Spark程序所有功能的唯一入口,无论是采用Scala,java,python,R等都
  29. * 必须有一个SparkContext(不同语言具体类名称不同,如果是Java的话,则为JavaSparkContext)
  30. * 同时还会负责Spark程序在Master注册程序等
  31. * SparkContext是整个Spark应用程序至关重要的一个对象
  32. */
  33. JavaSparkContext jsc = new JavaSparkContext(sparkConf);//其底层实际上是Scala的SparkContext
  34. /**
  35. * 第三步,根据具体的数据来源(HDFS,HBase,Local,FS,DB,S3等),通过JavaSparkContext来创建JavaRDD
  36. * JavaRDD的创建方式有三种:根据外部数据来源(例如HDFS),
  37. * 根据Scala集合,由其他的RDD操作数据会将RDD划分成一系列Partition,
  38. * 分配到每个Partition的数据属于一个Task处理范畴
  39. */
  40. JavaRDD<String> lines = jsc.textFile("hdfs://192.168.168.200:9000/input/words.txt");
  41.  
  42. //如果是Scala,由于SAM转化,所以可以写成val words=lines.flatMap{line =>line.split(" ")}
  43. JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
  44. @Override
  45. public Iterator<String> call(String line) {
  46. return Arrays.asList(line.split(" ")).iterator();
  47. }
  48. });
  49.  
  50. /**
  51. * 第4步:对初始的JavaRDD进行Transformation级别的处理,例如map,filter等高阶函数等的编程,来进行具体的数据计算
  52. * 第4.1步:在单词拆分的基础上对每个单词实例进行计数为1,也就是word =>(word,1)
  53. */
  54. JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
  55. public Tuple2<String, Integer> call(String word) throws Exception{
  56. return new Tuple2<String, Integer>(word, 1);
  57. }
  58. });
  59. /**
  60. * 统计总次数
  61. */
  62. JavaPairRDD<String,Integer> wordCount=pairs.reduceByKey(new Function2<Integer,Integer,Integer>()
  63. {
  64. public Integer call(Integer v1,Integer v2)throws Exception
  65. {
  66. return v1+v2;
  67.  
  68. }
  69. });
  70.  
  71. wordCount.foreach(new VoidFunction<Tuple2<String,Integer>>(){
  72. public void call(Tuple2<String,Integer> pairs) throws Exception {
  73. System.out.println(pairs._1()+":"+pairs._2());
  74. }
  75. });
  76.  
  77. jsc.close();
  78. }
  79.  
  80. }

 

© 著作权归作者所有

共有 人打赏支持
四叶草666
粉丝 0
博文 51
码字总数 50778
作品 0
深圳
程序员
flume+kafka+storm运行实例

概述 在基于Hadoop平台的很多应用场景中,我们需要对数据进行离线和实时分析,离线分析可以很容易地借助于Hive或者mr来实现统计分析,但是对于实时的需求Hive和mr就不合适了。实时应用场景可...

张欢19933
2016/02/04
879
0
Spark Streaming实时流处理学习

目录 1.初识实时流处理 2.分布式日志收集框架Flume 3.分布式发布订阅消息系统Kafka 4.实战环境搭建 5.Spark Streaming入门 6.Spark Streaming核心概念与编程 7.Spark Streaming进阶与案例实战...

牦牛sheriff
09/02
0
0
Flume+Kafka+Storm+Redis构建大数据实时处理系统

一、大数据处理的常用方法 之前在《采集→清洗→处理:基于MapReduce的离线数据分析》中已经有提及到,这里依然给出下面的图示: 前面给出的那篇文章是基于MapReduce的离线数据分析案例,其通...

技术小能手
07/09
0
0
Kafka+Storm+HDFS整合实践

在基于Hadoop平台的很多应用场景中,我们需要对数据进行离线和实时分析,离线分析可以很容易地借助于Hive来实现统计分析,但是对于实时的需求Hive就不合适了。实时应用场景可以使用Storm,它...

HI曲奇饼干
2016/06/04
90
1
Kafka 状态信息监视器--burrowx

burrowx 是一个简单、轻量的 Kafka 状态信息监视器,目前使用 influxdb 存储指标。burrowx 受到 Burrow 的启发,但比它更快、更干净和更稳定。burrowx 是 influxdb 和 grafana 的良好整合。 ...

sundy-li
2017/08/08
292
0

没有更多内容

加载失败,请刷新页面

加载更多

Qt那些事0.0.7

在帮助文档(Overview - QML and C++ Integration)中随缘遇到一张图,是关于C++对象与QML整合介绍的,值得标记下来,虽然大部分功能也有所涉猎,但是还是留个记号,万一哪天我失忆了还想写Q...

Ev4n
7分钟前
0
0
快速幂运算

题:求一个数 data 的 n 次幂,要求时间复杂度为log(n) 1:递归算法: /** * x^3=(x^2)*x;x^7=(x^3)^2 * x * * 递归算法 * @param data 底数 * @param n 次...

偶尔诗文
12分钟前
0
0
Google 宣布将会关闭消费者版本 Google+

Google 家的社交平台 Google+ 原来曾经在今年 3 月发生了一次严重的用户资料外泄事故,但这科网巨擘却一直保密,直至今天华尔街日报把事件披露之后才确认事件。Google 在重申问题已经即时解决...

问题终结者
25分钟前
0
0
腾讯三大运维开源项目齐聚“OSCAR开源先锋日”

10月20日,腾讯开源三大运维开源项目——TARS、蓝鲸和织云Metis首次集结,参与了由中国信息通信研究院主办、云计算标准与开源推进委员会承办的 “OSCAR开源先锋日”。会上,腾讯开源团队与前...

腾讯开源
30分钟前
1
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部