文档章节

flume自定义Interceptor

c
 chunhei2008
发布于 2015/01/19 19:46
字数 905
阅读 246
收藏 1
点赞 0
评论 0

背景:

在工作中遇到这样一个问题,nginx老的收集的日志格式为前面使用“,”分割,后面使用“空格”分割,最后一个user-agent字段(历史遗留问题),现有业务是采用PHP 写脚本跑,将数据转换为使用“\t”分割,user-agent字段抽取出os、browser、browser version三个字段写到文件再有scp同步到日志中转机器,日志数量巨大,PHP处理遇到瓶颈,我们也正在改造日志收集系统接入hadoop集群。

flume现状:

这种比较个性化的转换flume没有相关插件

分析:

flume event 针对source为文本文件时,会一行一个event(默认小于2048长度)

而拦截器就是针对event来做处理的

代码:

package com.wy.flume.interceptor;

import com.google.common.base.Charsets;
import com.google.common.collect.Lists;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;  

public class AdRefererLogFormatInterceptor implements Interceptor {
    //匹配user-agent
    private static final Pattern pattern = Pattern.compile("^\"(.*)\"\\s\"(.*)\"(.*)$");
    private static final HashMap<String, String> platform = initPlatforms();
    private static final HashMap<String, String> browser = initBrowsers();

    private static HashMap<String, String> initPlatforms() {
        HashMap<String, String> platforms = new HashMap<>();
        platforms.put("windows nt 6.2", "Win8");
        platforms.put("windows nt 6.2", "Win8");
        platforms.put("windows nt 6.1", "Win7");
        platforms.put("windows nt 6.0", "Win Longhorn");
        platforms.put("windows nt 5.2", "Win2003");
        platforms.put("windows nt 5.0", "Win2000");
        platforms.put("windows nt 5.1", "WinXP");
        platforms.put("windows nt 4.0", "Windows NT 4.0");
        platforms.put("winnt4.0", "Windows NT 4.0");
        platforms.put("winnt 4.0", "Windows NT");
        platforms.put("winnt", "Windows NT");
        platforms.put("windows 98", "Win98");
        platforms.put("win98", "Win98");
        platforms.put("windows 95", "Win95");
        platforms.put("win95", "Win95");
        platforms.put("windows", "Unknown Windows OS");
        platforms.put("os x", "MacOS X");
        platforms.put("ppc mac", "Power PC Mac");
        platforms.put("freebsd", "FreeBSD");
        platforms.put("ppc", "Macintosh");
        platforms.put("linux", "Linux");
        platforms.put("debian", "Debian");
        platforms.put("sunos", "Sun Solaris");
        platforms.put("beos", "BeOS");
        platforms.put("apachebench", "ApacheBench");
        platforms.put("aix", "AIX");
        platforms.put("irix", "Irix");
        platforms.put("osf", "DEC OSF");
        platforms.put("hp-ux", "HP-UX");
        platforms.put("netbsd", "NetBSD");
        platforms.put("bsdi", "BSDi");
        platforms.put("openbsd", "OpenBSD");
        platforms.put("gnu", "GNU/Linux");
        platforms.put("unix", "Unknown Unix OS");
        return platforms;
    }

    private static HashMap<String, String> initBrowsers() {
        HashMap<String, String> browsers = new HashMap<>();
        browsers.put("Flock", "Flock");
        browsers.put("Chrome", "Chrome");
        browsers.put("Opera", "Opera");
        browsers.put("MSIE", "IE");
        browsers.put("Internet Explorer", "IE");
        browsers.put("Shiira", "Shiira");
        browsers.put("Firefox", "Firefox");
        browsers.put("Chimera", "Chimera");
        browsers.put("Phoenix", "Phoenix");
        browsers.put("Firebird", "Firebird");
        browsers.put("Camino", "Camino");
        browsers.put("Netscape", "Netscape");
        browsers.put("OmniWeb", "OmniWeb");
        browsers.put("Safari", "Safari");
        browsers.put("Mozilla", "Mozilla");
        browsers.put("Konqueror", "Konqueror");
        browsers.put("icab", "iCab");
        browsers.put("Lynx", "Lynx");
        browsers.put("Links", "Links");
        browsers.put("hotjava", "HotJava");
        browsers.put("amaya", "Amaya");
        browsers.put("IBrowse", "IBrowse");
        return browsers;
    }

    private AdRefererLogFormatInterceptor() {
    }

    @Override
    public void initialize() {
        // NO-OP...
    }

    @Override
    public void close() {
        // NO-OP...
    }

    @Override
    public Event intercept(Event event) {
        String body = new String(event.getBody(), Charsets.UTF_8);
        String[] fields = body.split(",", 8);
        StringBuilder sb = new StringBuilder();
        sb.append(fields[0]);
        sb.append('\t');
        sb.append(fields[1]);
        sb.append('\t');
        sb.append(fields[2]);
        sb.append('\t');
        sb.append(fields[3]);
        sb.append('\t');
        sb.append(fields[4]);
        sb.append('\t');
        sb.append(fields[5]);
        sb.append('\t');
        sb.append(fields[6]);
        sb.append('\t');

        Matcher submatcher = pattern.matcher(fields[7].trim());
        String url = "";
        String os = "others";
        String br = "others";
        String ver = "";
        if (submatcher.matches()) {
            url = submatcher.group(1);
            String agent = submatcher.group(2);
            //匹配操作系统
            Set<String> platformKeys = platform.keySet();
            for (String platformKey : platformKeys) {
                Pattern pattern = Pattern.compile( Pattern.quote(platformKey) , Pattern.CASE_INSENSITIVE);
                Matcher matcher = pattern.matcher(agent);
                if (matcher.find()) {
                    os = platform.get(platformKey);
                    break;
                }
            }
            //匹配浏览器 和版本
            Set<String> browserKeys = browser.keySet();
            for (String browserKey : browserKeys) {
                Pattern pattern = Pattern.compile( Pattern.quote(browserKey) + ".*?([0-9\\.]+)", Pattern.CASE_INSENSITIVE);
                Matcher matcher = pattern.matcher(agent);
                if (matcher.find()) {
                    ver = matcher.group(1);
                    br = browser.get(browserKey);
                    break;
                }
            }
        }
        sb.append(url);
        sb.append('\t');
        sb.append(os);
        sb.append('\t');
        sb.append(br);
        sb.append('\t');
        sb.append(ver);
        //修改event body
        event.setBody(sb.toString().getBytes());
        return event;
    }

    @Override
    public List<Event> intercept(List<Event> events) {
        List<Event> intercepted = Lists.newArrayListWithCapacity(events.size());
        for (Event event : events) {
            Event interceptedEvent = intercept(event);
            if (interceptedEvent != null) {
                intercepted.add(interceptedEvent);
            }
        }
        return intercepted;
    }
    
    public static class Builder implements Interceptor.Builder {
        //使用Builder初始化Interceptor
        @Override
        public Interceptor build() {
            return new AdRefererLogFormatInterceptor();
        }

        @Override
        public void configure(Context context) {
            
        }
    }
}



部署:

1、将程序打包成AdRerfererLogInterceptor.jar

2、将jar包上传到FLUME_HOME的lib目录下(flume1.5采用bin安装)

3、在配置文件中使用Interceptor

hdp2.sources.s1.interceptors = i1
hdp2.sources.s1.interceptors.i1.type = com.wy.flume.interceptor.AdRefererLogFormatInterceptor$Builder

优势:

在数据传输的同时进行数据的处理,节省步骤,而且有flume帮组管理文件进度,程序中断时不用手动做恢复(file channel)

总结:

在Interceptor中可以对event的header 和 body 进行处理,进而达到定制化的目的。

© 著作权归作者所有

共有 人打赏支持
c
粉丝 3
博文 27
码字总数 8480
作品 2
广州
高级程序员
阿里大数据工程师教你怎样理解Flume

lume是干什么的? 收集日志的 flume如何搜集日志? 我们把flume比作情报人员 (1)搜集信息 (2)获取记忆信息 (3)传递报告间谍信息 flume是怎么完成上面三件事情的,三个组件: source: ...

JAVA丶学习 ⋅ 04/14 ⋅ 0

kafka来读取flume的数据

一、查看kafka topic ./kafka-topics.sh --list --zookeeper bigdata-test-3:2181, bigdata-test-2:2181, bigdata-test-1:2181, bigdata-test-4:2181, bigdata-test-5:2181 ./kafka-topics.s......

weixin_41876523 ⋅ 05/24 ⋅ 0

解决Flume采集数据时在HDFS上产生大量小文件的问题

问题:flume指定HDFS类型的Sink时,采集数据至HDFS指定目录,会产生大量小文件。 问题重现: 1、创建flume配置文件flume-env.sh,: flume配置文件如下(根据自身需要修改): 因为flume可以...

舒运 ⋅ 06/10 ⋅ 0

Kafka实战-Flume到Kafka

1.概述   前面给大家介绍了整个Kafka项目的开发流程,今天给大家分享Kafka如何获取数据源,即Kafka生产数据。下面是今天要分享的目录: 数据来源 Flume到Kafka 数据源加载 预览   下面开...

smartloli ⋅ 2015/07/02 ⋅ 0

【Strom篇】Flume+Kafaka+Strom整合完成信息记录

第一部分:流程分析 由flume收集客户端发送的信息,经过kafka集群消费者收集,然后给storm集群数据处理(数据清洗),最后再发给kafka集群收集。 第二部分:集群搭建 flume+kafka 一、配合f...

chenshi_2753 ⋅ 05/06 ⋅ 0

其他消息中间件及场景应用(下3)

版权声明:欢迎转载,但是看在我辛勤劳动的份上,请注明来源:http://blog.csdn.net/yinwenjie(未经允许严禁用于商业用途!) https://blog.csdn.net/yinwenjie/article/details/51516329 目...

yunlielai ⋅ 04/15 ⋅ 0

flume 总结--flume入门介绍

flume介绍 flume被设计为一个灵活的分布式系统,可以很容易的扩展,而且是高度可定制化的,一个配置正确的Flume Agent和由互相连接的Agent创建的Agent管道,保证不会丢失数据,提供持久的cha...

u013362353 ⋅ 05/28 ⋅ 0

Flume---大数据协作框架

flume是什么 Apache Flume是一个分布式的、可靠的、易用的系统,可以有效地将来自很多不同源系统的大量日志数据收集、汇总或者转移到一个数据中心存储。 Apache Flume的作用不仅限于日志汇总...

简心 ⋅ 05/06 ⋅ 0

flume集成CDH步骤与异常解决

1、 确定你的flume在哪台主机上 2、 确认该台主机上的flume是否可以正常使用? 在指定的目录下,创建一个bigdatapageto_hive.conf 内容可以是官网的实例:http://flume.apache.org/FlumeUser...

hexinghua0126 ⋅ 05/12 ⋅ 0

追加内容到文件末尾 - flume-append-file-sink

flume追加内容到文件末尾的sink。 仅适合1g以内的日志,再大估计会有压力,没具体测试。 依赖 Flume-NG >= 1.7 Linux MacOS 构建 $ mvn clean package jar文件下载 flume-append-file-sink-1...

五十风 ⋅ 04/13 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

内核线程、轻量级进程、用户线程

线程与进程概念 在现代操作系统中,进程支持多线程。 进程是资源管理的最小单元; 线程是程序执行的最小单元。 即线程作为调度和分配的基本单位,进程作为资源分配的基本单位 一个进程的组成...

117 ⋅ 10分钟前 ⋅ 0

elasticsearch2.4.6升级为elasticsearch-5.5.0的经历

将elasticsearch-5.5.0 中的配置 path.data 指向原来的数据路径 即 path.data: /usr/local/src/elasticsearch-2.4.6/data 注意: elasticsearch-5.5.0 需要将jdk版本升级到1.8...

晨猫 ⋅ 11分钟前 ⋅ 1

lvm讲解 磁盘故障小案例

1

oschina130111 ⋅ 15分钟前 ⋅ 0

那些提升开发人员工作效率的在线工具

本文转载自公众号 Hollis 作为一个Java开发人员,经常要和各种各样的工具打交道,除了我们常用的IDE工具以外,其实还有很多工具是我们在日常开发及学习过程中要经常使用到的。 Hollis偏爱使用...

时刻在奔跑 ⋅ 27分钟前 ⋅ 0

restful风格 实现DELETE PUT请求 的web.xml的配置

import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframe......

泉天下 ⋅ 32分钟前 ⋅ 0

Shell数组

Shell数组 Shell在编程方面比Windows批处理强大很多,无论是在循环、运算。 bash支持一维数组(不支持多维数组),并且没有限定数组的大小。类似与C语言,数组元素的下标由0开始编号。获取数...

蜗牛奔跑 ⋅ 42分钟前 ⋅ 0

nmap为了开发方便 可以做简单的修改

因为nmap扫描是默认使用的是nse脚本,但是在开发的过程中需要修改后缀(主要是因为后缀为lua才能显示高亮,所以这里用一个取巧的办法) nse_main.lua文件中我们找到如下代码 local t, path = cn...

超级大黑猫 ⋅ 46分钟前 ⋅ 0

springmvc获取axios数据为null情况

场景:前端用了vue没有用ajax与后台通信,用了axios,但是在代码运行过程中发现axios传递到后台的值接受到数据为null。 问题原因:此处的问题在与axios返回给后台的数据为json类型的,后台接...

王子城 ⋅ 48分钟前 ⋅ 0

hadoop技术入门学习之发行版选择

经常会看到这样的问题:零基础学习hadoop难不难?有的人回答说:零基础学习hadoop,没有想象的那么难,也没有想象的那么容易。看到这样的答案不免觉得有些尴尬,这个问题算是白问了,因为这个...

左手的倒影 ⋅ 48分钟前 ⋅ 0

806. Number of Lines To Write String - LeetCode

Question 806. Number of Lines To Write String Solution 思路:注意一点,如果a长度为4,当前行已经用了98个单元,要另起一行。 Java实现: public int[] numberOfLines(int[] widths, Str...

yysue ⋅ 56分钟前 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部