文档章节

flume-ng 自定义拦截器,对header中的字段进行正则匹配分离出更多header

c
 chunhei2008
发布于 2015/03/17 17:03
字数 529
阅读 2789
收藏 0


代码如下:

package com.wy.flume.interceptor;

import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.commons.lang.StringUtils;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer;
import org.apache.flume.interceptor.RegexExtractorInterceptorSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;

public class RegexExtractorHeaderInterceptor implements Interceptor {

    static final String REGEX = "regex";
    static final String SERIALIZERS = "serializers";
    
    
    static final String EXTRACTOR_HEADER = "extractorHeader";  
    static final boolean DEFAULT_EXTRACTOR_HEADER = false;  
    static final String EXTRACTOR_HEADER_KEY = "extractorHeaderKey"; 

    private static final Logger logger = LoggerFactory
        .getLogger(RegexExtractorHeaderInterceptor.class);

    private final Pattern regex;
    private final List<NameAndSerializer> serializers;

    private final boolean extractorHeader;  
    private final String extractorHeaderKey;  
    
    private RegexExtractorHeaderInterceptor(Pattern regex,
        List<NameAndSerializer> serializers,boolean extractorHeader, String extractorHeaderKey) {
      this.regex = regex;
      this.serializers = serializers;
      
      this.extractorHeader = extractorHeader;
      this.extractorHeaderKey = extractorHeaderKey;
      
    }

    @Override
    public void initialize() {
      // NO-OP...
    }

    @Override
    public void close() {
      // NO-OP...
    }

    @Override
    public Event intercept(Event event) {
      String extractorHeaderVal;
      if (extractorHeader){
          
          extractorHeaderVal = event.getHeaders().get(extractorHeaderKey);
          
      }else{
          
          extractorHeaderVal = new String(event.getBody(),Charsets.UTF_8);
          
      }
      
      Matcher matcher = regex.matcher(extractorHeaderVal);
      Map<String, String> headers = event.getHeaders();
      if (matcher.find()) {
        for (int group = 0, count = matcher.groupCount(); group < count; group++) {
          int groupIndex = group + 1;
          if (groupIndex > serializers.size()) {
            if (logger.isDebugEnabled()) {
              logger.debug("Skipping group {} to {} due to missing serializer",
                  group, count);
            }
            break;
          }
          NameAndSerializer serializer = serializers.get(group);
          if (logger.isDebugEnabled()) {
            logger.debug("Serializing {} using {}", serializer.headerName,
                serializer.serializer);
          }
          headers.put(serializer.headerName,
              serializer.serializer.serialize(matcher.group(groupIndex)));
        }
      }
      return event;
    }

    @Override
    public List<Event> intercept(List<Event> events) {
      List<Event> intercepted = Lists.newArrayListWithCapacity(events.size());
      for (Event event : events) {
        Event interceptedEvent = intercept(event);
        if (interceptedEvent != null) {
          intercepted.add(interceptedEvent);
        }
      }
      return intercepted;
    }

    public static class Builder implements Interceptor.Builder {

      private Pattern regex;
      private List<NameAndSerializer> serializerList;
      
      private boolean extractorHeader;
      private String extractorHeaderKey;
      
      private final RegexExtractorInterceptorPassThroughSerializer defaultSerializer = new RegexExtractorInterceptorPassThroughSerializer();
      

      @Override
      public void configure(Context context) {
        String regexString = context.getString(REGEX);
        Preconditions.checkArgument(!StringUtils.isEmpty(regexString),
            "Must supply a valid regex string");
        regex = Pattern.compile(regexString);
        regex.pattern();
        regex.matcher("").groupCount();
        configureSerializers(context);
        
        extractorHeader = context.getBoolean(EXTRACTOR_HEADER,DEFAULT_EXTRACTOR_HEADER);
        
        if (extractorHeader){
            
            extractorHeaderKey = context.getString(EXTRACTOR_HEADER_KEY);
            Preconditions.checkArgument(!StringUtils.isEmpty(extractorHeaderKey),"header key must");
            
        }
        
      }

      private void configureSerializers(Context context) {
        String serializerListStr = context.getString(SERIALIZERS);
        Preconditions.checkArgument(!StringUtils.isEmpty(serializerListStr),
            "Must supply at least one name and serializer");

        String[] serializerNames = serializerListStr.split("\\s+");

        Context serializerContexts =
            new Context(context.getSubProperties(SERIALIZERS + "."));

        serializerList = Lists.newArrayListWithCapacity(serializerNames.length);
        for(String serializerName : serializerNames) {
          Context serializerContext = new Context(
              serializerContexts.getSubProperties(serializerName + "."));
          String type = serializerContext.getString("type", "DEFAULT");
          String name = serializerContext.getString("name");
          Preconditions.checkArgument(!StringUtils.isEmpty(name),
              "Supplied name cannot be empty.");

          if("DEFAULT".equals(type)) {
            serializerList.add(new NameAndSerializer(name, defaultSerializer));
          } else {
            serializerList.add(new NameAndSerializer(name, getCustomSerializer(
                type, serializerContext)));
          }
        }
      }

      private RegexExtractorInterceptorSerializer getCustomSerializer(
          String clazzName, Context context) {
        try {
          RegexExtractorInterceptorSerializer serializer = (RegexExtractorInterceptorSerializer) Class
              .forName(clazzName).newInstance();
          serializer.configure(context);
          return serializer;
        } catch (Exception e) {
          logger.error("Could not instantiate event serializer.", e);
          Throwables.propagate(e);
        }
        return defaultSerializer;
      }

      @Override
      public Interceptor build() {
        Preconditions.checkArgument(regex != null,
            "Regex pattern was misconfigured");
        Preconditions.checkArgument(serializerList.size() > 0,
            "Must supply a valid group match id list");
        return new RegexExtractorHeaderInterceptor(regex, serializerList, extractorHeader, extractorHeaderKey);
      }
    }

    static class NameAndSerializer {
      private final String headerName;
      private final RegexExtractorInterceptorSerializer serializer;

      public NameAndSerializer(String headerName,
          RegexExtractorInterceptorSerializer serializer) {
        this.headerName = headerName;
        this.serializer = serializer;
      }
    }
  }

应用配置:

hdp2.sources.s1.interceptors = i2
hdp2.sources.s1.interceptors.i2.type = com.wy.flume.interceptor.RegexExtractorHeaderInterceptor$Builder
hdp2.sources.s1.interceptors.i2.regex = ([^_]+)_(\\d{8}).*
hdp2.sources.s1.interceptors.i2.extractorHeader = true
hdp2.sources.s1.interceptors.i2.extractorHeaderKey = basename
hdp2.sources.s1.interceptors.i2.serializers = s1 s2
hdp2.sources.s1.interceptors.i2.serializers.s1.name = log_type
hdp2.sources.s1.interceptors.i2.serializers.s2.name = log_day

© 著作权归作者所有

共有 人打赏支持
c
粉丝 3
博文 27
码字总数 8480
作品 2
广州
高级程序员
私信 提问
Flume学习系列(四)---- Interceptors(拦截器)

前言:flume通过使用Interceptors(拦截器)实现修改和过滤事件的功能。举个栗子,一个网站每天产生海量数据,但是可能会有很多数据是不完整的(缺少重要字段),或冗余的,如果不对这些数据...

小北觅
08/21
0
0
阿里大数据工程师教你怎样理解Flume

lume是干什么的? 收集日志的 flume如何搜集日志? 我们把flume比作情报人员 (1)搜集信息 (2)获取记忆信息 (3)传递报告间谍信息 flume是怎么完成上面三件事情的,三个组件: source: ...

JAVA丶学习
04/14
0
0
Flume NG 学习笔记(八)Interceptors(拦截器)测试

版权声明:本文为博主原创文章,未经博主允许不得转载。 目录(?)[+] 拦截器主要是对事件的header信息信息操作,要么直接忽略他,要么修改他的数据 一、Event Serializers file_roll sink 和h...

jackwxh
06/29
0
0
Flume学习系列(五)---- Custom Interceptors(自定义拦截器)

前言:接上一篇,本篇文章实现一个自定义的拦截器。主要功能是在Event的body中添加IP地址。因为没有拦截器可以在Body中添加(host是在header中添加),所以需要自定义。掌握了这个,其他的情...

小北觅
08/21
0
0
flume源码编译/拦截器分析(一)

flume介绍 ---由于是第一次进行源码编译与开发,步骤有点复杂,后续再进行简化 Flume是Cloudera提供的一个高可用、高可靠、分布式的海量日志采集、聚合和传输的系统。Flume支持在日志系统中定...

-九天-
01/11
0
0

没有更多内容

加载失败,请刷新页面

加载更多

RestClientUtil和ConfigRestClientUtil区别说明

RestClientUtil directly executes the DSL defined in the code. ConfigRestClientUtil gets the DSL defined in the configuration file by the DSL name and executes it. RestClientUtil......

bboss
38分钟前
6
0

中国龙-扬科
昨天
1
0
Linux系统设置全局的默认网络代理

更改全局配置文件/etc/profile all_proxy="all_proxy=socks://rahowviahva.ml:80/"ftp_proxy="ftp_proxy=http://rahowviahva.ml:80/"http_proxy="http_proxy=http://rahowviahva.ml:80/"......

临江仙卜算子
昨天
5
0
java框架学习日志-6(bean作用域和自动装配)

本章补充bean的作用域和自动装配 bean作用域 之前提到可以用scope来设置单例模式 <bean id="type" class="cn.dota2.tpye.Type" scope="singleton"></bean> 除此之外还有几种用法 singleton:......

白话
昨天
8
0
在PC上测试移动端网站和模拟手机浏览器的5大方法

总结很全面,保存下来以备不时之需。原文地址:https://www.cnblogs.com/coolfeng/p/4708942.html

kitty1116
昨天
6
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部