文档章节

容器日志处理及实现

好雨云帮
 好雨云帮
发布于 2017/04/28 09:47
字数 1978
阅读 1285
收藏 30

容器日志

输出形式:

目前容器日志有两种输出形式:

  • stdout,stderr 标准输出

这种形式的日志输出我们可以直接使用docker logs查看日志, k8s 集群中同样集群可以使用kubectl logs类似的形式查看日志。

  • 日志文件记录

这种日志输出我们无法从以上方法查看日志内容,只能tail日志文件查看。

收集方式:

不论你的业务容器日志如何输出,都是可以使用统一的日志收集器收集。常见的日志收集方式:

  • k8s 集群
  1. 集群启动时会在每个机器启动一个Fluentd agent收集日志然后发送给 Elasticsearch。实现方式是每个agent挂载目录/var/lib/docker/containers使用fluentd的tail插件扫描每个容器日志文件,直接发送给Elasticsearch。
  2. Fluentd agent起在业务同一个 pod 中共享 volume 然后实现对日志文件的收集发送给Elasticsearch。
  • docker swarm 集群 docker swarm 目前暂时没有提供日志查看机制。但是docker cloud提供了与kubectrl logs类似的机制查看 stdout 的日志。目前还没有 fluentd 插件直接对服务进行日志收集,暂时考虑直接使用使用跟容器一样的机制收集。docker service create 支持--log-driver

  • docker 容器

    从 docker1.8 内置了fluentd log driver 。以如下的形式启动容器,容器 stdout/stderr 日志将发往配置的 fluentd 。如果配置后,docker logs将无法使用。另外默认模式下如果你配置得地址没有正常服务,容器无法启动。你也可以使用fluentd-async-connect形式启动, docker daemon 则能在后台尝试连接并缓存日志。

docker run --log-driver=fluentd --log-opt fluentd-address=myhost.local:24224 同样如果是日志文件,将文件暴露出来直接使用 fluentd 收集。

容器日志源码简单分析

# /container/container.go:63
type CommonContainer struct {
	StreamConfig *stream.Config
  ...
}
# /container/stream/streams.go:26
type Config struct {
	sync.WaitGroup
	stdout    *broadcaster.Unbuffered
	stderr    *broadcaster.Unbuffered
	stdin     io.ReadCloser
	stdinPipe io.WriteCloser
}

moby源码来看,每一个container实例都有几个属性stdout,stderr,stdin,以及管道stdinPipe(当容器使用-i参数启动时标准输入将被运行,daemon将能够使用此管道向容器内写入标准输入).

20170427149327482466768.png 那么针对如上的实例该如何实现日志收集转发?

# /container/container.go:312
func (container *Container) StartLogger(cfg containertypes.LogConfig) (logger.Logger, error) {
	c, err := logger.GetLogDriver(cfg.Type)
	if err != nil {
		return nil, fmt.Errorf("Failed to get logging factory: %v", err)
	}
	ctx := logger.Context{
		Config:              cfg.Config,
		ContainerID:         container.ID,
		ContainerName:       container.Name,
		ContainerEntrypoint: container.Path,
		ContainerArgs:       container.Args,
		ContainerImageID:    container.ImageID.String(),
		ContainerImageName:  container.Config.Image,
		ContainerCreated:    container.Created,
		ContainerEnv:        container.Config.Env,
		ContainerLabels:     container.Config.Labels,
		DaemonName:          "docker",
	}

	// Set logging file for "json-logger"
	if cfg.Type == jsonfilelog.Name {
		ctx.LogPath, err = container.GetRootResourcePath(fmt.Sprintf("%s-json.log", container.ID))
		if err != nil {
			return nil, err
		}
	}
	return c(ctx)
}
#/container/container.go:978
func (container *Container) startLogging() error {
	if container.HostConfig.LogConfig.Type == "none" {
		return nil // do not start logging routines
	}

	l, err := container.StartLogger(container.HostConfig.LogConfig)
	if err != nil {
		return fmt.Errorf("Failed to initialize logging driver: %v", err)
	}

	copier := logger.NewCopier(map[string]io.Reader{"stdout": container.StdoutPipe(), "stderr": container.StderrPipe()}, l)
	container.LogCopier = copier
	copier.Run()
	container.LogDriver = l

	// set LogPath field only for json-file logdriver
	if jl, ok := l.(*jsonfilelog.JSONFileLogger); ok {
		container.LogPath = jl.LogPath()
	}

	return nil
}

第一个方法是为container查找log-driver。首先根据容器配置的log-driver类别调用:logger.GetLogDriver(cfg.Type)返回一个方法类型:

/daemon/logger/factory.go:9
type Creator func(Context) (Logger, error)

实质就是从工厂类注册的logdriver插件去查找,具体源码下文分析。获取到c方法后构建调用参数具体就是容器的一些信息。然后使用调用c方法返回driver。driver是个接口类型,我们看看有哪些方法:

# /daemon/logger/logger.go:61
type Logger interface {
	Log(*Message) error
	Name() string
	Close() error
}

很简单的三个方法,也很容易理解,Log()发送日志消息到driver,Close()进行关闭操作(根据不同实现)。 也就是说我们自己实现一个logdriver,只需要实现如上三个方法,然后注册到logger工厂类中即可。下面我们来看/daemon/logger/factory.go

第二个方法就是处理日志了,获取到日志driver,在创建一个Copier,顾名思义就是复制日志,分别从stdout 和stderr复制到logger driver。下面看看具体关键实现:

#/daemon/logger/copir.go:41
func (c *Copier) copySrc(name string, src io.Reader) {
	defer c.copyJobs.Done()
	reader := bufio.NewReader(src)

	for {
		select {
		case <-c.closed:
			return
		default:
			line, err := reader.ReadBytes('\n')
			line = bytes.TrimSuffix(line, []byte{'\n'})

			// ReadBytes can return full or partial output even when it failed.
			// e.g. it can return a full entry and EOF.
			if err == nil || len(line) > 0 {
				if logErr := c.dst.Log(&Message{Line: line, Source: name, Timestamp: time.Now().UTC()}); logErr != nil {
					logrus.Errorf("Failed to log msg %q for logger %s: %s", line, c.dst.Name(), logErr)
				}
			}

			if err != nil {
				if err != io.EOF {
					logrus.Errorf("Error scanning log stream: %s", err)
				}
				return
			}
		}
	}
}

每读取一行数据,构建一个消息,调用logdriver的log方法发送到driver处理。

日志driver注册器

位于/daemon/logger/factory.go的源码实现即时日志driver的注册器,其中几个重要的方法(上文已经提到一个):

# /daemon/logger/factory.go:21
func (lf *logdriverFactory) register(name string, c Creator) error {
	if lf.driverRegistered(name) {
		return fmt.Errorf("logger: log driver named '%s' is already registered", name)
	}

	lf.m.Lock()
	lf.registry[name] = c
	lf.m.Unlock()
	return nil
}
# /daemon/logger/factory.go:39
func (lf *logdriverFactory) registerLogOptValidator(name string, l LogOptValidator) error {
	lf.m.Lock()
	defer lf.m.Unlock()

	if _, ok := lf.optValidator[name]; ok {
		return fmt.Errorf("logger: log validator named '%s' is already registered", name)
	}
	lf.optValidator[name] = l
	return nil
}

看起来很简单,就是将一个Creator方法类型添加到一个map结构中,将LogOptValidator添加到另一个map这里注意加锁的操作。

#/daemon/logger/factory.go:13
type LogOptValidator func(cfg map[string]string) error

这个主要是验证driver的参数 ,dockerd和docker启动参数中有:--log-opt

实例

云帮怎么实现的

使用自己实现的 zeroMQ-driver 直接将容器日志通过 0MQ 发到日志统一处理中心。在处理中心统一完成下一步处理。如果平台用户需要将日志向外输出或者直接对接平台内日志分析应用,我们的处理是在应用 pod 中启动日志收集插件容器(封装扩展的 fluentd ),根据用户的需要配置日志出口,实现应用级日志收集。容器日志首先是由 docker-daemon 收集到,再根据容器 log-driver 配置进行相应操作,也就是说如果你的宿主机网络与容器网络不通(k8s 集群),日志从宿主机到 pod 中的收集容器只有两种方式:走外层网络,文件挂载。 我们采用文件挂载方式。 以zmq-driver为例讲讲我们怎么实现自己的driver。直接接收容器的日志。

//定义一个struct,这里包含一个zmq套接字
type ZmqLogger struct {
	writer      *zmq.Socket
	containerId string
	tenantId    string
	serviceId   string
	felock      sync.Mutex
}
//定义init方法调用logger注册器的方法注册当前driver
//和参数验证方法。
func init() {
	if err := logger.RegisterLogDriver(name, New); err != nil {
		logrus.Fatal(err)
	}
	if err := logger.RegisterLogOptValidator(name, ValidateLogOpt); err != nil {
		logrus.Fatal(err)
	}
}
//实现一个上文提到的Creator方法注册logdriver.
//这里新建一个zmq套接字构建一个实例
func New(ctx logger.Context) (logger.Logger, error) {
	zmqaddress := ctx.Config[zmqAddress]

	puber, err := zmq.NewSocket(zmq.PUB)
	if err != nil {
		return nil, err
	}
	var (
		env       = make(map[string]string)
		tenantId  string
		serviceId string
	)
	for _, pair := range ctx.ContainerEnv {
		p := strings.SplitN(pair, "=", 2)
		//logrus.Errorf("ContainerEnv pair: %s", pair)
		if len(p) == 2 {
			key := p[0]
			value := p[1]
			env[key] = value
		}
	}
	tenantId = env["TENANT_ID"]
	serviceId = env["SERVICE_ID"]

	if tenantId == "" {
		tenantId = "default"
	}

	if serviceId == "" {
		serviceId = "default"
	}

	puber.Connect(zmqaddress)

	return &ZmqLogger{
		writer:      puber,
		containerId: ctx.ID(),
		tenantId:    tenantId,
		serviceId:   serviceId,
		felock:      sync.Mutex{},
	}, nil
}
//实现Log方法,这里使用zmq socket发送日志消息
//这里必须注意,zmq socket是线程不安全的,我们知道
//本方法可能被两个线程(复制stdout和肤质stderr)调用//必须使用锁保证线程安全。否则会发生错误。
func (s *ZmqLogger) Log(msg *logger.Message) error {
	s.felock.Lock()
	defer s.felock.Unlock()
	s.writer.Send(s.tenantId, zmq.SNDMORE)
	s.writer.Send(s.serviceId, zmq.SNDMORE)
	if msg.Source == "stderr" {
		s.writer.Send(s.containerId+": "+string(msg.Line), zmq.DONTWAIT)
	} else {
		s.writer.Send(s.containerId+": "+string(msg.Line), zmq.DONTWAIT)
	}
	return nil
}
//实现Close方法,这里用来关闭zmq socket。
//同样注意线程安全,调用此方法的是容器关闭协程。
func (s *ZmqLogger) Close() error {
	s.felock.Lock()
	defer s.felock.Unlock()
	if s.writer != nil {
		return s.writer.Close()
	}
	return nil
}

func (s *ZmqLogger) Name() string {
	return name
}
//验证参数的方法,我们使用参数传入zmq pub的地址。
func ValidateLogOpt(cfg map[string]string) error {
	for key := range cfg {
		switch key {
		case zmqAddress:
		default:
			return fmt.Errorf("unknown log opt '%s' for %s log driver", key, name)
		}
	}
	if cfg[zmqAddress] == "" {
		return fmt.Errorf("must specify a value for log opt '%s'", zmqAddress)
	}
	return nil
}

© 著作权归作者所有

好雨云帮
粉丝 39
博文 123
码字总数 154027
作品 1
朝阳
私信 提问
理解OpenShift(6):集中式日志处理

理解OpenShift(1):网络之 Router 和 Route 理解OpenShift(2):网络之 DNS(域名服务) 理解OpenShift(3):网络之 SDN 理解OpenShift(4):用户及权限管理 理解OpenShift(5):从 Do...

SammyLiu
2018/12/19
0
0
关于k8s集群容器日志收集的总结

【作者 云盟成员 barnett】本文介绍了k8s官方提供的日志收集方法,并介绍了Fluentd日志收集器并与其他产品做了比较。最后介绍了好雨云帮如何对k8s进行改造并使用ZeroMQ以消息的形式将日志传输...

好雨云帮
2016/12/22
845
0
关于K8s集群器日志收集的总结

【作者barnett】本文介绍了k8s官方提供的日志收集方法,并介绍了Fluentd日志收集器并与其他产品做了比较。最后介绍了好雨云帮如何对k8s进行改造并使用ZeroMQ以消息的形式将日志传输到统一的日...

good_rain
2016/12/15
126
0
再次升级!阿里云Kubernetes日志解决方案

背景 针对K8S日志采集存在的采集目标多、弹性伸缩难、运维成本大、侵入性高、采集性能低等问题,在18年2月份日志服务和容器服务团队一起发布了阿里云Kubernetes日志解决方案。1分钟内即可完成...

元乙
2018/05/24
0
0
Kubernetes-基于EFK进行统一的日志管理原理

EFK安装部署参考:https://blog.csdn.net/luanpeng825485697/article/details/83312662 1、统一日志管理的整体方案 通过应用和系统日志可以了解Kubernetes集群内所发生的事情,对于调试问题和...

数据架构师
2018/11/01
0
0

没有更多内容

加载失败,请刷新页面

加载更多

视频如何加水印?

很多视频制作者的视频都被他人盗用过,为了防止自己的劳动成果被他人窃取,给视频加水印对于视频制作者来说,是一件非常重要的事情。那么下面分享一个手机给视频加水印的方法,一起来看看吧!...

白米稀饭2019
33分钟前
5
0
004-Envelop-基于Blockstack的文件传输dapp

本篇文章主要介绍基于Blockstack的文件传输工具; ####A-链接地址 官网地址:https://envelop.app/ Github地址:https://github.com/envelop-app ####B-特性: 1: Share private files easil...

Riverzhou
35分钟前
7
0
SpringCloud——声明式调用Feign

Feign声明式调用 一、Feign简介 使用Ribbon和RestTemplate消费服务的时候,有一个最麻烦的点在于,每次都要拼接URL,组织参数,所以有了Feign声明式调用,Feign的首要目标是将Java HTTP客户端...

devils_os
41分钟前
7
0
《JAVA核心知识》学习笔记 (22. 数据结构)

22.1.1. 栈(stack) 栈( stack)是限制插入和删除只能在一个位置上进行的表,该位置是表的末端,叫做栈顶 (top)。它是后进先出(LIFO)的。对栈的基本操作只有 push(进栈)和 pop(出栈...

Shingfi
47分钟前
6
0
你对AJAX认知有多少(1)?

AJAX(一) AJAX技术对于前段或者后端工程师来说,都是必不可缺的 那我们这几期都来细细品味一下AJAX的相关知识,直接上干货喽~ 1、什么是AJAX,为什么要使用Ajax(请谈一下你对Ajax的认识) 什么...

理性思考
54分钟前
15
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部