文档章节

并行消费kafka存放HDFS

刀锋
 刀锋
发布于 2017/07/20 11:33
字数 848
阅读 33
收藏 0

KafkaConsumer  

import com.alibaba.fastjson.JSONObject;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.message.MessageAndMetadata;
import kafka.serializer.StringDecoder;
import kafka.utils.VerifiableProperties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
 
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
 
 
public class KafkaConsumer implements Runnable{
    public static FileSystem fs ;
    public static Properties properties() throws IOException {
        Properties properties_kafkainfo=new Properties();
        InputStream in = new FileInputStream("kafkainfo.properties");
        properties_kafkainfo.load(in);
        return properties_kafkainfo;
    }
    public static Properties properties_topic() throws IOException {
        Properties properties_kafkatopic=new Properties();
        InputStream in = new FileInputStream("topic.properties");
        properties_kafkatopic.load(in);
        return properties_kafkatopic;
    }
    private final kafka.javaapi.consumer.ConsumerConnector consumer;
    public KafkaConsumer() throws IOException {
        Properties props = new Properties();
        //props.put("zookeeper.connect", "10.20.30.91:2181,10.20.30.92:2181,10.20.30.93:2181");
        props.put("zookeeper.connect", properties().getProperty("zookeeper_connect"));
        props.put("group.id", properties().getProperty("group"));
        props.put("zookeeper.session.timeout.ms", properties().getProperty("session_timeout_ms"));
        props.put("zookeeper.sync.time.ms", properties().getProperty("zookeeper_sync_time_ms"));
        props.put("auto.commit.interval.ms", properties().getProperty("auto_commit_interval_ms"));
        props.put("auto.commit.enable",properties().getProperty("auto_commit_enable"));
        props.put("auto.offset.reset", properties().getProperty("auto_offset_reset")); //largest smallest
        props.put("serializer.class", properties().getProperty("serializer_class"));
        ConsumerConfig config = new ConsumerConfig(props);
        consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config);
    }
    public void run() {
 
 
        final int numThreads = 6;
        final Iterator<string> topic;
        try {
            topic = properties_topic().stringPropertyNames().iterator();
            Map<string, integer=""> topicCountMap = new HashMap<string, integer="">();
            List<string> topicList = new ArrayList<string>();
 
            while(topic.hasNext()){
                final String key_topic = topic.next();
                topicList.add(key_topic);
                topicCountMap.put(key_topic, new Integer(numThreads));
            }
            StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
            StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());
            final Map<string, string="">>> consumerMap =
                    consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder);
            //循环是遍历配置文件中的topic数量,多少个topic 有多少个并行
          for(int i=0;i<topiclist.size();i++) final="" key_topic1="topicList.get(i);" new="" override="" public="" string="" void="">> stream = consumerMap.get(key_topic1);
                                ExecutorService executor = Executors.newFixedThreadPool(numThreads);
                                int threadNumber = 0;
                                for (final KafkaStream streams : stream) {
                                    executor.submit(new KafkaConsumerThread(streams, threadNumber,key_topic1));
                                    threadNumber++;
                                }
 
                            }
                        }).start();
 
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
 
    public static void main(String[] args) {
        try {
            Thread t = new Thread(new KafkaConsumer());
            t.start();
        } catch (IOException e) {
            e.printStackTrace();
        }
 
    }
}
</topiclist.size();i++)></string,></string></string></string,></string,></string>

KafkaConsumerThread  

import com.alibaba.fastjson.JSONObject;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.message.MessageAndMetadata;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.*;
import java.net.URI;
import java.net.URISyntaxException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
 
 
public class KafkaConsumerThread implements Runnable{
 
    private KafkaStream stream;
    private int topicNum;
    private String topic;
    FSDataOutputStream outputStream = null;
    public KafkaConsumerThread(KafkaStream stream, int topicNum,String topic) {
        this.stream = stream;
        this.topicNum = topicNum;
        this.topic = topic;
 
    }
    //消费出来写入HDFS
  public void run() {
        ConsumerIterator<string,string> it = stream.iterator();
        while (it.hasNext()){
            Path path = null;
            MessageAndMetadata<string,string> m = it.next();
            JSONObject jsStr = JSONObject.parseObject(m.message());
            String dateTime = new SimpleDateFormat("yyyyMMddHH").format(jsStr.get("createTime"));
            String hdfs ="hdfs://10.20.30.91:8020/kafka/";
            String name = dateTime + ".log";
            String filename = hdfs+topic+name;
            FileWrapper writer = getFileByName(filename,path);
            System.out.println("消费:"+jsStr.toString());
            writer.write(jsStr.toString());
        }
    }
    private static class FileWrapper {
        Configuration conf = new Configuration();
        private String filename;
        Path filePath;
        FileSystem hdfs= null;
        private AtomicInteger calc = new AtomicInteger();
        private long starttime=new Date().getTime();
        private FSDataOutputStream outputStream;
        public FileWrapper(String filename,Path filePath) {
            conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
            conf.set("fs.file.impl",org.apache.hadoop.fs.LocalFileSystem.class.getName());
            conf.setBoolean("dfs.support.append",true);
            this.filename = filename;
            this.filePath=filePath;
            try {
                hdfs = FileSystem.get(new URI("hdfs://10.20.30.91:8020"),conf);
                outputStream = hdfs.create(filePath);
            } catch (IOException e) {
                e.printStackTrace();
            } catch (URISyntaxException e) {
                e.printStackTrace();
            }
        }
        //FSDataOutputStream outputStream = hdfs.create(filedfs);
        public void write (String txt){
            try {
                String txt2=txt+"\n";
                outputStream.write(txt2.getBytes("UTF-8"));
                System.out.println("txt:========"+txt2.getBytes("UTF-8"));
                outputStream.flush();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        public void close(){
            try {
                outputStream.close();
                hdfs.close();
            } catch (IOException e) {
            }
        }
 
    }
    static Map<string,filewrapper> fileMap=new ConcurrentHashMap<>();
    FileWrapper getFileByName(String filename,Path filePath){
        FileWrapper bufferedWriter = fileMap.get(filename);
        if(bufferedWriter!=null){
            return bufferedWriter;
        }
        synchronized (fileMap){
            bufferedWriter=fileMap.get(filename);
            if(bufferedWriter!=null){
                return bufferedWriter;
            }
            filePath = new Path(filename);
            FSDataOutputStream outputStream = null;
            System.out.println("file.name==========="+filename);
            FileWrapper result = new FileWrapper(filename,filePath);
            if(result==null){
                new Throwable("result为空!");
            }
            fileMap.put(filename,result);
            return result;
        }
 
    }
    static void flushData(){
        long nowtime = new Date().getTime();
        synchronized (fileMap){
            for (FileWrapper fileWrapper : fileMap.values()) {
                try {
                    fileWrapper.outputStream.flush();
                } catch (IOException e) {
                    e.printStackTrace();
                }
                if(nowtime-fileWrapper.starttime>2*60*60*1000){
                        fileMap.remove(fileWrapper.filename);
                        fileWrapper.close();
                    }
 
            }
        }
    }
 
    static {
        Timer timer = new Timer("flush-task", false);
        timer.schedule(new TimerTask() {
            @Override
            public void run() {
                flushData();
            }
        },60000,60000);
    }
 
 
}
 
</string,filewrapper></string,string></string,string>

kafka配置信息

zookeeper_connect=ttye_kafka-001:2181,ttye_kafka-002:2181,ttye_kafka-003:2181
group=tvm_hdfs_01
session_timeout_ms=4000
zookeeper_sync_time_ms=200
auto_commit_interval_ms=1000
auto_commit_enable=true
auto_offset_reset=smallest
serializer_class=kafka.serializer.StringEncoder

topic信息

app_send_welfare_ana=app_send_welfare_ana
app_balance_welfare_ana=app_balance_welfare_ana

gradle

buildscript {
    repositories {
        maven {
            url "https://plugins.gradle.org/m2/"
        }
    }
    dependencies {
        classpath "com.github.jengelman.gradle.plugins:shadow:1.2.4"
    }
}
group 'kafkatohdfs'
version '1.0-SNAPSHOT'
 
apply plugin: 'com.github.johnrengelman.shadow'
apply plugin: 'java'
apply plugin: 'idea'
apply plugin:'application'
 
[compileJava, compileTestJava, javadoc]*.options*.encoding = 'UTF-8'
sourceCompatibility = 1.7
targetCompatibility=1.7
mainClassName = "com.tvm.KafkaConsumer"
repositories {
    mavenLocal()
    mavenCentral()
    jcenter()
    maven {
        url = 'http://115.159.154.56:8081/nexus/content/groups/public'
    }
}
 
dependencies {
    testCompile group: 'junit', name: 'junit', version: '4.11'
    compile "org.apache.kafka:kafka_2.10:0.8.2.2"
    compile group: 'com.alibaba', name: 'fastjson', version: '1.2.21'
    compile "org.projectlombok:lombok:1.16.6"
    compile "org.apache.logging.log4j:log4j-api:2.3"
    compile "org.apache.logging.log4j:log4j-core:2.3"
    //hadoop
    compile group: 'org.apache.hadoop', name: 'hadoop-hdfs', version: '2.7.3'
    compile group: 'org.apache.hadoop', name: 'hadoop-common', version: '2.7.3'
    compile group: 'org.apache.hadoop', name: 'hadoop-client', version: '2.7.3'
 
    fileTree(dir: 'lib', include: '*.jar')
}

使用gradle shadowJar 进行打包

在linux运行:

后台运行:

nohup java -jar KafkaConsumer_log-1.0-SNAPSHOT.jar > /dev/null 2>&1 &

本文转载自:http://blog.csdn.net/qq_28767763/article/details/65445021

共有 人打赏支持
刀锋
粉丝 2
博文 396
码字总数 300958
作品 0
济南
程序员
Spark-Streaming获取kafka数据的两种方式-Receiver与Direct的方式

Spark-Streaming获取kafka数据的两种方式-Receiver与Direct的方式,可以从代码中简单理解成Receiver方式是通过zookeeper来连接kafka队列,Direct方式是直接连接到kafka的节点上获取数据了。 ...

张欢19933
2017/06/19
0
0
spark读取kafka数据流

spark读取kafka数据流提供了两种方式createDstream和createDirectStream。 两者区别如下: 1、KafkaUtils.createDstream 构造函数为KafkaUtils.createDstream(ssc, [zk], [consumer group i......

恶魔苏醒ing
2017/06/07
0
0
RocketMQ与Kafka对比(18项差异)

转自:https://github.com/alibaba/RocketMQ/wiki/rmqvskafka 淘宝内部的交易系统使用了淘宝自主研发的Notify消息中间件,使用Mysql作为消息存储媒介,可完全水平扩容,为了进一步降低成本,...

洋哥6
2016/02/29
106
0
RocketMQ与Kafka对比

RocketMQ与Kafka对比(18项差异) 淘宝内部的交易系统使用了淘宝自主研发的Notify消息中间件,使用Mysql作为消息存储媒介,可完全水平扩容,为了进一步降低成本,我们认为存储部分可以进一步...

莫问viva
2015/05/08
0
0
9.输入DStream之Kafka数据源实战(基于Direct的方式)

基于Direct的方式 这种新的不基于Receiver的直接方式,是在Spark 1.3中引入的,从而能够确保更加健壮的机制。替代掉使用Receiver来接收数据后,这种方式会周期性地查询Kafka,来获得每个top...

weixin_32265569
2017/11/16
0
0

没有更多内容

加载失败,请刷新页面

加载更多

linux学习-0919

linux和windows互传文件 用户配置文件和密码配置文件 用户组管理 用户管理 一、linux和windows互传文件 linux和windows可以互相传输文件,但是需要使用xshell 并且安装lrzsz包: yum insta...

wxy丶
40分钟前
1
0
收集几个开源的微信小程序开发框架

1、 mpvue mpvue 是美团点评开源的一个使用 Vue.js 开发小程序的前端框架。框架基于 Vue.js 核心,mpvue 修改了 Vue.js 的 runtime 和 compiler 实现,使其可以运行在小程序环境中,从而为小...

哥本哈根的小哥
46分钟前
1
0
Golang CSP并发模型

今天介绍一下 go语言的并发机制以及它所使用的CSP并发模型 CSP并发模型 CSP模型是上个世纪七十年代提出的,用于描述两个独立的并发实体通过共享的通讯 channel(管道)进行通信的并发模型。 CS...

xtof
47分钟前
1
0
用chrome在电脑上模拟微信内置浏览器

先了解安卓微信和Ios微信的UA(User agent:用户代理) 安卓微信UA: mozilla/5.0 (linux; u; android 4.1.2; zh-cn; mi-one plus build/jzo54k) applewebkit/534.30 (khtml, like gecko) ve......

可达鸭眉头一皱
53分钟前
2
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部