文档章节

flume spooldir 定期采集日期目录

你我他有个梦
 你我他有个梦
发布于 2017/08/16 22:41
字数 508
阅读 73
收藏 0

这里以cdh5-1.6.0_5.10.2为例。

flume源码下载地址:https://github.com/cloudera/flume-ng/tree/cdh5-1.6.0_5.10.2,SpoolDirectorySource在https://github.com/cloudera/flume-ng/tree/cdh5-1.6.0_5.10.2/flume-ng-core项目下,下载之后找到org/apache/flume/source/SpoolDirectorySource修改源码如下:

@Override
public synchronized void start() {
  //添加解析日期目录方法
  spoolDirectory = directory(spoolDirectory);
  logger.info("SpoolDirectorySource source starting with directory: {}",
      spoolDirectory);

  executor = Executors.newSingleThreadScheduledExecutor();

  File directory = new File(spoolDirectory);
  try {
    reader = new ReliableSpoolingFileEventReader.Builder()
        .spoolDirectory(directory)
        .completedSuffix(completedSuffix)
        .ignorePattern(ignorePattern)
        .trackerDirPath(trackerDirPath)
        .annotateFileName(fileHeader)
        .fileNameHeader(fileHeaderKey)
        .annotateBaseName(basenameHeader)
        .baseNameHeader(basenameHeaderKey)
        .deserializerType(deserializerType)
        .deserializerContext(deserializerContext)
        .deletePolicy(deletePolicy)
        .inputCharset(inputCharset)
        .decodeErrorPolicy(decodeErrorPolicy)
        .consumeOrder(consumeOrder)
        .recursiveDirectorySearch(recursiveDirectorySearch)
        .build();
  } catch (IOException ioe) {
    throw new FlumeException("Error instantiating spooling event parser",
        ioe);
  }

  Runnable runner = new SpoolDirectoryRunnable(reader, sourceCounter);
  executor.scheduleWithFixedDelay(
      runner, 0, POLL_DELAY_MS, TimeUnit.MILLISECONDS);

  super.start();
  logger.debug("SpoolDirectorySource source started");
  sourceCounter.start();
}

/**
 * 解析时间
 * @param pattern
 * @return
 */
public static String getTime(String pattern) {
    SimpleDateFormat sdf = null;
    try{
        sdf = new SimpleDateFormat(pattern);
    }catch (Exception e){
        return "";
    }
    return sdf.format(new Date(System.currentTimeMillis()));
}

/**
 * 解析时间
 *
 * @param spoolDirectory
 * @return
 */
public static String spoolTimeDirectory(String spoolDirectory) {
    String spool = spoolDirectory.substring(spoolDirectory.lastIndexOf("/") + 1, spoolDirectory.length());
    String time = getTime(spool);
    if (StringUtils.isNotBlank(time)) {
        return time;
    }
    return spool;
}

/**
 * 拼装目录
 *
 * @param spoolDirectory
 * @return
 */
public static String directory(String spoolDirectory) {
    String spoolDir = spoolDirectory.substring(0, spoolDirectory.lastIndexOf("/") + 1);
    return spoolDir + spoolTimeDirectory(spoolDirectory);
}

按照如上简单修改之后,编译之后倒入到jar包,替换cdh集群中的flume即可。配置文件如下:

app.sources=r1
app.sinks=s1
app.channels=c1

app.sources.r1.type=spooldir
app.sources.r1.spoolDir=/data/log/yyyy-MM-dd
app.sources.r1.channels=c1
app.sources.r1.fileHeader=false
#一行读取默认最大限制为2048,这里重新设置最大限制
app.sources.r1.deserializer.maxLineLength =1048576

#app.sources.r1.interceptors =i1
#app.sources.r1.interceptors.i1.type = timestamp

app.sinks.s1.type = hdfs
app.sinks.s1.hdfs.path = hdfs://hadoop1:8020/home/data/avatar-log/data-log/%Y-%m-%d
#文件前缀和后缀
app.sinks.s1.hdfs.filePrefix = gdapp_log
app.sinks.s1.hdfs.fileSuffix = .log
#通过设置 hdfs.inUsePrefix,例如设置为 .时,hdfs 会把该文件当做隐藏文件,以避免在 mr 过程中读到这些临时文件,引起一些错误
app.sinks.s1.hdfs.inUsePrefix = .
#同时打开的最大文件数目
app.sinks.s1.hdfs.maxOpenFiles = 5000
app.sinks.s1.hdfs.batchSize= 1000
app.sinks.s1.hdfs.fileType = DataStream
app.sinks.s1.hdfs.writeFormat =Text
#128M为一个采集后的存储文件大小
app.sinks.s1.hdfs.rollSize = 134217728
app.sinks.s1.hdfs.rollCount = 0
app.sinks.s1.hdfs.rollInterval = 300
app.sinks.s1.hdfs.useLocalTimeStamp = true
app.sinks.s1.channel = c1

#app.channels.c1.type=file
#app.channels.c1.checkpointDir=./file_channel/checkpoint
#app.channels.c1.dataDirs=./file_channel/data
app.channels.c1.type = memory
app.channels.c1.capacity = 10000
app.channels.c1.transactionCapacity = 1000


 

© 著作权归作者所有

共有 人打赏支持
你我他有个梦

你我他有个梦

粉丝 96
博文 110
码字总数 98858
作品 0
昌平
程序员
私信 提问
带你看懂大数据采集引擎之Flume&采集目录中的日志

欢迎关注大数据和人工智能技术文章发布的微信公众号:清研学堂,在这里你可以学到夜白(作者笔名)精心整理的笔记,让我们每天进步一点点,让优秀成为一种习惯! 一、Flume的介绍: Flume由C...

李金泽
03/04
0
0
Flume 日志收集系统 Spooldir-Source HDFS-sink

日志即log,记录发生的事件。以Nginx为例,有errorlog和accesslog 2个日志。access_log是访问日志,每条访问记录会产生几百字节的数据,随着访问量增加,日志文件会越来越大,必须定期清理日...

白头雁
08/23
0
0
解决Flume采集数据时在HDFS上产生大量小文件的问题

问题:flume指定HDFS类型的Sink时,采集数据至HDFS指定目录,会产生大量小文件。 问题重现: 1、创建flume配置文件flume-env.sh,: flume配置文件如下(根据自身需要修改): 因为flume可以...

舒运
06/10
0
0
Flume框架简单介绍(34)

Flume是一个分布式的海量数据收集框架. Flume框架流程图 Channel是缓存的数据,如果Sink传送给了HDFS,Channel中缓存的数据就会删除,如果没有传送成功,Channel相当于做了备份,Sink重复从C...

肖鋭
2014/04/06
0
0
Kafka实战-Flume到Kafka

1.概述   前面给大家介绍了整个Kafka项目的开发流程,今天给大家分享Kafka如何获取数据源,即Kafka生产数据。下面是今天要分享的目录: 数据来源 Flume到Kafka 数据源加载 预览   下面开...

smartloli
2015/07/02
0
0

没有更多内容

加载失败,请刷新页面

加载更多

Linux学习-1031(rsync同步工具 上)

10.28 rsync工具介绍 10.29/10.30 rsync常用选项 10.31 rsync通过ssh同步 一、 rsync工具介绍 rsync是一个同步工具,在日常的运维中常会用到。它可以本地同步,也实现可以远程两台机器同步。...

wxy丶
1分钟前
0
0
python实战一期:第一天

1. 为什么学习python 1.1 为什么要学Python? Python第一是个非常牛B的脚本语言,能满足绝大部分自动化运维的需求,又能做后端C/S架构,又能用WEB框架快速开发出高大上的Web界面,只有当你自...

laoba
3分钟前
0
0
Java并发编程学习三:线程同步的关键字以及理解

上篇文章中介绍了Java线程的带来的问题与内存模型中介绍了线程可能会引发的问题以及对应Java的内存模型,顺带介绍了Volatile和Sychronized关键字。今天对Java中涉及到的常见的关键类和关键字...

JerryLin123
10分钟前
0
0
我用代码来给你们分析一个赚钱的技巧

赚钱是个俗气的话题,但又是人人都绕不开的事情。我今天来“科学”地触碰下这个话题。 谈赚钱,就会谈到理财、投资,谈到炒股。有这样一个笑话: 问:如何成为百万富翁? 答:带一千万进入股...

crossin
10分钟前
0
0
spring MatchingBean应用

1、编写接口FactoryList import java.util.List;public interface FactoryList<E extends MatchingBean<K>, K> extends List<E> { E getBean(K factor); List<E> getBeanLi......

重城重楼
23分钟前
1
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部