文档章节

Flume日志收集之Logger和HDFS数据传输方式

海岸线的曙光
 海岸线的曙光
发布于 2018/06/25 10:41
字数 2101
阅读 570
收藏 13

    Flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(比如文本、HDFS、Hbase等)的能力。在搭建环境和使用前,请大家自行了解一下Flume,主要是它的核心组件:Source、Channel、Sink,下面将说下常见的几种使用方式下环境的搭建。

    主环境:虚拟机slave01,slave02,slave03,基于之前已搭建好的环境,包括JDK、Zookeeper、Hadoop,详见之前的博客介绍。

    下载地址:http://flume.apache.org/download.html

    一、基于netcat的source+channel(memory)+sink(logger)的数据传输过程

    (1)配置环境变量FLUME_HOME

    将解压的文件移目录下,如 /usr/local/,通过命令 vim /etc/profile 添加环境变量:

JAVA_HOME=/usr/java/jdk1.8.0_161
JRE_HOME=/usr/java/jdk1.8.0_161/jre
SCALA_HOME=/usr/local/scala
HADOOP_HOME=/usr/local/hadoop
SPARK_HOME=/usr/local/spark
ZOOKEEPER_HOME=/usr/local/zookeeper
HBASE_HOME=/usr/local/hbase
KAFKA_HOME=/usr/local/kafka
HIVE_HOME=/usr/local/hive
DERBY_HOME=/usr/local/derby
FLUME_HOME=/usr/local/flume
PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin:$SCALA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$SPARK_HOME/bin:$SPARK_HOME/sbin:$ZOOKEEPER_HOME/bin:$HBASE_HOME/bin:$KAFKA_HOME/bin:$HIVE_HOME/bin:$DERBY_HOME/bin:$FLUME_HOME/bin
CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib:$DERBY_HOME/lib/derby.jar:$DERBY_HOME/lib/derbyclient.jar:$DERBY_HOME/lib/derbytools.jar:$DERBY_HOME/lib/derbynet.jar
export JAVA_HOME JRE_HOME SCALA_HOME HADOOP_HOME SPARK_HOME ZOOKEEPER_HOME HBASE_HOME KAFKA_HOME HIVE_HOME DERBY_HOME FLUME_HOME PATH CLASSPATH

    运行命令 source /etc/profile 使变量生效。

    验证是否配置成功,cd到 flume/bin 目录下,执行命令:flume-ng version

[hadoop@slave01 bin]$ flume-ng version
错误: 找不到或无法加载主类 org.apache.flume.tools.GetJavaProperty
Flume 1.8.0
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: 99f591994468633fc6f8701c5fc53e0214b6da4f
Compiled by denes on Fri Sep 15 14:58:00 CEST 2017
From source with checksum fbb44c8c8fb63a49be0a59e27316833d

    验证成功,但出现了报错信息,大都说的是JDK版本(改为1.8以下)或HBase配置(注释掉hbase/conf/hbase-env.sh中的 export HBASE_CLASSPATH)问题,不过,不影响的话就不用在意这个了。

    (2)修改 conf/目录下配置文件    

    复制一份 flume-env.sh.template ,重命名为 flume-env.sh

[hadoop@slave01 conf]$ cp flume-env.sh.template flume-env.sh

    添加JAVA_HOME配置,内容如下:

export JAVA_HOME=/usr/java/jdk1.8.0_161

    复制一份 flume-conf.properties.template ,重命名为 flume-conf.properties

[hadoop@slave01 conf]$ cp flume-conf.properties.template flume-conf.properties

    修改为一个最简单基本的配置,如下:

a1.sources = so1
a1.channels = c1
a1.sinks = s1

# For each one of the sources, the type is defined
a1.sources.so1.type = netcat
a1.sources.so1.bind = slave01
a1.sources.so1.port = 8888

# The channel can be defined as follows.
a1.sources.so1.channels = c1

# Each sink's type must be defined
a1.sinks.s1.type = logger

#Specify the channel the sink should use
a1.sinks.s1.channel = c1

# Each channel's type is defined.
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100
a1.channels.c1.transactionCapacity = 100 

    其中:

    a1表示名为 a1 的agent,在下面启动命令中 --name a1用到

    a1.sinks.s1.type=logger,表示收集到的日志直接在flume的logger中打印出来,其他类型的大家可以自行尝试。

    a1.sources.so1.type = netcat,表示组件Source使用netcat,

    a1.sources.so1.bind = slave01,表示日志需要发送到的主机名或者Ip地址,该主机运行着netcat类型的source在监听,

    a1.sources.so1.port = 8888,表示日志需要发送到的端口号,该端口号要有netcat类型的source在监听,

    其他的配置参数,大家自行查看。

    (3)启动测试

    在bin目录下运行,启动flume agent a1 服务端,其中flume-conf.properties为加载的配置文件,注意其所在目录:

flume-ng agent --conf conf --conf-file ../conf/flume-conf.properties --name a1 -Dflume.root.logger=INFO,console

    命令参数大家自行百度查看,显示如下:

Info: Including Hadoop libraries found via (/usr/local/hadoop/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/usr/local/hbase/bin/hbase) for HBASE access
Info: Including Hive libraries found via (/usr/local/hive) for Hive access
.....
.....
.....
(中间省略部分)
.....
.....
.....
18/06/22 16:20:20 INFO node.Application: Starting new configuration:{ sourceRunners:{so1=EventDrivenSourceRunner: { source:org.apache.flume.source.NetcatSource{name:so1,state:IDLE} }} sinkRunners:{s1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@7060583 counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} }
18/06/22 16:20:20 INFO node.Application: Starting Channel c1
18/06/22 16:20:20 INFO node.Application: Waiting for channel: c1 to start. Sleeping for 500 ms
18/06/22 16:20:21 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.
18/06/22 16:20:21 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started
18/06/22 16:20:21 INFO node.Application: Starting Sink s1
18/06/22 16:20:21 INFO node.Application: Starting Source so1
18/06/22 16:20:21 INFO source.NetcatSource: Source starting
18/06/22 16:20:21 INFO source.NetcatSource: Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:8888]

    到此,flume启动成功,下面是测试:

    安装telnet,若已安装则跳过,检查是否安装命令:rpm -qa | grep telnet,若无返回值则表示未安装,安装命令:yum install telnet,安装后同样检查是否安装成功。可以参考如下链接:https://blog.csdn.net/typa01_kk/article/details/46604967

    打开另一个终端,输入如下命令,用于连接端口,并输入任意字符串,如 "hello":

[hadoop@slave01 conf]$ telnet slave01 8888
Trying 127.0.0.1...
Connected to slave01.
Escape character is '^]'.
hello
OK

    可以在第一个终端看到输出如下:

18/06/22 16:20:36 INFO sink.LoggerSink: Event: { headers:{} body: 68 65 6C 6C 6F 0D                               hello. }

    到此,这种基于netcat的source+channel(memory)+logger数据传输的方式算是完成了,当然这只是单机Flume的数据传输,也可以多个节点进行数据传输,大家可以试试。

    二、基于netcat的source+channel(file)+sink(hdfs)的数据传输过程

    (1)环境变量同上保持不变

    (2)修改conf/目录下配置文件

      同上,只修改配置文件 flume-conf.properties,如下:

a.sources = r1
a.sinks = k1
a.channels = c1

# Describe/configure the source
a.sources.r1.type = netcat
a.sources.r1.bind = slave01
a.sources.r1.port = 8889

# Describe the sink
a.sinks.k1.type = hdfs
#指定hdfs地址中的输出目录
a.sinks.k1.hdfs.path = hdfs://slave01:9000/output
a.sinks.k1.hdfs.writeFormat = Text
a.sinks.k1.hdfs.fileType = DataStream
a.sinks.k1.hdfs.rollInterval = 10
a.sinks.k1.hdfs.rollSize = 0
a.sinks.k1.hdfs.rollCount = 0
a.sinks.k1.hdfs.filePrefix = %Y-%m-%d-%H-%M-%S
a.sinks.k1.hdfs.useLocalTimeStamp = true

# Use a channel which buffers events in file
a.channels.c1.type = file
a.channels.c1.checkpointDir = /usr/local/flume/checkpoint
a.channels.c1.dataDirs = /usr/local/flume/data

# Bind the source and sink to the channel
a.sources.r1.channels = c1
a.sinks.k1.channel = c1

      其中:

    a.sinks.k1.type = hdfs,表示在HDFS中查看flume收集到的日志数据,两个文件夹checkpoint和data可手动创建;

    hdfs地址中的输出目录是必须的,这是接下来启动Hadoop访问文件的目录。

    (3)启动测试

    在这之前先启动Hadoop集群,成功启动后在进程中会出现DataNode,一定要保证,不然后面会报错,可参考之前的关于Hadoop集群的博客:

    集群四部曲(二):完美的Hadoop集群搭建

    运行命令启动flume agent a 服务端:

flume-ng agent --conf conf --conf-file ../conf/flume-conf.properties --name a -Dflume.root.logger=INFO,console

    再通过telnet命令连接端口,输入字符串:

[hadoop@slave01 sbin]$ telnet slave01 8889
Trying 127.0.0.1...
Connected to slave01.
Escape character is '^]'.
qwer1234
OK

    在刚才启动的服务端界面可以看到输出为:

18/06/25 10:17:51 INFO hdfs.HDFSEventSink: Writer callback called.
18/06/25 10:17:51 INFO hdfs.BucketWriter: Closing hdfs://slave01:9000/output/2018-06-25-10-17-41.1529893061596.tmp
18/06/25 10:17:51 INFO hdfs.BucketWriter: Renaming hdfs://slave01:9000/output/2018-06-25-10-17-41.1529893061596.tmp to hdfs://slave01:9000/output/2018-06-25-10-17-41.1529893061596
18/06/25 10:17:51 INFO hdfs.HDFSEventSink: Writer callback called.
18/06/25 10:17:52 INFO hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false
18/06/25 10:17:52 INFO hdfs.BucketWriter: Creating hdfs://slave01:9000/output/2018-06-25-10-17-52.1529893072686.tmp
18/06/25 10:18:02 INFO hdfs.BucketWriter: Closing hdfs://slave01:9000/output/2018-06-25-10-17-52.1529893072686.tmp
18/06/25 10:18:02 INFO hdfs.BucketWriter: Renaming hdfs://slave01:9000/output/2018-06-25-10-17-52.1529893072686.tmp to hdfs://slave01:9000/output/2018-06-25-10-17-52.1529893072686
18/06/25 10:18:02 INFO hdfs.HDFSEventSink: Writer callback called.
18/06/25 10:18:08 INFO file.EventQueueBackingStoreFile: Start checkpoint for /usr/local/flume/checkpoint/checkpoint, elements to sync = 3
18/06/25 10:18:08 INFO file.EventQueueBackingStoreFile: Updating checkpoint metadata: logWriteOrderID: 1529893058855, queueSize: 0, queueHead: 0
18/06/25 10:18:08 INFO file.Log: Updated checkpoint for file: /usr/local/flume/data/log-8 position: 325 logWriteOrderID: 1529893058855

    这时,访问slave01:50070,打开文件菜单并输入存放路径:

    在hdfs的output目录中可以看到目录中多出了以时间戳命名的文件,文件中写入了你的测试数据(qwer1234),点击上面的文件名并下载保存到本地:

    在目录中查看文件内容,可以看出文件中保存的正是你写入的数据:

[hadoop@slave01 下载]$ cat 2018-06-25-10-17-52.1529893072686 
qwer1234
[hadoop@slave01 下载]$ 

    以上是比较常用且常见的两种数据传输方式:控制台打印和HDFS,在日志收集中都是很方便,特别是HDFS,在基于Hadoop集群状态下,应用很是广泛。

© 著作权归作者所有

海岸线的曙光

海岸线的曙光

粉丝 47
博文 54
码字总数 59221
作品 0
朝阳
程序员
私信 提问
Flume---大数据协作框架

flume是什么 Apache Flume是一个分布式的、可靠的、易用的系统,可以有效地将来自很多不同源系统的大量日志数据收集、汇总或者转移到一个数据中心存储。 Apache Flume的作用不仅限于日志汇总...

简心
2018/05/06
115
0
02. Spark Streaming实时流处理学习——分布式日志收集框架Flume

2. 分布式日志收集框架Flume 2.1 业务现状分析 如上图,大量的系统和各种服务的日志数据持续生成。用户有了很好的商业创意想要充分利用这些系统日志信息。比如用户行为分析,轨迹跟踪等等。 ...

牦牛sheriff
2018/09/02
0
0
Flume NG 简介及配置实战

Flume 作为 cloudera 开发的实时日志收集系统,受到了业界的认可与广泛应用。Flume 初始的发行版本目前被统称为 Flume OG(original generation),属于 cloudera。但随着 FLume 功能的扩展,...

大数据之路
2014/07/08
55.9K
9
大数据系统数据采集产品的架构分析

任何完整的大数据平台,一般包括以下的几个过程: 数据采集 数据存储 数据处理 数据展现(可视化,报表和监控) 其中,数据采集是所有数据系统必不可少的,随着大数据越来越被重视,数据采集...

naughty
2015/10/31
4.5K
1
Flume日志收集分层架构应用实践

Flume作为一个日志收集工具,非常轻量级,基于一个个Flume Agent,能够构建一个很复杂很强大的日志收集系统,它的灵活性和优势,主要体现在如下几点: 1)模块化设计:在其Flume Agent内部可...

workming
2018/06/29
0
0

没有更多内容

加载失败,请刷新页面

加载更多

计算机实现原理专题--二进制减法器(二)

在计算机实现原理专题--二进制减法器(一)中说明了基本原理,现准备说明如何来实现。 首先第一步255-b运算相当于对b进行按位取反,因此可将8个非门组成如下图的形式: 由于每次做减法时,我...

FAT_mt
今天
5
0
好程序员大数据学习路线分享函数+map映射+元祖

好程序员大数据学习路线分享函数+map映射+元祖,大数据各个平台上的语言实现 hadoop 由java实现,2003年至今,三大块:数据处理,数据存储,数据计算 存储: hbase --> 数据成表 处理: hive --> 数...

好程序员官方
今天
7
0
tabel 中含有复选框的列 数据理解

1、el-ui中实现某一列为复选框 实现多选非常简单: 手动添加一个el-table-column,设type属性为selction即可; 2、@selection-change事件:选项发生勾选状态变化时触发该事件 <el-table @sel...

everthing
今天
6
0
【技术分享】TestFlight测试的流程文档

上架基本需求资料 1、苹果开发者账号(如还没账号先申请-苹果开发者账号申请教程) 2、开发好的APP 通过本篇教程,可以学习到ios证书申请和打包ipa上传到appstoreconnect.apple.com进行TestF...

qtb999
今天
10
0
再见 Spring Boot 1.X,Spring Boot 2.X 走向舞台中心

2019年8月6日,Spring 官方在其博客宣布,Spring Boot 1.x 停止维护,Spring Boot 1.x 生命周期正式结束。 其实早在2018年7月30号,Spring 官方就已经在博客进行过预告,Spring Boot 1.X 将维...

Java技术剑
今天
18
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部