文档章节

并行消费kafka存放HDFS

qingzhongli
 qingzhongli
发布于 2017/07/20 11:33
字数 848
阅读 348
收藏 0

KafkaConsumer  

import com.alibaba.fastjson.JSONObject;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.message.MessageAndMetadata;
import kafka.serializer.StringDecoder;
import kafka.utils.VerifiableProperties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
 
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
 
 
public class KafkaConsumer implements Runnable{
    public static FileSystem fs ;
    public static Properties properties() throws IOException {
        Properties properties_kafkainfo=new Properties();
        InputStream in = new FileInputStream("kafkainfo.properties");
        properties_kafkainfo.load(in);
        return properties_kafkainfo;
    }
    public static Properties properties_topic() throws IOException {
        Properties properties_kafkatopic=new Properties();
        InputStream in = new FileInputStream("topic.properties");
        properties_kafkatopic.load(in);
        return properties_kafkatopic;
    }
    private final kafka.javaapi.consumer.ConsumerConnector consumer;
    public KafkaConsumer() throws IOException {
        Properties props = new Properties();
        //props.put("zookeeper.connect", "10.20.30.91:2181,10.20.30.92:2181,10.20.30.93:2181");
        props.put("zookeeper.connect", properties().getProperty("zookeeper_connect"));
        props.put("group.id", properties().getProperty("group"));
        props.put("zookeeper.session.timeout.ms", properties().getProperty("session_timeout_ms"));
        props.put("zookeeper.sync.time.ms", properties().getProperty("zookeeper_sync_time_ms"));
        props.put("auto.commit.interval.ms", properties().getProperty("auto_commit_interval_ms"));
        props.put("auto.commit.enable",properties().getProperty("auto_commit_enable"));
        props.put("auto.offset.reset", properties().getProperty("auto_offset_reset")); //largest smallest
        props.put("serializer.class", properties().getProperty("serializer_class"));
        ConsumerConfig config = new ConsumerConfig(props);
        consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config);
    }
    public void run() {
 
 
        final int numThreads = 6;
        final Iterator<string> topic;
        try {
            topic = properties_topic().stringPropertyNames().iterator();
            Map<string, integer=""> topicCountMap = new HashMap<string, integer="">();
            List<string> topicList = new ArrayList<string>();
 
            while(topic.hasNext()){
                final String key_topic = topic.next();
                topicList.add(key_topic);
                topicCountMap.put(key_topic, new Integer(numThreads));
            }
            StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
            StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());
            final Map<string, string="">>> consumerMap =
                    consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder);
            //循环是遍历配置文件中的topic数量,多少个topic 有多少个并行
          for(int i=0;i<topiclist.size();i++) final="" key_topic1="topicList.get(i);" new="" override="" public="" string="" void="">> stream = consumerMap.get(key_topic1);
                                ExecutorService executor = Executors.newFixedThreadPool(numThreads);
                                int threadNumber = 0;
                                for (final KafkaStream streams : stream) {
                                    executor.submit(new KafkaConsumerThread(streams, threadNumber,key_topic1));
                                    threadNumber++;
                                }
 
                            }
                        }).start();
 
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
 
    public static void main(String[] args) {
        try {
            Thread t = new Thread(new KafkaConsumer());
            t.start();
        } catch (IOException e) {
            e.printStackTrace();
        }
 
    }
}
</topiclist.size();i++)></string,></string></string></string,></string,></string>

KafkaConsumerThread  

import com.alibaba.fastjson.JSONObject;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.message.MessageAndMetadata;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.*;
import java.net.URI;
import java.net.URISyntaxException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
 
 
public class KafkaConsumerThread implements Runnable{
 
    private KafkaStream stream;
    private int topicNum;
    private String topic;
    FSDataOutputStream outputStream = null;
    public KafkaConsumerThread(KafkaStream stream, int topicNum,String topic) {
        this.stream = stream;
        this.topicNum = topicNum;
        this.topic = topic;
 
    }
    //消费出来写入HDFS
  public void run() {
        ConsumerIterator<string,string> it = stream.iterator();
        while (it.hasNext()){
            Path path = null;
            MessageAndMetadata<string,string> m = it.next();
            JSONObject jsStr = JSONObject.parseObject(m.message());
            String dateTime = new SimpleDateFormat("yyyyMMddHH").format(jsStr.get("createTime"));
            String hdfs ="hdfs://10.20.30.91:8020/kafka/";
            String name = dateTime + ".log";
            String filename = hdfs+topic+name;
            FileWrapper writer = getFileByName(filename,path);
            System.out.println("消费:"+jsStr.toString());
            writer.write(jsStr.toString());
        }
    }
    private static class FileWrapper {
        Configuration conf = new Configuration();
        private String filename;
        Path filePath;
        FileSystem hdfs= null;
        private AtomicInteger calc = new AtomicInteger();
        private long starttime=new Date().getTime();
        private FSDataOutputStream outputStream;
        public FileWrapper(String filename,Path filePath) {
            conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
            conf.set("fs.file.impl",org.apache.hadoop.fs.LocalFileSystem.class.getName());
            conf.setBoolean("dfs.support.append",true);
            this.filename = filename;
            this.filePath=filePath;
            try {
                hdfs = FileSystem.get(new URI("hdfs://10.20.30.91:8020"),conf);
                outputStream = hdfs.create(filePath);
            } catch (IOException e) {
                e.printStackTrace();
            } catch (URISyntaxException e) {
                e.printStackTrace();
            }
        }
        //FSDataOutputStream outputStream = hdfs.create(filedfs);
        public void write (String txt){
            try {
                String txt2=txt+"\n";
                outputStream.write(txt2.getBytes("UTF-8"));
                System.out.println("txt:========"+txt2.getBytes("UTF-8"));
                outputStream.flush();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        public void close(){
            try {
                outputStream.close();
                hdfs.close();
            } catch (IOException e) {
            }
        }
 
    }
    static Map<string,filewrapper> fileMap=new ConcurrentHashMap<>();
    FileWrapper getFileByName(String filename,Path filePath){
        FileWrapper bufferedWriter = fileMap.get(filename);
        if(bufferedWriter!=null){
            return bufferedWriter;
        }
        synchronized (fileMap){
            bufferedWriter=fileMap.get(filename);
            if(bufferedWriter!=null){
                return bufferedWriter;
            }
            filePath = new Path(filename);
            FSDataOutputStream outputStream = null;
            System.out.println("file.name==========="+filename);
            FileWrapper result = new FileWrapper(filename,filePath);
            if(result==null){
                new Throwable("result为空!");
            }
            fileMap.put(filename,result);
            return result;
        }
 
    }
    static void flushData(){
        long nowtime = new Date().getTime();
        synchronized (fileMap){
            for (FileWrapper fileWrapper : fileMap.values()) {
                try {
                    fileWrapper.outputStream.flush();
                } catch (IOException e) {
                    e.printStackTrace();
                }
                if(nowtime-fileWrapper.starttime>2*60*60*1000){
                        fileMap.remove(fileWrapper.filename);
                        fileWrapper.close();
                    }
 
            }
        }
    }
 
    static {
        Timer timer = new Timer("flush-task", false);
        timer.schedule(new TimerTask() {
            @Override
            public void run() {
                flushData();
            }
        },60000,60000);
    }
 
 
}
 
</string,filewrapper></string,string></string,string>

kafka配置信息

zookeeper_connect=ttye_kafka-001:2181,ttye_kafka-002:2181,ttye_kafka-003:2181
group=tvm_hdfs_01
session_timeout_ms=4000
zookeeper_sync_time_ms=200
auto_commit_interval_ms=1000
auto_commit_enable=true
auto_offset_reset=smallest
serializer_class=kafka.serializer.StringEncoder

topic信息

app_send_welfare_ana=app_send_welfare_ana
app_balance_welfare_ana=app_balance_welfare_ana

gradle

buildscript {
    repositories {
        maven {
            url "https://plugins.gradle.org/m2/"
        }
    }
    dependencies {
        classpath "com.github.jengelman.gradle.plugins:shadow:1.2.4"
    }
}
group 'kafkatohdfs'
version '1.0-SNAPSHOT'
 
apply plugin: 'com.github.johnrengelman.shadow'
apply plugin: 'java'
apply plugin: 'idea'
apply plugin:'application'
 
[compileJava, compileTestJava, javadoc]*.options*.encoding = 'UTF-8'
sourceCompatibility = 1.7
targetCompatibility=1.7
mainClassName = "com.tvm.KafkaConsumer"
repositories {
    mavenLocal()
    mavenCentral()
    jcenter()
    maven {
        url = 'http://115.159.154.56:8081/nexus/content/groups/public'
    }
}
 
dependencies {
    testCompile group: 'junit', name: 'junit', version: '4.11'
    compile "org.apache.kafka:kafka_2.10:0.8.2.2"
    compile group: 'com.alibaba', name: 'fastjson', version: '1.2.21'
    compile "org.projectlombok:lombok:1.16.6"
    compile "org.apache.logging.log4j:log4j-api:2.3"
    compile "org.apache.logging.log4j:log4j-core:2.3"
    //hadoop
    compile group: 'org.apache.hadoop', name: 'hadoop-hdfs', version: '2.7.3'
    compile group: 'org.apache.hadoop', name: 'hadoop-common', version: '2.7.3'
    compile group: 'org.apache.hadoop', name: 'hadoop-client', version: '2.7.3'
 
    fileTree(dir: 'lib', include: '*.jar')
}

使用gradle shadowJar 进行打包

在linux运行:

后台运行:

nohup java -jar KafkaConsumer_log-1.0-SNAPSHOT.jar > /dev/null 2>&1 &

本文转载自:http://blog.csdn.net/qq_28767763/article/details/65445021

qingzhongli
粉丝 3
博文 415
码字总数 324547
作品 0
济南
程序员
私信 提问
Spark-Streaming获取kafka数据的两种方式-Receiver与Direct的方式

Spark-Streaming获取kafka数据的两种方式-Receiver与Direct的方式,可以从代码中简单理解成Receiver方式是通过zookeeper来连接kafka队列,Direct方式是直接连接到kafka的节点上获取数据了。 ...

张欢19933
2017/06/19
155
0
spark读取kafka数据流

spark读取kafka数据流提供了两种方式createDstream和createDirectStream。 两者区别如下: 1、KafkaUtils.createDstream 构造函数为KafkaUtils.createDstream(ssc, [zk], [consumer group i......

恶魔苏醒ing
2017/06/07
0
0
扩展Apache Kylin流数据源以对接阿里云LogHub的实践

前言 Apache Kylin 从1.6开始支持流式数据作为数据源,可以直接消费 Apache Kafka 的数据进行 Cube 构建,从而实现了在流数据上的近实时亚秒级 SQL 分析。 Apache Kafka 是一种高吞吐量的分布...

ApacheKylin
2018/10/26
1
0
RocketMQ与Kafka对比

RocketMQ与Kafka对比(18项差异) 淘宝内部的交易系统使用了淘宝自主研发的Notify消息中间件,使用Mysql作为消息存储媒介,可完全水平扩容,为了进一步降低成本,我们认为存储部分可以进一步...

莫问viva
2015/05/08
530
0
RocketMQ与Kafka对比(18项差异)

转自:https://github.com/alibaba/RocketMQ/wiki/rmqvskafka 淘宝内部的交易系统使用了淘宝自主研发的Notify消息中间件,使用Mysql作为消息存储媒介,可完全水平扩容,为了进一步降低成本,...

洋哥6
2016/02/29
224
0

没有更多内容

加载失败,请刷新页面

加载更多

OpenStack 简介和几种安装方式总结

OpenStack :是一个由NASA和Rackspace合作研发并发起的,以Apache许可证授权的自由软件和开放源代码项目。项目目标是提供实施简单、可大规模扩展、丰富、标准统一的云计算管理平台。OpenSta...

小海bug
29分钟前
3
0
DDD(五)

1、引言 之前学习了解了DDD中实体这一概念,那么接下来需要了解的就是值对象、唯一标识。值对象,值就是数字1、2、3,字符串“1”,“2”,“3”,值时对象的特征,对象是一个事物的具体描述...

MrYuZixian
今天
6
0
数据库中间件MyCat

什么是MyCat? 查看官网的介绍是这样说的 一个彻底开源的,面向企业应用开发的大数据库集群 支持事务、ACID、可以替代MySQL的加强版数据库 一个可以视为MySQL集群的企业级数据库,用来替代昂贵...

沉浮_
今天
4
0
解决Mac下VSCode打开zsh乱码

1.乱码问题 iTerm2终端使用Zsh,并且配置Zsh主题,该主题主题需要安装字体来支持箭头效果,在iTerm2中设置这个字体,但是VSCode里这个箭头还是显示乱码。 iTerm2展示如下: VSCode展示如下: 2...

HelloDeveloper
今天
6
0
常用物流快递单号查询接口种类及对接方法

目前快递查询接口有两种方式可以对接,一是和顺丰、圆通、中通、天天、韵达、德邦这些快递公司一一对接接口,二是和快递鸟这样第三方集成接口一次性对接多家常用快递。第一种耗费时间长,但是...

程序的小猿
今天
7
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部