Logstash扩展开发 - Input 与 Codec

原创
2015/12/26 22:32
阅读数 3K

Logstash扩展开发 - Input 与 Codec

1.Input的run方法中诞生了Event

Input插件负责从原始数据源中‘发射’出Logstash Processing PipeLine中流动的Event对象,在Input之后的Filter与Output中就以Event说话了。一个简单的Input插件代码样例如下:

# encoding: utf-8
require "logstash/inputs/base"
require "logstash/namespace"
require "stud/interval"
require "socket" # for Socket.gethostname

# Add any asciidoc formatted documentation here
# Generate a repeating message.
#
# This plugin is intended only as an example.

class LogStash::Inputs::Example < LogStash::Inputs::Base
  config_name "example"

  # If undefined, Logstash will complain, even if codec is unused.
  default :codec, "plain"

  # The message string to use in the event.
  config :message, :validate => :string, :default => "Hello World!"

  # Set how frequently messages should be sent.
  #
  # The default, `1`, means send a message every second.
  config :interval, :validate => :number, :default => 1

  public
  def register
    @host = Socket.gethostname
  end # def register

  def run(queue)
    Stud.interval(@interval) do
      event = LogStash::Event.new("message" => @message, "host" => @host)
      decorate(event)
      queue << event
    end # loop
  end # def run

end # class LogStash::Inputs::Example

上例中的run方法构造了event对象实例放入了input-filter队列queue。该例中的事件对象简单地被直接构造了出来,但实际情况中往往更复杂。这其中包括以下关键问题:

1.1 获取数据的消息模式

为了与各种异构数据源进行通信,如果抛开不同数据源涉及到的不同协议参数和SDK不管的话,最主要的问题在于数据获取的模式。若以消息的观点来看,消息的点对点传输分为两种模式:

  • 推模式
  • 拉模式

从Logstash实例角度出发,“推模式”意味着Logstash被动地等待数据发送请求;“拉模式”意味着Logstash要主动向数据源发起查询、读取请求。以通过HTTP协议获取数据为例,Logstash中存在着两个相关插件,分别是httphttp_poller,前者应用于“推模式”场景,后者应用于“拉模式”场景。

多数情况下,“推模式”相对简单,logstash实例作为server角色被动地接收数据发送请求即可,不过有时也会涉及到请求负载、复杂协议解析等问题。而对于“拉模式”,logstash作为主动查询方,需要考虑的事情就稍多一些比如控制查询速率、记录上次的查询位置,既要避免遗漏又要避免重复,保证数据整体上的完整一致性。

下方的代码示例是jdbc输入插件的代码片段(run方法):

  def run(queue)
    if @schedule
      @scheduler = Rufus::Scheduler.new(:max_work_threads => 1)
      @scheduler.cron @schedule do
        execute_query(queue)
      end

      @scheduler.join
    else
      execute_query(queue)
    end
  end # def run

#...

  def execute_query(queue)
    # update default parameters
    @parameters['sql_last_start'] = @sql_last_start
    execute_statement(@statement, @parameters) do |row|
      event = LogStash::Event.new(row)
      decorate(event)
      queue << event
    end
  end

从中可以清晰地看到,需要安排查询任务的调度并处理上次查询位置记录。

1.2 识别并包装事件对象

另外,从上例中可以看到event对象的构造过程——直接用jdbc查询结果集中的row来构造事件。这个过程在这里表现地很简单很清晰,但这并不意味着所有的情况都是如此简单清晰。事实上大多数情况都要比这复杂。先说这里为什么简单?其原因主要有两个:

原因1. 数据交互协议是严格定义的:对于数据消费方来说经过规范化封装的数据结果集是可以直接拿来用的,比如直接以一个row对应一个事件即可,不需要再为事件划分做什么多余的事情。 原因2. 数据对象本身结构化的:row可以被直接作为事件并非因为它是个Object的,而在于它自身的内容是结构化的(Hash),正好迎合了event的结构需求。

能够具备以上两点优势的多数集中于“基于专用通信协议针对结构性数据源进行数据获取”的场景。比如针对各种数据库的查询,不管是关系型DB还是NoSQL DB;再比如通过JMS这样的消息协议传输的数据以及Log4J Socket Appender发送的数据,等等。

关于事件对象的构造工作,以上讨论只是交代了在具备良好协议与数据结构的环境下如何搞定。但是,更多情况下并不具备这样的条件,又该如何应对呢?接下来就要讨论如何从非结构化数据中构造事件的问题了。

这里所谓的非结构化指的是数据本身缺乏基本的格式约束,相比较于使用jdbc从关系库中直接获取清晰完整的数据对象的情况,那么如何处理包含任意内容的日志文本文件呢?如何从纯文本(plain text)的序列数据中划分事件边界,进而包装事件对象呢?相比较jdbc api提供的模板化工具性支持,从纯文本(plain text)中读取数据可能需要我们从最基本的字符编码做起。好吧,说到这里,我们面临的其实是事件数据的反序列化问题。在处理这种稍复杂的问题时,Logstash以委托的手法,确切的说是基于“策略模式”引入了codec接口,将复杂的事件对象反序列化过程与数据源信息交换进行了隔离,一来代码优雅清晰易维护,二来扩展性强,三来与问题域自然对应方便做到搞定任意数据源。下方的代码示例是stdin输入插件的代码片段(run方法与codec配置)。

  default :codec, "line"

  #……

  def run(queue)
    while !stop?
      begin
        # Based on some testing, there is no way to interrupt an IO.sysread nor
        # IO.select call in JRuby. Bummer :(
        data = $stdin.sysread(16384)
        @codec.decode(data) do |event|
          decorate(event)
          event["host"] = @host if !event.include?("host")
          queue << event
        end
      rescue IOError, EOFError # stdin closed
        break
      rescue => e
        # ignore any exception in the shutdown process
        break if stop?
        raise(e)
      end
    end
  end

从上方的代码样例中可以看到event对象是由@codec实例变量执行decode方法得到的,而非如同前例中直接在input的run过程中直接构造。另外还可以看到stdin默认执行的是line codec。接下来的内容就要重点就要介绍一下codec的概念了。

2.Codec的概念、作用和用法

“codec”的字面意思是解码编码器。在Logstash Processing Pipeline中它会出现在两个位置上,一是在input里,再者就是output里。虽然从位置上看是对称的,但在两个位置分别起到的作用其实并不严格对应它名字中的对称含义——“解码--编码”。关于这点我们后置讨论。暂时地,我们先重点关注codec在input插件中的作用。其实,codec就是帮助input搞定了复杂情形下识别并包装事件对象的问题。

下方代码示例为plain codec的源码:

# encoding: utf-8
require "logstash/codecs/base"
require "logstash/util/charset"

# The "plain" codec is for plain text with no delimiting between events.
#
# This is mainly useful on inputs and outputs that already have a defined
# framing in their transport protocol (such as zeromq, rabbitmq, redis, etc)
class LogStash::Codecs::Plain < LogStash::Codecs::Base
  config_name "plain"

  # Set the message you which to emit for each event. This supports `sprintf`
  # strings.
  #
  # This setting only affects outputs (encoding of events).
  config :format, :validate => :string

  # The character encoding used in this input. Examples include `UTF-8`
  # and `cp1252`
  #
  # This setting is useful if your log files are in `Latin-1` (aka `cp1252`)
  # or in another character set other than `UTF-8`.
  #
  # This only affects "plain" format logs since json is `UTF-8` already.
  config :charset, :validate => ::Encoding.name_list, :default => "UTF-8"

  public
  def register
    @converter = LogStash::Util::Charset.new(@charset)
    @converter.logger = @logger
  end

  public
  def decode(data)
    yield LogStash::Event.new("message" => @converter.convert(data))
  end # def decode

  public
  def encode(event)
    if event.is_a?(LogStash::Event) and @format
      @on_event.call(event, event.sprintf(@format))
    else
      @on_event.call(event, event.to_s)
    end
  end # def encode

end # class LogStash::Codecs::Plain

可以看到,decode方法中处理了data参数的字符编码,然后构造了事件对象(data作为事件的message属性)。

说到这里,先明确几点问题:

  • Tip1: 每个input只能设置一个codec,通过input的codec属性直接赋值对应codec名字即可
  • Tip2: input默认的codec是plain codec
  • Tip3: codec.decode对于input而言是作为一个工具方法来使用的,可用可不用,有时用于解析整体数据,有时用于解析数据片段(如http的body部分)

codec要解决的问题其实是针对各种具体需求场景的非常个性化的问题,如果硬要总结共性的话,那么这其中主要牵扯两个问题:

  • 问题1:字符编码处理
  • 问题2:事件边界划分

2.1 字符编码处理

比如前文中plain codec的源码示例中就以@converter.convert(data)处理了数据的编码,确保以正确的编码方式读入源数据。否则,很容易会遭遇事件“乱码”。

2.2 事件边界划分

plain codec中并没有处理事件边界问题,其手法是直接将data参数完完整整地当成一个事件的message,如此就完全依赖input中将事件划分搞定。其实并不是所有的情况都如此,要自己搞定事件划分的codec有很多,比如line multiline,可以看到这些codec都跟“文本行”有着很硬的关系。事实即是如此,当处理面向事件的纯文本数据时,以“文本行”为基本处理单元是惯用并通用的处理手段。 (在无协议的纯文本中有什么字符比\n更能代表分割切换含义的呢?)

不过,不按换行套路来的自然会有。比如json codec,它的decode函数代码如下:

  public
  def decode(data)
    data = @converter.convert(data)
    begin
      decoded = LogStash::Json.load(data)
      if decoded.is_a?(Array)
        decoded.each {|item| yield(LogStash::Event.new(item)) }
      elsif decoded.is_a?(Hash)
        yield LogStash::Event.new(decoded)
      else
        @logger.info? && @logger.info("JSON codec received a scalar instead of an Arary or Object!", :data => data)
        yield LogStash::Event.new("message" => data, "tags" => ["_jsonparsefailure"])
      end

    rescue LogStash::Json::ParserError => e
      @logger.info("JSON parse failure. Falling back to plain-text", :error => e, :data => data)
      yield LogStash::Event.new("message" => data, "tags" => ["_jsonparsefailure"])
    rescue StandardError => e
      # This should NEVER happen. But hubris has been the cause of many pipeline breaking things
      # If something bad should happen we just don't want to crash logstash here.
      @logger.warn("An unexpected error occurred parsing input to JSON",
                   :input => data,
                   :message => e.message,
                   :class => e.class.name,
                   :backtrace => e.backtrace)
    end
  end # def decode

可以看到,在这里无所谓行(hang)不行(hang),而是按照json规范来处理data。就像它的Doc中描述的:

This codec may be used to decode (via inputs) and encode (via outputs) full JSON messages. If the data being sent is a JSON array at its root multiple events will be created (one per element).

其实它还有个兄弟json_lines

If you are streaming JSON messages delimited by '\n' then see the json_lines codec.

处理纯文本,“行”划分是最吃得开的。不过关于这个,最后要强调的是,总体上,logstash在处理这块问题时,从实现方面而言并不是很规整,有时候会在input中在读取数据源时直接暗含了事件划分过程,然后再codec中就无需再做考虑。所以这种情况造成了一个codec有时并不能适用于所有的input,甚至是会限定仅有个别的input才可以使用,而对于一个input也并非所有的codec都能直接用。而这并不是最糟糕的,最糟糕的问题在于文档中关于这些并没有太详细的说法,只能多看看源码了。

2.3 Codec的decode与encode

codec实例中会提供两个方法实现:decodeencode,前者用于input过程,后者用于output过程。看上去是互逆且对称,但事实并非如此。其实,甚至有很多codec只是仅仅只会工作于inputoutput其一。举些例子来说,dotsrubydebug只有在output中才有意义;collectd压根就没实现encode方法。

所以,从事实情况来看,与其将codec按字面意思理解,倒不如将其看作processing pipe line中的aop切入点。当input与output的框架既定之后,利用codec切入,优雅地做一些数据加工和转换,有时候会比使用filter来的更合适。

附录:关于字符编码

以下按时间发展顺序列述:

(F1.)ANSI:ASCII

长度:1Byte (8bit) 共256种状态

内容:0x00-0x20为控制码,0x21-0x7F为标准字符集,0x80-0xFF为扩展字符集。

ANSI:American National Standards Institute ASCII:American Standard Code for Information Interchange

(F2.)GB2312

长度:2Byte (16bit)

内容:兼容ASCII,是中国人第一次对 ASCII 做出中文扩展。在 ASCII 的基础上,规定一个小于127(0x7F)的字符的意义与原来相同,但两个大于127的字符连在一起时,就表示一个汉字。大约7000多个简体汉字了。还把数学符号、罗马希腊的 字母、日文的假名们都编进去了,连在 ASCII 里本来就有的数字、标点、字母都统统重新编了两个字节长的编码,这就是常说的"全角"字符,而原来在127号以下的那些就叫"半角"字符了。

(F3.)GBK

长度:2Byte (16bit)

内容:GB2312逐渐不能够满足罕见汉字的编码需求时,中国人进一步扩展了GB2312,或者说是第二次对 ASCII 做出中文扩展,不再要求低字节一定是127号之后的内码,只要第一个字节是大于127就固定表示这是一个汉字的开始,不管后面跟的是不是扩展字符集里的内容。GBK 包括了 GB2312 的所有内容,同时又增加了近20000个新的汉字(包括繁体字)和符号。

(F4.)GB18030

长度:2Byte (16bit)

内容:GBK逐渐不能满足我国少数民族文化的信息化发展需求,中国人再进一步扩展GBK,或者说是第三次对 ASCII 做出中文扩展,又加了几千个新的少数民族的字,GBK 扩成了 GB18030。

(F5.)双字节编码小结

GB2312、GBK与GB18030是ASCII在中文世界的逐步扩展。

GB2312、GBK与GB18030通称他们叫做 "DBCS"(Double Byte Charecter Set 双字节字符集)。在DBCS系列标准里,最大的特点是两字节长的汉字字符和一字节长的英文字符并存于同一套编码方案里,程序里为了支持中文处理,必须要注意字串里的每一个字节的值,如果这个值是大于127的,那么就认为一个双字节字符集里的字符出现了。

因为当时各个国家都像中国这样搞出一套自己的编码标准,结果就是兼容性问题很难搞。即使同为一种语言系统的台湾地区,也区别于大陆搞了套BIG5。

(F6.)UNICODE

ISO (国际标谁化组织)的国际组织决定着手解决这个问题。他们采用的方法很简单:废了所有的地区性编码方案,重新搞一个包括了地球上所有文化、所有字母和符号 的编码!叫它"Universal Multiple-Octet Coded Character Set",简称 UCS,俗称 "UNICODE"。

UNICODE 开始制订时,计算机的存储器容量极大地发展了,空间再也不成为问题了。于是 ISO 就直接规定必须用两个字节,也就是16位来统一表示所有的字符,对于ASCII里的那些“半角”字符,UNICODE 包持其原编码不变,只是将其长度由原来的8位扩展为16位,而其他文化和语言的字符则全部重新统一编码。由于"半角"英文符号只需要用到低8位,所以其高 8位永远是0,因此这种大气的方案在保存英文文本时会多浪费一倍的空间。

需要注意的是,UNICODE 在制订时没有考虑与任何一种现有的编码方案保持兼容,这使得 GBK 与 UNICODE 在汉字的内码编排上完全是不一样的,没有一种简单的算术方法可以把文本内容从UNICODE编码和另一种编码进行转换,这种转换必须通过查表来进行。

如前所述,UNICODE 是用两个字节来表示为一个字符,他总共可以组合出65535不同的字符,这大概已经可以覆盖世界上所有文化的符号。如果还不够也没有关系,ISO已经准备 了UCS-4方案(平时所说的UNICODE都是UCS2),说简单了就是四个字节来表示一个字符,这样我们就可以组合出21亿个不同的字符出来(最高位有其他用途)。

(F7.)UNICODE 与 UTF

UCS只是规定如何编码,并没有规定如何传输、保存这个编码。UTF(UCS Transfer Format)就是专门解决这个问题的。顾名思义,UTF8就是每次8个位传输数据,而UTF16就是每次16个位。从UNICODE到 UTF时并不是直接的对应,而是要过一些算法和规则来转换。

从UCS-2到UTF-8的编码方式如下:

UCS-2编码(16进制) UTF-8 字节流(二进制) 0000 - 007F 0xxxxxxx 0080 - 07FF 110xxxxx 10xxxxxx 0800 - FFFF 1110xxxx 10xxxxxx 10xxxxxx

可以看到UTF8本身起到了节省空间的作用,并且UTF8的一个特别的好处是它与ISO-8859-1完全兼容(ISO/IEC8859-1,又称Latin-1或“西欧语言”,是国际标准化组织内ISO/IEC 8859的第一个8位字符集。它以ASCII为基础,在空置的0xA0-0xFF的范围内,加入96个字母及符号,藉以供使用变音符号的拉丁字母语言使用)。

UTF-16以16位为单元对UCS进行编码。对于小于0x10000的UCS码,UTF-16编码就等于UCS码对应的16位无符号整数。对于不小于0x10000的UCS码,定义了一个算法。不过由于实际使用的UCS2,或者UCS4的BMP必然小于0x10000,所以就目前而言,可以认为UTF-16和UCS-2基本相同。但UCS-2只是一个编码方案,UTF-16却要用于实际的传输,所以就不得不考虑字节序的问题。

(F8.)UTF的字节序和BOM

UTF-8以字节为编码单元,没有字节序的问题。UTF-16以两个字节为编码单元,在解释一个UTF-16文本前,首先要弄清楚每个编码单元的字节序。例如“奎”的Unicode编码是594E,“乙”的Unicode编码是4E59。如果我们收到UTF-16字节流“594E”,那么这是“奎”还是“乙”?

Unicode规范中推荐的标记字节顺序的方法是BOM。BOM是Byte order Mark。BOM是一个有点小聪明的想法:

在UCS编码中有一个叫做"ZERO WIDTH NO-BREAK SPACE"的字符,它的编码是FEFF。而FFFE在UCS中是不存在的字符,所以不应该出现在实际传输中。UCS规范建议我们在传输字节流前,先传输字符"ZERO WIDTH NO-BREAK SPACE"。

这样如果接收者收到FEFF,就表明这个字节流是Big-Endian的;如果收到FFFE,就表明这个字节流是Little-Endian的。因此字符"ZERO WIDTH NO-BREAK SPACE"又被称作BOM。

UTF-8不需要BOM来表明字节顺序,但可以用BOM来表明编码方式。字符"ZERO WIDTH NO-BREAK SPACE"的UTF-8编码是EF BB BF(读者可以用我们前面介绍的编码方法验证一下)。所以如果接收者收到以EF BB BF开头的字节流,就知道这是UTF-8编码了。

Windows就是使用BOM来标记文本文件的编码方式的。

(F9.)参考资料

"Short overview of ISO-IEC 10646 and Unicode" (http://www.nada.kth.se/i18n/ucs/unicode-iso10646-oview.html)。

"Understanding Unicode A general introduction to the Unicode Standard" (http://scripts.sil.org/cms/scripts/page.php?site_id=nrsi&item_id=IWS-Chapter04a)

"Character set encoding basics Understanding character set encodings and legacy encodings" (http://scripts.sil.org/cms/scripts/page.php?site_id=nrsi&item_id=IWS-Chapter03)

展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部