并行消费kafka存放HDFS
博客专区 > 刀锋 的博客 > 博客详情
并行消费kafka存放HDFS
刀锋 发表于6个月前
并行消费kafka存放HDFS
  • 发表于 6个月前
  • 阅读 22
  • 收藏 0
  • 点赞 0
  • 评论 0

标题:腾讯云 新注册用户域名抢购1元起>>>   

KafkaConsumer  

import com.alibaba.fastjson.JSONObject;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.message.MessageAndMetadata;
import kafka.serializer.StringDecoder;
import kafka.utils.VerifiableProperties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
 
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
 
 
public class KafkaConsumer implements Runnable{
    public static FileSystem fs ;
    public static Properties properties() throws IOException {
        Properties properties_kafkainfo=new Properties();
        InputStream in = new FileInputStream("kafkainfo.properties");
        properties_kafkainfo.load(in);
        return properties_kafkainfo;
    }
    public static Properties properties_topic() throws IOException {
        Properties properties_kafkatopic=new Properties();
        InputStream in = new FileInputStream("topic.properties");
        properties_kafkatopic.load(in);
        return properties_kafkatopic;
    }
    private final kafka.javaapi.consumer.ConsumerConnector consumer;
    public KafkaConsumer() throws IOException {
        Properties props = new Properties();
        //props.put("zookeeper.connect", "10.20.30.91:2181,10.20.30.92:2181,10.20.30.93:2181");
        props.put("zookeeper.connect", properties().getProperty("zookeeper_connect"));
        props.put("group.id", properties().getProperty("group"));
        props.put("zookeeper.session.timeout.ms", properties().getProperty("session_timeout_ms"));
        props.put("zookeeper.sync.time.ms", properties().getProperty("zookeeper_sync_time_ms"));
        props.put("auto.commit.interval.ms", properties().getProperty("auto_commit_interval_ms"));
        props.put("auto.commit.enable",properties().getProperty("auto_commit_enable"));
        props.put("auto.offset.reset", properties().getProperty("auto_offset_reset")); //largest smallest
        props.put("serializer.class", properties().getProperty("serializer_class"));
        ConsumerConfig config = new ConsumerConfig(props);
        consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config);
    }
    public void run() {
 
 
        final int numThreads = 6;
        final Iterator<string> topic;
        try {
            topic = properties_topic().stringPropertyNames().iterator();
            Map<string, integer=""> topicCountMap = new HashMap<string, integer="">();
            List<string> topicList = new ArrayList<string>();
 
            while(topic.hasNext()){
                final String key_topic = topic.next();
                topicList.add(key_topic);
                topicCountMap.put(key_topic, new Integer(numThreads));
            }
            StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
            StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());
            final Map<string, string="">>> consumerMap =
                    consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder);
            //循环是遍历配置文件中的topic数量,多少个topic 有多少个并行
          for(int i=0;i<topiclist.size();i++) final="" key_topic1="topicList.get(i);" new="" override="" public="" string="" void="">> stream = consumerMap.get(key_topic1);
                                ExecutorService executor = Executors.newFixedThreadPool(numThreads);
                                int threadNumber = 0;
                                for (final KafkaStream streams : stream) {
                                    executor.submit(new KafkaConsumerThread(streams, threadNumber,key_topic1));
                                    threadNumber++;
                                }
 
                            }
                        }).start();
 
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
 
    public static void main(String[] args) {
        try {
            Thread t = new Thread(new KafkaConsumer());
            t.start();
        } catch (IOException e) {
            e.printStackTrace();
        }
 
    }
}
</topiclist.size();i++)></string,></string></string></string,></string,></string>

KafkaConsumerThread  

import com.alibaba.fastjson.JSONObject;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.message.MessageAndMetadata;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.*;
import java.net.URI;
import java.net.URISyntaxException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
 
 
public class KafkaConsumerThread implements Runnable{
 
    private KafkaStream stream;
    private int topicNum;
    private String topic;
    FSDataOutputStream outputStream = null;
    public KafkaConsumerThread(KafkaStream stream, int topicNum,String topic) {
        this.stream = stream;
        this.topicNum = topicNum;
        this.topic = topic;
 
    }
    //消费出来写入HDFS
  public void run() {
        ConsumerIterator<string,string> it = stream.iterator();
        while (it.hasNext()){
            Path path = null;
            MessageAndMetadata<string,string> m = it.next();
            JSONObject jsStr = JSONObject.parseObject(m.message());
            String dateTime = new SimpleDateFormat("yyyyMMddHH").format(jsStr.get("createTime"));
            String hdfs ="hdfs://10.20.30.91:8020/kafka/";
            String name = dateTime + ".log";
            String filename = hdfs+topic+name;
            FileWrapper writer = getFileByName(filename,path);
            System.out.println("消费:"+jsStr.toString());
            writer.write(jsStr.toString());
        }
    }
    private static class FileWrapper {
        Configuration conf = new Configuration();
        private String filename;
        Path filePath;
        FileSystem hdfs= null;
        private AtomicInteger calc = new AtomicInteger();
        private long starttime=new Date().getTime();
        private FSDataOutputStream outputStream;
        public FileWrapper(String filename,Path filePath) {
            conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
            conf.set("fs.file.impl",org.apache.hadoop.fs.LocalFileSystem.class.getName());
            conf.setBoolean("dfs.support.append",true);
            this.filename = filename;
            this.filePath=filePath;
            try {
                hdfs = FileSystem.get(new URI("hdfs://10.20.30.91:8020"),conf);
                outputStream = hdfs.create(filePath);
            } catch (IOException e) {
                e.printStackTrace();
            } catch (URISyntaxException e) {
                e.printStackTrace();
            }
        }
        //FSDataOutputStream outputStream = hdfs.create(filedfs);
        public void write (String txt){
            try {
                String txt2=txt+"\n";
                outputStream.write(txt2.getBytes("UTF-8"));
                System.out.println("txt:========"+txt2.getBytes("UTF-8"));
                outputStream.flush();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        public void close(){
            try {
                outputStream.close();
                hdfs.close();
            } catch (IOException e) {
            }
        }
 
    }
    static Map<string,filewrapper> fileMap=new ConcurrentHashMap<>();
    FileWrapper getFileByName(String filename,Path filePath){
        FileWrapper bufferedWriter = fileMap.get(filename);
        if(bufferedWriter!=null){
            return bufferedWriter;
        }
        synchronized (fileMap){
            bufferedWriter=fileMap.get(filename);
            if(bufferedWriter!=null){
                return bufferedWriter;
            }
            filePath = new Path(filename);
            FSDataOutputStream outputStream = null;
            System.out.println("file.name==========="+filename);
            FileWrapper result = new FileWrapper(filename,filePath);
            if(result==null){
                new Throwable("result为空!");
            }
            fileMap.put(filename,result);
            return result;
        }
 
    }
    static void flushData(){
        long nowtime = new Date().getTime();
        synchronized (fileMap){
            for (FileWrapper fileWrapper : fileMap.values()) {
                try {
                    fileWrapper.outputStream.flush();
                } catch (IOException e) {
                    e.printStackTrace();
                }
                if(nowtime-fileWrapper.starttime>2*60*60*1000){
                        fileMap.remove(fileWrapper.filename);
                        fileWrapper.close();
                    }
 
            }
        }
    }
 
    static {
        Timer timer = new Timer("flush-task", false);
        timer.schedule(new TimerTask() {
            @Override
            public void run() {
                flushData();
            }
        },60000,60000);
    }
 
 
}
 
</string,filewrapper></string,string></string,string>

kafka配置信息

zookeeper_connect=ttye_kafka-001:2181,ttye_kafka-002:2181,ttye_kafka-003:2181
group=tvm_hdfs_01
session_timeout_ms=4000
zookeeper_sync_time_ms=200
auto_commit_interval_ms=1000
auto_commit_enable=true
auto_offset_reset=smallest
serializer_class=kafka.serializer.StringEncoder

topic信息

app_send_welfare_ana=app_send_welfare_ana
app_balance_welfare_ana=app_balance_welfare_ana

gradle

buildscript {
    repositories {
        maven {
            url "https://plugins.gradle.org/m2/"
        }
    }
    dependencies {
        classpath "com.github.jengelman.gradle.plugins:shadow:1.2.4"
    }
}
group 'kafkatohdfs'
version '1.0-SNAPSHOT'
 
apply plugin: 'com.github.johnrengelman.shadow'
apply plugin: 'java'
apply plugin: 'idea'
apply plugin:'application'
 
[compileJava, compileTestJava, javadoc]*.options*.encoding = 'UTF-8'
sourceCompatibility = 1.7
targetCompatibility=1.7
mainClassName = "com.tvm.KafkaConsumer"
repositories {
    mavenLocal()
    mavenCentral()
    jcenter()
    maven {
        url = 'http://115.159.154.56:8081/nexus/content/groups/public'
    }
}
 
dependencies {
    testCompile group: 'junit', name: 'junit', version: '4.11'
    compile "org.apache.kafka:kafka_2.10:0.8.2.2"
    compile group: 'com.alibaba', name: 'fastjson', version: '1.2.21'
    compile "org.projectlombok:lombok:1.16.6"
    compile "org.apache.logging.log4j:log4j-api:2.3"
    compile "org.apache.logging.log4j:log4j-core:2.3"
    //hadoop
    compile group: 'org.apache.hadoop', name: 'hadoop-hdfs', version: '2.7.3'
    compile group: 'org.apache.hadoop', name: 'hadoop-common', version: '2.7.3'
    compile group: 'org.apache.hadoop', name: 'hadoop-client', version: '2.7.3'
 
    fileTree(dir: 'lib', include: '*.jar')
}

使用gradle shadowJar 进行打包

在linux运行:

后台运行:

nohup java -jar KafkaConsumer_log-1.0-SNAPSHOT.jar > /dev/null 2>&1 &

共有 人打赏支持
粉丝 2
博文 15
码字总数 280899
×
刀锋
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
* 金额(元)
¥1 ¥5 ¥10 ¥20 其他金额
打赏人
留言
* 支付类型
微信扫码支付
打赏金额:
已支付成功
打赏金额: