文档章节

ELK+kafka集成

liwei2000
 liwei2000
发布于 2017/07/21 16:32
字数 435
阅读 234
收藏 1

1、因为本项目采用的log4j2,所以在log4j2中直接配置 

<Kafka name="Kafka" topic="XX_log">  
  <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss}||%p||%c{1}||XX_web||%m%n"/>  
    <Property name="bootstrap.servers">127.0.0.1:9092</Property>  
    <Property name="timeout.ms">500</Property>  
</Kafka>

PatternLayout 中格式采用了||将内容连接起来目的为了logstash进行切分,其中增加timeout.ms属性为了保证日志系统挂掉的情况不会对业务系统产生较大影响,当然kafka可以采用集群的方式,bootstrap.servers多个地址用“,”分隔。XX_web代表当前业务平台。 

 

2、搭建kafka集群这里就不多介绍了官网很全, 

zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183

 

3、创建logstash动态模板 

{  
    "template": "*",  
    "settings": {  
        "index.refresh_interval": "5s",  
        "number_of_replicas": "0",  
        "number_of_shards": "3"  
    },  
    "mappings": {  
        "_default_": {  
            "_all": {  
                "enabled": false  
            },  
            "dynamic_templates": [  
                {  
                    "message_field": {  
                        "match": "message",  
                        "match_mapping_type": "string",  
                        "mapping": {  
                            "type": "string",  
                            "index": "analyzed"  
                        }  
                    }  
                },  
                {  
                    "string_fields": {  
                        "match": "*",  
                        "match_mapping_type": "string",  
                        "mapping": {  
                            "type": "string",  
                            "index": "not_analyzed"  
                        }  
                    }  
                }  
            ],  
            "properties": {  
                "dateTime": {  
                    "type": "date",  
                    "format": "yyy-MM-dd HH:mm:ss"  
                },  
                "@version": {  
                    "type": "integer",  
                    "index": "not_analyzed"  
                },  
                "context": {  
                    "type": "string",  
                    "index": "analyzed"  
                },  
                "level": {  
                    "type": "string",  
                    "index": "not_analyzed"  
                },  
                "class": {  
                    "type": "string",  
                    "index": "not_analyzed"  
                },  
                "server": {  
                    "type": "string",  
                    "index": "not_analyzed"  
                }  
            }  
        }  
    }  
}

 

4、配置logstash 

input{  
       kafka {  
                zk_connect =>"127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183"  
                group_id =>"logstash"  
                topic_id =>"XX_log"  
                reset_beginning => false  
                consumer_threads => 5  
                decorate_events => true  
       }  
}  
filter {  
   mutate{  
        split=>["message","||"]  
        add_field => {  
             "dateTime" => "%{[message][0]}"  
        }  
        add_field => {  
              "level" => "%{[message][1]}"  
        }  
        add_field => {  
               "class" => "%{[message][2]}"  
        }  
        add_field => {  
                "server" => "%{[message][3]}"  
         }  
        add_field => {  
                "context" => "%{[message][4]}"  
         }  
         remove_field => ["message"]  
    }  
    date {  
        match => ["logdate", "yyyy-MM-dd HH:mm:ss"]  
    }  
}  
output{  
      elasticsearch {  
            hosts => ["127.0.0.1:9200"]  
            index => "XX_log-%{+YYYY-MM}"  
            codec => "json"  
            manage_template => true  
            template_overwrite => true  
            flush_size => 50000  
            idle_flush_time => 10  
            workers => 2  
            template => "E:\logstash\template\template_log.json"    
    }  
}

 

按照年月将日志保存进ES索引中index => "XX_log-%{+YYYY-MM}",logstash从kafka集群中读取日志信息。 

 

5、搭建ZK集群,这里就不多介绍了,网上资料比较多

 

6、搭建ES集群,ES集群比较简单,设置的参数不要太多就可以使用。

 

7、配置kibana 

server.port: 5601 # 服务端口  
# The host to bind the server to.  
server.host: "115.28.240.113"  
elasticsearch.url: http://127.0.0.1:9200   ES地址-集群  
kibana.index: "kibana"

 

8、版本 JKD 1.7  ES-2.4, logstash 2.4, kafka-2.10,kibana-4.6.4

本文转载自:http://www.roncoo.com/article/detail/129152

共有 人打赏支持
liwei2000
粉丝 41
博文 66
码字总数 51647
作品 0
贵阳
私信 提问
LarryKoo/jfinal-plus

JFinal-Plus 高度集成开箱即用项目基础包 v1.0.0 发布 基于JFinal 2.2 版本; 集成JFinal-Ext和JFinal-Ext2最新版本的稳定工具(感谢); 集成Beetl2支持,整合Beetl+Shiro使用; 集成Beetl2 + XS...

LarryKoo
2016/08/08
0
0
使用Gitlab和Gitlab CI做持续集成(理论篇)

持续集成是一种软件开发实践。 在持续集成中,团队成员频繁集成他们的工作成果,一般每人每天至少集成一次,也可以多次。 每次集成会经过自动构建(包括自动测试)的检验,以尽快发现集成错误...

donhui
2016/07/25
4.3K
11
“中国系统集成产业联盟”正式成立

“中国系统集成产业联盟”正式成立 2013年12月26日,“中国系统集成产业联盟”成立大会在北京市石景山区电科大厦成功举行。 “中国系统集成产业联盟”由工业和信息化部电子科学技术情报研究所...

beechnut1221
2013/12/27
111
1
J2EE快速开发框架--JDFrame

JDFrame采用基于J2EE的B/S/S三层体系架构、基于MVC设计模式、集成业界常用标签库、集成权限体系管理功能、集成待办事宜、系统公告等功能; 为达到最佳性能开发环境建议采用Jdk1.6及以上版本,...

fasake
2014/02/11
4K
0
NutzBook衍生项目--nutz-book-project

开源图书 Nutz 向导式指南 nutz-book 的衍生项目。 当前已经实现或正在实现的功能: 基本的增删改查 Dao关联关系 邮件发送 Quartz计划任务 Shiro集成及权限管理 Ehcache及DaoCache集成 Beet...

wendal
2015/07/22
1K
0

没有更多内容

加载失败,请刷新页面

加载更多

js垃圾回收机制和引起内存泄漏的操作

JS的垃圾回收机制了解吗? Js具有自动垃圾回收机制。垃圾收集器会按照固定的时间间隔周期性的执行。 JS中最常见的垃圾回收方式是标记清除。 工作原理:是当变量进入环境时,将这个变量标记为“...

Jack088
16分钟前
1
0
大数据教程(10.1)倒排索引建立

前面博主介绍了sql中join功能的大数据实现,本节将继续为小伙伴们分享倒排索引的建立。 一、需求 在很多项目中,我们需要对我们的文档建立索引(如:论坛帖子);我们需要记录某个词在各个文...

em_aaron
32分钟前
2
0
"errcode": 41001, "errmsg": "access_token missing hint: [w.ILza05728877!]"

Postman获取微信小程序码的时候报错, errcode: 41001, errmsg: access_token missing hint 查看小程序开发api指南,原来access_token是直接当作parameter的(写在url之后),scene参数一定要...

两广总督bogang
32分钟前
6
0
MYSQL索引

索引的作用 索引类似书籍目录,查找数据,先查找目录,定位页码 性能影响 索引能大大减少查询数据时需要扫描的数据量,提高查询速度, 避免排序和使用临时表 将随机I/O变顺序I/O 降低写速度,占用磁...

关元
51分钟前
7
0
撬动世界的支点——《引爆点》读书笔记2900字优秀范文

撬动世界的支点——《引爆点》读书笔记2900字优秀范文: 作者:挽弓如月。因为加入火种协会的读书活动,最近我连续阅读了两本论述流行的大作,格拉德威尔的《引爆点》和乔纳伯杰的《疯传》。...

原创小博客
今天
18
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部