文档章节

logstash笔记

各种打杂
 各种打杂
发布于 2017/08/22 17:07
字数 4795
阅读 353
收藏 0

把之前的logstash笔记分享,轻拍。

Logstash
    doc
        https://www.elastic.co/guide/en/logstash/current/plugins-filters-mutate.html#plugins-filters-mutate-update
        
    wget https://artifacts.elastic.co/downloads/logstash/logstash-5.4.0.tar.gz
    2.x需要java7的支持
    文件
        bin/logstash  主程序文件
        配置文件需要自己编写
    安装使用
        tar *.tar.gz
        cd logstash
        vi logstash.conf
            input  output是两个必要定义的元素
            filter 是可选的,作用是按规定修改数据
            格式
                1,命令形式 bin/logstash -e 'input { stdin { } } output { stdout {} }'    
                    stdin 是标准输入plugin
                    stdout 是标准输出,屏幕
                2,文件形式
                    input {
                       stdin {}
                    }
                    output {
                       stdout {}
                    }
                例:文件输入,输出esasticsearch
                    input {
                       file => {
                          path => "/var/log/syslog"
                          start_position => "beginning"
                          ignore_older => 0
                       }
                    }
                    output {
                       esasticsearch {
                          hosts => [ "localhost:9200" ]
                       }
                    }
                    查看导入es的数据
                        curl -XGET 'localhost:9200/logstash-*/_search?pretty&q=response=200'
    命令
        logstash-plugin
            list                          List all installed Logstash plugins
                Usage:
                    bin/logstash-plugin list [OPTIONS] [PLUGIN]
                Parameters:
                    [PLUGIN]                      Part of plugin name to search for, leave empty for all plugins
                Options:
                    --installed                   List only explicitly installed plugins using bin/logstash-plugin install ... (default: false)
                    --verbose                     Also show plugin version number (default: false)
                    --group NAME                  Filter plugins per group: input, output, filter or codec
                    -h, --help                    print help
            install                       Install a Logstash plugin
                Usage:
                    bin/logstash-plugin install [OPTIONS] [PLUGIN] ...
                Parameters:
                    [PLUGIN] ...                  plugin name(s) or file
                Options:
                    --version VERSION             version of the plugin to install
                    --[no-]verify                 verify plugin validity before installation (default: true)
                    --preserve                    preserve current gem options (default: false)
                    --development                 install all development dependencies of currently installed plugins (default: false)
                    --local                       force local-only plugin installation. see bin/logstash-plugin package|unpack (default: false)
                    -h, --help                    print help
            remove                        Remove a Logstash plugin
            update                        Update a plugin
            pack                          Package currently installed plugins, Deprecated: Please use prepare-offline-pack instead
            unpack                        Unpack packaged plugins, Deprecated: Please use prepare-offline-pack instead
            generate                      Create the foundation for a new plugin
            prepare-offline-pack          Create an archive of specified plugins to use for offline installation
    启动
        5.x以上默认开启了xpack, 如果连不上es会报错,在logstash.yml里增加xpack.monitoring.enabled: false
        命令:
            bin/logstash -e 'input { stdin { } } output { stdout {} }'    
        配置文件:
            bin/logstash -f logstash.conf
            logstash 还提供一个方便我们规划和书写配置的小功能。你可以直接用 bin/logstash -f /etc/logstash.d/ 来运行。logstash 会自动读取 /etc/logstash.d/ 目录下所有的文本文件,然后在自己内存里拼接成一个完整的大配置文件,再去执行。
                【logstash.d/*.conf 这样只会读取排第一后缀是.conf的文件】
            注意:
            logstash 列出目录下所有文件时,是字母排序的。而 logstash 配置段的 filter 和 output 都是顺序执行,所以顺序非常重要。采用多文件管理的用户,推荐采用数字编号方式命名配置文件,同时在配置中,严谨采用 if 判断限定不同日志的动作
        参数:
            --allow-unsafe-shutdown  迫使logstash退出时关闭,默认情况下,logstash将拒绝离开,直到所有收到事件输出完
            -w, --pipeline-workers COUNT  工作的pipeline数量,默认是 CPU 核数
            -b, --pipeline-batch-size     每个 Logstash pipeline 线程,在执行具体的 filter 和 output 函数之前,最多能累积的日志条数。默认是 125 条。越大性能越好,同样也会消耗越多的 JVM 内存
            -u, --pipeline-batch-delay    每个 Logstash pipeline 线程,在打包批量日志的时候,最多等待几毫秒。默认是 5 ms
            -l, --log FILE  自身的日志
            --quiet  只打印出error级别的日志
            --verbose  日志级别
            --debug  日志级别
            -v  日志级别
            -p, --pluginpath PATH   加载自己的插件
            -t, --configtest  检查配置文件的语法错误
            -r, --[no-]auto-reload   监视配置文件的更新,并自动加载
            --reload-interval RELOAD_INTERVAL   配置文件监控的间隔,默认是3秒
            --allow-env  支持shell的变量
    语法
        Logstash从 1.3.0 版开始支持条件判断和表达式。
            字段引用、sprintf格式、条件判断只能用于filter和output,不能用于input。
            if 
                支持: grok
                不支持: mutate
                if "ab" in [fieldname] {
                  ...
                } else if "a" in "ab" { 
                  ...
                } else {
                  ...
                }
                ==(等于), !=(不等于), <(小于), >(大于), <=(小于等于), >=(大于等于)
                =~(匹配正则), !~(不匹配正则)
                in(包含), not in(不包含)
                and(与), or(或), nand(非与), xor(非或)
                ()(复合表达式), !()(对复合表达式结果取反)
                filter {
                  if [action] == "login" {
                    mutate { remove => "secret" }
                  }
                }
                output {
                  if "_grokparsefailure" not in [tags] {
                    elasticsearch { ... }
                  }
                }
                if [loglevel] == "ERROR" and [deployment] == "production"
                if [foo] in [foobar]
                if [foo] in "foo"
                if "hello" in [greeting]
                if [foo] in ["hello", "world", "foo"]
                if [missing] in [alsomissing]
                if !("foo" in ["hello", "world"])
                if [foo] #判断filed是否存在,不能加双引号
        数据类型    
            bool
                debug => true
            string
                host => "hostname"
            number
                port => 514
            array
                match => ["datetime", "UNIX", "ISO8601"]
            hash
                options => {
                    key1 => "value1",
                    key2 => "value2"
                }
            注意:如果你用的版本低于 1.2.0,哈希的语法跟数组是一样的,像下面这样写:
                match => [ "field1", "pattern1", "field2", "pattern2" ]
        字段
            #字段引用使用[]号,比如使用status做判断,if [status] = 200 {}
            #若是要取得字段的值,使用 %{ip}
            #取os的值,需要这样:[ua][os],可以把ua看作数组名,os是下标。
            字段是 Logstash::Event 对象的属性
            字段引用
                [tags]
                多维哈希表,或者叫哈希的哈希值获取
                    options => {
                        a => {
                            b => "b"
                        }
                    }
                    [options][a][0]
                logstash 的数组也支持倒序下标,即 [geoip][location][-1] 可以获取数组最后一个元素的值。
                Logstash 还支持变量内插,在字符串里使用字段引用的方法是这样:
                    "the longitude is %{[geoip][location][0]}"
    配置
        host字段自定义
            host会使用hosts文件的第一行作为内容
            127.0.0.1  localhost
        通用
            每个 logstash 过滤插件,都会有四个方法叫 add_tag, remove_tag, add_field 和 remove_field
            type 和 tags 是 logstash 事件中两个特殊的字段。通常来说我们会在输入区段中通过 type 来标记事件类型。而 tags 则是在数据处理过程中,由具体的插件来添加或者删除的
        input
            file
                file {
                    path => ["/var/log/*.log", "/var/log/message"]
                    type => "system"
                    start_position => "beginning"
                }
                Logstash 使用一个名叫 FileWatch 的 Ruby Gem 库来监听文件变化。这个库支持 glob 展开文件路径,而且会记录一个叫 .sincedb 的数据库文件来跟踪被监听的日志文件的当前读取位置。
                sincedb 文件中记录了每个被监听的文件的 inode, major number, minor number 和 pos
                一些比较有用的配置项:
                    tag
                        此设置没有默认值。向您的活动添加任意数量的任意标签
                    discover_interval
                        logstash 每隔多久去检查一次被监听的 path 下是否有新文件。默认值是 15 秒。
                    exclude
                        不想被监听的文件可以排除出去,这里跟 path 一样支持 glob 展开。
                    close_older
                        一个已经监听中的文件,如果超过这个值的时间内没有更新内容,就关闭监听它的文件句柄。默认是 3600 秒,即一小时。
                    ignore_older
                        在每次检查文件列表的时候,如果一个文件的最后修改时间超过这个值,就忽略这个文件。默认是 86400 秒,即一天。
                    sincedb_path
                        记录文件读取到的位置(如果文件被切割,我的想法是:logstash只会写新的值进来,新进来的日志仍然正常读取)
                        如果你不想用默认的 $HOME/.sincedb(Windows 平台上在 C:\Windows\System32\config\systemprofile\.sincedb),可以通过这个配置定义 sincedb 文件到其他位置。
                    sincedb_write_interval
                        logstash 每隔多久写一次 sincedb 文件,默认是 15 秒。
                    stat_interval
                        logstash 每隔多久检查一次被监听文件状态(是否有更新),默认是 1 秒。
                    start_position
                        只有在一开始的时候才有效,如果有了sincedb的信息,此选项无效
                        logstash 从什么位置开始读取文件数据,默认是结束位置,也就是说 logstash 进程会以类似 tail -F 的形式运行。如果你是要导入原有数据,把这个设定改成 "beginning",logstash 进程就从头开始读取,有点类似 cat,但是读到最后一行不会终止,而是继续变成 tail -F。
                注意
                    FileWatch 只支持文件的绝对路径,而且会不自动递归目录。
                    能写成 path => "/path/to/*/*/*/*.log"。FileWatch 模块提供了一个稍微简单一点的写法:/path/to/**/*.log,用 ** 来缩写表示递归全部子目录。
                    start_position 仅在该文件从未被监听过的时候起作用。如果 sincedb 文件中已经有这个文件的 inode 记录了,那么 logstash 依然会从记录过的 pos 开始读取数据。所以重复测试的时候每回需要删除 sincedb 文件(官方博客上提供了另一个巧妙的思路:将 sincedb_path 定义为 /dev/null,则每次重启自动从头开始读)。
                    因为 windows 平台上没有 inode 的概念,Logstash 某些版本在 windows 平台上监听文件不是很靠谱。windows 平台上,推荐考虑使用 nxlog。
            stdin
                标准输入
            syslog 
                本机的 syslog 就会默认发送到 logstash 里
            redis
                list => BLPOP
                channel => SUBSCRIBE
                pattern_channel => PSUBSCRIBE
                配置:
                input {
                    redis {
                        batch_count => 1
                        data_type => "list"
                        key => "logstash-*"
                        host => "192.168.0.2"
                        port => 6379
                        threads => 5
                    }
                }
            kafka {
                #https://www.elastic.co/guide/en/logstash/2.3/plugins-inputs-kafka.html#plugins-inputs-kafka-zk_connect
                #----------- 5.0以下的配置
                zk_connect => "10.26.93.65:2181,127.0.0.1:2181"  #Zookeeper的地址
                group_id => "nginx_log"
                #topic订阅有三个可选topic_id or white_list or black_list
                #white_list => "sudiyi_.*"  #Whitelist of topics to include for consumption.
                #black_list => "xx.*|aabb"  #Blacklist of topics to exclude from consumption.
                topic_id => "app_server" #单个topic订阅
                consumer_threads => 5  #应该与kafka的Partition一样
                # ----------5.0以上配置
                bootstrap_servers => "10.26.93.65:9092,127.0.0.1:9092" #kafka集群的地址
                group_id => "nginx_log"
                #topic
                codec => "json" #约定数据传输格式,就是编码和解码
                topics => ["sudiyi_express_admin","user_center_service"]
                #topics_pattern => "user_center.*" #如果这个存在,topics会被忽略
                #线程完美的配置应该和kafkapartitions一样,过多的线程是浪费的
                consumer_threads => 3
            }
        编码插件
            input | decode | filter | encode | output
        filter
            grok 
                调试:http://grokdebug.herokuapp.com/
                自带:
                    %{IPORHOST:clientip} 
                    %{USER:ident} 
                    %{USER:auth} 
                    %{HTTPDATE:timestamp}
                    %{WORD:verb} 
                    %{NOTSPACE:request}
                    %{NUMBER:httpversion}
                    %{DATA:rawrequest}
                    %{NUMBER:response}
                    %{NUMBER:bytes}
                    %{COMMONAPACHELOG} 
                    %{QS:referrer} 
                    %{QS:agent}
                pattern => “\[(?<datetime>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2},\d{3})\]\s(?<level>\w*)\s\”Crawl\surl:(?<url>(.*))”
                Grok 支持把预定义的 grok 表达式 写入到文件中
                格式
                    定义:USERNAME [a-zA-Z0-9._-]+    
                    引用:USER %{USERNAME}
                            %{PATTERN_NAME:capture_name:data_type}
                                实例:%{NUMBER:request_time:float}
                patterns_dir 选项来指明grok 表达式文件
                    patterns_dir => "/path/to/your/own/patterns"
                多行匹配
                    在和 codec/multiline 搭配使用的时候,需要注意一个问题,grok 正则和普通正则一样,默认是不支持匹配回车换行的。就像你需要 =~ //m 一样也需要单独指定,具体写法是在表达式开始位置加 (?m) 标记
                多项选择
                    match => [
                        "message", "(?<request_time>\d+(?:\.\d+)?)",
                        "message", "%{SYSLOGBASE} %{DATA:message}",
                        "message", "(?m)%{WORD}"f
                    ]
                    logstash 会按照这个定义次序依次尝试匹配,到匹配成功为止
                remove_field 参数来删除掉 message 字段,或者用 overwrite 参数来重写默认的 message 字段,只保留最重要的部分
                    remove_fileld => ["message"]
            dissect    
                logstash不自带, 需要手动安装bin/logstash-plugin install logstash-filter-dissect
                这是一个跟grok比较像的过滤器
                一个dissect过滤器最好只mapping一个字段,尝试过mapping两个字段会报key的错误,因为第二个字段是从第一个字段产生的,可能是因为第二个字段不能及时产生造成的报错
                    例子数据:
                        2017-05-04 19:06:48,079 [ForkJoinPool-1-worker-11] INFO  c.s.t.s.s.i.ReceiveEventServiceImpl:148: Heartbeat deviceId= 1000269
                    mapping => {
                        "message" => "%{datetime} [%{thread_name}] %{level} %{action} %{info_name} %{info_body}"
                    }
                    # "要匹配的字段" => "匹配语法"
                    #结果: datetime: 2017-05-04 19:06:48,079
                            thread_name: ForkJoinPool-1-worker-11
                            level: INFO
                            ...
                    #Heartbeat后面归到一个fileld,然后可以用kv过滤器处理

            date
                #匹配上了会自动替换@timestamp字段
                #SSS是毫秒
                date {
                    #        [ "field_name", "时间格式1", "时间格式2" ]
                    match => ["date_field", "ISO8601", "Y-MM-dd HH:mm:ss,SSS"]
                    remove_field => [ "date_field", "my_extraneous_field" ]    删除
                }
                UNIX_MS  毫秒时间戳
                UNIX  秒时间戳

                很多中国用户经常提一个问题:为什么 @timestamp 比我们晚了 8 个小时?怎么修改成北京时间?
                其实,Elasticsearch 内部,对时间类型字段,是统一采用 UTC 时间,存成 long 长整形数据的!对日志统一采用 UTC 时间存储,是国际安全/运维界的一个通识——欧美公司的服务器普遍广泛分布在多个时区里——不像中国,地域横跨五个时区却只用北京时间。
                对于页面查看,ELK 的解决方案是在 Kibana 上,读取浏览器的当前时区,然后在页面上转换时间内容的显示。
                所以,建议大家接受这种设定。否则,即便你用 .getLocalTime 修改,也还要面临在 Kibana 上反过去修改,以及 Elasticsearch 原有的 ["now-1h" TO "now"] 这种方便的搜索语句无法正常使用的尴尬。
            geoip 
                geoip {
                    source => "message"
                }
                过滤
                geoip {
                    fields => ["city_name", "continent_code", "country_code2", "country_code3", "country_name", "dma_code", "ip", "latitude", "longitude", "postal_code", "region_name", "timezone"]
                }
                需要注意的是:geoip.location 是 logstash 通过 latitude 和 longitude 额外生成的数据。所以,如果你是想要经纬度又不想重复数据的话,应该像下面这样做:
                    filter { 
                        geoip { 
                            fields => ["city_name", "country_code2", "country_name", "latitude", "longitude", "region_name"] 
                            remove_field => ["[geoip][latitude]", "[geoip][longitude]"] 
                        } 
                    }
            mutate
                数据类型修改
                     mutate {
                        convert => ["request_time", "float"]
                        #5.2语法
                        #convert => { "info_content[deviceId]" => "integer" }
                        #convert => { "msgId" => "integer" }
                    }
                字符串处理
                    gsub
                        通过应用正则表达式和替换来转换字符串字段。 如果字段不是字符串,则不会执行任何操作。
                        add和gsub最好不要放到一个mutate里面,有时候刚add,gsub执行没有效果
                            gsub => [
                                "urlparams", "[\\?#]", "_",              #替换反斜杠,问号,#号
                                "fieldname", "/", "_"                 # _替换为/
                            ]  
                       split
                        filter {
                            mutate {
                                split => ["message", "|"]
                            }
                        }            
                    join
                        仅对数组类型字段有效
                        我们在之前已经用 split 割切的基础再 join 回去。配置改成:
                        filter {
                            mutate {
                                split => ["message", "|"]
                            }
                            mutate {
                                join => ["message", ","]
                            }
                        }
                    替换一个field的内容, %{}是引用字段的内容
                    replace => {
                        "type" => "app_nginx_%{type}"

                    }
                增加字段
                    add_field => { "visit_device" => "%{[http_user_agent][0]}" }
                    merge
                        合并两个数组或者哈希字段。依然在之前 split 的基础上继续:
                        filter {
                            mutate {
                                split => ["message", "|"]
                            }
                            mutate {
                                merge => ["message", "message"]
                            }
                        }
                字段处理
                    rename
                        重命名某个字段,如果目的字段已经存在,会被覆盖掉:
                            filter {
                                mutate {
                                    rename => ["syslog_host", "host"]
                                }
                            }
                    update
                        更新某个字段的内容。如果字段不存在,不会新建。
                            mutate {
                                update => { "sample" => "My new message" }
                            }
                    replace
                        作用和 update 类似,但是当字段不存在的时候,它会起到 add_field 参数一样的效果,自动添加新的字段。
                执行次序
                    需要注意的是,filter/mutate 内部是有执行次序的。其次序如下:
                        rename(event) if @rename
                        update(event) if @update
                        replace(event) if @replace
                        convert(event) if @convert
                        gsub(event) if @gsub
                        uppercase(event) if @uppercase
                        lowercase(event) if @lowercase
                        strip(event) if @strip
                        remove(event) if @remove
                        split(event) if @split
                        join(event) if @join
                        merge(event) if @merge
                        filter_matched(event)
            split 拆分事件
                split 插件中使用的是 yield 功能,其结果是 split 出来的新事件,会直接结束其在 filter 阶段的历程,也就是说写在 split 后面的其他 filter 插件都不起作用,进入到 output 阶段。所以,一定要保证 split 配置写在全部 filter 配置的最后
                terminator    指定切割的间隔符, Default value is "\n" 
                field 指定切割的域, default value is "message"
            kv
                处理key=value这种数据结构
                source  数据源,Default value is "message"
                target  把取出来的数据放到一个容器里面,相当于list
                    target => "kv"
                transform_key   转变key的值为大写或小写
                transform_value  转变value的值为大写或小写
                trimkey  修剪key包含的字符like [ or ] using \.
                trim  修剪value包含的字符like [ or ] using \.(有时候value后面会包括一些无用的分隔符,需要用这个设置处理掉)
                value_split  按什么符号切分,Default value is "="

        output
            file {
                path => "/path/xx"
                codec => line { format => "custom format: %{message}"}
            }
            codec => dots #把每个 event 都变成一个点(.)
            codec=>rubydebug
            null {} 抛弃所有 event
            输出插件统一具有一个参数是 workers。Logstash 为输出做了多线程的准备
            kafka
                kafka版本是0.9要对应使用logstash版本2.3.x以下,否则导不了数据到kafka
                fluntd也有kafka插件
                kafka 
                    kafka{
                        bootstrap_servers => "xx.sudiyi.cn:9092" #由于kafka集群配置,可能这里需要使用域名:端口(在host文件里配)
                        topic_id => "test01"
                        codec => "json" #约定数据传输格式,就是编码和解码
                            # https://www.elastic.co/guide/en/logstash/current/configuration-file-structure.html#codec
                            # json 
                            # plain 无格式
                    }
            elasticsearch 
                HTTP , node 和 transport 方式
                elasticsearch {
                    hosts => ["es-node02:9200"] #最好在hosts里映射ip地址
                    index => "logstash-%{type}-%{+YYYY.MM.dd}"
                    document_type => "%{type}"
                    workers => 1  #5.x这个是string类型
                    flush_size => 20000
                    idle_flush_time => 10
                    template_overwrite => true
                    user => ""
                    password => ""
                }

                配置
                    批量发送
                        flush_size 和 idle_flush_time 共同控制 Logstash 向 Elasticsearch 发送批量数据的行为。以上面示例来说:Logstash 会努力攒到 20000 条数据一次性发送出去,但是如果 10 秒钟内也没攒够 20000 条,Logstash 还是会以当前攒到的数据量发一次。
                    索引名
                        写入的 ES 索引的名称,这里可以使用变量。为了更贴合日志场景,Logstash 提供了 %{+YYYY.MM.dd} 这种写法。在语法解析的时候,看到以 + 号开头的,就会自动认为后面是时间格式,尝试用时间格式来解析后续字符串。所以,之前处理过程中不要给自定义字段取个加号开头的名字……
                        此外,注意索引名中不能有大写字母,否则 ES 在日志中会报 InvalidIndexNameException,但是 Logstash 不会报错,这个错误比较隐晦,也容易掉进这个坑中。

                Logstash 1.4.2 在 transport 和 http 协议的情况下是固定连接指定 host 发送数据。从 1.5.0 开始,host 可以设置数组,它会从节点列表中选取不同的节点发送数据,达到 Round-Robin 负载均衡的效果。
                Kibana4 强制要求 ES 全集群所有 node 版本在 1.4 以上,Kibana4.2 要求 ES 2.0 以上。所以采用 node 方式发送数据的 logstash-1.4(携带的 Elasticsearch.jar 库是 1.1.1 版本) 会导致 Kibana4 无法运行,采用 Kibana4 的读者务必改用 http 方式。
                开发者在 IRC freenode#logstash 频道里表示:"高于 1.0 版本的 Elasticsearch 应该都能跟最新版 logstash 的 node 一起正常工作"。此信息仅供参考,请认真测试后再上线。
                经常有同学问,为什么 Logstash 在有多个 conf 文件的情况下,进入 ES 的数据会重复,几个 conf 数据就会重复几次。其实问题原因在之前启动参数章节有提过,output 段顺序执行,没有对日志 type 进行判断的各插件配置都会全部执行一次。在 output 段对 type 进行判断的语法如下所示:
                    output {
                        if [type] == "nginxaccess" {
                          elasticsearch { }
                        }
                    }
                老版本的性能问题
                    Logstash 1.4.2 在 http 协议下默认使用作者自己的 ftw 库,随同分发的是 0.0.39 版。该版本有内存泄露问题,长期运行下输出性能越来越差!
                    解决办法:
                        对性能要求不高的,可以在启动 logstash 进程时,配置环境变量 ENV["BULK"],强制采用 elasticsearch 官方 Ruby 库。命令如下:
                        export BULK="esruby"
                        对性能要求高的,可以尝试采用 logstash-1.5.0RC2 。新版的 outputs/elasticsearch 放弃了 ftw 库,改用了一个 JRuby 平台专有的 Manticore 库。根据测试,性能跟 ftw 比相当接近。
                        对性能要求极高的,可以手动更新 ftw 库版本,目前最新版是 0.0.42 版,据称内存问题在 0.0.40 版即解决。
                模板
                    Elasticsearch 支持给索引预定义设置和 mapping(前提是你用的 elasticsearch 版本支持这个 API)。Logstash 自带有一个优化好的模板
                    默认使用./vendor/bundle/jruby/1.9/gems/logstash-output-elasticsearch-2.7.1-java/lib/logstash/outputs/elasticsearch/elasticsearch-template.json 这个模板去插入elasticsearch数据,在比较老的版本模板不会自动创建.raw,这会造成elasticsearch在对字符串做聚合查询的时间报Field data loading is forbidden on [xxxx]错误

 

 

附,之前的一个匹配:

patterns:

STATUS1 [A-Z]+
TIME \d+\.\d+
DATE \d{4}-\d{1,2}-\d{1,2}\ \d{1,2}:\d{1,2}:\d{1,2}
RESPONSE_AGREEMENT [A-Z]+
METHOD (POST)|(GET)
RESPONSE_METHOD /\S+
DEVICE_ID \d{1,7}
mail_nu \d{1,50}
mobiles \d+
tag1 [a-zA-Z0-9._-]+


MAIN_SC \[%{DATE:logtime}\] device_id: %{DEVICE_ID:device_id}, mail_no: %{mail_nu:mail_no}, mobile: %{mobiles:mobile}, status: %{STATUS1:status}, \[%{tag1:tag}\]

 

日志:

[2017-05-22 17:06:57] device_id: 1022456, mail_no: 3965330139219, mobile: 13603840208, status: OK, [NEW_API]


 

© 著作权归作者所有

共有 人打赏支持
各种打杂
粉丝 2
博文 5
码字总数 8673
作品 0
成都
运维
Logstash学习笔记

image 欢迎访问我的博客查看原文:http://wangnan.tech 背景 先介绍下ELK stack Elasticsearch Elasticsearch 是基于 JSON 的分布式搜索和分析引擎,专为实现水平扩展、高可用和管理便捷性而...

wanna
2017/11/03
0
0
Redis+Logstash+Elasticsearch配置笔记

1、 开机自动启动Redis 1),拷贝redis目录utils下的redisinitscript文件到/etc/init.d,并将其重命名为redisd,再运行chmod u+x redisd 2),修改redis根目录下的redis.conf,将daemonize修改...

go2school
2015/04/10
0
0
logstash 使用笔记

logstash-2.3.4的logstash.conf的配置文件 一、监控日志文件,匹配关键字,输出到指定文件/发送邮件。 input { file { path => ["你的日志文件"] } } filter { if ([message] !~ "你的匹配关...

netpeak
2017/01/18
0
0
elasticSearch2.4.1安装笔记

1.Elasticsearch安装 参考地址:https://www.elastic.co/guide/en/elasticsearch/reference/2.4/installation.html 下载地址:https://download.elastic.co/elasticsearch/release/org/elast......

hollowJ丶
2016/12/07
72
0
Logstash输出到Elasticsearch笔记

image 欢迎访问我的博客查看原文:http://wangnan.tech output配置中elasticsearch配置 action index 给一个文档建立索引 delete 通过id值删除一个文档(这个action需要指定一个id值) crea...

wanna
2017/11/07
0
0

没有更多内容

加载失败,请刷新页面

加载更多

Linux命令备忘录: jobs 显示Linux中的任务列表及任务状态命令

jobs命令用于显示Linux中的任务列表及任务状态,包括后台运行的任务。该命令可以显示任务号及其对应的进程号。其中,任务号是以普通用户的角度进行的,而进程号则是从系统管理员的角度来看的...

开元中国2015
46分钟前
1
0
springboot Whitelabel Error Page(Not Found)解决方案

当出现上图图的错误时注意 报错信息 There was an unexpected error (type=Not Found, status=404). Not Found代表未访问到资源 解决方案:比较访问路径和代码的路径有没有写错 正确的访问路...

斩神魂
46分钟前
2
0
记一次hbase master停止服务的原因以及恢复

在Hdfs空间不足的情况下,拒绝写入,hbase会down掉。如果hdfs空间没有清理的情况下,重新启动hbase,会报splitlog失败,原因是wal日志重写过程中会写hdfs,写不进去导致的。重启不成功。 解决...

PageYi
50分钟前
1
0
如何从平面设计转行到UI设计?

时代的变迁,科技的进步,工具的发展,薪资的差距,促使许多人转行的原因,但平面与界面两者之间有着哪些的差异呢?如果,想要转行又该具备哪些条件呢? 平面、界面设计之间的差异性 平面设计...

mo311
53分钟前
4
0
线程池整理

一般在生产环境中,我们都不会直接new一个Thread,然后再去start(),因为这么做会不断频繁的创建线程,销毁线程,过大的线程会耗尽CPU和内存资源,大量的垃圾回收,也会给GC带来压力,延长GC停顿时间...

算法之名
55分钟前
13
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部