文档章节

flume-ng 自定义sink 实现rollfile 变量目录

c
 chunhei2008
发布于 2015/03/17 11:39
字数 950
阅读 93
收藏 2
点赞 0
评论 0
应用场景:需要实时收集多台服务器的nginx日志到一台机器。收集完成结果存放需要按天生成文件夹,按每5分钟生成文件,比如2012年12月29日12点26分的日志,需要放到/data/log/20121229/log-1225-对应的文件中。自己实现了类似flume-og和flume-ng的hdfs-sink的文件sink。

使用的时候配置如下:
agent.sources = source
agent.channels = channel
agent.sinks = sink

agent.sources.source.type = avro
agent.sources.source.bind = 192.168.0.100
agent.sources.source.port = 44444
agent.sources.source.channels = channel

agent.sinks.sink.type = org.apache.flume.sink.FileSink
agent.sinks.sink.file.path = /data/log/%{dayStr}
agent.sinks.sink.file.filePrefix = log-%{hourStr}%{minStr}-
agent.sinks.sink.file.txnEventMax = 10000
agent.sinks.sink.file.maxOpenFiles = 5
agent.sinks.sink.channel = channel

agent.channels.channel.type = memory
agent.channels.channel.capacity = 100000
agent.channels.channel.transactionCapacity = 100000
agent.channels.channel.keep-alive = 60


依赖的jar如下:
jakarta-oro-2.0.1.jar
flume-ng-core-1.3.0-SNAPSHOT.jar
flume-ng-sdk-1.3.0-SNAPSHOT.jar
flume-ng-configuration-1.3.0-SNAPSHOT.jar
slf4j-log4j12-1.6.1.jar
slf4j-api-1.6.1.jar
guava-10.0.1.jar

代码如下:
FileSink.java
package  org.apache.flume.sink;

import  java.io.IOException;
import  java.util.Calendar;
import  java.util.List;
import  java.util.concurrent.Executors;
import  java.util.concurrent.ScheduledExecutorService;

import  org.apache.flume.Channel;
import  org.apache.flume.Context;
import  org.apache.flume.Event;
import  org.apache.flume.EventDeliveryException;
import  org.apache.flume.Transaction;
import  org.apache.flume.conf.Configurable;
import  org.apache.flume.formatter.output.BucketPath;
import  org.apache.flume.instrumentation.SinkCounter;
import  org.apache.flume.serialization.EventSerializer;
import  org.slf4j.Logger;
import  org.slf4j.LoggerFactory;

import  com.google.common.base.Preconditions;
import  com.google.common.collect.Lists;
import  com.google.common.util.concurrent.ThreadFactoryBuilder;

public  class  FileSink  extends  AbstractSink  implements  Configurable {

         private  static  final  Logger  logger  = LoggerFactory
                     . getLogger(FileSink . class  );

         private  String  path  ;
         private  static  final  String  defaultFileName  =  "FlumeData" ;
         private  static  final  int  defaultMaxOpenFiles  = 50;

         /**
        * Default length of time we wait for blocking BucketWriter calls before
        * timing out the operation. Intended to prevent server hangs.
        */

         private  long  txnEventMax  ;

         private  FileWriterLinkedHashMap  sfWriters  ;

         private  String  serializerType  ;
         private  Context  serializerContext  ;

         private  boolean  needRounding  =  false ;
         private  int  roundUnit  = Calendar. SECOND ;
         private  int  roundValue  = 1;
         private  SinkCounter  sinkCounter  ;

         private  int  maxOpenFiles  ;

         private  ScheduledExecutorService  timedRollerPool  ;

         private  long  rollInterval  ;

         @Override
         public  void  configure(Context context) {

              String directory = Preconditions. checkNotNull(
                           context.getString(  "file.path" ),  "file.path is required" );
              String fileName = context.getString(  "file.filePrefix" ,  defaultFileName );
                this . path  = directory +  "/"  + fileName;

                maxOpenFiles  = context.getInteger( "file.maxOpenFiles"  ,
                             defaultMaxOpenFiles );

                serializerType  = context.getString( "sink.serializer"  ,  "TEXT"  );
                serializerContext  =  new  Context(
                           context.getSubProperties(EventSerializer.  CTX_PREFIX ));
                txnEventMax  = context.getLong( "file.txnEventMax"  , 1l);
                if  ( sinkCounter  ==  null ) {
                       sinkCounter  =  new  SinkCounter(getName());
              }

                rollInterval  = context.getLong( "file.rollInterval"  , 30l);
              String rollerName =  "hdfs-"  + getName() +  "-roll-timer-%d"  ;
                timedRollerPool  = Executors.newScheduledThreadPool(  maxOpenFiles ,
                             new  ThreadFactoryBuilder().setNameFormat(rollerName).build());
       }

         @Override
         public  Status process()  throws  EventDeliveryException {
              Channel channel = getChannel();
              Transaction transaction = channel.getTransaction();
              List<BucketFileWriter> writers = Lists. newArrayList();
              transaction.begin();
                try  {
                     Event event =  null ;
                       int  txnEventCount = 0;
                       for  (txnEventCount = 0; txnEventCount <  txnEventMax ; txnEventCount++) {
                           event = channel.take();
                             if  (event ==  null ) {
                                    break ;
                           }

                             // reconstruct the path name by substituting place holders
                           String realPath = BucketPath
                                         . escapeString( path , event.getHeaders(),  needRounding ,
                                                         roundUnit ,  roundValue  );
                           BucketFileWriter bucketFileWriter =  sfWriters .get(realPath);

                             // we haven't seen this file yet, so open it and cache the
                             // handle
                             if  (bucketFileWriter ==  null ) {
                                  bucketFileWriter =  new  BucketFileWriter();
                                  bucketFileWriter.open(realPath,  serializerType ,
                                                  serializerContext ,  rollInterval  ,  timedRollerPool ,
                                                  sfWriters );
                                    sfWriters .put(realPath, bucketFileWriter);
                           }

                             // track the buckets getting written in this transaction
                             if  (!writers.contains(bucketFileWriter)) {
                                  writers.add(bucketFileWriter);
                           }

                             // Write the data to File
                           bucketFileWriter.append(event);
                     }

                       if  (txnEventCount == 0) {
                             sinkCounter .incrementBatchEmptyCount();
                     }  else  if  (txnEventCount ==  txnEventMax ) {
                             sinkCounter .incrementBatchCompleteCount();
                     }  else  {
                             sinkCounter .incrementBatchUnderflowCount();
                     }

                       // flush all pending buckets before committing the transaction
                       for  (BucketFileWriter bucketFileWriter : writers) {
                             if  (!bucketFileWriter.isBatchComplete()) {
                                  flush(bucketFileWriter);
                           }
                     }
                     transaction.commit();
                       if  (txnEventCount > 0) {
                              sinkCounter .addToEventDrainSuccessCount(txnEventCount);
                     }

                       if  (event ==  null ) {
                             return  Status. BACKOFF  ;
                     }
                       return  Status. READY  ;
              }  catch  (IOException eIO) {
                     transaction.rollback();
                       logger .warn( "File IO error"  , eIO);
                       return  Status. BACKOFF  ;
              }  catch  (Throwable th) {
                     transaction.rollback();
                       logger .error( "process failed"  , th);
                       if  (th  instanceof  Error) {
                             throw  (Error) th;
                     }  else  {
                             throw  new  EventDeliveryException(th);
                     }
              }  finally  {
                     transaction.close();
              }
       }

         private  void  flush(BucketFileWriter bucketFileWriter)  throws  IOException {
              bucketFileWriter.flush();
       }

         @Override
         public  synchronized  void  start() {
                super .start();
                this . sfWriters  =  new  FileWriterLinkedHashMap( maxOpenFiles );
                sinkCounter .start();

       }
}

BucketFileWriter.java
package org.apache.flume.sink;

import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.serialization.EventSerializer;
import org.apache.flume.serialization.EventSerializerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BucketFileWriter {

     private static final Logger logger = LoggerFactory
               .getLogger(BucketFileWriter.class);
     private static final String IN_USE_EXT = ".tmp";
     /**
     * This lock ensures that only one thread can open a file at a time.
     */
     private final AtomicLong fileExtensionCounter;
     private OutputStream outputStream;

     private EventSerializer serializer;

     private String filePath;

     /**
     * Close the file handle and rename the temp file to the permanent filename.
     * Safe to call multiple times. Logs HDFSWriter.close() exceptions.
     *
     * @throws IOException
     *             On failure to rename if temp file exists.
     */

     public BucketFileWriter() {
          fileExtensionCounter = new AtomicLong(System.currentTimeMillis());
     }

     public void open(final String filePath, String serializerType,
               Context serializerContext, final long rollInterval,
               final ScheduledExecutorService timedRollerPool,
               final FileWriterLinkedHashMap sfWriters) throws IOException {
          this.filePath = filePath;
          File file = new File(filePath + fileExtensionCounter + IN_USE_EXT);
          file.getParentFile().mkdirs();
          outputStream = new BufferedOutputStream(new FileOutputStream(file));
          logger.info("filename = " + file.getAbsolutePath());
          serializer = EventSerializerFactory.getInstance(serializerType,
                    serializerContext, outputStream);
          serializer.afterCreate();
          if (rollInterval > 0) {
               Callable<Void> action = new Callable<Void>() {
                    @Override
                    public Void call() throws Exception {
                         logger.debug(
                                   "Rolling file ({}): Roll scheduled after {} sec elapsed.",
                                   filePath + fileExtensionCounter + IN_USE_EXT,
                                   rollInterval);
                         if (sfWriters.containsKey(filePath)) {
                              sfWriters.remove(filePath);
                         }
                         close();
                         return null;
                    }
               };
               timedRollerPool.schedule(action, rollInterval, TimeUnit.SECONDS);
          }
     }

     public void append(Event event) throws IOException {
          serializer.write(event);
     }

     public boolean isBatchComplete() {
          return true;
     }

     public void flush() throws IOException {
          serializer.flush();
          outputStream.flush();

     }

     /**
     * Rename bucketPath file from .tmp to permanent location.
     */
     private void renameBucket() {
          File srcPath = new File(filePath + fileExtensionCounter + IN_USE_EXT);
          File dstPath = new File(filePath + fileExtensionCounter);
          if (srcPath.exists()) {
               srcPath.renameTo(dstPath);
               logger.info("Renaming " + srcPath + " to " + dstPath);
          }
     }

     public synchronized void close() throws IOException, InterruptedException {
          if (outputStream != null) {
               outputStream.flush();
               outputStream.close();
          }
          renameBucket();
     }
}

FileWriterLinkedHashMap.java
package org.apache.flume.sink;

import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.Map.Entry;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FileWriterLinkedHashMap extends
          LinkedHashMap<String, BucketFileWriter> {

     private static final Logger logger = LoggerFactory
               .getLogger(FileWriterLinkedHashMap.class);

     private static final long serialVersionUID = -7860596835613215998L;
     private final int maxOpenFiles;

     public FileWriterLinkedHashMap(int maxOpenFiles) {
          super(16, 0.75f, true); // stock initial capacity/load, access
          this.maxOpenFiles = maxOpenFiles;
     }

     @Override
     protected boolean removeEldestEntry(Entry<String, BucketFileWriter> eldest) {
          if (size() > maxOpenFiles) {
               // If we have more that max open files, then close the last one
               // and
               // return true
               try {
                    eldest.getValue().close();
               } catch (IOException e) {
                    logger.warn(eldest.getKey().toString(), e);
               } catch (InterruptedException e) {
                    logger.warn(eldest.getKey().toString(), e);
                    Thread.currentThread().interrupt();
               }
               return true;
          } else {
               return false;
          }
     }
}

本文转载自:http://blog.csdn.net/tswisdom/article/details/41483471

共有 人打赏支持
c
粉丝 3
博文 27
码字总数 8480
作品 2
广州
高级程序员
阿里大数据工程师教你怎样理解Flume

lume是干什么的? 收集日志的 flume如何搜集日志? 我们把flume比作情报人员 (1)搜集信息 (2)获取记忆信息 (3)传递报告间谍信息 flume是怎么完成上面三件事情的,三个组件: source: ...

JAVA丶学习 ⋅ 04/14 ⋅ 0

Flume---大数据协作框架

flume是什么 Apache Flume是一个分布式的、可靠的、易用的系统,可以有效地将来自很多不同源系统的大量日志数据收集、汇总或者转移到一个数据中心存储。 Apache Flume的作用不仅限于日志汇总...

简心 ⋅ 05/06 ⋅ 0

Kafka实战-Flume到Kafka

1.概述   前面给大家介绍了整个Kafka项目的开发流程,今天给大家分享Kafka如何获取数据源,即Kafka生产数据。下面是今天要分享的目录: 数据来源 Flume到Kafka 数据源加载 预览   下面开...

smartloli ⋅ 2015/07/02 ⋅ 0

flume 总结--flume入门介绍

flume介绍 flume被设计为一个灵活的分布式系统,可以很容易的扩展,而且是高度可定制化的,一个配置正确的Flume Agent和由互相连接的Agent创建的Agent管道,保证不会丢失数据,提供持久的cha...

u013362353 ⋅ 05/28 ⋅ 0

追加内容到文件末尾 - flume-append-file-sink

flume追加内容到文件末尾的sink。 仅适合1g以内的日志,再大估计会有压力,没具体测试。 依赖 Flume-NG >= 1.7 Linux MacOS 构建 $ mvn clean package jar文件下载 flume-append-file-sink-1...

五十风 ⋅ 04/13 ⋅ 0

【Strom篇】Flume+Kafaka+Strom整合完成信息记录

第一部分:流程分析 由flume收集客户端发送的信息,经过kafka集群消费者收集,然后给storm集群数据处理(数据清洗),最后再发给kafka集群收集。 第二部分:集群搭建 flume+kafka 一、配合f...

chenshi_2753 ⋅ 05/06 ⋅ 0

kafka来读取flume的数据

一、查看kafka topic ./kafka-topics.sh --list --zookeeper bigdata-test-3:2181, bigdata-test-2:2181, bigdata-test-1:2181, bigdata-test-4:2181, bigdata-test-5:2181 ./kafka-topics.s......

weixin_41876523 ⋅ 05/24 ⋅ 0

解决 flume KafkaSink 启动后cpu占用100%的问题

解决 flume KafkaSink 启动后cpu占用100%的问题 Flume 版本 :1.6.0-cdh5.5.0 问题描述: 配置kafkasink,将实时数据发送到kafka。 Flume启动完成后,没有日志处理时,cpu使用率飙升到100% ...

hamlin ⋅ 2016/06/21 ⋅ 0

解决Flume采集数据时在HDFS上产生大量小文件的问题

问题:flume指定HDFS类型的Sink时,采集数据至HDFS指定目录,会产生大量小文件。 问题重现: 1、创建flume配置文件flume-env.sh,: flume配置文件如下(根据自身需要修改): 因为flume可以...

舒运 ⋅ 06/10 ⋅ 0

flume集成CDH步骤与异常解决

1、 确定你的flume在哪台主机上 2、 确认该台主机上的flume是否可以正常使用? 在指定的目录下,创建一个bigdatapageto_hive.conf 内容可以是官网的实例:http://flume.apache.org/FlumeUser...

hexinghua0126 ⋅ 05/12 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

zblog2.3版本的asp系统是否可以超越卢松松博客的流量[图]

最近访问zblog官网,发现zlbog-asp2.3版本已经进入测试阶段了,虽然正式版还没有发布,想必也不久了。那么作为aps纵横江湖十多年的今天,blog2.2版本应该已经成熟了,为什么还要发布这个2.3...

原创小博客 ⋅ 55分钟前 ⋅ 0

聊聊spring cloud的HystrixCircuitBreakerConfiguration

序 本文主要研究一下spring cloud的HystrixCircuitBreakerConfiguration HystrixCircuitBreakerConfiguration spring-cloud-netflix-core-2.0.0.RELEASE-sources.jar!/org/springframework/......

go4it ⋅ 今天 ⋅ 0

二分查找

二分查找,也称折半查找、二分搜索,是一种在有序数组中查找某一特定元素的搜索算法。搜素过程从数组的中间元素开始,如果中间元素正好是要查找的元素,则搜素过程结束;如果某一特定元素大于...

人觉非常君 ⋅ 今天 ⋅ 0

VS中使用X64汇编

需要注意的是,在X86项目中,可以使用__asm{}来嵌入汇编代码,但是在X64项目中,再也不能使用__asm{}来编写嵌入式汇编程序了,必须使用专门的.asm汇编文件来编写相应的汇编代码,然后在其它地...

simpower ⋅ 今天 ⋅ 0

ThreadPoolExecutor

ThreadPoolExecutor public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, ......

4rnold ⋅ 昨天 ⋅ 0

Java正无穷大、负无穷大以及NaN

问题来源:用Java代码写了一个计算公式,包含除法和对数和取反,在页面上出现了-infinity,不知道这是什么问题,网上找答案才明白意思是负的无穷大。 思考:为什么会出现这种情况呢?这是哪里...

young_chen ⋅ 昨天 ⋅ 0

前台对中文编码,后台解码

前台:encodeURI(sbzt) 后台:String param = URLDecoder.decode(sbzt,"UTF-8");

west_coast ⋅ 昨天 ⋅ 0

实验楼—MySQL基础课程-挑战3实验报告

按照文档要求创建数据库 sudo sercice mysql startwget http://labfile.oss.aliyuncs.com/courses/9/createdb2.sqlvim /home/shiyanlou/createdb2.sql#查看下数据库代码 代码创建了grade......

zhangjin7 ⋅ 昨天 ⋅ 0

一起读书《深入浅出nodejs》-node模块机制

node 模块机制 前言 说到node,就不免得提到JavaScript。JavaScript自诞生以来,经历了工具类库、组件库、前端框架、前端应用的变迁。通过无数开发人员的努力,JavaScript不断被类聚和抽象,...

小草先森 ⋅ 昨天 ⋅ 0

Java桌球小游戏

其实算不上一个游戏,就是两张图片,不停的重画,改变ball图片的位置。一个左右直线碰撞的,一个有角度碰撞的。 左右直线碰撞 package com.bjsxt.test;import javax.swing.*;import j...

森林之下 ⋅ 昨天 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部