文档章节

Flume + Solr + log4j搭建web日志采集系统

OrangeJoke
 OrangeJoke
发布于 2017/07/19 11:24
字数 1869
阅读 4444
收藏 254
点赞 1
评论 4

前言

很多web应用会选择ELK来做日志采集系统,这里选用Flume,一方面是因为熟悉整个Hadoop框架,另一方面,Flume也有很多的优点。

关于Apache Hadoop Ecosystem 请点击这里。

Cloudera 官方的教程也是基于这个例子开始的,get-started-with-hadoop-tutorial

并且假设我们已经了解Flume(agent, Source, Channel, Sink) , Morphline (ETL), Solr (全文检索),如果都没有了解,请自行百度。

Scenario (需求)

首先我们有多个web 应用,每个web应用每天都有不断的日志产生,这些日志文件现在以文件的形式存储在服务器当中,我们需要收集这些日志,并对日志进行查询。

所以整个流程就是,Flume agent 收集日志 -> Morphline 进行过滤 -> 对结果进行索引然后在Solr中进行搜索。

Flume 收集日志

  1. 使用 Spooling Directory Source
    就是监视指定目录是否有新文件移入,如果有,就会读取这些Event, 但是文件一旦被移到该目录之后,就不应该被写入,目录下的文件名也不可重复,这样的情况就是需要定期将文件移动到指定的目录,不能实现实时的读取。
  2. 使用 Exec Source
    就是通过下面的命令行产生的结果作为源,在agent 死亡或或者机器重启的过程可能会存在数据丢失
agent.sources.execSrc.type = exec
agent.sources.execSrc.shell=/bin/bash -c
agent.sources.execSrc.command= tail -F /var/log/flume/flume.log | grep "error: "

  1. 使用消息中间件JMS或者KAFKA
    请参考: 基于Flume+Log4j+Kafka的日志采集架构方案
    客户端直接发送至kafaka queue , 用 log4j KafkaAppender

  2. 使用Flume Appender
    对于Java web 应用,我们就最简单直接采取这种方式。 Flume Appender 我们这里就直接采用log4j2 , 关于日志框架的这些说明,请看另一片博客 spring boot use log4j log4j 关于flume Appender 的配置

The Flume Appender supports three modes of operation.

  1. It can act as a remote Flume client which sends Flume events via Avro to a Flume Agent configured with an Avro Source.(同步,Avro协议)
  2. It can act as an embedded Flume Agent where Flume events pass directly into Flume for processing.(异步,需要维护客户端flume)
  3. It can persist events to a local BerkeleyDB data store and then asynchronously send the events to Flume, similar to the embedded Flume Agent but without most of the Flume dependencies.(先写数据库,再异步发送)

Usage as an embedded agent will cause the messages to be directly passed to the Flume Channel and then control will be immediately returned to the application. All interaction with remote agents will occur asynchronously. Setting the "type" attribute to "Embedded" will force the use of the embedded agent. In addition, configuring agent properties in the appender configuration will also cause the embedded agent to be used.

我们下面就简单的用第一种方式

客户端配置

log4j.xml

<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="warn" name="MyApp" packages="">
  <Appenders>
    <Flume name="eventLogger" compress="true">
      <Agent host="192.168.10.101" port="8800"/>
      <Agent host="192.168.10.102" port="8800"/>
      <RFC5424Layout enterpriseNumber="18060" includeMDC="true" appName="MyApp"/>
    </Flume>
  </Appenders>
  <Loggers>
    <Root level="error">
      <AppenderRef ref="eventLogger"/>
    </Root>
  </Loggers>
</Configuration>

服务端配置

参考:flume log4j appender config

下载 flume, 在conf 目录下,配置example.conf :

# example.conf: A single-node Flume configuration

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = org.apache.flume.clients.log4jappender.Log4jAppender
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动 flume

bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console

查看日志,是否成功。

Solr 配置

关于solr的介绍

这里 solr的数据也是需要存储到 hdfs中的,另外solr 是通过zookeeper 来管理的

以下配置,这里用的cloudera manager 安装,所以自动配好了,但是需要检验,如果是手动安装也有对应的文档可以直接查看, 另外这里省略Solr Authentication

####配置 zookeeper service

$ cat /etc/solr/conf/solr-env.sh
export SOLR_ZK_ENSEMBLE=zk01.example.com:2181,zk02.example.com:2181,zk03.example.com:2181/solr

####配置 solr use hdfs

$ cat /etc/default/solr
//地址nn01.example.com:8020 是hdfs name node的地址
SOLR_HDFS_HOME=hdfs://nn01.example.com:8020/solr

//To create the /solr directory in HDFS,需要创建/solr hdfs目录:
$ sudo -u hdfs hdfs dfs -mkdir /solr
$ sudo -u hdfs hdfs dfs -chown solr /solr

initializing the ZooKeeper Namespace

$ sudo service solr-server restart

启动solr

$ sudo service solr-server restart

solr collection 配置

solr 通过 collection 来组织逻辑数据,所以你需要创建collection,每个collection有自己的配置,文档上已经讲的比较清楚了,而且也不多,这里不再赘述

Generating Collection Configuration

下面是的collection是用来存储上面收集到的日志:

// 使用默认模版创建instancedir
$ solrctl instancedir --generate $HOME/weblogs_config

// upload instancedir to zookeeper,上传配置
$ solrctl instancedir --create weblogs_config $HOME/weblogs_config

//verify instance  
$ solrctl instancedir --list

// create collection  -s shard_count, collection 和config 关联
$ solrctl collection --create weblogs_collection -s 2 -c weblogs_config

A SolrCloud collection is the top-level object for indexing documents and providing a query interface. Each collection must be associated with an instance directory. Different collections can use the same instance directory. Each collection is typically replicated among several SolrCloud instances. Each replica is called a core and is assigned to an individual Solr service. The assignment process is managed automatically, although you can apply fine-grained control over each individual core using the solrctl core command . 这是 collection 和instance之间关系的介绍

成功创建之后如何修改和扩展,请参考这里solectl usage

Morphline (ETL)

创建好 collection 之后,我们就需要将日志解析存储到solr里,方便检索。Morphline 就是这个中间过程的ETL工具(extracting, transforming and loading data), Flume 提供了Morphlion Solr Sink, 从 log flume的source中读取event,经过ETL导入到solr中。

输入图片说明

配置flume

继续上面的flume,中 example.conf 的配置

a1.sources = r1
a1.sinks = k1
a1.channels = c1
 
# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4444
 
# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.solr.morphline.MorphlineSolrSink
a1.sinks.k1.morphlineFile=morphlines.conf
a1.sinks.k1.morphlineId = morphline_log4j2
 
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 1000
 
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

配置Morphline

我们的日志格式文件如下:

[INFO ] 2017-07-14 11:40:51.556 [main] RequestMappingHandlerAdapter - Detected ResponseBodyAdvice bean in apiResponseAdvice

需要解析成:

level: INFO
create_time: 2017-07-14 11:40:51.556
thread: main
class:  RequestMappingHandlerAdapter
 -    //这里有个短横线
message: Detected ResponseBodyAdvice bean in apiResponseAdvice

所以我们使用grok
在线调试工具online tools


 # grok get data from unstructured line 
     { 
        grok {
          dictionaryFiles : [grok-dictionary.conf]
          expressions : {
            message : """\[%{LOGLEVEL:level} \] %{SC_LOGDATETIME:create_time} \[%{DATA:thread}\] %{WORD:class} [-] %{GREEDYDATA:message}"""
          }
        }
      
      }

      # Consume the output record of the previous command and pipe another
      # record downstream.
      #
      # convert timestamp field to native Solr timestamp format 
      # e.g.  2017-07-14 11:40:52.512 to 2012-09-06T07:14:34.000Z
      {
        convertTimestamp {
          field : create_time
          inputFormats : ["yyyy-MM-dd HH:mm:ss.SSS", "yyyy-MM-dd"]
          inputTimezone : America/Los_Angeles
          outputFormat : "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"
          outputTimezone : UTC
        }
      }


配置 schema.xml

在上一节,配置solr的时候,我们生成了默认的模版,我们需要根据实际的需求修改schema.xml,在$HOME/weblogs/conf 下

The schema.xml file contains all of the details about which fields your documents can contain, and how those fields should be dealt with when adding documents to the index, or when querying those fields.

schema.xml
solrconfig.xml

   <field name="level" type="text_general" indexed="true" stored="true" multiValued="true"/>
   <field name="create_time" type="date" indexed="true" stored="true"/>
   <field name="thread" type="text_general" indexed="true" stored="true"/>
   <field name="class" type="text_general" indexed="true" stored="true"/>
   <field name="message" type="text_general" indexed="true" stored="true"/>

重新上传配置到zookeeper

$ solrctl instancedir --update weblogs_config $HOME/weblogs_config
$ solrctl collection --reload weblogs_collection

###总结 到此为止,我们完成了日志的收集,解析,索引,你可以通过 Hue来进行搜索和查询了,或者自己定义UI。这个教程比较基础也相对简单,但是可以完成基本的需求,也把日志处理流程走了一遍,剩下的大家自定义即可。下一篇将对其它组建也做一个类似的例子。

© 著作权归作者所有

共有 人打赏支持
OrangeJoke
粉丝 34
博文 34
码字总数 26793
作品 0
江北
高级程序员
加载中

评论(4)

OrangeJoke
OrangeJoke

引用来自“喵小扁”的评论

elk比较成熟了啊

回复@喵小扁 : 是的,我这里用这个的话,主要是和Haoop 生态圈结合到一起,换句话说就是都用apache
OrangeJoke
OrangeJoke

引用来自“Leo_”的评论

1.这个应该是Log4j 2吧
2.不管是用kafka还是flume都是作为一个中间件,当然也可以同时使用。
3.如果我没记错Solr是一个搜索服务,这样真的好吗?没有可视化的方案或者其他的方案吗?

回复@Leo_ : 对的,solr 做搜索,可视化可以自己做,也可以用hue
喵小扁
喵小扁
elk比较成熟了啊
TGVvbmFyZA
TGVvbmFyZA
1.这个应该是Log4j 2吧
2.不管是用kafka还是flume都是作为一个中间件,当然也可以同时使用。
3.如果我没记错Solr是一个搜索服务,这样真的好吗?没有可视化的方案或者其他的方案吗?
其他消息中间件及场景应用(下3)

版权声明:欢迎转载,但是看在我辛勤劳动的份上,请注明来源:http://blog.csdn.net/yinwenjie(未经允许严禁用于商业用途!) https://blog.csdn.net/yinwenjie/article/details/51516329 目...

yunlielai ⋅ 04/15 ⋅ 0

阿里大数据工程师教你怎样理解Flume

lume是干什么的? 收集日志的 flume如何搜集日志? 我们把flume比作情报人员 (1)搜集信息 (2)获取记忆信息 (3)传递报告间谍信息 flume是怎么完成上面三件事情的,三个组件: source: ...

JAVA丶学习 ⋅ 04/14 ⋅ 0

Flume+Kafka+Storm+Redis构建大数据实时处理系统:实时统计网站PV、UV+展示

[TOC] 1 大数据处理的常用方法 前面在我的另一篇文章中《大数据采集、清洗、处理:使用MapReduce进行离线数据分析完整案例》中已经有提及到,这里依然给出下面的图示: 前面给出的那篇文章是...

xpleaf ⋅ 04/16 ⋅ 0

解决Flume采集数据时在HDFS上产生大量小文件的问题

问题:flume指定HDFS类型的Sink时,采集数据至HDFS指定目录,会产生大量小文件。 问题重现: 1、创建flume配置文件flume-env.sh,: flume配置文件如下(根据自身需要修改): 因为flume可以...

舒运 ⋅ 06/10 ⋅ 0

大数据经典学习路线(及供参考)之 二

2.1 数据仓库增强 2.1.1 数据仓库及数据模型入门 什么是数据仓库、数据仓库的意义、数据仓库核心概念、数据仓库的体系结构 2.1.2 数据仓库设计 建立数据仓库的步骤、数据的抽取、数据的转换、...

柯西带你学编程 ⋅ 05/22 ⋅ 0

Kafka实战-Flume到Kafka

1.概述   前面给大家介绍了整个Kafka项目的开发流程,今天给大家分享Kafka如何获取数据源,即Kafka生产数据。下面是今天要分享的目录: 数据来源 Flume到Kafka 数据源加载 预览   下面开...

smartloli ⋅ 2015/07/02 ⋅ 0

Flume---大数据协作框架

flume是什么 Apache Flume是一个分布式的、可靠的、易用的系统,可以有效地将来自很多不同源系统的大量日志数据收集、汇总或者转移到一个数据中心存储。 Apache Flume的作用不仅限于日志汇总...

简心 ⋅ 05/06 ⋅ 0

flume 总结--flume入门介绍

flume介绍 flume被设计为一个灵活的分布式系统,可以很容易的扩展,而且是高度可定制化的,一个配置正确的Flume Agent和由互相连接的Agent创建的Agent管道,保证不会丢失数据,提供持久的cha...

u013362353 ⋅ 05/28 ⋅ 0

阿里云大数据利器之-使用sql实现流计算做实时展现业务( flume故障转移版 )

实时业务处理的需求越来越多,也有各种处理方案,比如storm,spark等都可以。那以数据流的方向可以总结成数据源-数据搜集-缓存队列-实时处理计算-数据展现。本文就用阿里云产品简单实现了一个...

上单 ⋅ 2017/07/25 ⋅ 0

【Strom篇】Flume+Kafaka+Strom整合完成信息记录

第一部分:流程分析 由flume收集客户端发送的信息,经过kafka集群消费者收集,然后给storm集群数据处理(数据清洗),最后再发给kafka集群收集。 第二部分:集群搭建 flume+kafka 一、配合f...

chenshi_2753 ⋅ 05/06 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

OSChina 周日乱弹 —— 这么好的姑娘都不要了啊

Osc乱弹歌单(2018)请戳(这里) 【今日歌曲】 @TigaPile :分享曾惜的单曲《讲真的》 《讲真的》- 曾惜 手机党少年们想听歌,请使劲儿戳(这里) @首席搬砖工程师 :怎样约女孩子出来吃饭,...

小小编辑 ⋅ 25分钟前 ⋅ 1

Jenkins实践3 之脚本

#!/bin/sh# export PROJ_PATH=项目路径# export TOMCAT_PATH=tomcat路径killTomcat(){pid=`ps -ef | grep tomcat | grep java|awk '{print $2}'`echo "tom...

晨猫 ⋅ 今天 ⋅ 0

Spring Bean的生命周期

前言 Spring Bean 的生命周期在整个 Spring 中占有很重要的位置,掌握这些可以加深对 Spring 的理解。 首先看下生命周期图: 再谈生命周期之前有一点需要先明确: Spring 只帮我们管理单例模...

素雷 ⋅ 今天 ⋅ 0

zblog2.3版本的asp系统是否可以超越卢松松博客的流量[图]

最近访问zblog官网,发现zlbog-asp2.3版本已经进入测试阶段了,虽然正式版还没有发布,想必也不久了。那么作为aps纵横江湖十多年的今天,blog2.2版本应该已经成熟了,为什么还要发布这个2.3...

原创小博客 ⋅ 今天 ⋅ 0

聊聊spring cloud的HystrixCircuitBreakerConfiguration

序 本文主要研究一下spring cloud的HystrixCircuitBreakerConfiguration HystrixCircuitBreakerConfiguration spring-cloud-netflix-core-2.0.0.RELEASE-sources.jar!/org/springframework/......

go4it ⋅ 今天 ⋅ 0

二分查找

二分查找,也称折半查找、二分搜索,是一种在有序数组中查找某一特定元素的搜索算法。搜素过程从数组的中间元素开始,如果中间元素正好是要查找的元素,则搜素过程结束;如果某一特定元素大于...

人觉非常君 ⋅ 今天 ⋅ 0

VS中使用X64汇编

需要注意的是,在X86项目中,可以使用__asm{}来嵌入汇编代码,但是在X64项目中,再也不能使用__asm{}来编写嵌入式汇编程序了,必须使用专门的.asm汇编文件来编写相应的汇编代码,然后在其它地...

simpower ⋅ 今天 ⋅ 0

ThreadPoolExecutor

ThreadPoolExecutor public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, ......

4rnold ⋅ 昨天 ⋅ 0

Java正无穷大、负无穷大以及NaN

问题来源:用Java代码写了一个计算公式,包含除法和对数和取反,在页面上出现了-infinity,不知道这是什么问题,网上找答案才明白意思是负的无穷大。 思考:为什么会出现这种情况呢?这是哪里...

young_chen ⋅ 昨天 ⋅ 0

前台对中文编码,后台解码

前台:encodeURI(sbzt) 后台:String param = URLDecoder.decode(sbzt,"UTF-8");

west_coast ⋅ 昨天 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部