文档章节

flume自定义拦截器

J
 Joseph_hu
发布于 2017/06/25 20:55
字数 406
阅读 64
收藏 0

下面看一下源码中时间戳的拦截器

public class TimestampInterceptor implements Interceptor {

  private final boolean preserveExisting;

  /**
   * Only {@link TimestampInterceptor.Builder} can build me
   */
  private TimestampInterceptor(boolean preserveExisting) {
    this.preserveExisting = preserveExisting;
  }

  @Override
  public void initialize() {
    // no-op
  }

  /**
   * Modifies events in-place.
    *这个是对单个事件的处理
   */
  @Override
  public Event intercept(Event event) {
//获得事件中的所有头信息,都是以key-value的形式存储
    Map<String, String> headers = event.getHeaders();
    if (preserveExisting && headers.containsKey(TIMESTAMP)) {
      // we must preserve the existing timestamp
    } else {
      long now = System.currentTimeMillis();
    //把时间戳放到头信息
      headers.put(TIMESTAMP, Long.toString(now));
    }
    return event;
  }

  /**
   * Delegates to {@link #intercept(Event)} in a loop.
   * @param events
    *这是对多个事件的处理
   * @return
   */
  @Override
  public List<Event> intercept(List<Event> events) {
    for (Event event : events) {
      intercept(event);
    }
    return events;
  }

  @Override
  public void close() {
    // no-op
  }

  /**
   * Builder which builds new instances of the TimestampInterceptor.
    *这个方法是必须实现的,这个接口Interceptor中注释明确必须实现
   */
  public static class Builder implements Interceptor.Builder {

    private boolean preserveExisting = PRESERVE_DFLT;

    @Override
    public Interceptor build() {
      return new TimestampInterceptor(preserveExisting);
    }

    @Override
    public void configure(Context context) {
      preserveExisting = context.getBoolean(PRESERVE, PRESERVE_DFLT);
    }

  }

  public static class Constants {
    public static String TIMESTAMP = "timestamp";
    public static String PRESERVE = "preserveExisting";
    public static boolean PRESERVE_DFLT = false;
  }

}

大家可以写自己的过滤器,特别对于log4j的日志处理,例如如果你的日志每一个小时生成一个文件,但是我要把它按照天来存储,大家可以按照这个这种形式实现. 把生成的包放到/app/apache-flume-1.7.0-bin/plugins.d/interceptor/lib下,如果/plugins.d不存在,自己就建立这个目录就可以. 在配置文件中添加:xxx.sources.r1.interceptors.i1.type = com.hc360.interceptor.DateTimeSuffixInterceptor$Builder就可以。 现在想把自己做的一些东西记录一下,坚持,坚持。。。。。。。。。。。。。。

© 著作权归作者所有

J
粉丝 2
博文 31
码字总数 10345
作品 0
菏泽
程序员
私信 提问
Flume学习系列(五)---- Custom Interceptors(自定义拦截器)

前言:接上一篇,本篇文章实现一个自定义的拦截器。主要功能是在Event的body中添加IP地址。因为没有拦截器可以在Body中添加(host是在header中添加),所以需要自定义。掌握了这个,其他的情...

小北觅
2018/08/21
0
0
Flume学习系列(四)---- Interceptors(拦截器)

前言:flume通过使用Interceptors(拦截器)实现修改和过滤事件的功能。举个栗子,一个网站每天产生海量数据,但是可能会有很多数据是不完整的(缺少重要字段),或冗余的,如果不对这些数据...

小北觅
2018/08/21
0
0
flume 总结--flume入门介绍

flume介绍 flume被设计为一个灵活的分布式系统,可以很容易的扩展,而且是高度可定制化的,一个配置正确的Flume Agent和由互相连接的Agent创建的Agent管道,保证不会丢失数据,提供持久的cha...

u013362353
2018/05/28
0
0
阿里大数据工程师教你怎样理解Flume

lume是干什么的? 收集日志的 flume如何搜集日志? 我们把flume比作情报人员 (1)搜集信息 (2)获取记忆信息 (3)传递报告间谍信息 flume是怎么完成上面三件事情的,三个组件: source: ...

JAVA丶学习
2018/04/14
0
0
Flume简介

1、什么是Flume? 2、 Flume的结构? 3、Source、Sink和Channel的关系? 4、Flume的基本数据单位是什么? 5、什么是Flume拦截器? 6、Flume的工作流程是什么? 7、Flume可靠吗? 下载地址:h...

antrol
2015/10/26
168
1

没有更多内容

加载失败,请刷新页面

加载更多

OSChina 周六乱弹 —— 早上儿子问我他是怎么来的

Osc乱弹歌单(2019)请戳(这里) 【今日歌曲】 @凉小生 :#今日歌曲推荐# 少点戾气,愿你和这个世界温柔以待。中岛美嘉的单曲《僕が死のうと思ったのは (曾经我也想过一了百了)》 《僕が死の...

小小编辑
今天
2.1K
14
Excption与Error包结构,OOM 你遇到过哪些情况,SOF 你遇到过哪些情况

Throwable 是 Java 中所有错误与异常的超类,Throwable 包含两个子类,Error 与 Exception 。用于指示发生了异常情况。 Java 抛出的 Throwable 可以分成三种类型。 被检查异常(checked Exc...

Garphy
今天
38
0
计算机实现原理专题--二进制减法器(二)

在计算机实现原理专题--二进制减法器(一)中说明了基本原理,现准备说明如何来实现。 首先第一步255-b运算相当于对b进行按位取反,因此可将8个非门组成如下图的形式: 由于每次做减法时,我...

FAT_mt
昨天
40
0
好程序员大数据学习路线分享函数+map映射+元祖

好程序员大数据学习路线分享函数+map映射+元祖,大数据各个平台上的语言实现 hadoop 由java实现,2003年至今,三大块:数据处理,数据存储,数据计算 存储: hbase --> 数据成表 处理: hive --> 数...

好程序员官方
昨天
61
0
tabel 中含有复选框的列 数据理解

1、el-ui中实现某一列为复选框 实现多选非常简单: 手动添加一个el-table-column,设type属性为selction即可; 2、@selection-change事件:选项发生勾选状态变化时触发该事件 <el-table @sel...

everthing
昨天
20
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部