使用开源流式数据库进行实时日志流分析

原创
2023/12/10 12:52
阅读数 55

原文链接:https://www.timeplus.com/post/log-stream-analysis

以上是我的某个服务器日志中显示的错误日志。由于这个应用程序是我的云上为客户提供关键服务的应用,因此,就必须迅速确定问题的根本原因,并在错误发生时立即纠正。我需要一种工具,可以帮助我来监视这些错误,对错误进行快速分析,并立即采取行动。

作为可观测性的支柱之一,日志分析是IT运维和安全领域中最重要的任务之一,这一过程中会收集、存储和处理大量的日志。其实现在也有许多知名工具,可以帮助你处理复杂的日志分析,包括:ELK(elastic, logstash, kibana)、Splunk、Sumologic和Datadog。

典型的日志分析工作流程包括:

  1. 数据收集:从不同来源收集日志数据

  2. 数据存储:将收集到的日志保存/索引在服务器存储上

  3. 数据搜索/查询:使用搜索/查询语言处理日志数据

  4. 数据呈现:将查询结果可视化为图表和仪表板

  5. 告警:基于数据查询,发送有关事件的通知

这些工具大多数都是基于文档搜索架构构建的,产品的核心是一个搜索引擎,利用倒排索引或布隆(bloom)过滤器帮助用户在大量数据,或所谓的索引中,快速搜索特定信息。当用户积累了大量日志并希望在日志中找到某些内容时,这些工具便成为了最佳选择。然而,如果你想进行基于日志流的监控,比如当日志在特定错误发生时发送告警,这些工具的搜索架构通常就不是最佳选择,因为它们的搜索是建立在需要时间构建的倒排索引之上的。因此,在相关的数据结构(索引)完全构建之前,数据不能立即可供搜索。这就是流式处理可以发挥作用的地方。今天,我将谈论如何使用开源工具链以及关键组件流式数据库“Proton”来实时监控你的日志流。

Proton:一个流式数据库

“Proton”在单一内核中提供了简单、高效、统一的流式分析与历史分析的融合。

上述的图是“Proton”的架构概要,它具有以下核心组件:

  • 流式存储:类似于Apache Kafka,是一个仅追加日志,可以处理实时流式数据,具有极低的延迟和高可伸缩性。

  • 历史存储:基于ClickHouse的历史在线分析技术构建,提供基于列数据的高性能历史数据查询。

  • 统一查询处理:可以运行基于SQL的、增量的、有状态的流式处理,就像Flink一样,但统一了流式和批处理(历史)模式。

使用流式处理,如果将日志流作为流式数据传输到“Proton”中,你可以运行一个无边界的推送查询。一旦日志中有新的信息,查询将以增量方式处理信息,发出结果并推送给用户。

本文开头我所展示的日志片段实际上是来自“Proton”服务器的错误日志。在接下来的例子中,我将使用“Proton”结合不同的开源工具来监视其自身的错误日志,让我们一起开始接下来的探索旅程吧!

Filebeat + Kafka + Proton

Filebeat是一个基于golang的轻量级开源日志收集器,属于Elastic Stack。Filebeat可用于转发和集中日志数据。

FileBeat软件栈

Filebeat的工作方式类似于追踪日志文件,将新生成的日志内容发送到下游组件,如ElasticSearch、Kafka、Redis等。

在这种情况下,我们配置Filebeat将日志数据发送到一个名为“logs”的Kafka主题,然后在“Proton”中创建一个外部流。

你可以使用Docker compose运行这个软件栈。

使用Filebeat收集日志数据并发送到Kafka

第一步是使用Filebeat将日志流数据发送到Kafka,以下是Filebeat的配置:

filebeat.inputs: - type:log   enabled:true   paths:     -/var/log/proton-server/proton-server.err.log   multiline:     type:pattern     pattern:'^(\d{4}\.\d{2}\.\d{2} \d{2}:\d{2}:\d{2}\.\d+)'     negate:true     match:after
output.kafka: hosts:["kafka:9092"] topic:logs

根据上述配置,Filebeat从“/var/log/proton-server/proton-server.err.log”中读取日志,使用正则表达式模式“^(\d{4}\.\d{2}\.\d{2} \d{2}:\d{2}:\d{2}\.\d+)”将多行日志拆分为事件。该模式匹配每个日志行的起始时间戳(如“2023.11.20 18:02:55.472620”)。然后,将日志事件发送到名为“logs”的Kafka主题。

在“Proton”中创建一个外部流以连接到Kafka主题

现在,日志数据正在实时发送到Kafka主题“logs”。如果你从Kafka中使用该主题,以下便是一些数据样本:

{"topic": "logs","value": "{\"@timestamp\":\"2023-11-23T00:41:56.655Z\",\"@metadata\":{\"beat\":\"filebeat\",\"type\":\"_doc\",\"version\":\"8.11.1\"},\"message\":\"2023.11.23 00:41:41.234843 [ 1 ] {} \u003cWarning\u003e Access(local directory): Recovering lists in directory /var/lib/proton/access/\",\"input\":{\"type\":\"log\"},\"ecs\":{\"version\":\"8.0.0\"},\"host\":{\"name\":\"83c6f2d59442\"},\"agent\":{\"name\":\"83c6f2d59442\",\"type\":\"filebeat\",\"version\":\"8.11.1\",\"ephemeral_id\":\"d88e8a06-63aa-4e62-87fe-3fd79be0b585\",\"id\":\"5dff8233-5b5d-49c7-ae02-8c325a1f6ac5\"},\"log\":{\"file\":{\"path\":\"/var/log/proton-server/proton-server.err.log\"},\"offset\":1268}}","timestamp": 1700700116655,"partition": 0,"offset": 9}{"topic": "logs","value": "{\"@timestamp\":\"2023-11-23T00:41:56.655Z\",\"@metadata\":{\"beat\":\"filebeat\",\"type\":\"_doc\",\"version\":\"8.11.1\"},\"ecs\":{\"version\":\"8.0.0\"},\"host\":{\"name\":\"83c6f2d59442\"},\"agent\":{\"ephemeral_id\":\"d88e8a06-63aa-4e62-87fe-3fd79be0b585\",\"id\":\"5dff8233-5b5d-49c7-ae02-8c325a1f6ac5\",\"name\":\"83c6f2d59442\",\"type\":\"filebeat\",\"version\":\"8.11.1\"},\"log\":{\"file\":{\"path\":\"/var/log/proton-server/proton-server.err.log\"},\"offset\":1393},\"message\":\"2023.11.23 00:41:41.356154 [ 1 ] {} \u003cWarning\u003e TelemetryCollector: Please note that telemetry is enabled. This is used to collect the version and runtime environment information to Timeplus, Inc. You can disable it by setting telemetry_enabled to false in config.yaml\",\"input\":{\"type\":\"log\"}}","timestamp": 1700700116655,"partition": 0,"offset": 10}

让我们将这些样本数据从字符串转换为JSON,以便更好地查看:

{  "@timestamp": "2023-11-23T00:41:56.655Z",  "@metadata": {    "beat": "filebeat",    "type": "_doc",    "version": "8.11.1"  },  "ecs": {    "version": "8.0.0"  },  "host": {    "name": "83c6f2d59442"  },  "agent": {    "ephemeral_id": "d88e8a06-63aa-4e62-87fe-3fd79be0b585",    "id": "5dff8233-5b5d-49c7-ae02-8c325a1f6ac5",    "name": "83c6f2d59442",    "type": "filebeat",    "version": "8.11.1"  },  "log": {    "file": {      "path": "/var/log/proton-server/proton-server.err.log"    },    "offset": 1393  },  "message": "2023.11.23 00:41:41.356154 [ 1 ] {} <Warning> TelemetryCollector: Please note that   telemetry is enabled. This is used to collect the version and runtime environment information to   Timeplus, Inc. You can disable it by setting telemetry_enabled to false in config.yaml",  "input": {    "type": "log"  }}

除了原始服务器日志消息外,Filebeat还向事件添加了一些额外的元数据,包括:时间戳、主机、代理、版本等。

为了分析该日志流,我们可以创建一个“Proton”外部流。

CREATE STREAM IF NOT EXISTS logs(`raw` string)ENGINE = ExternalStreamSETTINGS type = 'kafka', brokers = 'kafka:9092', topic = 'logs'

有了上述创建的流,用户可以使用“Proton”的流式SQL实时分析该日志数据。

使用“Proton”的流式SQL监控日志流

“Proton”流类似于数据库表。不同之处在于,如果在流之上运行查询,查询是无边界的。一旦没有数据到达,查询将立即将查询结果推送给用户。

使用“Proton”的流式SQL监控日志流

在上面的演示中,我运行了两个“Proton”客户端,其中一个运行了一个流式查询“select * from logs”。当我在另一个“Proton”客户端中输入一些无效的SQL,比如“select abc”,这就会生成一些错误日志,第一个查询将立即捕获到这个错误并将错误结果推送给用户。

使用“Proton”的流式SQL,你便可以使用SQL对该流进行更多的实时监控和分析。由于日志消息是非结构化数据,你可以使用grok或正则表达式从消息中提取信息,然后根据你的用例使用SQL来转换或聚合数据。

优点和缺点

Filebeat、Kafka、Proton的软件栈是非常典型的数据流水线软件栈,其中Kafka用于解耦数据生产者和数据消费者,从而在灵活性和组件多样性之间平衡。这种架构有其优点和缺点。

优点

  • 实时日志流式传输

  • 灵活的且可扩展的

  • Kafka提供了日志生产者(Filebeat)和日志消费者(Proton)的解耦,以处理新的消费者或背压等情况

缺点

  • 有一定的复杂性,并且涉及三个不同的组件

Vector + Proton

Vector是一个Rust开发的开源、高性能的数据流水线。该团队和产品已经被Datadog收购

Vector架构

在先前的解决方案中,Kafka被用作日志流传输器。这种做法的好处在于隔离了日志生产者和消费者。然而,在某些情况下,用户可能并不真的关心这种隔离,我们可以使用Vector的HTTP输出直接将日志数据传送到“Proton”,从而简化整体部署。

Vector软件栈

你可以使用Docker compose运行这个软件栈。

使用Vector收集日志数据并发送到“Proton”流

在这种情况下,我们从日志中收集数据,然后使用“Proton”的HTTP摄取API将这些数据发送到“Proton”。

首先,我们需要在“Proton”中创建一个流。

CREATE STREAM IF NOT EXISTS logs(raw string)

然后,我们配置Vector以读取日志数据,将其转换为“Proton”摄取负载格式,然后通过HTTP进行摄取。以下是Vector的配置:

Filebeat、Kafka、Proton的软件栈是非常典型的数据流水线软件栈,其中Kafka用于解耦数据生产者和数据消费者,从而在灵活性和组件多样性之间平衡。这种架构有其优点和缺点。

sources: logs:   type:"file"   multiline:start_pattern:^(\d{4}\.\d{2}\.\d{2}\d{2}:\d{2}:\d{2}\.\d+)mode:halt_beforecondition_pattern:^(\d{4}\.\d{2}\.\d{2}\d{2}:\d{2}:\d{2}\.\d+)timeout_ms:1000   include:     -"/var/log/proton-server/proton-server.err.log"
transforms: payload:   type:"lua"   inputs:     -"logs"   version:"2"   hooks:     process:|-function(event,emit)event.log.message={columns={"raw"},data={{event.log.message}}}emit(event)end
sinks: proton:   type:http   inputs:     -"payload"   compression:none   method:post   uri:http://proton:3218/proton/v1/ingest/streams/logs   batch:     max_events:1   encoding:     codec:"text"

在这种情况下,我们调用Vector的LUA脚本将原始日志事件转换为“Proton”摄取负载,参考以下的摄取curl的示例命令:

curl  -X POST \  'http://localhost:3218/proton/v1/ingest/streams/logs' \  --header 'Content-Type: application/json' \  --data-raw '{"columns":["raw"],"data":[["log message"]]}'

使用Vector收集日志数据并发送到“Proton”流

与之前的演示类似,日志流将在事件发生时实时捕获任何新事件。

然而,这种配置存在一个限制,即我们的转换将每个日志事件转换为一个“Proton” HTTP摄取调用。在日志事件很多的情况下,每个事件发送一次HTTP摄取将导致性能问题,最好发送一个事件批次。可以利用Vector提供的dedupe变换来支持这一点。但是调试这样的数据流水线并不容易,未来我会开始研究这部分。

优点和缺点

使用Vector和“Proton”软件栈,我们不再需要Kafka。但现在我们需要在Vector数据流水线配置中处理数据转换和批处理。

优点

  • 实时日志流式传输

  • 高性能的日志收集器

缺点

  • Vector数据流水线难以调试,很难支持批量摄取

仅使用“Proton”

在先前的解决方案中,Kafka已被移除,但在某些情况下,你可能希望能够快速运行日志流分析,并且希望可以减少所用组件。那这样有没有可能呢?是的,实际上你可以只运行“Proton”来完成,使用“Proton”提供的外部日志流功能

仅使用“Proton”的软件栈

这是我能想到的可用于实时监控日志流的最简单的软件栈。“Proton”充当了日志数据收集器、存储器和查询处理器,所有这些功能都集成在一个组件中。

只需运行“docker run -d --pull always --name proton ghcr.io/timeplus-io/proton:latest

”即可完成!你现在就已经拥有了监控日志流所需的一切啦!

在这种情况下,我们只需使用“Proton”创建一个外部日志流,并参考以下SQL进行相关配置:

CREATE EXTERNAL STREAM proton_log(  raw string)SETTINGStype='log',   log_files='proton-server.log',   log_dir='/var/log/proton-server',   timestamp_regex='^(\d{4}\.\d{2}\.\d{2} \d{2}:\d{2}:\d{2}\.\d+)',   row_delimiter='(\d{4}\.\d{2}\.\d{2} \d{2}:\d{2}:\d{2}\.\d+) \[ \d+ \] \{'

优点和缺点

现在,我们已经拥有了一个超紧凑的日志流式部署。它快速、简单且强大,唯一的限制是由于在与日志相同的盒子中运行“Proton”,性能和可扩展性受到该盒子上的资源限制。

优点

  • 实时日志流式传输

  • 低延迟和快速响应

  • 通过使用流式SQL形成的强大的事件流式分析能力

  • 低成本地使用简化的软件栈

缺点

  • 由于本地资源限制,可扩展性受到限制

使用仅包含“Proton”的解决方案,“Proton”必须部署在与日志文件位于同一位置的本地,这意味着“Proton”将与你的服务器位于同一台机器上共享计算资源。虽然“Proton”被设计为高性能、低占用的,对于大多数日志分析场景来说已经足够好了。如果可扩展性是一个问题,那么你可以考虑之前介绍的两个选项。

还值得一提的是,外部流不会在本地保留任何数据。如果需要进行历史日志分析,你可以考虑创建一个物化视图,将这些日志数据存储在本地。

总结

日志数据出于多种原因而至关重要,涵盖了IT、安全和业务运营等各个领域。虽然存在许多成熟的工具来帮助用户进行日志分析,但这些工具并非设计用于以实时低延迟处理日志流。这就是流式数据库(如“Proton”)发挥作用的地方,能够在事件发生时立即处理日志流数据。

在这篇文章中,我演示了使用“Proton”监控日志流的三种不同的架构选项,你可以选择适合你的最佳选项。“Proton”也是我们实时流数据平台Timeplus的核心引擎,快来试一试,告诉我们你的想法吧!

展开阅读全文
加载中
点击引领话题📣 发布并加入讨论🔥
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部