文档章节

flume自定义Interceptor

c
 chunhei2008
发布于 2015/01/19 19:46
字数 905
阅读 246
收藏 1

背景:

在工作中遇到这样一个问题,nginx老的收集的日志格式为前面使用“,”分割,后面使用“空格”分割,最后一个user-agent字段(历史遗留问题),现有业务是采用PHP 写脚本跑,将数据转换为使用“\t”分割,user-agent字段抽取出os、browser、browser version三个字段写到文件再有scp同步到日志中转机器,日志数量巨大,PHP处理遇到瓶颈,我们也正在改造日志收集系统接入hadoop集群。

flume现状:

这种比较个性化的转换flume没有相关插件

分析:

flume event 针对source为文本文件时,会一行一个event(默认小于2048长度)

而拦截器就是针对event来做处理的

代码:

package com.wy.flume.interceptor;

import com.google.common.base.Charsets;
import com.google.common.collect.Lists;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;  

public class AdRefererLogFormatInterceptor implements Interceptor {
    //匹配user-agent
    private static final Pattern pattern = Pattern.compile("^\"(.*)\"\\s\"(.*)\"(.*)$");
    private static final HashMap<String, String> platform = initPlatforms();
    private static final HashMap<String, String> browser = initBrowsers();

    private static HashMap<String, String> initPlatforms() {
        HashMap<String, String> platforms = new HashMap<>();
        platforms.put("windows nt 6.2", "Win8");
        platforms.put("windows nt 6.2", "Win8");
        platforms.put("windows nt 6.1", "Win7");
        platforms.put("windows nt 6.0", "Win Longhorn");
        platforms.put("windows nt 5.2", "Win2003");
        platforms.put("windows nt 5.0", "Win2000");
        platforms.put("windows nt 5.1", "WinXP");
        platforms.put("windows nt 4.0", "Windows NT 4.0");
        platforms.put("winnt4.0", "Windows NT 4.0");
        platforms.put("winnt 4.0", "Windows NT");
        platforms.put("winnt", "Windows NT");
        platforms.put("windows 98", "Win98");
        platforms.put("win98", "Win98");
        platforms.put("windows 95", "Win95");
        platforms.put("win95", "Win95");
        platforms.put("windows", "Unknown Windows OS");
        platforms.put("os x", "MacOS X");
        platforms.put("ppc mac", "Power PC Mac");
        platforms.put("freebsd", "FreeBSD");
        platforms.put("ppc", "Macintosh");
        platforms.put("linux", "Linux");
        platforms.put("debian", "Debian");
        platforms.put("sunos", "Sun Solaris");
        platforms.put("beos", "BeOS");
        platforms.put("apachebench", "ApacheBench");
        platforms.put("aix", "AIX");
        platforms.put("irix", "Irix");
        platforms.put("osf", "DEC OSF");
        platforms.put("hp-ux", "HP-UX");
        platforms.put("netbsd", "NetBSD");
        platforms.put("bsdi", "BSDi");
        platforms.put("openbsd", "OpenBSD");
        platforms.put("gnu", "GNU/Linux");
        platforms.put("unix", "Unknown Unix OS");
        return platforms;
    }

    private static HashMap<String, String> initBrowsers() {
        HashMap<String, String> browsers = new HashMap<>();
        browsers.put("Flock", "Flock");
        browsers.put("Chrome", "Chrome");
        browsers.put("Opera", "Opera");
        browsers.put("MSIE", "IE");
        browsers.put("Internet Explorer", "IE");
        browsers.put("Shiira", "Shiira");
        browsers.put("Firefox", "Firefox");
        browsers.put("Chimera", "Chimera");
        browsers.put("Phoenix", "Phoenix");
        browsers.put("Firebird", "Firebird");
        browsers.put("Camino", "Camino");
        browsers.put("Netscape", "Netscape");
        browsers.put("OmniWeb", "OmniWeb");
        browsers.put("Safari", "Safari");
        browsers.put("Mozilla", "Mozilla");
        browsers.put("Konqueror", "Konqueror");
        browsers.put("icab", "iCab");
        browsers.put("Lynx", "Lynx");
        browsers.put("Links", "Links");
        browsers.put("hotjava", "HotJava");
        browsers.put("amaya", "Amaya");
        browsers.put("IBrowse", "IBrowse");
        return browsers;
    }

    private AdRefererLogFormatInterceptor() {
    }

    @Override
    public void initialize() {
        // NO-OP...
    }

    @Override
    public void close() {
        // NO-OP...
    }

    @Override
    public Event intercept(Event event) {
        String body = new String(event.getBody(), Charsets.UTF_8);
        String[] fields = body.split(",", 8);
        StringBuilder sb = new StringBuilder();
        sb.append(fields[0]);
        sb.append('\t');
        sb.append(fields[1]);
        sb.append('\t');
        sb.append(fields[2]);
        sb.append('\t');
        sb.append(fields[3]);
        sb.append('\t');
        sb.append(fields[4]);
        sb.append('\t');
        sb.append(fields[5]);
        sb.append('\t');
        sb.append(fields[6]);
        sb.append('\t');

        Matcher submatcher = pattern.matcher(fields[7].trim());
        String url = "";
        String os = "others";
        String br = "others";
        String ver = "";
        if (submatcher.matches()) {
            url = submatcher.group(1);
            String agent = submatcher.group(2);
            //匹配操作系统
            Set<String> platformKeys = platform.keySet();
            for (String platformKey : platformKeys) {
                Pattern pattern = Pattern.compile( Pattern.quote(platformKey) , Pattern.CASE_INSENSITIVE);
                Matcher matcher = pattern.matcher(agent);
                if (matcher.find()) {
                    os = platform.get(platformKey);
                    break;
                }
            }
            //匹配浏览器 和版本
            Set<String> browserKeys = browser.keySet();
            for (String browserKey : browserKeys) {
                Pattern pattern = Pattern.compile( Pattern.quote(browserKey) + ".*?([0-9\\.]+)", Pattern.CASE_INSENSITIVE);
                Matcher matcher = pattern.matcher(agent);
                if (matcher.find()) {
                    ver = matcher.group(1);
                    br = browser.get(browserKey);
                    break;
                }
            }
        }
        sb.append(url);
        sb.append('\t');
        sb.append(os);
        sb.append('\t');
        sb.append(br);
        sb.append('\t');
        sb.append(ver);
        //修改event body
        event.setBody(sb.toString().getBytes());
        return event;
    }

    @Override
    public List<Event> intercept(List<Event> events) {
        List<Event> intercepted = Lists.newArrayListWithCapacity(events.size());
        for (Event event : events) {
            Event interceptedEvent = intercept(event);
            if (interceptedEvent != null) {
                intercepted.add(interceptedEvent);
            }
        }
        return intercepted;
    }
    
    public static class Builder implements Interceptor.Builder {
        //使用Builder初始化Interceptor
        @Override
        public Interceptor build() {
            return new AdRefererLogFormatInterceptor();
        }

        @Override
        public void configure(Context context) {
            
        }
    }
}



部署:

1、将程序打包成AdRerfererLogInterceptor.jar

2、将jar包上传到FLUME_HOME的lib目录下(flume1.5采用bin安装)

3、在配置文件中使用Interceptor

hdp2.sources.s1.interceptors = i1
hdp2.sources.s1.interceptors.i1.type = com.wy.flume.interceptor.AdRefererLogFormatInterceptor$Builder

优势:

在数据传输的同时进行数据的处理,节省步骤,而且有flume帮组管理文件进度,程序中断时不用手动做恢复(file channel)

总结:

在Interceptor中可以对event的header 和 body 进行处理,进而达到定制化的目的。

© 著作权归作者所有

共有 人打赏支持
c
粉丝 3
博文 27
码字总数 8480
作品 2
广州
高级程序员
阿里大数据工程师教你怎样理解Flume

lume是干什么的? 收集日志的 flume如何搜集日志? 我们把flume比作情报人员 (1)搜集信息 (2)获取记忆信息 (3)传递报告间谍信息 flume是怎么完成上面三件事情的,三个组件: source: ...

JAVA丶学习
04/14
0
0
Flume NG 简介及配置实战

Flume 作为 cloudera 开发的实时日志收集系统,受到了业界的认可与广泛应用。Flume 初始的发行版本目前被统称为 Flume OG(original generation),属于 cloudera。但随着 FLume 功能的扩展,...

大数据之路
2014/07/08
0
9
Flume - Kafka日志平台整合

1. Flume介绍 Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行...

mantoudev
03/21
0
0
两个flume的拦截器(interceptor)

flume支持拦截器(interceptors)机制,是在source这个层面上工作,这里有两个拦截器 1,支持将日志体(event body)里面的字符串替换成另一个字符串。配置文件http://git.oschina.net/atuc...

午火
2014/06/11
0
0
flume之sink与channel(4)

Hdfs sink(也是最重要的一个) #sink test.sinks.si1.type=logger logger的意思就是把我们收集到的日志打印到我们的屏幕上。是提供我们测试用的 hadoop fs -mkdir /flume/log hadoop dfsadmi...

lixiyuan
2014/04/03
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

php 使用redis锁限制并发访问类

1.并发访问限制问题 对于一些需要限制同一个用户并发访问的场景,如果用户并发请求多次,而服务器处理没有加锁限制,用户则可以多次请求成功。 例如换领优惠券,如果用户同一时间并发提交换领...

豆花饭烧土豆
22分钟前
0
0
Linux环境搭建 | 手把手教你配置Linux虚拟机

在上一节 「手把你教你安装Linux虚拟机」 里,我们已经安装好了Linux虚拟机,在这一节里,我们将配置安装好的Linux虚拟机,使其达到可以开发的程度。 Ubuntu刚安装完毕之后,还无法进行开发,...

良许Linux
23分钟前
0
0
Nginix开启SSL支持HTTPS访问(自签名方法)

Nginix开启SSL支持HTTPS访问(自签名方法) 超文本传输安全协议(缩写:HTTPS,英语:Hypertext Transfer Protocol Secure)是超文本传输协议和SSL/TLS的组合,用以提供加密通讯及对网络服务器...

openthings
40分钟前
0
0
(三)Nginx配置·续

概述 前文写了关于Nginx环境配置,但是还没有完,接下来将会继续讲三个相关的配置 主要是以下三个 1.Nginx访问日志 2.Nginx日志切割 3.静态文件不记录日志和过期时间 Nginx访问日志 1.先看看...

杉下
今天
1
0
jquery创建类似于java的map

var map = {}; // Map map = new HashMap(); map[key] = value; // map.put(key, value); var value = map[key]; // Object value = map.get(key); var has = key in map; // boolean has = ......

SuperDabai
今天
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部