文档章节

flume spooldir 定期采集日期目录

你我他有个梦
 你我他有个梦
发布于 2017/08/16 22:41
字数 508
阅读 30
收藏 0
点赞 0
评论 0

这里以cdh5-1.6.0_5.10.2为例。

flume源码下载地址:https://github.com/cloudera/flume-ng/tree/cdh5-1.6.0_5.10.2,SpoolDirectorySource在https://github.com/cloudera/flume-ng/tree/cdh5-1.6.0_5.10.2/flume-ng-core项目下,下载之后找到org/apache/flume/source/SpoolDirectorySource修改源码如下:

@Override
public synchronized void start() {
  //添加解析日期目录方法
  spoolDirectory = directory(spoolDirectory);
  logger.info("SpoolDirectorySource source starting with directory: {}",
      spoolDirectory);

  executor = Executors.newSingleThreadScheduledExecutor();

  File directory = new File(spoolDirectory);
  try {
    reader = new ReliableSpoolingFileEventReader.Builder()
        .spoolDirectory(directory)
        .completedSuffix(completedSuffix)
        .ignorePattern(ignorePattern)
        .trackerDirPath(trackerDirPath)
        .annotateFileName(fileHeader)
        .fileNameHeader(fileHeaderKey)
        .annotateBaseName(basenameHeader)
        .baseNameHeader(basenameHeaderKey)
        .deserializerType(deserializerType)
        .deserializerContext(deserializerContext)
        .deletePolicy(deletePolicy)
        .inputCharset(inputCharset)
        .decodeErrorPolicy(decodeErrorPolicy)
        .consumeOrder(consumeOrder)
        .recursiveDirectorySearch(recursiveDirectorySearch)
        .build();
  } catch (IOException ioe) {
    throw new FlumeException("Error instantiating spooling event parser",
        ioe);
  }

  Runnable runner = new SpoolDirectoryRunnable(reader, sourceCounter);
  executor.scheduleWithFixedDelay(
      runner, 0, POLL_DELAY_MS, TimeUnit.MILLISECONDS);

  super.start();
  logger.debug("SpoolDirectorySource source started");
  sourceCounter.start();
}

/**
 * 解析时间
 * @param pattern
 * @return
 */
public static String getTime(String pattern) {
    SimpleDateFormat sdf = null;
    try{
        sdf = new SimpleDateFormat(pattern);
    }catch (Exception e){
        return "";
    }
    return sdf.format(new Date(System.currentTimeMillis()));
}

/**
 * 解析时间
 *
 * @param spoolDirectory
 * @return
 */
public static String spoolTimeDirectory(String spoolDirectory) {
    String spool = spoolDirectory.substring(spoolDirectory.lastIndexOf("/") + 1, spoolDirectory.length());
    String time = getTime(spool);
    if (StringUtils.isNotBlank(time)) {
        return time;
    }
    return spool;
}

/**
 * 拼装目录
 *
 * @param spoolDirectory
 * @return
 */
public static String directory(String spoolDirectory) {
    String spoolDir = spoolDirectory.substring(0, spoolDirectory.lastIndexOf("/") + 1);
    return spoolDir + spoolTimeDirectory(spoolDirectory);
}

按照如上简单修改之后,编译之后倒入到jar包,替换cdh集群中的flume即可。配置文件如下:

app.sources=r1
app.sinks=s1
app.channels=c1

app.sources.r1.type=spooldir
app.sources.r1.spoolDir=/data/log/yyyy-MM-dd
app.sources.r1.channels=c1
app.sources.r1.fileHeader=false
#一行读取默认最大限制为2048,这里重新设置最大限制
app.sources.r1.deserializer.maxLineLength =1048576

#app.sources.r1.interceptors =i1
#app.sources.r1.interceptors.i1.type = timestamp

app.sinks.s1.type = hdfs
app.sinks.s1.hdfs.path = hdfs://hadoop1:8020/home/data/avatar-log/data-log/%Y-%m-%d
#文件前缀和后缀
app.sinks.s1.hdfs.filePrefix = gdapp_log
app.sinks.s1.hdfs.fileSuffix = .log
#通过设置 hdfs.inUsePrefix,例如设置为 .时,hdfs 会把该文件当做隐藏文件,以避免在 mr 过程中读到这些临时文件,引起一些错误
app.sinks.s1.hdfs.inUsePrefix = .
#同时打开的最大文件数目
app.sinks.s1.hdfs.maxOpenFiles = 5000
app.sinks.s1.hdfs.batchSize= 1000
app.sinks.s1.hdfs.fileType = DataStream
app.sinks.s1.hdfs.writeFormat =Text
#128M为一个采集后的存储文件大小
app.sinks.s1.hdfs.rollSize = 134217728
app.sinks.s1.hdfs.rollCount = 0
app.sinks.s1.hdfs.rollInterval = 300
app.sinks.s1.hdfs.useLocalTimeStamp = true
app.sinks.s1.channel = c1

#app.channels.c1.type=file
#app.channels.c1.checkpointDir=./file_channel/checkpoint
#app.channels.c1.dataDirs=./file_channel/data
app.channels.c1.type = memory
app.channels.c1.capacity = 10000
app.channels.c1.transactionCapacity = 1000


 

© 著作权归作者所有

共有 人打赏支持
你我他有个梦

你我他有个梦

粉丝 92
博文 110
码字总数 98858
作品 0
昌平
程序员
解决Flume采集数据时在HDFS上产生大量小文件的问题

问题:flume指定HDFS类型的Sink时,采集数据至HDFS指定目录,会产生大量小文件。 问题重现: 1、创建flume配置文件flume-env.sh,: flume配置文件如下(根据自身需要修改): 因为flume可以...

舒运 ⋅ 06/10 ⋅ 0

Kafka实战-Flume到Kafka

1.概述   前面给大家介绍了整个Kafka项目的开发流程,今天给大家分享Kafka如何获取数据源,即Kafka生产数据。下面是今天要分享的目录: 数据来源 Flume到Kafka 数据源加载 预览   下面开...

smartloli ⋅ 2015/07/02 ⋅ 0

flume_kafkaChannel_kafkaSink

agent.sources = source 抽取类型为目录 agent.sources.source.type = spooldir 抽取的文件目录 agent.sources.source.spoolDir = /root/tmp/flume/data 添加一个存储绝对路径文件名的头 ag...

tanj123 ⋅ 04/17 ⋅ 0

其他消息中间件及场景应用(下3)

版权声明:欢迎转载,但是看在我辛勤劳动的份上,请注明来源:http://blog.csdn.net/yinwenjie(未经允许严禁用于商业用途!) https://blog.csdn.net/yinwenjie/article/details/51516329 目...

yunlielai ⋅ 04/15 ⋅ 0

Flume---大数据协作框架

flume是什么 Apache Flume是一个分布式的、可靠的、易用的系统,可以有效地将来自很多不同源系统的大量日志数据收集、汇总或者转移到一个数据中心存储。 Apache Flume的作用不仅限于日志汇总...

简心 ⋅ 05/06 ⋅ 0

阿里大数据工程师教你怎样理解Flume

lume是干什么的? 收集日志的 flume如何搜集日志? 我们把flume比作情报人员 (1)搜集信息 (2)获取记忆信息 (3)传递报告间谍信息 flume是怎么完成上面三件事情的,三个组件: source: ...

JAVA丶学习 ⋅ 04/14 ⋅ 0

kafka来读取flume的数据

一、查看kafka topic ./kafka-topics.sh --list --zookeeper bigdata-test-3:2181, bigdata-test-2:2181, bigdata-test-1:2181, bigdata-test-4:2181, bigdata-test-5:2181 ./kafka-topics.s......

weixin_41876523 ⋅ 05/24 ⋅ 0

【Strom篇】Flume+Kafaka+Strom整合完成信息记录

第一部分:流程分析 由flume收集客户端发送的信息,经过kafka集群消费者收集,然后给storm集群数据处理(数据清洗),最后再发给kafka集群收集。 第二部分:集群搭建 flume+kafka 一、配合f...

chenshi_2753 ⋅ 05/06 ⋅ 0

Flume+Kafka+Storm+Redis构建大数据实时处理系统:实时统计网站PV、UV+展示

[TOC] 1 大数据处理的常用方法 前面在我的另一篇文章中《大数据采集、清洗、处理:使用MapReduce进行离线数据分析完整案例》中已经有提及到,这里依然给出下面的图示: 前面给出的那篇文章是...

xpleaf ⋅ 04/16 ⋅ 0

flume 总结--flume入门介绍

flume介绍 flume被设计为一个灵活的分布式系统,可以很容易的扩展,而且是高度可定制化的,一个配置正确的Flume Agent和由互相连接的Agent创建的Agent管道,保证不会丢失数据,提供持久的cha...

u013362353 ⋅ 05/28 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

十五周二次课

十五周二次课 17.1mysql主从介绍 17.2准备工作 17.3配置主 17.4配置从 17.5测试主从同步 17.1mysql主从介绍 MySQL主从介绍 MySQL主从又叫做Replication、AB复制。简单讲就是A和B两台机器做主...

河图再现 ⋅ 38分钟前 ⋅ 0

docker安装snmp rrdtool环境

以Ubuntu16:04作为基础版本 docker pull ubuntu:16.04 启动一个容器 docker run -d -i -t --name flow_mete ubuntu:16.04 bash 进入容器 docker exec -it flow_mete bash cd ~ 安装基本软件 ......

messud4312 ⋅ 今天 ⋅ 0

OSChina 周一乱弹 —— 快别开心了,你还没有女友呢。

Osc乱弹歌单(2018)请戳(这里) 【今日歌曲】 @莱布妮子 :分享吴彤的单曲《好春光》 《好春光》- 吴彤 手机党少年们想听歌,请使劲儿戳(这里) @clouddyy :小萝莉街上乱跑,误把我认错成...

小小编辑 ⋅ 今天 ⋅ 7

mysql in action / alter table

change character set ALTER SCHEMA `employees` DEFAULT CHARACTER SET utf8mb4 DEFAULT COLLATE utf8mb4_general_ci ;ALTER TABLE `employees`.`t2` CHARACTER SET = utf8mb4 , COLLAT......

qwfys ⋅ 今天 ⋅ 0

Java 开发者不容错过的 12 种高效工具

Java 开发者常常都会想办法如何更快地编写 Java 代码,让编程变得更加轻松。目前,市面上涌现出越来越多的高效编程工具。所以,以下总结了一系列工具列表,其中包含了大多数开发人员已经使用...

jason_kiss ⋅ 昨天 ⋅ 0

Linux下php访问远程ms sqlserver

1、安装freetds(略,安装在/opt/local/freetds 下) 2、cd /path/to/php-5.6.36/ 进入PHP源码目录 3、cd ext/mssql进入MSSQL模块源码目录 4、/opt/php/bin/phpize生成编译配置文件 5、 . ./...

wangxuwei ⋅ 昨天 ⋅ 0

如何成为技术专家

文章来源于 -- 时间的朋友 拥有良好的心态。首先要有空杯心态,用欣赏的眼光发现并学习别人的长处,包括但不限于工具的使用,工作方法,解决问题以及规划未来的能力等。向别人学习的同时要注...

长安一梦 ⋅ 昨天 ⋅ 0

Linux vmstat命令实战详解

vmstat命令是最常见的Linux/Unix监控工具,可以展现给定时间间隔的服务器的状态值,包括服务器的CPU使用率,内存使用,虚拟内存交换情况,IO读写情况。这个命令是我查看Linux/Unix最喜爱的命令...

刘祖鹏 ⋅ 昨天 ⋅ 0

MySQL

查看表相关命令 - 查看表结构    desc 表名- 查看生成表的SQL    show create table 表名- 查看索引    show index from  表名 使用索引和不使用索引 由于索引是专门用于加...

stars永恒 ⋅ 昨天 ⋅ 0

easyui学习笔记

EasyUI常用控件禁用方法 combobox $("#id").combobox({ disabled: true }); ----- $("#id").combobox({ disabled: false}); validatebox $("#id").attr("readonly", true); ----- $("#id").r......

miaojiangmin ⋅ 昨天 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部