文档章节

flume+kafka测试(监控File)

JPblog
 JPblog
发布于 2016/09/14 14:39
字数 548
阅读 141
收藏 2

1.flume与kafka整合

1.下载插件包

    Flume和Kafka插件包下载:https://github.com/beyondj2ee/flumeng-kafka-plugin

2.复制jar包

    复制插件包中的jar包到flume/lib中

        (删掉不同版本相同jar包,需删除scala-compiler-z.9.2.jar包,否则flume启动会出现问题)

    复制kafka/libs中的jar包到flume/lib中

2.配置flume配置文件(监控file

    vi /opt/flume/conf/hw.conf

agent.sources = s1   
agent.channels = c1 
agent.sinks = k1

agent.sources.s1.type=exec
agent.sources.s1.command=tail -F /opt/log/debug.log
agent.sources.s1.channels=c1
agent.channels.c1.type=memory
agent.channels.c1.capacity=10000
agent.channels.c1.transactionCapacity=100

#设置Kafka接收器
agent.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSink
#设置Kafka的broker地址和端口号
agent.sinks.k1.brokerList=127.0.0.1:9092
#设置Kafka的Topic
agent.sinks.k1.topic=testKJ1
#设置序列化方式
agent.sinks.k1.serializer.class=kafka.serializer.StringEncoder
agent.sinks.k1.channel=c1

3.启动zookeeper(/opt/kafka)

    bin/zookeeper-server-start.sh config/zookeeper.properties

4.启动kafka(/opt/kafka)

    bin/kafka-server-start.sh config/server.properties

5.启动kafka消费者 (/opt/kafka)

    bin/kafka-console-consumer.sh -zookeeper localhost:2181 --from-beginning --topic testKJ1

6.启动flume(/opt/flume)

    bin/flume-ng agent -n agent -c conf -f conf/hw.conf -Dflume.root.logger=INFO,console

7.向被监控日志插入数据

    echo "nihaoa dashagua">>/opt/log/debug.log

####################################################################################

控制文件举例

监控目录

    vi /opt/flume/conf/dir.conf

a1.sources = s1
a1.channels = c1
a1.sinks = k1

a1.sources.s1.type = spooldir
a1.sources.s1.channels = c1
a1.sources.s1.spoolDir = /opt/log/
a1.sources.s1.fileHeader = true

a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 800000

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = Dirtopic
a1.sinks.k1.brokerList = 127.0.0.1:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 20
a1.sinks.k1.channel = c1

监控目录写入HDFS

    vi /opt/flume/conf/dirhdfs.conf

a1.sources = s1
a1.channels = c1
a1.sinks = k2

a1.sources.s1.type = spooldir
a1.sources.s1.channels = c1
a1.sources.s1.spoolDir = /opt/log/
a1.sources.s1.fileHeader = true

a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 800000

a1.sinks.k2.topic = Dirtopic
a1.sinks.k2.type=hdfs
a1.sinks.k2.channel=c1
a1.sinks.k2.hdfs.path= /tmp/dirhdfs
a1.sinks.k2.hdfs.filePrefix=events-
a1.sinks.k2.hdfs.round=true
a1.sinks.k2.hdfs.roundValue=10
a1.sinks.k2.hdfs.roundUnit=minute

监控目录写入HDFS(按日分目录、按小时分文件)

    如/user/hue/logput/20170521下,event-20170521.1495378401218.log

a1.sources = s1
a1.channels = c1
a1.sinks = k2

a1.sources.s1.type = spooldir
a1.sources.s1.channels = c1
a1.sources.s1.spoolDir = /opt/log/
a1.sources.s1.fileHeader = true

a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 800000

a1.sinks.k2.topic = Dirtopic
a1.sinks.k2.type=hdfs
a1.sinks.k2.channel=c1
a1.sinks.k2.hdfs.useLocalTimeStamp = true
a1.sinks.k2.hdfs.path= /user/hue/logput/%Y%m%d
a1.sinks.k2.hdfs.filePrefix=events-%Y%m%d
a1.sinks.k2.hdfs.fileSuffix=.log
a1.sinks.k2.hdfs.round=true
a1.sinks.k2.hdfs.roundValue=10
a1.sinks.k2.hdfs.roundUnit=minute

 

© 著作权归作者所有

下一篇: 6)flume安装
JPblog
粉丝 14
博文 65
码字总数 42633
作品 0
朝阳
程序员
私信 提问
Flume+Kafka+storm流式计算——文件分组

如题,我用Flume+Kafka监控一个文件夹下的小文件输入,把各个数据流传输到storm集群进行数据计算。 由于每个小文件中内容有业务计算逻辑,所以我在flume传输到kafka时,修改了Kafkasink代码,...

诺诺
2016/06/20
468
0
kafka和flume的区别和对比使用

(1)kafka和flume都是日志系统。kafka是分布式消息中间件,自带存储,提供push和pull存取数据功能。flume分为agent(数据采集器),collector(数据简单处理和写入),storage(存储器)三部分...

u010963948
2017/12/13
0
0
玩转Flume+Kafka原来也就那点事儿

作者介绍 程超,易宝支付架构师,10年JAVA工作经验,擅长分布式和大数据技术领域,目前主要从事金融支付类方性能分析向。 Flume是一个分布式、可靠、和高可用的海量日志聚合的系统,支持在系...

程超
2016/09/13
0
0
flume+kafka性能问题

@luobiaoxc 你好,想跟你请教个问题:我在本机测试flume读一个200M的文件发送到kfka,kafka那边7分钟才能接收完数据,但本机2分钟左右就已经COMPLETED了,请问是flume往kafka发送数据慢还是k...

rabbitld
2015/04/03
511
0
flume+kafka 性能问题

应用场景: 在使用spoolDir作为flume source,kafka作为sink将文件内容缓存到kafka中。 问题: 发现flume + kafka在信息缓存的性能上不是很好,现在也分不清楚是谁的原因。读取一个80万的文件...

GoldenRoc
2014/09/12
9.8K
9

没有更多内容

加载失败,请刷新页面

加载更多

Java8

package com.shi.lambda;import java.util.Arrays;import java.util.List;import org.junit.Test;import com.shi.model.Employee;/** * 初始化案例 * @author xiaosh......

小小小施爷
35分钟前
1
0
c# 动态编译代码

有时候做计算一些东西时候,算法一直变更,写在程序需要一直调整,因此算法写在cs文件,然后动态调用内部的方法去计算判断,只需变更cs文件即可。 using Microsoft.CSharp; using System; us...

朝如青丝暮成雪
35分钟前
4
0
好程序员技术分享html5和JavaScript的区别

好程序员技术分享html5和JavaScript的区别,HTML5广义上讲是前端开发学科的代名词,包含HTML5、CSS3及JavaScript三个重要的部分,是运行在浏览器上应用的统称。如PC端网站、管理系统、手机网...

好程序员IT
38分钟前
2
0
tomcat 与 spring boot 设置虚拟路径

tomcat 设置虚拟路径 <Context path="/uploadDir" docBase="/data"/>path是请求访问的路径docBase是服务器存储文件的路径,Linux 根目录下 data spring boot 虚拟路径设置 registry.addRe......

kdy1994
41分钟前
1
0
var ,let ,const 的区别和共同点

一、let和var区别 1.关于变量提升,var能变量提升,let不能 // 关于var 如下所示console.log(a); //输出undefined,此时就是变量提升var a = 2; console.log(a); //2 //相当于下面...

MrBoyce
46分钟前
1
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部