Logstash日志收集

原创
2021/06/02 19:00
阅读数 557

首先先下载我们需要的安装文件,wget https://artifacts.elastic.co/downloads/logstash/logstash-6.2.4.tar.gz

这里因为我们的elasticsearch用的是6.2.4,所以我们logstash使用相同的版本。

  1. Logstash是一个数据收集处理引擎。
  2. ETL工具

它整个的处理流程如下

这里input{}的stdin{}是标准输入,再到filter{},filter{}可以有多个,首先是grok{},它是利用了正则匹配的方法,将信息中的内容匹配出一些独立的字段;date{}是专门处理日期类型的;geoip{}是处理地理位置信息的;useragent{}是把信息中的agent提取出来;output{}是把数据输出到elasticsearch中。

  • Pipeline
    • input-filter-output的3阶段处理流程
    • 队列管理
    • 插件生命周期管理
  • Logstash Event
    • 内部流转的数据表现形式
    • 原始数据在input被转换为Event,在output event被转换为目标格式数据
    • 在配置文件中可以对Event中的属性进行增删改查

原始数据从Input进来,通过Output输出符合目标数据源的数据。Input通过Codec的decode(解码)把原始数据转换成Event,Event通过Output的encode(编码)转换为目标数据源的数据格式。整体流程如下。

我们来看一个最简单的配置

这里Input的codec是line,意思就是说对每一行进行切割数据,就是把每一行都变成一个Logstash Event;ouput的codec是json,意思就是说把Event转换成json对象输出。假设有一个输出为"foo\nbar",它的解析过程如下所示

我们可以看到它最终被解析成了两个Event,一个是"foo",一个是"bar"。这两个Event再通过output变成json对象

现在我们来实现一下上面的过程,先建立一个codec.conf的文件,文件内容为

input {
        stdin {
                codec => line
        }
}

output {
        stdout {
                codec => json
        }
}

执行

[root@ config]# echo "foo
> bar
> "|../bin/logstash -f ./codec.conf

运行结果如下

Sending Logstash's logs to /home/soft/logstash-6.2.4/logs which is now configured via log4j2.properties
[2021-06-02T18:53:45,304][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"netflow", :directory=>"/home/soft/logstash-6.2.4/modules/netflow/configuration"}
[2021-06-02T18:53:45,333][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"fb_apache", :directory=>"/home/soft/logstash-6.2.4/modules/fb_apache/configuration"}
[2021-06-02T18:53:45,429][INFO ][logstash.setting.writabledirectory] Creating directory {:setting=>"path.queue", :path=>"/home/soft/logstash-6.2.4/data/queue"}
[2021-06-02T18:53:45,436][INFO ][logstash.setting.writabledirectory] Creating directory {:setting=>"path.dead_letter_queue", :path=>"/home/soft/logstash-6.2.4/data/dead_letter_queue"}
[2021-06-02T18:53:45,804][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2021-06-02T18:53:45,837][INFO ][logstash.agent           ] No persistent UUID file found. Generating new UUID {:uuid=>"b63c49fd-9164-435f-9579-e01cf97cc025", :path=>"/home/soft/logstash-6.2.4/data/uuid"}
[2021-06-02T18:53:46,638][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"6.2.4"}
[2021-06-02T18:53:47,095][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
[2021-06-02T18:53:48,736][INFO ][logstash.pipeline        ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50}
[2021-06-02T18:53:48,896][INFO ][logstash.pipeline        ] Pipeline started successfully {:pipeline_id=>"main", :thread=>"#<Thread:0x26912ae5 sleep>"}
[2021-06-02T18:53:49,021][INFO ][logstash.agent           ] Pipelines running {:count=>1, :pipelines=>["main"]}
{"@version":"1","host":"ecs-linge-hangye-1","@timestamp":"2021-06-02T10:53:48.967Z","message":"foo"}{"@version":"1","host":"ecs-linge-hangye-1","@timestamp":"2021-06-02T10:53:48.988Z","message":"bar"}{"@version":"1","host":"ecs-linge-hangye-1","@timestamp":"2021-06-02T10:53:48.989Z","message":""}[2021-06-02T18:53:49,271][INFO ][logstash.pipeline        ] Pipeline has terminated {:pipeline_id=>"main", :thread=>"#<Thread:0x26912ae5 run>"}

我们可以看到获得了{"@version":"1","host":"ecs-linge-hangye-1","@timestamp":"2021-06-02T10:53:48.967Z","message":"foo"}{"@version":"1","host":"ecs-linge-hangye-1","@timestamp":"2021-06-02T10:53:48.988Z","message":"bar"}两个json对象。

Event的生命周期

这里从Input进入的原始数据,经过Codec解码后,变成Event进入Queue,Queue会把进入的Event分发给不同的Pipeline(管道)中,Pipeline包含了后面的Batcher、Filter、Output。Batcher的作用是批量的从Queue中取数据。

这里每一个Pipeline都是一个线程。Batcher会收集很多的数据,当数据达到一定的数量的时候,会发送给Filter往后传递。

 

展开阅读全文
加载中

作者的其它热门文章

打赏
0
0 收藏
分享
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部