文档章节

Logstash日志采集监控方案

fym_82
 fym_82
发布于 2016/04/14 10:10
字数 1417
阅读 472
收藏 0

Logstash日志采集监控方案

概述

基于Logstash搭建日志实时采集监控系统

系统要求

优化系统内核配置

  1. ulimit

  2. logstash各组件间有版本依赖,请注意使用的版本匹配

  3. 配置好邮件服务sendmail

系统组成

Elasticsearch + Logstash + KibanaELK):

  1. Elasticsearch:日志汇总存储并创建索引提供查询服务

  2. Logstash:日志采集、过滤与监控,logstash本身提供多种input-output策略,可灵活选择

  3. Kibana:提供统一的日志查询web界面,同时可以进行分类图表统计

 

 

架构策略

在应用服务端配置logstash-agent负责采集原始日志并汇总到MQMQ选用kafka进行日志消息缓存与分发,后端部署logstash-indexer,订阅kafka topic内日志消息并写入ES文件存储,同时判断日志消息内是否包含错误或异常等信息,通过sendMail邮件服务发送报警邮件到相关邮件组,后端ES部署双节点集群做分布式搜索服务,提供http服务给kibana供用户通过web界面查询实时日志。

如图:

1 JDK安装

2 ES

注:ES最新版本要求不能以root方式启动,需创建对应用户并赋予es相关文件路径的控制权限

2.1单节点模式

  1. 下载es tar

官网:https://www.elastic.co
wget https://download.elastic.co/elasticsearch/elasticsearch/elasticsearch-1.7.0.tar.gz

 

 

  1. 解压并配置

tar elasticsearch-1.7.0.tar.gz

修改${es_home}/ config/ elasticsearch.yml

注:红色为基础配置项,需针对实际场景修改,更多详细配置请参阅官网

#配置es集群命名

cluster.name:

#当前节点名

node.name

#数据存储目录,默认${es_home}/data

path.data

#日志目录,默认${es_home}/logs

path.logs

#临时文件存储目录,默认${es_home}/work

path.work

#对外服务http端口,默认9200

http.port

#节点间交互端口,默认9300

transport.tcp.port

 

  1. 使用
    启动:bin/elasticsearch

后端启动:bin/elasticsearch d
PID
启动:bin/elasticsearch -d -p pid

  1. Web请求,验证服务是否成功
    http://esIp:9200/
    返回:

  2. Web查询数据接口:
    http://esIp:9200/_search?pretty

  3.  

  4.  

 

 

 

 

2.2集群模式

在其他节点按上述单节点模式安装,修改配置文件${es_home}/ config/ elasticsearch.yml:

保证:

  1. 各节点cluster.name必须一致node.name必须不同

  2. 各节点在相同网段内

启动各节点后,es会自动发现同网段内的节点组成集群

 

2.3管理插件

  1. head:查看集群几乎所有信息,还能进行简单的搜索查询,观察自动恢复的情况等

安装:./bin/plugin -install mobz/elasticsearch-head
查看:http://ip:9200/_plugin/head/

  1. Bigdesk:集群监控插件,通过该插件可以查看整个集群的资源消耗情况,cpu、内存、http链接等
    安装:./bin/plugin -install lukas-vlcek/bigdesk
    查看:http://ip:9200/_plugin/bigdesk/#nodes

  2.  

3 Logstash

下载:

https://download.elastic.co/logstash/logstash/logstash-all-plugins-2.1.0.tar.gz

Logstash-input-plugins使用文档:

https://www.elastic.co/guide/en/logstash/current/input-plugins.html

Logstash-output-plugins使用文档:

https://www.elastic.co/guide/en/logstash/current/output-plugins.html

 

解压logstash-all-plugins-2.1.0.tar.gz,配置启动文件,进行不同模式处理。

3.1   logstash-agent

功能:监控、过滤日志,也可称为shipper

配置日志采集,采集原始日志发送到kafka集群

logstash-agent.conf

input {

  #file-input

  file {

path => "${log_path}"

#log type

type => “${log_type}

  }

 

}

output {

  kafka {

                   #kafka brokers

        bootstrap_servers => "${borker_list}"

                   #send to kafka topic

        topic_id => "${topic_name}"

        codec => plain {

           format => "%{message}"

        }

      }

  #stdout { codec => rubydebug }

}

 

注:

  1. 1.       output使用kafka时,codec默认采用json格式发送,如显式配置了codecplain时不会传输“@version typetags”等input内的默认字段和add_field添加的字段,可通过format自定义传输格式:如:

codec json时传输:

{"message":"dfsd","@version":"1","@timestamp":"2015-12-07T07:18:01.124Z","type":"in","IP":"%{[ip]}","host":"testqiuli.novalocal","tags":["_grokparsefailure"]}

codecplain时传输:

2015-12-07T07:22:21.526Z testqiuli.novalocal tt

  1. 2.        

  2. 2 logstash-indexer

功能:收集日志并将日志交给ElasticSearch 做搜索

配置统一订阅kafka topic中日志,收集汇总到es

logstash-index.conf

input {

  #kafka-input

  kafka {

zk_connect => "${zk_nodes}"

type => “${log_type}

#consumer thread num,must be less then the partitions of kafka topic

consumer_threads => 2

#topic_id => "${topic_name}"

#kafka topics to consume,more topics such as :topic1,topic2

    white_list => "${topic_names}"

  }

 

}

output {

  elasticsearch { hosts => ["${es_IP}:${es_port}"] }

 

}

 

 

3.3 报警filter

配置filter,对日志中包含“error”或“exception”等关键字的异常错误进行过滤,并通过邮件报警到指定的邮件组

注:报警的output必须在output模块最后添加,否则匹配上的日志信息不会由其他output收集了

配置logstash-warn.conf

input {

  #kafka-input

  kafka {

zk_connect => "${zk_nodes}"

type => “${log_type}

#topic_id => "${topic_name}"

#kafka topics to consume,more topics such as :topic1,topic2

    white_list => "${topic_names}"

  }

}

 

#warning filter

filter {

  grok {

    match => { "message" => [ "error", "exception" ] }

    add_tag => ["tag_error"]

  }

}

output {

  elasticsearch { hosts => ["${es_IP}:${es_port}"] }

  #warning output,ps:make sure at the end of output

  if "tag_error" in [tags] {

  exec {

    command => "echo '%{@timestamp} %{source}: %{message}' | mail -s ${email_title} ${email_to}”

  }

 }

}

3.4          多个input-output

同一个conf文件中可配置多个inputoutput,完成不同功能

 

logstash-all.conf

input {

  #file-input

  file {

path => "${log_path}"

type => “${log_type}

  }

  #kafka-input

  kafka {

    zk_connect => "${zk_nodes}"

#topic_id => "${topic_name}"

#kafka topics to consume,more topics such as :topic1,topic2

    white_list => "${topic_names}"

  }

 

}

filter {

  grok {

    match => { "message" => [ "error", "exception" ] }

    add_tag => ["tag_error"]

  }

}

 

output {

  elasticsearch { hosts => ["${es_IP}:${es_port}"] }

  kafka {

                   #kafka brokers

        bootstrap_servers => "${borker_list}"

                   #send to kafka topic

        topic_id => "${topic_name}"

        codec => plain {

           format => "%{message}"

        }

      }

  #stdout { codec => rubydebug }

  #warning output,ps:make sure at the end of output

  if "tag_error" in [tags] {

  exec {

    command => "echo '%{@timestamp} %{source}: %{message}' | mail -s ${email_title} ${email_to}"

  }

 }

}

 

 

3.3          启动logstash

配置好conf文件后,使用-f指定该文件启动:

./bin/logstash -f XXX.conf

后端启动:nohup ./bin/logstash -f XXX.conf &

 

3.5 注意事项

  1. logstash使用kafka作为input时,由于logstash-plugins-inputs-kafka插件连接zk时需对本地hostname进行解析,因此需对本机配置host IP映射

  2. logstash使用kafka作为output时,注意logstash创建的topic是否成功,多个logstash-index节点时,配置kafka-outputgroup_id需一致(默认logstash

  3. 3.        报警的output必须在output模块最后添加,否则匹配上的日志信息不会由其他output收集了

4 Kibana

下载:https://download.elastic.co/kibana/kibana/kibana-4.1.3-linux-x64.tar.gz

配置:config/kibana.yml 

#配置kibana连接的es

elasticsearch.url

 

访问:http://kibanaIp:5601

 

Discover:

© 著作权归作者所有

共有 人打赏支持
上一篇: MQ介绍与选型
下一篇: MQ介绍与选型
fym_82
粉丝 0
博文 2
码字总数 5245
作品 0
东城
私信 提问
建设DevOps统一运维监控平台,先从日志监控说起

前言 随着Devops、云计算、微服务、容器等理念的逐步落地和大力发展,机器越来越多,应用越来越多,服务越来越微,应用运行基础环境越来多样化,容器、虚拟机、物理机不一而足。 面对动辄几百...

stars永恒
04/26
0
0
Linux的企业-ELK日志分析

一、简介 1、核心组成 ELK由Elasticsearch、Logstash和Kibana三部分组件组成; Elasticsearch是个开源分布式搜索引擎,它的特点有:分布式,零配置,自动发现,索引自动分片,索引副本机制,...

一百个小排
2017/11/07
0
0
ELK日志处理之Filebeat工作原理

一、Filebeat简介 Beats是Elastic Stack技术栈中轻量级的日志采集器,Beats家族包括以下五个成员: Filebeat:轻量级的日志采集器,可用于收集文件数据。 Metricbeat:5.0版本之前名为Topbeat...

napoay
2017/04/18
0
0
ELK+Beats实现Windows服务器系统日志监控

ELK在运维监控领域使用非常广泛,日志采集通常依靠Logstash,但是通常来讲Logstash架构比较重载,一个安装包由几百MB,相比之下Elastic还提供另一种更轻量的采集工具Beats。Beats 平台集合了...

半夜菊花茶
2017/11/11
0
0
轻量型数据采集器--Beats

Beats 是一款轻量级的数据采集器,采用 Go 语言编写。它集合了多种单一用途数据采集器。这些采集器安装后可用作轻量型代理,从成百上千或成千上万台机器向 Logstash 或 Elasticsearch 发送数...

匿名
2017/11/15
2.7K
1

没有更多内容

加载失败,请刷新页面

加载更多

jquery通过id显示隐藏

var $div3 = $('#div3'); 显示 $div3.show(); 隐藏 $div3.hide();

yan_liu
今天
3
0
《乱世佳人》读书笔记及相关感悟3900字

《乱世佳人》读书笔记及相关感悟3900字: 之前一直听「荔枝」,后来不知怎的转向了「喜马拉雅」,一听就是三年。上班的时候听房产,买房了以后听装修,兴之所至时听旅行,分手后听亲密关系,...

原创小博客
今天
3
0
大数据教程(9.6)map端join实现

上一篇文章讲了mapreduce配合实现join,本节博主将讲述在map端的join实现; 一、需求 实现两个“表”的join操作,其中一个表数据量小,一个表很大,这种场景在实际中非常常见,比如“订单日志...

em_aaron
今天
3
0
cookie与session详解

session与cookie是什么? session与cookie属于一种会话控制技术.常用在身份识别,登录验证,数据传输等.举个例子,就像我们去超市买东西结账的时候,我们要拿出我们的会员卡才会获取优惠.这时...

士兵7
今天
3
0
十万个为什么之为什么大家都说dubbo

Dubbo是什么? 使用背景 dubbo为什么这么流行, 为什么大家都这么喜欢用dubbo; 通过了解分布式开发了解到, 为适应访问量暴增,业务拆分后, 子应用部署在多台服务器上,而多台服务器通过可以通过d...

尾生
今天
4
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部