文档章节

Flume-ng 高级功能配置

China_OS
 China_OS
发布于 2015/05/12 10:45
字数 1635
阅读 537
收藏 0
看看flume的高级功能:

1    flume channel selectors

      如果没有特殊说明,则默认是replicating模式。 还有Multiplexing、Custom模式可以选择。

        1    Replicating Channel Selector

              需要设置以下的属性:
               
              selector.type    默认值是replicating,用来设置该组件的名称

            selector.optional    设置selector的channel

a1.sources = r1
a1.channels = c1 c2 c3
a1.source.r1.selector.type = replicating
a1.source.r1.channels = c1 c2 c3
a1.source.r1.selector.optional = c3
在上面的配置中,c3是optional的channel,如果向c3写入失败,则会被忽略,c1和c2没有被标记为optional,如果向c2和c1写入失败则会导致事件失败。


        2    Multiplexing Channel Selector

                需要设置以下属性:

                selector.type    默认值是replicating,可以设置为multiplexing
                selector.header    默认值是flume.selector.header
                selector.default

             selector.mapping.*

a1.sources = r1
a1.channels = c1 c2 c3 c4
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = state
a1.sources.r1.selector.mapping.CZ = c1
a1.sources.r1.selector.mapping.US = c2 c3
a1.sources.r1.selector.default = c4

2    flume sink processors

        sink groups允许给一个实体设置多个sinks,sink processors可以使在sink group中所有sink具有负载均衡的能力,或者在一个sink失效后切换到另一个sink的fail over模式。

        需要设置以下属性:

        sinks    空格分隔的一系列sinks列表

       processor.type    该组件的名称,可以使default、failover、load_balance

a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
        默认的sink processor只接受一个sink,用户不一定非得创建sink group。

        1    failover sink processor

                failover机制维护一个sink优先级列表,保证有效事件可以被处理掉。

                需要设置以下属性:
                sinks    在同一组sinks中的以空格分隔的sink列表
                processor.type    默认值是default,在此可以设置为failover
                processor.priority.<sinkName>    sink在该sink group中的优先级

             processor.maxpenalty    默认值3000毫秒,故障转移的默认时间

a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000
        2    load balancing sink processor

                load balancing sink processor给多个sinks之间提供负载均衡,它维护一个可用sink索引,它支持通过round_robin和random两种方法进行负载分配,默认的选择方式是round_type类型的,也可以通过配置文件进行更改。当被选择器被调用的时候,它不会屏蔽故障的sink,继续尝试访问每一个可用的sink,如果所有的sink都故障了,选择器则无法给sink传播数据。如果backoff被开启,则sink processor会屏蔽故障的sink,选择器会在一个给定的超时时间内移除它们,当超时时间完毕后,sink还是无法访问,则超时时间以指数方式增长。

                需要设置以下属性:
                processor.sinks    在同一组sinks中的以空格分隔的sink列表
                processor.type    默认是default,在此可以设置成load_balance
                processor.backoff    默认是false
                processor.selector    默认是round_robin,还可以选择random

             processor.selector.maxTimeOut    默认是30000毫秒,屏蔽故障sink的时间

a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = random
3    Event Serializers
    
        file_roll sink和hdfs sink都支持EventSerializer接口,    

4    Body Text Serializer

        别名是text,该拦截器会把事件的body输出,不做任何改变,而事件的头部会被忽略。

        可以配置以下参数:

       appendNewline    默认值是true,在每个事件的后面追加一个新行。

a1.sinks = k1
a1.sinks.k1.type = file_roll
a1.sinks.k1.channel = c1
a1.sinks.k1.sink.directory = /var/log/flume
a1.sinks.k1.sink.serializer = text
a1.sinks.k1.sink.serializer.appendNewline = false

5    Avro Event Serializer

        别名是avro_event,该拦截器会把flume的事件插入一个avro容器文件中,

        可设置的参数如下:
        syncIntervalBytes    默认值是2048000,avro的同步间隔,单位是bytes

       compressionCodec    默认值是null,avro的压缩codec。

a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.serializer = avro_event
a1.sinks.k1.serializer.compressionCodec = snappy

6    monitor flume
    
        1    ganglia report

                flume可以报告他的metrics到ganglia3,只要你在启动flume agent的时候设置一些参数即可,也可以把这些参数设置在flume-env.sh配置文件中。需要设置的参数如下,这些参数的前缀如下flume.monitoring.:
                type    该组件的名称,这里设置为ganglia
                hosts    逗号分隔的ganglia服务器列表,hostname:port
                pollFrequency    默认值是60秒,flume向ganglia报告metrics的时间间隔
                isGanglia3    默认是false,ganglia server的版本在3以上,flume 发送的是ganglia3.1的数据格式

                启动flume agent
                flume-ng agent --conf-file example.conf --name a1 -Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=com.example:1234,com.example2:5455

        2    JSON report

                flume也可以报告JSON格式的report,为了开启JSON report,在flume机器上启动了一个web server。需要在客户端启动时设置以下参数:
                type    该组件的名称,这里设置为http
                port    该服务监听的端口,默认是41414

                启动flume agent
                flume-ng agent --conf-file example.conf --name a1 -Dflume.monitoring.type=http -Dflume.monitoring.port=34545
                然后通过http://<hostname>:<port>/metrics来查看值

7    Flume Interceptors

        flume拦截器可以修改或者删除事件,flume还支持连接器链,事件可以经过一系列拦截器。多个拦截器在配置文件中以空格分隔,拦截器的顺序就是事件处理的顺序,只有一个拦截器通过之后才会进行到下一个拦截器。

        1    Timestamp Interceptor

                该拦截器会插入到事件头中,会在事件头中插入一个key是timestamp的KV对,value的值是相关的timestamp。该拦截器可以保护相关的已经存在的timestamp。可以设置以下参数:
                type    该组件的名称,此处是timestamp

             preserveExisting    默认值是false,如果timestamp已经存在,应该被保护

a1.sources = r1
a1.channels = c1
a1.sources.r1.channels =  c1
a1.sources.r1.type = seq
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp

        2    Host Interceptor

                该拦截器会插入当前agent运行机器的hostname或者ip,插入KV对,key名称是host,可以设置以下值:
                type    该组件的名称,此处是host
                preserveExisting    默认值是false,如果在头部已经存在host,则应该被保护
                useIP    默认值是true,使用ip,而非hostname

             hostHeader        默认值是host,头部的key名称

a1.sources = r1
a1.channels = c1
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = host
a1.sources.r1.interceptors.i1.hostHeader = hostname

        3    Static Interceptor

                该拦截器允许用户追加静态头部在所有事件中,可以设置以下参数:
                type    该组件的名称,此处是static
                preserveExisting    默认值是true,如果头部已经存在,则应该被保护
                key    默认值是key,头部创建的key名称

             value    默认值是value,静态value值

a1.sources = r1
a1.channels = c1
a1.sources.r1.channels =  c1
a1.sources.r1.type = seq
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = datacenter
a1.sources.r1.interceptors.i1.value = NEW_YORK






© 著作权归作者所有

共有 人打赏支持
China_OS
粉丝 422
博文 460
码字总数 513987
作品 0
静安
技术主管
私信 提问
Flume OG和Flume NG的区别

应用场景 Flume作为Hadoop中的日志采集工具,非常的好用,但是在安装Flume的时候,查阅很多资料,发现形形色色,有的说安装Flume很简单,有的说安装Flume很复杂,需要依赖zookeeper,所以一方...

wsc449
2017/11/23
0
0
【Flume】- 收集 Log4j 日志上送Kafka

Flume 收集 Log4j 日志上送Kafka存储 --- 环境准备 下载Flume: http://flume.apache.org/ 安装: 解压下载包到自定义路径 配置agent 解释: sources: avro 用于接收Log4j日志 interceptors: ...

ZeroneLove
04/10
0
0
flume 1.7 源码导入eclipse windows

安装maven,设置MAVEN_HOME等配置 下载flume源码 eclipse-oxygen,设置eclipse 使用外部maven,并配置settings.xml 遇到问题: 如果顺利,已将所需jar都下载下来了。 导入后遇到如下问题 fl...

柯里昂
2017/10/31
0
0
Flume NG 简介及配置实战

Flume 作为 cloudera 开发的实时日志收集系统,受到了业界的认可与广泛应用。Flume 初始的发行版本目前被统称为 Flume OG(original generation),属于 cloudera。但随着 FLume 功能的扩展,...

大数据之路
2014/07/08
0
9
log4j直接输出日志到flume

log4j直接输出日志到flume 此jar是由Cloudera的CDH发行版提供的一个工具类,通过配置,可以将log4j的日志直接输出到flume,方便日志的采集。 在CDH5.3.0版本中是:flume-ng-log4jappender-1....

cloud-coder
2015/07/06
0
4

没有更多内容

加载失败,请刷新页面

加载更多

sass和less的优缺点

简述 sass和less都是css的预编译处理语言,他们引入了mixins,参数,嵌套规则,运算,颜色,名字空间,作用域,JavaScript赋值等 加快了css开发效率,当然这两者都可以配合gulp和grunt等前端构...

莫西摩西
42分钟前
0
0
信号量与PV操作

在计算机操作系统中,PV操作是进程管理中的难点。 首先应弄清PV操作的含义:PV操作由P操作原语和V操作原语组成(原语是不可中断的过程),对信号量进行操作,具体定义如下: P(S):①将信号...

shzwork
55分钟前
0
0
重新认识网络通信协议

OSI网络分层 应用层 http, smtp,pop3这些都属于应用层协议 为用户的应用程序提供服务 表示层 确保一个系统的应用层发送的信息被另一个系统的应用层接收到 会话层 通过传输层建立数据传输的通...

最胖的瘦子
今天
2
0
【转】分布式数据流的轻量级异步快照

本篇翻译自论文:Lightweight Asynchronous Snapshots for Distributed Dataflows,Flink的容错快照模型即来源于该论文。原文地址:https://arxiv.org/pdf/1506.08603.pdf 分布式数据流的轻量...

yiduwangkai
今天
1
0
java使用反射机制设置私有成员变量的值

写一个方法:public void setProperty(Objectobj, String propertyName, Object value){}, 此方法可将obj对象中名为propertyName的属性的值设置为value。(这里不知道obj对象的propertyNam...

群星纪元
今天
1
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部