文档章节

flume spooldir 定期采集日期目录

你我他有个梦
 你我他有个梦
发布于 2017/08/16 22:41
字数 508
阅读 86
收藏 0

这里以cdh5-1.6.0_5.10.2为例。

flume源码下载地址:https://github.com/cloudera/flume-ng/tree/cdh5-1.6.0_5.10.2,SpoolDirectorySource在https://github.com/cloudera/flume-ng/tree/cdh5-1.6.0_5.10.2/flume-ng-core项目下,下载之后找到org/apache/flume/source/SpoolDirectorySource修改源码如下:

@Override
public synchronized void start() {
  //添加解析日期目录方法
  spoolDirectory = directory(spoolDirectory);
  logger.info("SpoolDirectorySource source starting with directory: {}",
      spoolDirectory);

  executor = Executors.newSingleThreadScheduledExecutor();

  File directory = new File(spoolDirectory);
  try {
    reader = new ReliableSpoolingFileEventReader.Builder()
        .spoolDirectory(directory)
        .completedSuffix(completedSuffix)
        .ignorePattern(ignorePattern)
        .trackerDirPath(trackerDirPath)
        .annotateFileName(fileHeader)
        .fileNameHeader(fileHeaderKey)
        .annotateBaseName(basenameHeader)
        .baseNameHeader(basenameHeaderKey)
        .deserializerType(deserializerType)
        .deserializerContext(deserializerContext)
        .deletePolicy(deletePolicy)
        .inputCharset(inputCharset)
        .decodeErrorPolicy(decodeErrorPolicy)
        .consumeOrder(consumeOrder)
        .recursiveDirectorySearch(recursiveDirectorySearch)
        .build();
  } catch (IOException ioe) {
    throw new FlumeException("Error instantiating spooling event parser",
        ioe);
  }

  Runnable runner = new SpoolDirectoryRunnable(reader, sourceCounter);
  executor.scheduleWithFixedDelay(
      runner, 0, POLL_DELAY_MS, TimeUnit.MILLISECONDS);

  super.start();
  logger.debug("SpoolDirectorySource source started");
  sourceCounter.start();
}

/**
 * 解析时间
 * @param pattern
 * @return
 */
public static String getTime(String pattern) {
    SimpleDateFormat sdf = null;
    try{
        sdf = new SimpleDateFormat(pattern);
    }catch (Exception e){
        return "";
    }
    return sdf.format(new Date(System.currentTimeMillis()));
}

/**
 * 解析时间
 *
 * @param spoolDirectory
 * @return
 */
public static String spoolTimeDirectory(String spoolDirectory) {
    String spool = spoolDirectory.substring(spoolDirectory.lastIndexOf("/") + 1, spoolDirectory.length());
    String time = getTime(spool);
    if (StringUtils.isNotBlank(time)) {
        return time;
    }
    return spool;
}

/**
 * 拼装目录
 *
 * @param spoolDirectory
 * @return
 */
public static String directory(String spoolDirectory) {
    String spoolDir = spoolDirectory.substring(0, spoolDirectory.lastIndexOf("/") + 1);
    return spoolDir + spoolTimeDirectory(spoolDirectory);
}

按照如上简单修改之后,编译之后倒入到jar包,替换cdh集群中的flume即可。配置文件如下:

app.sources=r1
app.sinks=s1
app.channels=c1

app.sources.r1.type=spooldir
app.sources.r1.spoolDir=/data/log/yyyy-MM-dd
app.sources.r1.channels=c1
app.sources.r1.fileHeader=false
#一行读取默认最大限制为2048,这里重新设置最大限制
app.sources.r1.deserializer.maxLineLength =1048576

#app.sources.r1.interceptors =i1
#app.sources.r1.interceptors.i1.type = timestamp

app.sinks.s1.type = hdfs
app.sinks.s1.hdfs.path = hdfs://hadoop1:8020/home/data/avatar-log/data-log/%Y-%m-%d
#文件前缀和后缀
app.sinks.s1.hdfs.filePrefix = gdapp_log
app.sinks.s1.hdfs.fileSuffix = .log
#通过设置 hdfs.inUsePrefix,例如设置为 .时,hdfs 会把该文件当做隐藏文件,以避免在 mr 过程中读到这些临时文件,引起一些错误
app.sinks.s1.hdfs.inUsePrefix = .
#同时打开的最大文件数目
app.sinks.s1.hdfs.maxOpenFiles = 5000
app.sinks.s1.hdfs.batchSize= 1000
app.sinks.s1.hdfs.fileType = DataStream
app.sinks.s1.hdfs.writeFormat =Text
#128M为一个采集后的存储文件大小
app.sinks.s1.hdfs.rollSize = 134217728
app.sinks.s1.hdfs.rollCount = 0
app.sinks.s1.hdfs.rollInterval = 300
app.sinks.s1.hdfs.useLocalTimeStamp = true
app.sinks.s1.channel = c1

#app.channels.c1.type=file
#app.channels.c1.checkpointDir=./file_channel/checkpoint
#app.channels.c1.dataDirs=./file_channel/data
app.channels.c1.type = memory
app.channels.c1.capacity = 10000
app.channels.c1.transactionCapacity = 1000


 

© 著作权归作者所有

共有 人打赏支持
你我他有个梦

你我他有个梦

粉丝 95
博文 110
码字总数 98858
作品 0
昌平
程序员
私信 提问
带你看懂大数据采集引擎之Flume&采集目录中的日志

欢迎关注大数据和人工智能技术文章发布的微信公众号:清研学堂,在这里你可以学到夜白(作者笔名)精心整理的笔记,让我们每天进步一点点,让优秀成为一种习惯! 一、Flume的介绍: Flume由C...

李金泽
2018/03/04
0
0
Flume 日志收集系统 Spooldir-Source HDFS-sink

日志即log,记录发生的事件。以Nginx为例,有errorlog和accesslog 2个日志。access_log是访问日志,每条访问记录会产生几百字节的数据,随着访问量增加,日志文件会越来越大,必须定期清理日...

白头雁
2018/08/23
0
0
解决Flume采集数据时在HDFS上产生大量小文件的问题

问题:flume指定HDFS类型的Sink时,采集数据至HDFS指定目录,会产生大量小文件。 问题重现: 1、创建flume配置文件flume-env.sh,: flume配置文件如下(根据自身需要修改): 因为flume可以...

舒运
2018/06/10
0
0
Apache Flume 1.7.0 发布,日志服务器

Apache Flume 1.7.0 发布了,Flume 是一个分布式、可靠和高可用的服务,用于收集、聚合以及移动大量日志数据,使用一个简单灵活的架构,就流数据模型。这是一个可靠、容错的服务。 本次更新如...

局长
2016/10/19
2K
3
Flume框架简单介绍(34)

Flume是一个分布式的海量数据收集框架. Flume框架流程图 Channel是缓存的数据,如果Sink传送给了HDFS,Channel中缓存的数据就会删除,如果没有传送成功,Channel相当于做了备份,Sink重复从C...

肖鋭
2014/04/06
0
0

没有更多内容

加载失败,请刷新页面

加载更多

rabbitMQ 在spring 的使用

一、准备工作 maven依赖 <dependency>  <groupId>com.rabbitmq</groupId>  <artifactId>amqp-client</artifactId>  <version>4.0.2</version></dependency> <dependency......

狼王黄师傅
昨天
1
0
Android JNI总结

0x01 JNI介绍 JNI是Java Native Interface的缩写,JNI不是Android专有的东西,它是从Java继承而来,但是在Android中,JNI的作用和重要性大大增强。 JNI在Android中起着连接Java和C/C++层的作...

天王盖地虎626
昨天
1
0
大数据教程(11.8)Hive1.2.2简介&初体验

上一篇文章分析了Hive1.2.2的安装,本节博主将分享Hive的体验&Hive服务端和客户端的使用方法。 一、Hive与hadoop直接的关系 Hive利用HDFS存储数据,利用MapReduce查询数据。 二、Hive与传统数...

em_aaron
昨天
3
0
跟我学Spring Cloud(Finchley版)-15-Hystrix监控详解

Hystrix提供了监控Hystrix Command的能力,本节来详细探讨。 监控端点与数据 应用整合Hystrix,同时应用包含spring-boot-starter-actuator 依赖,就会存在一个/actuator/hystrix.stream 端点...

周立_ITMuch
昨天
6
0
day26:shell题

1、 判断当前主机的CPU生产商,其信息在/proc/cpuinfo文件中vendor id一行中。 如果其生产商为AuthenticAMD,就显示其为AMD公司; 如果其生产商为GenuineIntel,就显示其为Intel公司; 否则,...

芬野de博客
昨天
7
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部