文档章节

SpringCloud实践分享-日志收集Kafka-ELK

杨子敬的瞎扯时刻
 杨子敬的瞎扯时刻
发布于 06/11 09:53
字数 1216
阅读 852
收藏 26
点赞 3
评论 0

微服务应用在容器化后,日志的查询就会变成困难的问题,虽说有portainer这类的容器管理工具,能够方便的查询每个容器中的日志,但容器到达一定数量后,尤其是应用有多个实例时候,查询就成了头疼的问题。所以我们采用了Kafka-Logstash-Elasticsearch-Kibana的方案来处理日志。

首先说说我的日志收集思路:

  1. 应用将日志传入kafka集群。在集群建立相应topic,并传入日志。
  2. logstash在kafka上消费(读取)日志内容,写入elasticsearch。
  3. kibana读elasticsearch,做对应的展示。

这样的好处,1)几乎不用做特别大的修改,只需做一定的配置工作即可完成日志收集;2)日志内容输入kafka几乎没有什么瓶颈,另外kafka的扩展性能很好,也很简单;3)收集的日志几乎是实时的;4)整体的扩展性很好,很容易消除瓶颈,例如elasticsearch分片、扩展都很容易。

应用侧配置

在应用中,我们只需配置log4j的相应配置,将其日志输出到kafka即可。以下为配置示例,配置中包含将日志输入出命令行和kafka部分中。注意,在输出到kafka中,需要有一个appender类kafka.producer.KafkaLog4jAppender一般是没有的,则我在本地自己写上该类(KafkaLog4jAppender.java),并加入相应的文件路径。

log4j.rootLogger=INFO,console,kafka

# 输出到kafka
log4j.appender.kafka=com.yang.config.KafkaLog4jAppender
  #kafka.producer.KafkaLog4jAppender
log4j.appender.kafka.topic=api-admin
log4j.appender.kafka.brokerList=192.0.0.2:9092,192.0.0.3:9092,192.0.0.4:9092 # 这里填写kafka的ip
log4j.appender.kafka.compressionType=none
log4j.appender.kafka.syncSend=true
log4j.appender.kafka.layout=org.apache.log4j.PatternLayout
log4j.appender.kafka.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L %% - %m%n

# 输出到Console

log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d (%t) [%p - %l] %m%n

KafkaLog4jAppender.java

public class KafkaLog4jAppender extends AppenderSkeleton {

    private static final String BOOTSTRAP_SERVERS_CONFIG = ProducerConfig.BOOTSTRAP_SERVERS_CONFIG;
    private static final String COMPRESSION_TYPE_CONFIG = ProducerConfig.COMPRESSION_TYPE_CONFIG;
    private static final String ACKS_CONFIG = ProducerConfig.ACKS_CONFIG;
    private static final String RETRIES_CONFIG = ProducerConfig.RETRIES_CONFIG;
    private static final String KEY_SERIALIZER_CLASS_CONFIG = ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
    private static final String VALUE_SERIALIZER_CLASS_CONFIG = ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
    private static final String SECURITY_PROTOCOL = CommonClientConfigs.SECURITY_PROTOCOL_CONFIG;
    private static final String SSL_TRUSTSTORE_LOCATION = SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG;
    private static final String SSL_TRUSTSTORE_PASSWORD = SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG;
    private static final String SSL_KEYSTORE_TYPE = SslConfigs.SSL_KEYSTORE_TYPE_CONFIG;
    private static final String SSL_KEYSTORE_LOCATION = SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG;
    private static final String SSL_KEYSTORE_PASSWORD = SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG;

    private String brokerList = null;
    private String topic = null;
    private String compressionType = null;
    private String securityProtocol = null;
    private String sslTruststoreLocation = null;
    private String sslTruststorePassword = null;
    private String sslKeystoreType = null;
    private String sslKeystoreLocation = null;
    private String sslKeystorePassword = null;

    private int retries = 0;
    private int requiredNumAcks = Integer.MAX_VALUE;
    private boolean syncSend = false;
    private Producer<byte[], byte[]> producer = null;

    public Producer<byte[], byte[]> getProducer() {
        return producer;
    }

    public String getBrokerList() {
        return brokerList;
    }

    public void setBrokerList(String brokerList) {
        this.brokerList = brokerList;
    }

    public int getRequiredNumAcks() {
        return requiredNumAcks;
    }

    public void setRequiredNumAcks(int requiredNumAcks) {
        this.requiredNumAcks = requiredNumAcks;
    }

    public int getRetries() {
        return retries;
    }

    public void setRetries(int retries) {
        this.retries = retries;
    }

    public String getCompressionType() {
        return compressionType;
    }

    public void setCompressionType(String compressionType) {
        this.compressionType = compressionType;
    }

    public String getTopic() {
        return topic;
    }

    public void setTopic(String topic) {
        this.topic = topic;
    }

    public boolean getSyncSend() {
        return syncSend;
    }

    public void setSyncSend(boolean syncSend) {
        this.syncSend = syncSend;
    }

    public String getSslTruststorePassword() {
        return sslTruststorePassword;
    }

    public String getSslTruststoreLocation() {
        return sslTruststoreLocation;
    }

    public String getSecurityProtocol() {
        return securityProtocol;
    }

    public void setSecurityProtocol(String securityProtocol) {
        this.securityProtocol = securityProtocol;
    }

    public void setSslTruststoreLocation(String sslTruststoreLocation) {
        this.sslTruststoreLocation = sslTruststoreLocation;
    }

    public void setSslTruststorePassword(String sslTruststorePassword) {
        this.sslTruststorePassword = sslTruststorePassword;
    }

    public void setSslKeystorePassword(String sslKeystorePassword) {
        this.sslKeystorePassword = sslKeystorePassword;
    }

    public void setSslKeystoreType(String sslKeystoreType) {
        this.sslKeystoreType = sslKeystoreType;
    }

    public void setSslKeystoreLocation(String sslKeystoreLocation) {
        this.sslKeystoreLocation = sslKeystoreLocation;
    }

    public String getSslKeystoreLocation() {
        return sslKeystoreLocation;
    }

    public String getSslKeystoreType() {
        return sslKeystoreType;
    }

    public String getSslKeystorePassword() {
        return sslKeystorePassword;
    }

    @Override
    public void activateOptions() {
        // check for config parameter validity
        Properties props = new Properties();
        if (brokerList != null)
            props.put(BOOTSTRAP_SERVERS_CONFIG, brokerList);
        if (props.isEmpty())
            throw new ConfigException("The bootstrap servers property should be specified");
        if (topic == null)
            throw new ConfigException("Topic must be specified by the Kafka log4j appender");
        if (compressionType != null)
            props.put(COMPRESSION_TYPE_CONFIG, compressionType);
        if (requiredNumAcks != Integer.MAX_VALUE)
            props.put(ACKS_CONFIG, Integer.toString(requiredNumAcks));
        if (retries > 0)
            props.put(RETRIES_CONFIG, retries);
        if (securityProtocol != null && sslTruststoreLocation != null &&
                sslTruststorePassword != null) {
            props.put(SECURITY_PROTOCOL, securityProtocol);
            props.put(SSL_TRUSTSTORE_LOCATION, sslTruststoreLocation);
            props.put(SSL_TRUSTSTORE_PASSWORD, sslTruststorePassword);

            if (sslKeystoreType != null && sslKeystoreLocation != null &&
                    sslKeystorePassword != null) {
                props.put(SSL_KEYSTORE_TYPE, sslKeystoreType);
                props.put(SSL_KEYSTORE_LOCATION, sslKeystoreLocation);
                props.put(SSL_KEYSTORE_PASSWORD, sslKeystorePassword);
            }
        }

        props.put(KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
        props.put(VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
        this.producer = getKafkaProducer(props);
        LogLog.debug("Kafka producer connected to " + brokerList);
        LogLog.debug("Logging for topic: " + topic);
    }

    protected Producer<byte[], byte[]> getKafkaProducer(Properties props) {
        return new KafkaProducer<byte[], byte[]>(props);
    }

    @Override
    protected void append(LoggingEvent event) {
        String message = subAppend(event);
        LogLog.debug("[" + new Date(event.getTimeStamp()) + "]" + message);
        Future<RecordMetadata> response = producer.send(new ProducerRecord<byte[], byte[]>(topic, message.getBytes()));
        if (syncSend) {
            try {
                response.get();
            } catch (InterruptedException ex) {
                throw new RuntimeException(ex);
            } catch (ExecutionException ex) {
                throw new RuntimeException(ex);
            }
        }
    }

    private String subAppend(LoggingEvent event) {
        return (this.layout == null) ? event.getRenderedMessage() : this.layout.format(event);
    }

    public void close() {
        if (!this.closed) {
            this.closed = true;
            producer.close();
        }
    }

    public boolean requiresLayout() {
        return true;
    }

}

logstash配置

logstash可能会有快速启动实例的需求,我们就采用docker部署,能够快速启动、扩展等功能。

镜像就直接logstash官方镜像logstash docker镜像,我们选择了一种最简单启动方式做演示,具体还有多种docker部署方法,可以参考以上链接。

docker启动命令,input输入中,指定kafka集群地址和对应topic;输出,则是elasticsearch集群地址、索引,以及elastisearch配置参数。

#启动命令
docker run -it -d logstash -e 'input { kafka { 
	bootstrap_servers => "kafkaIp1:9092,kafkaIp2:9092" 
	topics => ["api-admin"] } } 
output { elasticsearch { 
	hosts => ["elasticsearch1:9200","elasticsearch2:9200","elasticsearch3:9200"] 
	index => "api-admin-%{+YYYY.MM.dd}" 
	flush_size => 20000 
	idle_flush_time => 10 
	template_overwrite => true 
} }'

在marathon中启动在Command选项中加入参数logstash -e 'input {} output {}'即可。另外说一句, 如果在容器编排系统(mesos/marathon、kubernetes)中,可能会有健康检查要求,其端口为9600。

© 著作权归作者所有

共有 人打赏支持
杨子敬的瞎扯时刻
粉丝 10
博文 11
码字总数 8785
作品 0
西城
程序员
springcloud+sleuth+zipkin+kafka+es

前面已经完成了Springcloud+sleuth+zipkin的入门,以及kafka的安装。至于ES这里就不在说明了,网上安装使用资料挺多的,这里仅仅是将其作为持久化工具使用。 环境说明 jdk1.8 server 64位 in...

xiaomin0322
04/23
0
0
基于 Spring Cloud 的企业级认证与授权 - pig-cloud

基于Spring Cloud、Spring Security Oauth2.0开发企业级认证与授权,提供常见服务监控、链路追踪、日志分析、缓存管理、任务调度等实现。 特点 业务模块不涉及oauth2.0,认证鉴权全部在网关模...

冷冷gg
01/07
0
3
Spring Cloud-Honghu Cloud分布式微服务云系统—技术点

鸿鹄Cloud是基于springcloud的,spring cloud本身提供的组件就很多,但我们需要按照企业的业务模式来定制企业所需要的通用架构,那我们现在需要考虑使用哪些技术呢? 下面我针对于spring cl...

itcloud
04/25
0
0
Uber jaeger--一个基于Go的分布式追踪系统

Jaeger-Uber开源的一个基于Go的分布式追踪系统 最近因工作需要在研究traing系统,最后选了jaeger,下面是一些总结,同时摘抄了网上的一些资料,并结合自己实践过程中遇到的一些什么问题,欢迎...

北极之北
05/30
0
0
使用elasticsearch作为存储引擎部署jaeger

使用elasticsearch作为存储引擎部署jaeger 由于网上大部分文章部署jaeger都是使用Cassandra,使用elasticsearch较少,而我主要是使用elasticsearch来部署的,下面是我的一些使用总结,欢迎指针...

北极之北
05/30
0
0
Pig 1.0-BETA 发布,完善的 Spring Cloud 开发脚手框架

Hi 大家好! 我是pig开发团队的冷冷,经过4月迭代,400次提交,关闭issue 35 ,N个内测版本,我们将这个完善的微服务开发脚手架框架正式开放公测。 PIG是一个后端基于Spring Cloud、oAuth2....

冷冷gg
04/24
0
0
基于ELK实时日志分析的最佳实践

在2018云栖大会深圳峰会大数据分析与可视化专场上,由阿里巴巴搜索引擎事业部开放搜索团队的吴迪带来了“基于ELK实时日志分析的最佳实践”的主题分享。介绍了传统的日志分析、ELK的概念和ELK...

smile小太阳
05/06
0
0
论代码所需要的环境、版本的重要性

学员们在参与“基于Spring Boot的博客系统实战”课程的时候,可能没有太注意版本的问题。其实,版本是一个非常重要也是一个非常容易忽略的问题。 版本不一致会导致各种奇怪的问题,比如: 应...

老卫
06/26
0
0
Grafana、elasticsearch、kafka、logstash和pinpoint结合

一、Grafana 1)下载安装 wget https://s3-us-west-2.amazonaws.com/grafana-releases/release/grafana-4.2.0-1.x8664.rpm sudo yum localinstall grafana-4.2.0-1.x86_64.rpm 2)启动 serv......

半船水
2017/10/25
0
0
部署 elk 日志系统 elasticsearch、logstash、 kibana

安装elk 安装Java 1.8环境 解压源码安装包: tar xf jdk-8u121-linux-x64.tar.gz ll mkdir /work/opt -p mv jdk1.8.0_121 /work/opt/jdk ll /work/opt/jdk/ chown -R root.root /work/opt v......

lvnian2009
06/26
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

自定义OkHttp的UA

背景 上次的问题很明显 由于我们的ua清一色OkHttp导致快速定位到内部应用。 既然如此我们是否考虑可以在UA上做点手脚。 自定义我们的UA呢??? 分析 首先UA在 均为okhttp/3.2.0 大概率是由于...

Mr_Qi
8分钟前
0
0
【scikit-learn】01:使用案例对sklearn库进行简单介绍

sklearn学习笔记:Quick Start 源地址:http://scikit-learn.org/stable/tutorial/basic/tutorial.html # -*-coding:utf-8-*-''' Author:kevinelstri Datetime:2017.2.16'''......

wangxuwei
11分钟前
0
0
Linux Kernel 4.16 系列停止维护,用户应升级至 4.17

知名 Linux 内核维护人员兼开发人员 Greg Kroah-Hartman 近日在发布 4.16.18 版本的同时,宣布这是 4.16 系列的最后一个维护版本,强烈建议用户立即升级至 4.17 系列。 Linux 4.16 于 2018 年...

问题终结者
13分钟前
0
0
Apache配置时.htaccess失效不起作用的原因分析

.htaccess 失效的原因 1. 重写规则有问题,检查自己的重写规则 2.Apache配置问题,配置中没有配置启用 rewrite a2enmod rewrite 3.网站配置文件没有启用配置需要配置 000-default.conf <Dire...

TU-DESGIN
33分钟前
1
0
两个求最大公约数C/C++算法实现

#include<stdio.h> #include<time.h> #include <iostream>using namespace std;//求最大公约数 LCD(Largest Common Division)//短除法 //m=8251, n=6105; int LCD_ShortDiv(int m, ......

失落的艺术
39分钟前
1
0
QueryPerformanceCounter

windows的Sleep函数,睡眠线程指定毫秒数,可以用来做毫秒延时。 对于微秒延时,没有一个现成的函数,但是可以通过 QueryPerformanceFrequency QueryPerformanceCounter 来间接实现。原理就是...

开飞色
58分钟前
1
0
log4j2使用AsyncRoot不显示行号问题处理

<AsyncRoot level="info" includeLocation="true"> <AppenderRef ref="File"/></AsyncRoot><!--1.异步logger,还需要在pom.xml中添加disruptor的依赖。2.includeLocation结合异......

小翔
今天
3
0
安卓手机上 K 歌,声音延迟怎么解决?

这篇文章可以为你提供一个解决录音和播放同步问题的思路,而且解决了声音从手机传输到耳机上有延时的问题。 初识音频 在开始之前,我先简单介绍一下音频相关的基础知识,方便下文理解。 我们...

编辑部的故事
今天
1
0
使用token实现在有效期内APP自动登录功能

使用token实现在有效期内APP自动登录功能 http://sevennight.cc/2016/07/19/auto_login_impl.html

风云海滩
今天
2
0
Spring Boot集成RabbitMQ发送接收JSON

默认情况下RabbitMQ发送的消息是转换为字节码,这里介绍一下如何发送JSON数据。 ObjectMapper 最简单发送JSON数据的方式是把对象使用ObjectMapper等JSON工具类把对象转换为JSON格式,然后发送...

小致dad
今天
3
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部