文档章节

Java连接kafka,并将数据写入hdfs

G
 GygesM
发布于 2017/04/27 16:17
字数 123
阅读 264
收藏 0
 public static void main(String[] args) throws IOException {
        Properties props = new Properties();
        //set brokers
        Gson gson = new Gson();
        props.put("bootstrap.servers", "139.198.XXX.XXX:9092,139.198.XXX.XXX:9093,139.198.XXX.XXX:9094");
        props.put("group.id", "bitautoHDFS");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList(topicName));
        BitautoEntity bitautoEntity = null;
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                try {
//                    写为entity
                    bitautoEntity = gson.fromJson(record.value(), BitautoEntity.class);
//                    将Entity转换为json
                    String result = gson.toJson(bitautoEntity);
//                    写入大数据平台
                    WriteContent.write2hdfs(result, hdfs_path);
                    System.out.println("推送数据成功!");
                } catch (Exception ex) {
                    System.out.print(ex.getMessage());
                }
            }
        }
    }

 

© 著作权归作者所有

下一篇: 解决跨域问题
G

GygesM

粉丝 0
博文 17
码字总数 1027
作品 0
私信 提问
阿里年薪50WJAVA工程师转大数据学习路线!

大数据有两个方向,一个是偏计算机的,另一个是偏经济的。你学过Java,所以你可以偏将计算机的。 Java程序员想转大数据可行吗?Java是全世界使用人数最多的编程语言。不少程序员选择Java做为...

JAVA丶学习
2018/04/25
0
0
Kafka对Java程序员有多重要?连阿里都在用它处理亿万级数据统计

一.了解淘宝Kafka架构 在ActiveMQ、RabbitMQ、RocketMQ、Kafka消息中间件之间,我们为什么要选择Kafka?下面详细介绍一下,2012年9月份我在支付宝做余额宝研发,2013年6月支付宝正式推出余额...

Java架构
02/17
0
0
Spark实战(一)SparkStreaming集成Kafka

Spark Streaming + Kafka集成指南 Kafka项目在版本0.8和0.10之间引入了一个新的消费者API,因此有两个独立的相应Spark Streaming包可用。请选择正确的包, 请注意,0.8集成与后来的0.9和0.1...

FrankDeng
2018/07/15
0
0
phoenix存储sparkstreaming数据报java.lang.NullPointerException错误

版本环境 Spark: Spark version 2.2.0 Phoenix: APACHEPHOENIX-4.14.0-cdh5.14.2 HBase: Version 1.2.0-cdh5.14.4 JDK: java version "1.8.074" Java(TM) SE Runtime Environment (build 1.8......

lninic
04/30
0
0
Kafka连接器深度解读之JDBC源连接器

在现实业务中,Kafka经常会遇到的一个集成场景就是,从数据库获取数据,因为关系数据库是一个非常丰富的事件源。数据库中的现有数据以及对该数据的任何更改都可以流式传输到Kafka主题中,在这...

李玉珏
03/12
1K
4

没有更多内容

加载失败,请刷新页面

加载更多

Taro 兼容 h5 踩坑指南

最近一周在改造 公羊阅读🐏 Taro 版本适配 h5 端,过程中改改补补,好不酸爽。 本文记录📝遇到的问题,希望为有相同需求的哥们👬节约点时间。 Taro 版本:1.3.9。 client_mobile_taro...

dkvirus
今天
4
0
Spring boot 静态资源访问

0. 两个配置 spring.mvc.static-path-patternspring.resources.static-locations 1. application中需要先行的两个配置项 1.1 spring.mvc.static-path-pattern 这个配置项是告诉springboo......

moon888
今天
3
0
hash slot(虚拟桶)

在分布式集群中,如何保证相同请求落到相同的机器上,并且后面的集群机器可以尽可能的均分请求,并且当扩容或down机的情况下能对原有集群影响最小。 round robin算法:是把数据mod后直接映射...

李朝强
今天
4
0
Kafka 原理和实战

本文首发于 vivo互联网技术 微信公众号 https://mp.weixin.qq.com/s/bV8AhqAjQp4a_iXRfobkCQ 作者简介:郑志彬,毕业于华南理工大学计算机科学与技术(双语班)。先后从事过电子商务、开放平...

vivo互联网技术
今天
19
0
java数据类型

基本类型: 整型:Byte,short,int,long 浮点型:float,double 字符型:char 布尔型:boolean 引用类型: 类类型: 接口类型: 数组类型: Byte 1字节 八位 -128 -------- 127 short 2字节...

audience_1
今天
9
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部