Logstash 与Elasticsearch整合使用示例
Logstash 与Elasticsearch整合使用示例
孤岛旭日 发表于3年前
Logstash 与Elasticsearch整合使用示例
  • 发表于 3年前
  • 阅读 1366
  • 收藏 12
  • 点赞 0
  • 评论 0

移动开发云端新模式探索实践 >>>   

摘要: 本文简单介绍了Logstash如何从kafka、log4j、logback及file等数据源获取数据并输出到Elasticearch的完整示例。

不多废话了,直接上操作步骤:

环境准备

这里使用Docker搭建,我Docker宿主机的IP是192.168.4.99

Zookeeper

docker run -d --name monitor-zk -p 2181:2181 -p 2888:2888 -p 3888:3888 jplock/zookeeper

Kafka

吐嘈一下,docker hub的有不少 kafka ,但好用的太少了

docker run -d \
 --name monitor-kafka \
 -p 9092:9092 \
 -e KAFKA_ADVERTISED_HOST_NAME=192.168.4.99 \
 -e ZOOKEEPER_IP=192.168.4.99 \
 ches/kafka

# Kafka Test(可以忽略)
docker run --rm --interactive ches/kafka  kafka-console-producer.sh --topic m1 --broker-list 192.168.4.99:9092
docker run --rm ches/kafka kafka-console-consumer.sh --topic test --from-beginning --zookeeper 192.168.4.99:2181

参考:

Docker:https://hub.docker.com/r/ches/kafka/

HTTP插件:https://github.com/confluentinc/kafka-rest

SSL加密:https://cwiki.apache.org/confluence/display/KAFKA/Deploying+SSL+for+Kafka

Elasticsearch

这里用2.x版本

docker run -d \
 --name monitor-es \
 -p 9200:9200 -p 9300:9300 \
 -v /opt/monitor/es/data:/usr/share/elasticsearch/data \
 -v /opt/monitor/es/plugins:/usr/share/elasticsearch/plugins \
 elasticsearch:2.0.0 \
 -Des.node.name="Node01" \
 -Des.network.host=::0

参考:

SQL化查询插件:https://github.com/NLPchina/elasticsearch-sql/wiki

Logstash

同样是2.x版本

docker run -d \
 --name monitor-logstash \
 -p 7001-7005:7001-7005 \
 -v /opt/monitor/logstash/config/:/config-dir \
 -v /opt/monitor/logstash/input_test/:/input_test \ # 用于测试File输入的目录映射
 logstash:2.0.0 \
 logstash -f /config-dir/config.conf # 配置文件,后面会介绍

参考:

Kafka输入插件:https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html

CSV过滤插件:https://www.elastic.co/guide/en/logstash/current/plugins-filters-csv.html

ES输出插件:https://www.elastic.co/guide/en/logstash/current/plugins-filters-elasticsearch.html

Logback输入插件:https://github.com/logstash/logstash-logback-encoder

Log4j输入插件:https://www.elastic.co/guide/en/logstash/current/plugins-inputs-log4j.html

Websocket输出插件:https://www.elastic.co/guide/en/logstash/current/plugins-outputs-websocket.html

Logstash 配置

在上面环境准备的最后一步我们启动了Logstash,其中引用到了一个config.conf的配置文件,它的格式如下:

input { 
  kafka { # 这个用于kafka输入
    type => 'key_trace_log'
    topic_id => 'format_key_trace_topic'
    zk_connect => '192.168.4.99:2181'
  } 
  log4j { # 这个用于Log4j输入
    type => 'tp_log_test1'
    mode => 'server'
    host => '0.0.0.0'
    port => 7001
  } 
  tcp { # 这个用于Logback输入
    type => 'tp_log_test2'
    host => '0.0.0.0'
    port => 7002
    codec => 'json_lines'
  }
  file { # 这个用于File输入
    type => 'tp_log_test3'
    path => ['/input_test/logs/*.log'] # 这是文件所在的目录
    start_position => 'beginning'
  }
}
filter {
  if [type] == 'key_trace_log' {
    csv { # Kafka输入的个message,这里假设是用tab分隔的,对应的字段如下
      separator => '	'
      columns => ['level','time','job_code','task_code','clue_value','node_code','stage','message']
    }
  }
  if [type] == 'tp_log_test3' {
    grok { # 对于File输入的数据需要做正则解析,下面正则解析的内容类似:2015-11-24 11:28:58  [ main:4234 ] - [ DEBUG ]  logstash slf4j test loop 109
      pattern => '(?<datetime>.{19}) *\[ (?<class>.*):(?<line>\d*) \] - \[ (?<level>.*) \] (?<msg>.*)'
    }
  }
}
output {
  if [type] == 'key_trace_log' {
    elasticsearch {
      hosts => ['192.168.4.99:9200']
      index => 'monitor_key_trace_log'
      document_type => 'default'
    }
    websocket {
      host => '0.0.0.0'
      port => 7005
    }
  }
  if [type] == 'tp_log_test1' {
    elasticsearch {
      hosts => ['192.168.4.99:9200']
      index => 'tp_log'
      document_type => 'test1'
    }
  }
  if [type] == 'tp_log_test2' {
    elasticsearch {
      hosts => ['192.168.4.99:9200']
      index => 'tp_log'
      document_type => 'test2'
    }
  }
  if [type] == 'tp_log_test3' {
    elasticsearch {
      hosts => ['192.168.4.99:9200']
      index => 'tp_log'
      document_type => 'test3'
    }
  }
  stdout { codec => rubydebug }
}

几点说明:

Logback 需要有专门的插件

在目标系统中加入依赖:

<dependency>
    <groupId>net.logstash.logback</groupId>
    <artifactId>logstash-logback-encoder</artifactId>
    <version>4.5.1</version>
</dependency>

修改logback.xml,加入:

<appender name="logstash" class="net.logstash.logback.appender.LogstashTcpSocketAppender">
    <destination>192.168.4.99:7002</destination>
    <encoder class="net.logstash.logback.encoder.LogstashEncoder" />
    <keepAliveDuration>5 minutes</keepAliveDuration>
</appender>

<root level="TRACE">
    <appender-ref ref="logstash"/>
</root>

Log4j 需要加上Socket适配

log4j.appender.SOCKET=org.apache.log4j.net.SocketAppender
log4j.appender.SOCKET.port=7001
log4j.appender.SOCKET.remoteHost=192.168.4.99

grok测试

http://grokdebug.herokuapp.com/

  • 打赏
  • 点赞
  • 收藏
  • 分享
共有 人打赏支持
粉丝 36
博文 58
码字总数 30599
作品 3
×
孤岛旭日
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
* 金额(元)
¥1 ¥5 ¥10 ¥20 其他金额
打赏人
留言
* 支付类型
微信扫码支付
打赏金额:
已支付成功
打赏金额: