文档章节

Maven+Eclipse+SparkStreaming+Kafka整合

四叶草666
 四叶草666
发布于 2017/07/18 15:36
字数 1102
阅读 14
收藏 0
点赞 0
评论 0

版本号:

maven3.5.0     scala IDE for Eclipse:版本(4.6.1)    spark-2.1.1-bin-hadoop2.7    kafka_2.11-0.8.2.1   JDK1.8

基础环境:

Maven3.5.0安装与配置+Eclipse应用

Maven下载项目依赖jar包和使用方法

maven中把依赖的JAR包一起打包

MAVEN Scope使用

一、指定JDK为1.8

在pom.xml配置文件中添加以下参数即可:

 
  1. <properties>
  2. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  3. <encoding>UTF-8</encoding>
  4.     <java.version>1.8</java.version>
  5.     <maven.compiler.source>1.8</maven.compiler.source>
  6.     <maven.compiler.target>1.8</maven.compiler.target>
  7. </properties>
 
  1. <plugin>  
  2.     <groupId>org.apache.maven.plugins</groupId>  
  3.     <artifactId>maven-compiler-plugin</artifactId>  
  4.     <configuration>  
  5.         <source>1.8</source>  
  6.         <target>1.8</target>  
  7.     </configuration>  
  8. </plugin>

配置之后的pom.xml文件如下:

 
  1. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  2. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  3. <modelVersion>4.0.0</modelVersion>
  4.  
  5. <groupId>Test</groupId>
  6. <artifactId>test</artifactId>
  7. <version>0.0.1-SNAPSHOT</version>
  8. <packaging>jar</packaging>
  9.  
  10. <name>test</name>
  11. <url>http://maven.apache.org</url>
  12.  
  13. <properties>
  14. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  15. <encoding>UTF-8</encoding>
  16. <!-- 配置JDK为1.8 -->
  17.         <java.version>1.8</java.version>
  18.         <maven.compiler.source>1.8</maven.compiler.source>
  19.         <maven.compiler.target>1.8</maven.compiler.target>
  20. </properties>
  21.  
  22. <dependencies>
  23. <dependency>
  24. <groupId>junit</groupId>
  25. <artifactId>junit</artifactId>
  26. <version>3.8.1</version>
  27. <scope>test</scope>
  28. </dependency>
  29. </dependencies>
  30.  
  31. <build>
  32. <plugins>
  33. <!-- 配置JDK为1.8 -->
  34. <plugin>  
  35.                 <groupId>org.apache.maven.plugins</groupId>  
  36.                 <artifactId>maven-compiler-plugin</artifactId>  
  37.                 <configuration>  
  38.                     <source>1.8</source>  
  39.                     <target>1.8</target>  
  40.                 </configuration>  
  41.             </plugin>  
  42.         
  43.              <!-- 配置打包依赖包maven-assembly-plugin -->
  44. <plugin>
  45. <artifactId> maven-assembly-plugin </artifactId>
  46. <configuration>
  47. <descriptorRefs>
  48. <descriptorRef>jar-with-dependencies</descriptorRef>
  49. </descriptorRefs>
  50. <archive>
  51. <manifest>
  52. <mainClass></mainClass>
  53. </manifest>
  54. </archive>
  55. </configuration>
  56. <executions>
  57. <execution>
  58. <id>make-assembly</id>
  59. <phase>package</phase>
  60. <goals>
  61. <goal>assembly</goal>
  62. </goals>
  63. </execution>
  64. </executions>
  65. </plugin>
  66. </plugins>
  67. </build>
  68. </project>

二、配置Spark依赖包

查看spark-2.1.1-bin-hadoop2.7/jars目录下的jar包版本

到maven远程仓库http://mvnrepository.com中搜索对应jar包即可。

1、配置spark-core_2.11-2.1.1.jar

往pom.xml文件中添加以下配置:

 
  1. <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.11 -->
  2. <dependency>
  3.     <groupId>org.apache.spark</groupId>
  4.     <artifactId>spark-core_2.11</artifactId>
  5.     <version>2.1.1</version>
  6.     <scope>runtime</scope>
  7. </dependency>

为了后面打包时把依赖包也一起打包,需要把<scope>provided</scope>配置成<scope>runtime</scope>

2、配置spark-streaming_2.11-2.1.1.jar

往pom.xml文件中添加以下配置:

 
  1.  <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming_2.11 -->
  2. <dependency>
  3.     <groupId>org.apache.spark</groupId>
  4.     <artifactId>spark-streaming_2.11</artifactId>
  5.     <version>2.1.1</version>
  6.     <scope>runtime</scope>
  7. </dependency>

 为了后面打包时把依赖包也一起打包,需要把<scope>provided</scope>配置成<scope>runtime</scope>。

三、配置Spark+Kafka

1、配置spark-streaming-kafka-0-8_2.11-2.1.1.jar

往pom.xml文件中添加以下配置:

 
  1. <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-8_2.11 -->
  2. <dependency>
  3.     <groupId>org.apache.spark</groupId>
  4.     <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
  5.     <version>2.1.1</version>
  6. </dependency>

四、pom.xml完整配置内容

 
  1. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  2. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  3. <modelVersion>4.0.0</modelVersion>
  4.  
  5. <groupId>Test</groupId>
  6. <artifactId>test</artifactId>
  7. <version>0.0.1-SNAPSHOT</version>
  8. <packaging>jar</packaging>
  9.  
  10. <name>test</name>
  11. <url>http://maven.apache.org</url>
  12.  
  13. <properties>
  14. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  15. <encoding>UTF-8</encoding>
  16. <!-- 配置JDK为1.8 -->
  17.         <java.version>1.8</java.version>
  18.         <maven.compiler.source>1.8</maven.compiler.source>
  19.         <maven.compiler.target>1.8</maven.compiler.target>
  20. </properties>
  21.  
  22. <dependencies>
  23. <dependency>
  24. <groupId>junit</groupId>
  25. <artifactId>junit</artifactId>
  26. <version>3.8.1</version>
  27. <scope>test</scope>
  28. </dependency>
  29. <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.11 -->
  30. <dependency>
  31.             <groupId>org.apache.spark</groupId>
  32.             <artifactId>spark-core_2.11</artifactId>
  33.             <version>2.1.1</version>
  34.             <scope>runtime</scope>
  35. </dependency>
  36.  <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming_2.11 -->
  37. <dependency>
  38.             <groupId>org.apache.spark</groupId>
  39.             <artifactId>spark-streaming_2.11</artifactId>
  40.             <version>2.1.1</version>
  41.             <scope>runtime</scope>
  42. </dependency>
  43. <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-8_2.11 -->
  44. <dependency>
  45.          <groupId>org.apache.spark</groupId>
  46.          <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
  47.          <version>2.1.1</version>
  48. </dependency>
  49. </dependencies>
  50.  
  51. <build>
  52. <plugins>
  53. <!-- 配置JDK为1.8 -->
  54. <plugin>  
  55.                 <groupId>org.apache.maven.plugins</groupId>  
  56.                 <artifactId>maven-compiler-plugin</artifactId>  
  57.                 <configuration>  
  58.                     <source>1.8</source>  
  59.                     <target>1.8</target>  
  60.                 </configuration>  
  61.             </plugin>  
  62.         
  63.              <!-- 配置打包依赖包maven-assembly-plugin -->
  64. <plugin>
  65. <artifactId> maven-assembly-plugin </artifactId>
  66. <configuration>
  67. <descriptorRefs>
  68. <descriptorRef>jar-with-dependencies</descriptorRef>
  69. </descriptorRefs>
  70. <archive>
  71. <manifest>
  72. <mainClass></mainClass>
  73. </manifest>
  74. </archive>
  75. </configuration>
  76. <executions>
  77. <execution>
  78. <id>make-assembly</id>
  79. <phase>package</phase>
  80. <goals>
  81. <goal>assembly</goal>
  82. </goals>
  83. </execution>
  84. </executions>
  85. </plugin>
  86. </plugins>
  87. </build>
  88. </project>

五、本地开发spark代码上传spark集群服务并运行

JavaDirectKafkaCompare.java

 
  1. package com.spark.main;
  2.  
  3. import java.util.HashMap;
  4. import java.util.HashSet;
  5. import java.util.Arrays;
  6. import java.util.Iterator;
  7. import java.util.Map;
  8. import java.util.Set;
  9. import java.util.regex.Pattern;
  10.  
  11. import scala.Tuple2;
  12. import kafka.serializer.StringDecoder;
  13.  
  14. import org.apache.spark.SparkConf;
  15. import org.apache.spark.api.java.function.*;
  16. import org.apache.spark.streaming.api.java.*;
  17. import org.apache.spark.streaming.kafka.KafkaUtils;
  18. import org.apache.spark.streaming.Durations;
  19.  
  20. public class JavaDirectKafkaCompare {
  21.  
  22. public static void main(String[] args) throws Exception {
  23. /**
  24.  * setMaster("local[2]"),至少要指定两个线程,一条用于用于接收消息,一条线程用于处理消息
  25.  *  Durations.seconds(2)每两秒读取一次kafka
  26.  */
  27.     SparkConf sparkConf = new SparkConf().setAppName("JavaDirectKafkaWordCount").setMaster("local[2]");
  28.     JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2));
  29.     /**
  30.      * checkpoint("hdfs://192.168.168.200:9000/checkpoint")防止数据丢包
  31.      */
  32.     jssc.checkpoint("hdfs://192.168.168.200:9000/checkpoint");
  33.     /**
  34.      * 配置连接kafka的相关参数      
  35.      */
  36.     Set<String> topicsSet = new HashSet<>(Arrays.asList("test"));
  37.     Map<String, String> kafkaParams = new HashMap<>();
  38.     kafkaParams.put("metadata.broker.list", "192.168.168.200:9092");
  39.  
  40.     // Create direct kafka stream with brokers and topics
  41.     JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
  42.         jssc,
  43.         String.class,
  44.         String.class,
  45.         StringDecoder.class,
  46.         StringDecoder.class,
  47.         kafkaParams,
  48.         topicsSet
  49.     );
  50.  
  51.     // Get the lines, split them into words, count the words and print
  52.     /**
  53.      * _2()获取第二个对象的值
  54.      */
  55.     JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
  56.       @Override
  57.       public String call(Tuple2<String, String> tuple2) {
  58.         return tuple2._2();
  59.       }
  60.     });
  61.     
  62.    String sfzh = "432922196105276721";
  63.    JavaDStream<String> wordCounts = lines.filter(new Function<String, Boolean>(){
  64. @Override
  65. public Boolean call(String s) throws Exception {
  66. // TODO Auto-generated method stub
  67. /**
  68.  * 通过身份证号筛选出相关数据
  69.  */
  70. if(s.contains(sfzh)){
  71. System.out.println("比对出来的结果:" + s);
  72. return true;
  73. }
  74. return false;
  75. }
  76.    });
  77.    wordCounts.print();
  78.     // Start the computation
  79.     jssc.start();
  80.     jssc.awaitTermination();
  81. }
  82.  
  83. }

右键Run As ------>Maven install,运行成功之后,会在target目录生成一个test-0.0.1-SNAPSHOT-jar-with-dependencies.jar,把该jar包复制到LInux集群环境下的SPARK_HOME/myApp目录下:

执行命令:

 
  1. cd /usr/local/spark/spark-2.1.1-bin-hadoop2.7;
  2. bin/spark-submit --class "com.spark.main.JavaDirectKafkaCompare" --master local[4] myApp/test-0.0.1-SNAPSHOT-jar-with-dependencies.jar;

六、附上离线Maven仓库

 下载地址:  链接:http://pan.baidu.com/s/1eS7Ywme 密码:y3qz

© 著作权归作者所有

共有 人打赏支持
四叶草666
粉丝 0
博文 51
码字总数 50778
作品 0
深圳
程序员
snakerflow/snaker-springmvc

Introduction Snaker-SpringMVC项目主要是基于springMVC、spring3、hibernate3、snaker框架整合的一个最基本的流程管理模块,方便大家轻松地完成流程引擎的整合 ###整合步骤 ####1).依赖包整...

snakerflow
2014/11/30
0
0
vSphere 5.5 VM整合磁盘失败之—文件被锁定无法访问

vSphere 5.5 VM整合磁盘失败之—文件被锁定无法访问 环境:vSPhere 5.5u3,虚机使用EMC的networker备份 问题现象:在vc上发现,晚上经过networker的备份之后,虚机提示需要整合磁盘 解决前相...

Makka_Pakka
07/06
0
0
白俊遥/thinkphp-bjyadmin

创建 QQ 群及捐赠渠道 链接 博客:http://baijunyao.com github:https://github.com/baijunyao/thinkphp-bjyadmin oschina:http://git.oschina.net/shuaibai123/thinkphp-bjyadmin 简介 使......

白俊遥
2016/06/30
0
0
struts + spring + hibernate 不太理解这种搭配,请指点一下。

struts + spring + hibernate 整合这是到底什么意思嘛?struts 是一个java的web开发框架,spring也是,为什么很多时候总是struts+spring ,是不是把这两个框架整合?如何整合开发? 使用php...

hstaewg
2015/08/29
181
5
ThinkPHP 常用功能和 SDK 合集--thinkphp-bjyadmin

简介 使用 thinkphp 开发项目的过程中把一些常用的功能或者第三方 sdk 整合好;开源供亲们参考; 这些都是经过线上运营考验的;无毒害可以免费放心折腾使用;只要不会某一天找到我说因为借鉴了...

白俊遥
2017/09/01
1K
2
ZHENFENGSHISAN/perfect-ssm

Quick Start 项目简介 ssm系列 ssm-demo:Spring+SpringMVC+Mybatis+easyUI整合 perfect-ssm:RESTful API+redis缓存 ssm-cluster:前后端分离+集群部署 ssm-dubbo:dubbo服务化 ssm-micro-se......

ZHENFENGSHISAN
2017/09/18
0
0
【MyBatis框架】mybatis和spring整合

spring和mybatis整合 1.整合思路 需要spring通过单例方式管理SqlSessionFactory。 spring和mybatis整合生成代理对象,使用SqlSessionFactory创建SqlSession。(spring和mybatis整合自动完成)...

Mysoft
2015/09/21
75
0
Confluence 6 数据库整合的限制

数据库整合的限制 注意: Confluence 自带的 XML 方式导出方法并不适用于备份和整合大数据集。这里有一些第三方的数据库工具你可以使用能够帮助你对大数据集进行备份和整合。如果你在选择正确...

honeymose
06/05
0
0
Mozilla:Linux 版火狐 46.0 浏览器将集成 GTK3

稿源:cnbeta 目前Linux版本Firefox 45.0浏览器,未能整合GTK3。现在我们获悉,Mozilla计划在Firefox 46.0浏览器当中进行整合。目前,Mozilla已经在测试Firefox 46.0 beta浏览器,正式版将在...

oschina
2016/03/13
2.2K
10
SpringMVC,mybatis,spring整合

需求:使用SpringMVC和mybatis完成商品列表查询 一 整合思路 1 相关jar包(maven) 2 整合dao层 mybatis和springmvc整合,通过spring管理mapper接口 使用mapper的扫描器自动扫描mapper接口在...

Bbigbug
04/25
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

about git flow

  昨天元芳做了git分支管理规范的分享,为了拓展大家关于git分支的认知,这里我特意再分享这两个关于git flow的链接,大家可以看一下。 Git 工作流程 Git分支管理策略   git flow本质上是...

qwfys
今天
2
0
Linux系统日志文件

/var/log/messages linux系统总日志 /etc/logrotate.conf 日志切割配置文件 参考https://my.oschina.net/u/2000675/blog/908189 dmesg命令 dmesg’命令显示linux内核的环形缓冲区信息,我们可...

chencheng-linux
今天
1
0
MacOS下给树莓派安装Raspbian系统

下载镜像 前往 树莓派官网 下载镜像。 点击 最新版Raspbian 下载最新版镜像。 下载后请,通过 访达 双击解压,或通过 unzip 命令解压。 检查下载的文件 ls -lh -rw-r--r-- 1 dingdayu s...

dingdayu
今天
1
0
spring boot使用通用mapper(tk.mapper) ,id自增和回显等问题

最近项目使用到tk.mapper设置id自增,数据库是mysql。在使用通用mapper主键生成过程中有一些问题,在总结一下。 1、UUID生成方式-字符串主键 在主键上增加注解 @Id @GeneratedValue...

北岩
今天
2
0
告警系统邮件引擎、运行告警系统

告警系统邮件引擎 cd mail vim mail.py #!/usr/bin/env python#-*- coding: UTF-8 -*-import os,sysreload(sys)sys.setdefaultencoding('utf8')import getoptimport smtplibfr......

Zhouliang6
今天
1
0
Java工具类—随机数

Java中常用的生成随机数有Math.random()方法及java.util.Random类.但他们生成的随机数都是伪随机的. Math.radom()方法 在jdk1.8的Math类中可以看到,Math.random()方法实际上就是调用Random类...

PrivateO2
今天
3
0
关于java内存模型、并发编程的好文

Java并发编程:volatile关键字解析    volatile这个关键字可能很多朋友都听说过,或许也都用过。在Java 5之前,它是一个备受争议的关键字,因为在程序中使用它往往会导致出人意料的结果。在...

DannyCoder
昨天
1
0
dubbo @Reference retries 重试次数 一个坑

在代码一中设置 成retries=0,也就是调用超时不用重试,结果DEBUG的时候总是重试,不是0吗,0就不用重试啊。为什么还是调用了多次呢? 结果在网上看到 这篇文章才明白 https://www.cnblogs....

奋斗的小牛
昨天
2
0
数据结构与算法3

要抓紧喽~~~~~~~放羊的孩纸回来喽 LowArray类和LowArrayApp类 程序将一个普通的Java数组封装在LowArray类中。类中的数组隐藏了起来,它是私有的,所以只有类自己的方法才能访问他。 LowArray...

沉迷于编程的小菜菜
昨天
1
0
spring boot应用测试框架介绍

一、spring boot应用测试存在的问题 官方提供的测试框架spring-boot-test-starter,虽然提供了很多功能(junit、spring test、assertj、hamcrest、mockito、jsonassert、jsonpath),但是在数...

yangjianzhou
昨天
8
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部