文档章节

flume收集日志,先本地缓存,再写到hadoop

午火
 午火
发布于 2014/06/09 15:36
字数 695
阅读 672
收藏 3

flume自带写hdfs的组建 hdfs sink,功能和性能都不错,就是有些缺点不好克服。

1,收集的日志一直在写hadoop,虽然可以订一个规则间断写hadoop,例如设置batchSize等,但在大压力下,几乎可以认为是每时每刻都在写hafs.

2,容错性差,日志收集的过程中,hadoop出现错误(例如,hdfs丢块)等问题,就会崩溃。

3,还有4,5等等,就是hdfs sink 有些问题了,不写了。

这里写一个先收集日志到本地,形成文件,然后把这个文件上载到远程的hadoop上。

这样做有好处,

1,日志收聚到本地文件。在收集日志过程中出现的hadoop错误、异常等等文件,在收集日志成文件的工程中不存在。

2,上传到hadoop采用文件的方式,使用hadoop自己的fs API,可以有很好的效率。

3,很好的容错机制,上传文件的工程中出现hadoop问题,导致文件上传失败,没关系,下一个工作任务再上传就好了,只有上传成功才删除本地文件。

4,还有好多了,这里不写了。

代码位置:http://git.oschina.net/atuchow/flume-additional/blob/master/src/main/java/com/fone/flumeExt/sink/L2HSink/LocalToHDFSSink.java

大致的架构

flume_local

配置文件:http://git.oschina.net/atuchow/flume-additional/blob/master/src/test/resources/local2hdfs.conf

这里做假单的解释:

LocalToHDFSSink.java 就是flume sink的启动类,在配置文件中要做配置的,a1.sinks.k1.type = com.fone.flume.sink.localFile.LocalToHDFSSink

其中如下代码:

@Overridepublic void start() {
......


    CronTriggerFileHDFS cronTriggerFileHDFS = new CronTriggerFileHDFS();
        LOG.info("定时器设置,cron expression :{} .", cronExpression);try {
            cronTriggerFileHDFS.run(filePath, hdfsPath, cronExpression, isKeep);
        } catch (Exception e) {
            LOG.warn("向hdfs写文件的定时器错误,错误:{}.", e);
        }
        sinkCounter.start();

......
}
@Overridepublic void stop() {
......if (cronTriggerFileHDFS != null) {try {
                cronTriggerFileHDFS.shutdown();
            } catch (Exception e) {// TODO Auto-generated catch block                e.printStackTrace();
            }
        }


......
}

开关定时器。

定时器设定:a1.sinks.k1.local.cronExpression = 0 */15  * * * ? 按照quartZ的设置要求进行设置,不了解者去看quartZ cronTigger设置。

LocalToHDFSSink的文件存储是这样的,本地文件给定一个初始的目录a1.sinks.k1.local.directory,日志在这个初始的目录存储,动态的目录结构通过a1.sinks.k1.local.middleDir 设置。

远程的hadoop给定一个初始的目录a1.sinks.k1.hdfs.directory ,其它的目录结构和文件与本地的设置完全相同,也就是把a1.sinks.k1.local.directory目录下的所有内容,复制到a1.sinks.k1.hdfs.directory 完成工作。

日志收集文件在本地产生,没有完成的时候,带文件名后缀.tmp,完成后去掉.tmp,以此作为是否现在执行复制到hadoop的标志。

其余的看代码吧。

© 著作权归作者所有

午火
粉丝 1
博文 3
码字总数 994
作品 0
通州
高级程序员
私信 提问
加载中

评论(1)

J
J_Stone
你好,既然从本地上传到hdfs,为什么还要依赖flume,继承AbstractSink类呢,谢谢。
Flume日志收集分层架构应用实践

Flume作为一个日志收集工具,非常轻量级,基于一个个Flume Agent,能够构建一个很复杂很强大的日志收集系统,它的灵活性和优势,主要体现在如下几点: 1)模块化设计:在其Flume Agent内部可...

workming
2018/06/29
0
0
flume,kafka区别、协同与详解

简介 socket模式 简单数据处理 开发公司 Flume 日志采集系统 (管道流方式,提供了很多的默认实现,让用户通过参数部署,及扩展API.) 可编写Interceptor,对数据进行拦截,对密码进行MD5加密...

flash胜龙
2019/06/06
1.4K
0
Kafka实战-Flume到Kafka

1.概述   前面给大家介绍了整个Kafka项目的开发流程,今天给大家分享Kafka如何获取数据源,即Kafka生产数据。下面是今天要分享的目录: 数据来源 Flume到Kafka 数据源加载 预览   下面开...

smartloli
2015/07/02
0
0
Flume NG 简介及配置实战

Flume 作为 cloudera 开发的实时日志收集系统,受到了业界的认可与广泛应用。Flume 初始的发行版本目前被统称为 Flume OG(original generation),属于 cloudera。但随着 FLume 功能的扩展,...

大数据之路
2014/07/08
5.7W
9
阿里大数据工程师教你怎样理解Flume

lume是干什么的? 收集日志的 flume如何搜集日志? 我们把flume比作情报人员 (1)搜集信息 (2)获取记忆信息 (3)传递报告间谍信息 flume是怎么完成上面三件事情的,三个组件: source: ...

JAVA丶学习
2018/04/14
0
0

没有更多内容

加载失败,请刷新页面

加载更多

MBTI助你成功,让你更了解你自己

MBTI助你成功,让你更了解你自己 生活总是一个七日接着又一个七日,相信看过第七日的小伙伴,很熟悉这段开场白,人生是一个测试接着又一个测试,上学的时候测试,是为了证明你的智力,可谓从...

蛤蟆丸子
40分钟前
49
0
Android实现App版本自动更新

现在很多的App中都会有一个检查版本的功能。例如斗鱼TV App的设置界面下: 当我们点击检查更新的时候,就会向服务器发起版本检测的请求。一般的处理方式是:服务器返回的App版本与当前手机安...

shzwork
昨天
63
0
npm 发布webpack插件 webpack-html-cdn-plugin

初始化一个项目 npm init 切换到npm源 淘宝 npm config set registry https://registry.npm.taobao.org npm npm config set registry http://registry.npmjs.org 登录 npm login 登录状态......

阿豪boy
昨天
87
0
java基础(16)递归

一.说明 递归:方法内调用自己 public static void run1(){ //递归 run1(); } 二.入门: 三.执行流程: 四.无限循环:经常用 无限递归不要轻易使用,无限递归的终点是:栈内存溢出错误 五.递...

煌sir
昨天
63
0
REST接口设计规范总结

URI格式规范 URI中尽量使用连字符”-“代替下划线”_”的使用 URI中统一使用小写字母 URI中不要包含文件(脚本)的扩展名 URI命名规范 文档(Document)类型的资源用名词(短语)单数命名 集合(Co...

Treize
昨天
69
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部