文档章节

flume spooldir 定期采集日期目录

你我他有个梦
 你我他有个梦
发布于 2017/08/16 22:41
字数 508
阅读 491
收藏 0

精选30+云产品,助力企业轻松上云!>>>

这里以cdh5-1.6.0_5.10.2为例。

flume源码下载地址:https://github.com/cloudera/flume-ng/tree/cdh5-1.6.0_5.10.2,SpoolDirectorySource在https://github.com/cloudera/flume-ng/tree/cdh5-1.6.0_5.10.2/flume-ng-core项目下,下载之后找到org/apache/flume/source/SpoolDirectorySource修改源码如下:

@Override
public synchronized void start() {
  //添加解析日期目录方法
  spoolDirectory = directory(spoolDirectory);
  logger.info("SpoolDirectorySource source starting with directory: {}",
      spoolDirectory);

  executor = Executors.newSingleThreadScheduledExecutor();

  File directory = new File(spoolDirectory);
  try {
    reader = new ReliableSpoolingFileEventReader.Builder()
        .spoolDirectory(directory)
        .completedSuffix(completedSuffix)
        .ignorePattern(ignorePattern)
        .trackerDirPath(trackerDirPath)
        .annotateFileName(fileHeader)
        .fileNameHeader(fileHeaderKey)
        .annotateBaseName(basenameHeader)
        .baseNameHeader(basenameHeaderKey)
        .deserializerType(deserializerType)
        .deserializerContext(deserializerContext)
        .deletePolicy(deletePolicy)
        .inputCharset(inputCharset)
        .decodeErrorPolicy(decodeErrorPolicy)
        .consumeOrder(consumeOrder)
        .recursiveDirectorySearch(recursiveDirectorySearch)
        .build();
  } catch (IOException ioe) {
    throw new FlumeException("Error instantiating spooling event parser",
        ioe);
  }

  Runnable runner = new SpoolDirectoryRunnable(reader, sourceCounter);
  executor.scheduleWithFixedDelay(
      runner, 0, POLL_DELAY_MS, TimeUnit.MILLISECONDS);

  super.start();
  logger.debug("SpoolDirectorySource source started");
  sourceCounter.start();
}

/**
 * 解析时间
 * @param pattern
 * @return
 */
public static String getTime(String pattern) {
    SimpleDateFormat sdf = null;
    try{
        sdf = new SimpleDateFormat(pattern);
    }catch (Exception e){
        return "";
    }
    return sdf.format(new Date(System.currentTimeMillis()));
}

/**
 * 解析时间
 *
 * @param spoolDirectory
 * @return
 */
public static String spoolTimeDirectory(String spoolDirectory) {
    String spool = spoolDirectory.substring(spoolDirectory.lastIndexOf("/") + 1, spoolDirectory.length());
    String time = getTime(spool);
    if (StringUtils.isNotBlank(time)) {
        return time;
    }
    return spool;
}

/**
 * 拼装目录
 *
 * @param spoolDirectory
 * @return
 */
public static String directory(String spoolDirectory) {
    String spoolDir = spoolDirectory.substring(0, spoolDirectory.lastIndexOf("/") + 1);
    return spoolDir + spoolTimeDirectory(spoolDirectory);
}

按照如上简单修改之后,编译之后倒入到jar包,替换cdh集群中的flume即可。配置文件如下:

app.sources=r1
app.sinks=s1
app.channels=c1

app.sources.r1.type=spooldir
app.sources.r1.spoolDir=/data/log/yyyy-MM-dd
app.sources.r1.channels=c1
app.sources.r1.fileHeader=false
#一行读取默认最大限制为2048,这里重新设置最大限制
app.sources.r1.deserializer.maxLineLength =1048576

#app.sources.r1.interceptors =i1
#app.sources.r1.interceptors.i1.type = timestamp

app.sinks.s1.type = hdfs
app.sinks.s1.hdfs.path = hdfs://hadoop1:8020/home/data/avatar-log/data-log/%Y-%m-%d
#文件前缀和后缀
app.sinks.s1.hdfs.filePrefix = gdapp_log
app.sinks.s1.hdfs.fileSuffix = .log
#通过设置 hdfs.inUsePrefix,例如设置为 .时,hdfs 会把该文件当做隐藏文件,以避免在 mr 过程中读到这些临时文件,引起一些错误
app.sinks.s1.hdfs.inUsePrefix = .
#同时打开的最大文件数目
app.sinks.s1.hdfs.maxOpenFiles = 5000
app.sinks.s1.hdfs.batchSize= 1000
app.sinks.s1.hdfs.fileType = DataStream
app.sinks.s1.hdfs.writeFormat =Text
#128M为一个采集后的存储文件大小
app.sinks.s1.hdfs.rollSize = 134217728
app.sinks.s1.hdfs.rollCount = 0
app.sinks.s1.hdfs.rollInterval = 300
app.sinks.s1.hdfs.useLocalTimeStamp = true
app.sinks.s1.channel = c1

#app.channels.c1.type=file
#app.channels.c1.checkpointDir=./file_channel/checkpoint
#app.channels.c1.dataDirs=./file_channel/data
app.channels.c1.type = memory
app.channels.c1.capacity = 10000
app.channels.c1.transactionCapacity = 1000


 

你我他有个梦

你我他有个梦

粉丝 98
博文 130
码字总数 109760
作品 0
通州
程序员
私信 提问
加载中
请先登录后再评论。
Apache Flume的介绍安装及简单案例

概述   Flume 是 一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的软件。Flume 的核心是把数据从数据源(source)收集过来,再将收集到的数据送到指定的目的地(sink)。为了保证输...

osc_xv7zq561
2018/03/22
8
0
日志采集flume配置备忘

运行在日志源结点的flume agent: source: Spooling Directory Source channel: Memory Channel sink: Avro Sink 说明: 只能显示ip,所以特地加了 用于后级日志汇总结点的日志数据分离,必不...

超爱fitnesse
2015/06/29
433
0
第14章 关于Flume

简介 设计Flume的宗旨是向Hadoop批量导入基于事件的海量数据。 安装 tar -zxvf apache-flume-1.9.0-bin.tar.gz 设置环境变量 示例: spool-to-logger.properties 配置文件: 事务和可靠性 Fl...

lhsqjoe
2019/09/17
2
0
kafka sink

flume版本1.6 kafka版本0.8.2 创建配置目录 创建采集数据目录 创建配置文件/opt/beh/core/flume/conf.d/kafka-agent.conf flume agent启动命令...

Yulong_
2017/04/21
42
0
大数据入门第十二天——flume入门

一、概述   1.什么是flume     官网的介绍:http://flume.apache.org/   Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and ......

osc_q2cisxou
2018/02/26
1
0

没有更多内容

加载失败,请刷新页面

加载更多

主机“ xxx.xx.xxx.xxx”不允许连接到该MySQL服务器

问题: This should be dead simple, but I cannot get it to work for the life of me. 这本来应该很简单,但是我无法让它在我的一生中发挥作用。 I'm just trying to connect remotely to......

技术盛宴
今天
14
0
Cocoa Autolayout:内容拥抱与内容压缩阻力优先

问题: I can't find a clear answer on Apple documentation regarding Cocoa Autolayout about the difference between content hugging and compression resistance. 关于Cocoa Autolayou......

javail
今天
24
0
OSChina 周二乱弹 —— 附近居民接连失踪,你们有什么头绪吗

Osc乱弹歌单(2020)请戳(这里) 【今日歌曲】 @薛定谔的兄弟 :分享洛神有语创建的歌单「我喜欢的音乐」: 《伤离别(原版)》- 黄霑 手机党少年们想听歌,请使劲儿戳(这里) @巴拉迪维 :睡...

小小编辑
今天
44
1
IntelliJ IDEA 默认快捷键大全

Remember these Shortcuts 常用 功能 快捷键 备注 ● Smart code completion Ctrl + Shift + Space - ● Search everywhere Double Shift - ● Show intention actions and quick-fixes Alt......

巨輪
今天
30
0
Hacker News 简讯 2020-07-14

更新时间: 2020-07-14 04:01 Bitcoin is more like ham radio than the early internet - (jpkoning.blogspot.com) 比特币更像是火腿收音机,而不是早期的互联网 得分:159 | 评论:140 Chipma......

FalconChen
今天
136
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部