文档章节

并行消费kafka存放HDFS

刀锋
 刀锋
发布于 2017/07/20 11:33
字数 848
阅读 51
收藏 0

KafkaConsumer  

import com.alibaba.fastjson.JSONObject;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.message.MessageAndMetadata;
import kafka.serializer.StringDecoder;
import kafka.utils.VerifiableProperties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
 
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
 
 
public class KafkaConsumer implements Runnable{
    public static FileSystem fs ;
    public static Properties properties() throws IOException {
        Properties properties_kafkainfo=new Properties();
        InputStream in = new FileInputStream("kafkainfo.properties");
        properties_kafkainfo.load(in);
        return properties_kafkainfo;
    }
    public static Properties properties_topic() throws IOException {
        Properties properties_kafkatopic=new Properties();
        InputStream in = new FileInputStream("topic.properties");
        properties_kafkatopic.load(in);
        return properties_kafkatopic;
    }
    private final kafka.javaapi.consumer.ConsumerConnector consumer;
    public KafkaConsumer() throws IOException {
        Properties props = new Properties();
        //props.put("zookeeper.connect", "10.20.30.91:2181,10.20.30.92:2181,10.20.30.93:2181");
        props.put("zookeeper.connect", properties().getProperty("zookeeper_connect"));
        props.put("group.id", properties().getProperty("group"));
        props.put("zookeeper.session.timeout.ms", properties().getProperty("session_timeout_ms"));
        props.put("zookeeper.sync.time.ms", properties().getProperty("zookeeper_sync_time_ms"));
        props.put("auto.commit.interval.ms", properties().getProperty("auto_commit_interval_ms"));
        props.put("auto.commit.enable",properties().getProperty("auto_commit_enable"));
        props.put("auto.offset.reset", properties().getProperty("auto_offset_reset")); //largest smallest
        props.put("serializer.class", properties().getProperty("serializer_class"));
        ConsumerConfig config = new ConsumerConfig(props);
        consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config);
    }
    public void run() {
 
 
        final int numThreads = 6;
        final Iterator<string> topic;
        try {
            topic = properties_topic().stringPropertyNames().iterator();
            Map<string, integer=""> topicCountMap = new HashMap<string, integer="">();
            List<string> topicList = new ArrayList<string>();
 
            while(topic.hasNext()){
                final String key_topic = topic.next();
                topicList.add(key_topic);
                topicCountMap.put(key_topic, new Integer(numThreads));
            }
            StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
            StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());
            final Map<string, string="">>> consumerMap =
                    consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder);
            //循环是遍历配置文件中的topic数量,多少个topic 有多少个并行
          for(int i=0;i<topiclist.size();i++) final="" key_topic1="topicList.get(i);" new="" override="" public="" string="" void="">> stream = consumerMap.get(key_topic1);
                                ExecutorService executor = Executors.newFixedThreadPool(numThreads);
                                int threadNumber = 0;
                                for (final KafkaStream streams : stream) {
                                    executor.submit(new KafkaConsumerThread(streams, threadNumber,key_topic1));
                                    threadNumber++;
                                }
 
                            }
                        }).start();
 
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
 
    public static void main(String[] args) {
        try {
            Thread t = new Thread(new KafkaConsumer());
            t.start();
        } catch (IOException e) {
            e.printStackTrace();
        }
 
    }
}
</topiclist.size();i++)></string,></string></string></string,></string,></string>

KafkaConsumerThread  

import com.alibaba.fastjson.JSONObject;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.message.MessageAndMetadata;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.*;
import java.net.URI;
import java.net.URISyntaxException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
 
 
public class KafkaConsumerThread implements Runnable{
 
    private KafkaStream stream;
    private int topicNum;
    private String topic;
    FSDataOutputStream outputStream = null;
    public KafkaConsumerThread(KafkaStream stream, int topicNum,String topic) {
        this.stream = stream;
        this.topicNum = topicNum;
        this.topic = topic;
 
    }
    //消费出来写入HDFS
  public void run() {
        ConsumerIterator<string,string> it = stream.iterator();
        while (it.hasNext()){
            Path path = null;
            MessageAndMetadata<string,string> m = it.next();
            JSONObject jsStr = JSONObject.parseObject(m.message());
            String dateTime = new SimpleDateFormat("yyyyMMddHH").format(jsStr.get("createTime"));
            String hdfs ="hdfs://10.20.30.91:8020/kafka/";
            String name = dateTime + ".log";
            String filename = hdfs+topic+name;
            FileWrapper writer = getFileByName(filename,path);
            System.out.println("消费:"+jsStr.toString());
            writer.write(jsStr.toString());
        }
    }
    private static class FileWrapper {
        Configuration conf = new Configuration();
        private String filename;
        Path filePath;
        FileSystem hdfs= null;
        private AtomicInteger calc = new AtomicInteger();
        private long starttime=new Date().getTime();
        private FSDataOutputStream outputStream;
        public FileWrapper(String filename,Path filePath) {
            conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
            conf.set("fs.file.impl",org.apache.hadoop.fs.LocalFileSystem.class.getName());
            conf.setBoolean("dfs.support.append",true);
            this.filename = filename;
            this.filePath=filePath;
            try {
                hdfs = FileSystem.get(new URI("hdfs://10.20.30.91:8020"),conf);
                outputStream = hdfs.create(filePath);
            } catch (IOException e) {
                e.printStackTrace();
            } catch (URISyntaxException e) {
                e.printStackTrace();
            }
        }
        //FSDataOutputStream outputStream = hdfs.create(filedfs);
        public void write (String txt){
            try {
                String txt2=txt+"\n";
                outputStream.write(txt2.getBytes("UTF-8"));
                System.out.println("txt:========"+txt2.getBytes("UTF-8"));
                outputStream.flush();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        public void close(){
            try {
                outputStream.close();
                hdfs.close();
            } catch (IOException e) {
            }
        }
 
    }
    static Map<string,filewrapper> fileMap=new ConcurrentHashMap<>();
    FileWrapper getFileByName(String filename,Path filePath){
        FileWrapper bufferedWriter = fileMap.get(filename);
        if(bufferedWriter!=null){
            return bufferedWriter;
        }
        synchronized (fileMap){
            bufferedWriter=fileMap.get(filename);
            if(bufferedWriter!=null){
                return bufferedWriter;
            }
            filePath = new Path(filename);
            FSDataOutputStream outputStream = null;
            System.out.println("file.name==========="+filename);
            FileWrapper result = new FileWrapper(filename,filePath);
            if(result==null){
                new Throwable("result为空!");
            }
            fileMap.put(filename,result);
            return result;
        }
 
    }
    static void flushData(){
        long nowtime = new Date().getTime();
        synchronized (fileMap){
            for (FileWrapper fileWrapper : fileMap.values()) {
                try {
                    fileWrapper.outputStream.flush();
                } catch (IOException e) {
                    e.printStackTrace();
                }
                if(nowtime-fileWrapper.starttime>2*60*60*1000){
                        fileMap.remove(fileWrapper.filename);
                        fileWrapper.close();
                    }
 
            }
        }
    }
 
    static {
        Timer timer = new Timer("flush-task", false);
        timer.schedule(new TimerTask() {
            @Override
            public void run() {
                flushData();
            }
        },60000,60000);
    }
 
 
}
 
</string,filewrapper></string,string></string,string>

kafka配置信息

zookeeper_connect=ttye_kafka-001:2181,ttye_kafka-002:2181,ttye_kafka-003:2181
group=tvm_hdfs_01
session_timeout_ms=4000
zookeeper_sync_time_ms=200
auto_commit_interval_ms=1000
auto_commit_enable=true
auto_offset_reset=smallest
serializer_class=kafka.serializer.StringEncoder

topic信息

app_send_welfare_ana=app_send_welfare_ana
app_balance_welfare_ana=app_balance_welfare_ana

gradle

buildscript {
    repositories {
        maven {
            url "https://plugins.gradle.org/m2/"
        }
    }
    dependencies {
        classpath "com.github.jengelman.gradle.plugins:shadow:1.2.4"
    }
}
group 'kafkatohdfs'
version '1.0-SNAPSHOT'
 
apply plugin: 'com.github.johnrengelman.shadow'
apply plugin: 'java'
apply plugin: 'idea'
apply plugin:'application'
 
[compileJava, compileTestJava, javadoc]*.options*.encoding = 'UTF-8'
sourceCompatibility = 1.7
targetCompatibility=1.7
mainClassName = "com.tvm.KafkaConsumer"
repositories {
    mavenLocal()
    mavenCentral()
    jcenter()
    maven {
        url = 'http://115.159.154.56:8081/nexus/content/groups/public'
    }
}
 
dependencies {
    testCompile group: 'junit', name: 'junit', version: '4.11'
    compile "org.apache.kafka:kafka_2.10:0.8.2.2"
    compile group: 'com.alibaba', name: 'fastjson', version: '1.2.21'
    compile "org.projectlombok:lombok:1.16.6"
    compile "org.apache.logging.log4j:log4j-api:2.3"
    compile "org.apache.logging.log4j:log4j-core:2.3"
    //hadoop
    compile group: 'org.apache.hadoop', name: 'hadoop-hdfs', version: '2.7.3'
    compile group: 'org.apache.hadoop', name: 'hadoop-common', version: '2.7.3'
    compile group: 'org.apache.hadoop', name: 'hadoop-client', version: '2.7.3'
 
    fileTree(dir: 'lib', include: '*.jar')
}

使用gradle shadowJar 进行打包

在linux运行:

后台运行:

nohup java -jar KafkaConsumer_log-1.0-SNAPSHOT.jar > /dev/null 2>&1 &

本文转载自:http://blog.csdn.net/qq_28767763/article/details/65445021

共有 人打赏支持
刀锋
粉丝 2
博文 399
码字总数 302397
作品 0
济南
程序员
私信 提问
Spark-Streaming获取kafka数据的两种方式-Receiver与Direct的方式

Spark-Streaming获取kafka数据的两种方式-Receiver与Direct的方式,可以从代码中简单理解成Receiver方式是通过zookeeper来连接kafka队列,Direct方式是直接连接到kafka的节点上获取数据了。 ...

张欢19933
2017/06/19
0
0
spark读取kafka数据流

spark读取kafka数据流提供了两种方式createDstream和createDirectStream。 两者区别如下: 1、KafkaUtils.createDstream 构造函数为KafkaUtils.createDstream(ssc, [zk], [consumer group i......

恶魔苏醒ing
2017/06/07
0
0
扩展Apache Kylin流数据源以对接阿里云LogHub的实践

前言 Apache Kylin 从1.6开始支持流式数据作为数据源,可以直接消费 Apache Kafka 的数据进行 Cube 构建,从而实现了在流数据上的近实时亚秒级 SQL 分析。 Apache Kafka 是一种高吞吐量的分布...

ApacheKylin
10/26
0
0
RocketMQ与Kafka对比

RocketMQ与Kafka对比(18项差异) 淘宝内部的交易系统使用了淘宝自主研发的Notify消息中间件,使用Mysql作为消息存储媒介,可完全水平扩容,为了进一步降低成本,我们认为存储部分可以进一步...

莫问viva
2015/05/08
0
0
RocketMQ与Kafka对比(18项差异)

转自:https://github.com/alibaba/RocketMQ/wiki/rmqvskafka 淘宝内部的交易系统使用了淘宝自主研发的Notify消息中间件,使用Mysql作为消息存储媒介,可完全水平扩容,为了进一步降低成本,...

洋哥6
2016/02/29
106
0

没有更多内容

加载失败,请刷新页面

加载更多

Ubuntu18.04 安装MySQL

1.安装MySQL sudo apt-get install mysql-server 2.配置MySQL sudo mysql_secure_installation 3.设置MySQL非root用户 设置原因:配置过程为系统root权限,在构建MySQL连接时出现错误:ERROR...

AI_SKI
今天
2
0
3.6 rc脚本(start方法) 3.7 rc脚本(stop和status方法) 3.8 rc脚本(以daemon方式启动)

3.6-3.7 rc脚本(start、stop和status方法) #!/usr/bin/env python# -*- coding: utf-8 -*-# [@Version](https://my.oschina.net/u/931210) : python 2.7# [@Time](https://my.oschina.......

隐匿的蚂蚁
今天
3
0
Cnn学习相关博客

CNN卷积神经网络原理讲解+图片识别应用(附源码) 笨方法学习CNN图像识别系列 深度学习图像识别项目(中):Keras和卷积神经网络(CNN) 卷积神经网络模型部署到移动设备 使用CNN神经网络进行...

-九天-
昨天
4
0
flutter 底部输入框 聊天输入框 Flexible

想在页面底部放个输入框,结果键盘一直遮住了,原来是布局问题 Widget build(BuildContext context) { return Scaffold( appBar: AppBar( title: Text("评论"), ...

大灰狼wow
昨天
4
0
Kernel I2C子系统

备注:所有图片来源于网络 1,I2C协议: 物理拓扑: I2C总线由两根信号线组成,一条是时钟信号线SCL,一条是数据信号线SDA。一条I2C总线可以接多个设备,每个设备都接入I2C总线的SCL和SDA。I...

yepanl
昨天
5
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部