文档章节

Flume和Kafka整合安装

四叶草666
 四叶草666
发布于 2017/07/20 09:21
字数 553
阅读 46
收藏 0

版本号:

RedHat6.5   JDK1.8    flume-1.6.0   kafka_2.11-0.8.2.1

1.flume安装

RedHat6.5安装单机flume1.6:http://blog.leanote.com/post/2630794313@qq.com/26781d33b435

2.kafka安装

RedHat6.5安装kafka集群 : http://blog.leanote.com/post/2630794313@qq.com/0230848f841a

3.Flume和Kafka整合

在conf目录新建flume-kafka.conf文件:

 
  1. touch /usr/local/flume/apache-flume-1.6.0-bin/conf/flume-kafka.conf
  2. sudo gedit /usr/local/flume/apache-flume-1.6.0-bin/conf/flume-kafka.conf

输入以下内容:

 
  1. # 指定Agent的组件名称  
  2. agent1.sources = source1  
  3. agent1.sinks = sink1  
  4. agent1.channels = channel1  
  5.  
  6. # 指定Flume source(要监听的路径)  
  7. agent1.sources.source1.type = spooldir  
  8. agent1.sources.source1.spoolDir = /usr/local/flume/logtest
  9.  
  10. # 指定Flume sink  
  11. #agent1.sinks.sink1.type = logger  
  12. agent1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink  
  13. agent1.sinks.sink1.topic = test  
  14. agent1.sinks.sink1.brokerList = 192.168.168.200:9092  
  15. agent1.sinks.sink1.requiredAcks = 1  
  16. agent1.sinks.sink1.batchSize = 100   
  17.  
  18. # 指定Flume channel  
  19. agent1.channels.channel1.type = memory  
  20. agent1.channels.channel1.capacity = 1000  
  21. agent1.channels.channel1.transactionCapacity = 100  
  22.  
  23. # 绑定source和sink到channel上  
  24. agent1.sources.source1.channels = channel1  
  25. agent1.sinks.sink1.channel = channel1  

agent1.sinks.sink1.topic = test   代表flume监听路径下发生变化时,会把消息发送到localhost机器上的test主题。

启动flume-kafka.conf:

 
  1. cd /usr/local/flume/apache-flume-1.6.0-bin
  2. bin/flume-ng agent --conf conf --conf-file conf/flume-kafka.conf --name agent1 -Dflume.root.logger=INFO,console

运行成功日志如下:

 
  1. 2017-07-07 22:22:02,270 (lifecycleSupervisor-1-2) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:120)] Monitored counter group for type: SINK, name: sink1: Successfully registered new MBean.
  2. 2017-07-07 22:22:02,270 (lifecycleSupervisor-1-2) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:96)] Component type: SINK, name: sink1 started

启动kafka的消费者,监听topic主题:

 
  1. kafka-console-consumer.sh --zookeeper localhost:2181 --topic test

testKafka.log :

在/usr/local/flume目录下面新建一个testKafka.log日志文件,写入Flume connect Kafka success! 作为测试内容:

 
  1. touch /usr/local/flume/testKafka.log
  2. sudo gedit /usr/local/flume/testKafka.log

 

然后拷贝testKafka.log到flume监听路径/usr/local/flume/logtest下:

 
  1. cp /usr/local/flume/testKafka.log /usr/local/flume/logtest

接着就可以在前一个终端看到刚刚采集的内容了,如下:

---------------------------------kafka------------------------------

 
  1. [root@master kafka_2.11-0.9.0.0]# kafka-console-consumer.sh --zookeeper localhost:2181 --topic test  
  2. [2017-07-07 22:36:38,687] INFO [Group Metadata Manager on Broker 200]: Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.GroupMetadataManager)
  3. Flume connect Kafka success!

 ---------------------------------flume------------------------------

 
  1. 2017-07-07 22:41:32,602 (pool-3-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile(ReliableSpoolingFileEventReader.java:348)] Preparing to move file /usr/local/flume/logtest/testKafka.log to /usr/local/flume/logtest/testKafka.log.COMPLETED
  2. 2017-07-07 22:41:35,669 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Fetching metadata from broker id:0,host:localhost,port:9092 with correlation id 0 for 1 topic(s) Set(test)
  3. 2017-07-07 22:41:35,728 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Connected to localhost:9092 for producing
  4. 2017-07-07 22:41:35,757 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Disconnecting from localhost:9092
  5. 2017-07-07 22:41:35,791 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Connected to slave2:9092 for producing

© 著作权归作者所有

共有 人打赏支持
四叶草666
粉丝 0
博文 51
码字总数 50778
作品 0
深圳
程序员
【Strom篇】Flume+Kafaka+Strom整合完成信息记录

第一部分:流程分析 由flume收集客户端发送的信息,经过kafka集群消费者收集,然后给storm集群数据处理(数据清洗),最后再发给kafka集群收集。 第二部分:集群搭建 flume+kafka 一、配合f...

chenshi_2753
05/06
0
0
Flume - Kafka日志平台整合

1. Flume介绍 Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行...

mantoudev
03/21
0
0
Flume+Kafka+Storm+Redis构建大数据实时处理系统

一、大数据处理的常用方法 之前在《采集→清洗→处理:基于MapReduce的离线数据分析》中已经有提及到,这里依然给出下面的图示: 前面给出的那篇文章是基于MapReduce的离线数据分析案例,其通...

技术小能手
07/09
0
0
flume 1.7 源码导入eclipse windows

安装maven,设置MAVEN_HOME等配置 下载flume源码 eclipse-oxygen,设置eclipse 使用外部maven,并配置settings.xml 遇到问题: 如果顺利,已将所需jar都下载下来了。 导入后遇到如下问题 fl...

柯里昂
2017/10/31
0
0
带你看懂大数据采集引擎之Flume&采集目录中的日志

欢迎关注大数据和人工智能技术文章发布的微信公众号:清研学堂,在这里你可以学到夜白(作者笔名)精心整理的笔记,让我们每天进步一点点,让优秀成为一种习惯! 一、Flume的介绍: Flume由C...

李金泽
03/04
0
0

没有更多内容

加载失败,请刷新页面

加载更多

Java Lock接口分析之ReentantReadWriteLock

ReentantReadWriteLock读写锁,在读线程多余写线程的并发环境中能体现出优异的性能,相比于synchronized与ReentrantLock这种独占式锁的模型,ReentantReadWriteLock采用独占式写锁与共享式读...

我爱春天的毛毛雨
21分钟前
0
0
EFK (Fluentd ElasticSearch Kibana) 采集nginx日志

本文描述如何通过FEK组合集中化nginx的访问日志。本人更喜欢按顺序来命名,所以使用FEK而不是EFK. 首先在nginx服务器上执行以下操作. 安装ruby http://blog.csdn.net/chenhaifeng2016/artic...

xiaomin0322
23分钟前
1
0
一键下载:将知乎专栏导出成电子书

老是有同学问,学了 Python 基础后不知道可以做点什么来提高。今天就再用个小例子,给大家讲讲,通过 Python 和爬虫,可以完成怎样的小工具。 在知乎上,你一定关注了一些不错的专栏(比如 ...

crossin
32分钟前
1
0
synchronized 之 对象锁 和 类锁

一、synchronized(object) 如果object没有被加锁,则获取object的锁;如果object已经被加锁则等待object的锁被释放。 二、需要加锁的情景 多线程共享同一资源会引起线程安全的情况下,才需要...

MyOldTime
33分钟前
6
0
tomcat 单机/多机 部署多应用

一.单机部署多应用: 1.在 linux 下解压安装两个 tomcat:tomcat1, tomcat2; 2.修改 /etc/profile, 增加 tomcat 环境变量: path 中加上 重新加载配置文件 source /etc/profile 3.修改 tomc...

imbiao
44分钟前
0
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部