文档章节

Flume通用配置

DanierWei
 DanierWei
发布于 2017/07/07 16:48
字数 1467
阅读 58
收藏 0

一、前言

  • 因Flume通过配置的方式去采集日志,所以存在不灵活的情况,但Flume日志模型中除了日志body部分,还提供header,以及一些通配符的操作让配置更加灵活。
  • 此篇文档讲介绍通用配置。

二、场景

  1. 配置一个通用的服务端,用于接收各个客户端agent日志,按照客户端不同日志类型,存储至hdfs不同路径,以及不同的kafka topic。

  2. 配置一个通用的服务端,用于接收各个客户端agent日志,按照客户端不同日志类型,存储至hdfs不同路径。

三、样例

  • 场景1服务端配置

    # Name the  components on this agent

    general_log_2_hdfs_kafka.sources  = r1

    general_log_2_hdfs_kafka.sinks =   k1 k2

    general_log_2_hdfs_kafka.channels  =  c1 c2

     

    #  Describe/configure the source

    general_log_2_hdfs_kafka.sources.r1.type  = avro

    general_log_2_hdfs_kafka.sources.r1.bind  = xxxx

    general_log_2_hdfs_kafka.sources.r1.port  = xxxx

    general_log_2_hdfs_kafka.sources.r1.selector.type=replicating

    general_log_2_hdfs_kafka.sources.r1.channels  = c1 c2

     

    # to hdfs 文件6小时回滚一次

    general_log_2_hdfs_kafka.sinks.k1.type = hdfs

    general_log_2_hdfs_kafka.sinks.k1.channel  = c1    

    general_log_2_hdfs_kafka.sinks.k1.hdfs.path=xxxx/%{file_key}/%Y-%m-%d       

    general_log_2_hdfs_kafka.sinks.k1.hdfs.writeFormat = Text

    general_log_2_hdfs_kafka.sinks.k1.hdfs.fileType = DataStream

    general_log_2_hdfs_kafka.sinks.k1.hdfs.useLocalTimeStamp = true

    general_log_2_hdfs_kafka.sinks.k1.hdfs.filePrefix=%{file_key}

    general_log_2_hdfs_kafka.sinks.k1.hdfs.fileSuffix=.log

    general_log_2_hdfs_kafka.sinks.k1.hdfs.rollSize = 0

    general_log_2_hdfs_kafka.sinks.k1.hdfs.rollCount = 0

    general_log_2_hdfs_kafka.sinks.k1.hdfs.rollInterval = 21600

    general_log_2_hdfs_kafka.sinks.k1.hdfs.round = true

    general_log_2_hdfs_kafka.sinks.k1.hdfs.roundValue = 6

    general_log_2_hdfs_kafka.sinks.k1.hdfs.roundUnit = hour

     

    # to kafka

    general_log_2_hdfs_kafka.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink

    general_log_2_hdfs_kafka.sinks.k2.channel  = c2

    general_log_2_hdfs_kafka.sinks.k2.brokerList = xxxx:xxxx,xxxx:xxxx,xxxx:xxxx

    general_log_2_hdfs_kafka.sinks.k2.requiredAcks = 1

    general_log_2_hdfs_kafka.sinks.k2.batchSize = 10

    # 可不配置,默认topic为default-flume-topic,如果event的header中包含topic字段则以它为准。

    general_log_2_hdfs_kafka.sinks.k2.topic = xxxx

     

    # Use a  channel which buffers events in memory

    general_log_2_hdfs_kafka.channels.c1.checkpointDir = /home/flume/data/flume_data/checkpoint

    general_log_2_hdfs_kafka.channels.c1.dataDirs = /home/flume/data/flume_data/data

    general_log_2_hdfs_kafka.channels.c1.type file

    general_log_2_hdfs_kafka.channels.c1.maxFileSize=1073741824

    general_log_2_hdfs_kafka.channels.c1.capacity=10000000

    general_log_2_hdfs_kafka.channels.c2.checkpointDir = /home/flume/data/flume_kafka/checkpoint

    general_log_2_hdfs_kafka.channels.c2.dataDirs = /home/flume/data/flume_kafka/data

    general_log_2_hdfs_kafka.channels.c2.type file

    general_log_2_hdfs_kafka.channels.c2.maxFileSize=1073741824

    general_log_2_hdfs_kafka.channels.c2.capacity=10000000

  • 发送至kafka时,topic可不配置,需要数据来源的数据中将topic加入header中。参考

    Note Kafka Sink uses the topic and key properties from the FlumeEvent headers to send events to Kafka. If topic exists in the headers, the event will be sent to that specific topic, overriding the topic configured for the Sink. If key exists in the headers, the key will used by Kafka to partition the data between the topic partitions. Events with same key will be sent to the same partition. If the key is null, events will be sent to random partitions.

     

  • 场景2服务端配置

    # Name the  components on this agent

    general_log_2_hdfs.sources  = r1

    general_log_2_hdfs.sinks =   k1

    general_log_2_hdfs.channels  =  c1

     

    #  Describe/configure the source

    general_log_2_hdfs.sources.r1.type  = avro

    general_log_2_hdfs.sources.r1.bind  = xxxx

    general_log_2_hdfs.sources.r1.port  = xxxx

    general_log_2_hdfs.sources.r1.channels  = c1

     

    # to hdfs 文件6小时回滚一次

    general_log_2_hdfs.sinks.k1.type = hdfs

    general_log_2_hdfs.sinks.k1.channel  = c1

    general_log_2_hdfs.sinks.k1.hdfs.path=xxxx/%{file_key}/%Y-%m-%d

    general_log_2_hdfs.sinks.k1.hdfs.writeFormat = Text

    general_log_2_hdfs.sinks.k1.hdfs.fileType = DataStream

    general_log_2_hdfs.sinks.k1.hdfs.useLocalTimeStamp = true

    general_log_2_hdfs.sinks.k1.hdfs.filePrefix=%{file_key}

    general_log_2_hdfs.sinks.k1.hdfs.fileSuffix=.log

    general_log_2_hdfs.sinks.k1.hdfs.rollSize = 0

    general_log_2_hdfs.sinks.k1.hdfs.rollCount = 0

    general_log_2_hdfs.sinks.k1.hdfs.rollInterval = 21600

    general_log_2_hdfs.sinks.k1.hdfs.round = true

    general_log_2_hdfs.sinks.k1.hdfs.roundValue = 6

    general_log_2_hdfs.sinks.k1.hdfs.roundUnit = hour

     

     

    # Use a  channel which buffers events in memory

    general_log_2_hdfs.channels.c1.checkpointDir = /home/flume/data/flume_data/checkpoint

    general_log_2_hdfs.channels.c1.dataDirs = /home/flume/data/flume_dat/data

    general_log_2_hdfs.channels.c1.type file

    general_log_2_hdfs.channels.c1.maxFileSize=1073741824

    general_log_2_hdfs.channels.c1.capacity=10000000

四、客户端agent配置样例

  • 针对场景1的配置

    # demo.conf: A single-node Flume configuration

     

    # Name the components on this agent

    demo.sources = r1

    demo.channels = c1

    # sink配置方式为负载均衡

    demo.sinks = k1 k2

    demo.sinkgroups = g1

    demo.sinkgroups.g1.sinks = k1 k2

    demo.sinkgroups.g1.processor.type = load_balance

    demo.sinkgroups.g1.processor.backoff = true

    demo.sinkgroups.g1.processor.selector = random

     

    # Describe/configure the source

    demo.sources.r1.type exec

    demo.sources.r1.command tail -F xxxxx

    demo.sources.r1.interceptors = i1 i2

    # 此配置表示采集的数据需要存储至hdfs的文件名称配置

    demo.sources.r1.interceptors.i1.type = static

    demo.sources.r1.interceptors.i1.preserveExisting = true

    demo.sources.r1.interceptors.i1.key = file_key

    demo.sources.r1.interceptors.i1.value = xxxx

    # 此配置表示采集的数据需要发送到topic配置

    demo.sources.r1.interceptors.i2.type = static

    demo.sources.r1.interceptors.i2.preserveExisting = true

    demo.sources.r1.interceptors.i2.key = topic

    demo.sources.r1.interceptors.i2.value = xxxx

     

    # Describe the sink 此处配置表示服务端配置有两台做负载均衡,可按照你的实际情况进行配置。

    demo.sinks.k1.type = avro

    demo.sinks.k1.hostname=xxxx

    demo.sinks.k1.port=xxxx

    demo.sinks.k2.type = avro

    demo.sinks.k2.hostname=xxxx

    demo.sinks.k2.port=xxxx

     

    # Use a channel which buffers events in file

    demo.channels.c1.type file

    demo.channels.c1.checkpointDir = demo/checkpoint

    demo.channels.c1.dataDirs = demo/data

    demo.channels.c1.maxFileSize=134217728

     

    # Bind the source and sink to the channel

    demo.sources.r1.channels = c1

    demo.sinks.k1.channel = c1

    demo.sinks.k2.channel = c1

  • 针对场景2的配置

    # demo.conf: A single-node Flume configuration

     

    # Name the components on this agent

    demo.sources = r1

    demo.channels = c1

    # sink配置方式为负载均衡

    demo.sinks = k1 k2

    demo.sinkgroups = g1

    demo.sinkgroups.g1.sinks = k1 k2

    demo.sinkgroups.g1.processor.type = load_balance

    demo.sinkgroups.g1.processor.backoff = true

    demo.sinkgroups.g1.processor.selector = random

     

    # Describe/configure the source

    demo.sources.r1.type exec

    demo.sources.r1.command tail -F xxxxx

    demo.sources.r1.interceptors = i1

    # 此配置表示采集的数据需要存储至hdfs的文件名称配置

    demo.sources.r1.interceptors.i1.type = static

    demo.sources.r1.interceptors.i1.preserveExisting = true

    demo.sources.r1.interceptors.i1.key = file_key

    demo.sources.r1.interceptors.i1.value = xxxx

     

    # Describe the sink 此处配置表示服务端配置有两台做负载均衡,可按照你的实际情况进行配置。

    demo.sinks.k1.type = avro

    demo.sinks.k1.hostname=xxxx

    demo.sinks.k1.port=xxxx

    demo.sinks.k2.type = avro

    demo.sinks.k2.hostname=xxxx

    demo.sinks.k2.port=xxxx

     

    # Use a channel which buffers events in file

    demo.channels.c1.type file

    demo.channels.c1.checkpointDir = demo/checkpoint

    demo.channels.c1.dataDirs = demo/data

    demo.channels.c1.maxFileSize=134217728

     

    # Bind the source and sink to the channel

    demo.sources.r1.channels = c1

    demo.sinks.k1.channel = c1

    demo.sinks.k2.channel = c1

  • 对于大多数场景下,日志文件基本都是按天存储,Flume提供了Taildir Source、Spooling Directory Source、以及Exec Source。此处以Exec Source为例

    # demo.conf: A single-node Flume configuration

     

    # Name the components on this agent

    demo.sources = r1

    demo.channels = c1

    # sink配置方式为负载均衡

    demo.sinks = k1 k2

    demo.sinkgroups = g1

    demo.sinkgroups.g1.sinks = k1 k2

    demo.sinkgroups.g1.processor.type = load_balance

    demo.sinkgroups.g1.processor.backoff = true

    demo.sinkgroups.g1.processor.selector = random

     

    # Describe/configure the source

    demo.sources.r1.type exec

    demo.sources.r1.shell = /bin/bash -c

    demo.sources.r1.command tail -F xxxxx/`date +%Y-%m-%d`/xxxx.log

    demo.sources.r1.interceptors = i1

    # 此配置表示采集的数据需要存储至hdfs的文件名称配置

    demo.sources.r1.interceptors.i1.type = static

    demo.sources.r1.interceptors.i1.preserveExisting = true

    demo.sources.r1.interceptors.i1.key = file_key

    demo.sources.r1.interceptors.i1.value = xxxx

     

    # Describe the sink 此处配置表示服务端配置有两台做负载均衡,可按照你的实际情况进行配置。

    demo.sinks.k1.type = avro

    demo.sinks.k1.hostname=xxxx

    demo.sinks.k1.port=xxxx

    demo.sinks.k2.type = avro

    demo.sinks.k2.hostname=xxxx

    demo.sinks.k2.port=xxxx

     

    # Use a channel which buffers events in file

    demo.channels.c1.type file

    demo.channels.c1.checkpointDir = demo/checkpoint

    demo.channels.c1.dataDirs = demo/data

    demo.channels.c1.maxFileSize=134217728

     

    # Bind the source and sink to the channel

    demo.sources.r1.channels = c1

    demo.sinks.k1.channel = c1

    demo.sinks.k2.channel = c1

五、自定义source、channel、sink

  • 基于我们公司的现状,消息使用阿里云的ONS,写了一个自定义的sink,采集的数据发送至ONS,源码地址
  • 未完待续~

© 著作权归作者所有

DanierWei
粉丝 1
博文 9
码字总数 5765
作品 0
杭州
私信 提问
Flume Installation

下载flume二进制安装包:apache-flume-1.6.0-bin.tar.gz http://www.apache.org/dist/flume/1.6.0/ http://www.apache.org/dist/flume/1.7.0/ 把apache-flume-1.6.0-bin.tar.gz文件存放在/op......

Yulong_
2017/08/10
19
0
SparkStreaming整合flume

SparkStreaming整合flume 在实际开发中push会丢数据,因为push是由flume将数据发给程序,程序出错,丢失数据。所以不会使用不做讲解,这里讲解poll,拉去flume的数据,保证数据不丢失。 1.首...

强行快乐~
07/22
0
0
Apache Flume 1.6.0 发布,日志服务器

Apache Flume 1.6.0 发布,此版本现已提供下载: http://flume.apache.org/download.html 更新内容: ** Bug 修复 [FLUME-1793] - Unit test TestElasticSearchLogStashEventSerializer fail......

oschina
2015/06/03
3.1K
2
Cloudera Flume简介

Flume是Cloudera提供的日志收集系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。 Flume是一个分布式...

Javafans
2012/03/12
780
0
Apache Flume 1.5.0 发布,日志服务器

Apache Flume 1.5.0 发布,Flume 是一个分布式、可靠和高可用的服务,用于收集、聚合以及移动大量日志数据,使用一个简单灵活的架构,就流数据模型。这是一个可靠、容错的服务。 改进内容包括...

oschina
2014/05/22
2.8K
4

没有更多内容

加载失败,请刷新页面

加载更多

CentOS7.6中安装使用fcitx框架

内容目录 一、为什么要使用fcitx?二、安装fcitx框架三、安装搜狗输入法 一、为什么要使用fcitx? Gnome3桌面自带的输入法框架为ibus,而在使用ibus时会时不时出现卡顿无法输入的现象。 搜狗和...

技术训练营
昨天
5
0
《Designing.Data-Intensive.Applications》笔记 四

第九章 一致性与共识 分布式系统最重要的的抽象之一是共识(consensus):让所有的节点对某件事达成一致。 最终一致性(eventual consistency)只提供较弱的保证,需要探索更高的一致性保证(stro...

丰田破产标志
昨天
8
0
docker 使用mysql

1, 进入容器 比如 myslq1 里面进行操作 docker exec -it mysql1 /bin/bash 2. 退出 容器 交互: exit 3. mysql 启动在容器里面,并且 可以本地连接mysql docker run --name mysql1 --env MY...

之渊
昨天
10
0
python数据结构

1、字符串及其方法(案例来自Python-100-Days) def main(): str1 = 'hello, world!' # 通过len函数计算字符串的长度 print(len(str1)) # 13 # 获得字符串首字母大写的...

huijue
昨天
6
0
PHP+Ajax微信手机端九宫格抽奖实例

PHP+Ajax结合lottery.js制作的一款微信手机端九宫格抽奖实例,抽奖完成后有收货地址添加表单出现。支持可以设置中奖概率等。 奖品列表 <div class="lottery_list clearfix" id="lottery"> ......

ymkjs1990
昨天
5
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部