文档章节

Flume-1.6.0自定义拦截器(Interceptor)

K_Zhiqiang
 K_Zhiqiang
发布于 2017/05/19 16:22
字数 381
阅读 960
收藏 0

    Flume中的拦截器是插件式的组件,作用在source和channel之间。可以实现source接收的事件,在写入channel之前,进行转换或者删除。Flume官方提供了一些常用的拦截器,也可以自定义拦截器对日志进行处理。自定义拦截器只需以下几步:

  •     使用的Flume版本为:apache-flume-1.6.0

1. 实现org.apache.flume.interceptor.Interceptor接口,位于flume-ng-core-1.6.0.jar中,maven坐标:

<dependency>
   <groupId>org.apache.flume</groupId>
   <artifactId>flume-ng-core</artifactId>
   <version>1.6.0</version>
</dependency>
package org.ziyuzile.demo.flume;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;

/**
 * Created by Kang on 2017/5/19.
 */
public class MyInterceptor implements Interceptor {
    @Override
    public void initialize() {}

    @Override
    public Event intercept(Event event) {
        String body = new String(event.getBody(), Charset.forName("UTF-8"));
        try{
            // body为原始数据,newBody为处理后的数据
            String newBody = body + "interceptor...";

            event.setBody(newBody.toString().getBytes());
        }catch (Exception e){
           e.printStackTrace();
        }
        return event;
    }

    @Override
    public List<Event> intercept(List<Event> events) {
        List<Event> intercepted = new ArrayList<>(events.size());
        for (Event event : events) {
            Event interceptedEvent = intercept(event);
            if (interceptedEvent != null) {
                intercepted.add(interceptedEvent);
            }
        }
        return intercepted;
    }

    @Override
    public void close() {}

    public static class Builder implements Interceptor.Builder{
        @Override
        public Interceptor build() {
            return new MyInterceptor();
        }

        @Override
        public void configure(Context context) {

        }
    }
}

2. 将开发好的自定义interceptor打jar包,上传至flume所在服务器,放置于$FLUME_HOME/lib中。

3. 配置flume agent的配置文件,增加interceptor配置,如图:

agent001.sources = s1
agent001.channels = c1
agent001.sinks = k1

# define sources
agent001.sources.s1.type = exec
agent001.sources.s1.command = tail -F /home/bd/tmp/user.log
# define interceptors
agent001.sources.s1.interceptors = i1
agent001.sources.s1.interceptors.i1.type = com.xiwei.flume.interceptor.ETLInterceptor$Builder

# define channels
agent001.channels.c1.type = memory
agent001.channels.c1.capacity = 10000
agent001.channels.c1.transactionCapacity = 10000

# define sinks
agent001.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
agent001.sinks.k1.brokerList = xwhadoop225:9092
agent001.sinks.k1.topic = yyy

# relationship
agent001.sources.s1.channels = c1
agent001.sinks.k1.channel = c1

4. 启动flume,interceptor生效

© 著作权归作者所有

下一篇: Spark-2.1.1概述
K_Zhiqiang
粉丝 0
博文 24
码字总数 13488
作品 0
海淀
程序员
私信 提问
flume源码编译/拦截器分析(一)

flume介绍 ---由于是第一次进行源码编译与开发,步骤有点复杂,后续再进行简化 Flume是Cloudera提供的一个高可用、高可靠、分布式的海量日志采集、聚合和传输的系统。Flume支持在日志系统中定...

-九天-
2018/01/11
309
0
Flume学习系列(四)---- Interceptors(拦截器)

前言:flume通过使用Interceptors(拦截器)实现修改和过滤事件的功能。举个栗子,一个网站每天产生海量数据,但是可能会有很多数据是不完整的(缺少重要字段),或冗余的,如果不对这些数据...

小北觅
2018/08/21
0
0
Flume学习系列(五)---- Custom Interceptors(自定义拦截器)

前言:接上一篇,本篇文章实现一个自定义的拦截器。主要功能是在Event的body中添加IP地址。因为没有拦截器可以在Body中添加(host是在header中添加),所以需要自定义。掌握了这个,其他的情...

小北觅
2018/08/21
0
0
阿里大数据工程师教你怎样理解Flume

lume是干什么的? 收集日志的 flume如何搜集日志? 我们把flume比作情报人员 (1)搜集信息 (2)获取记忆信息 (3)传递报告间谍信息 flume是怎么完成上面三件事情的,三个组件: source: ...

JAVA丶学习
2018/04/14
0
0
两个flume的拦截器(interceptor)

flume支持拦截器(interceptors)机制,是在source这个层面上工作,这里有两个拦截器 1,支持将日志体(event body)里面的字符串替换成另一个字符串。配置文件http://git.oschina.net/atuc...

午火
2014/06/11
3.9K
0

没有更多内容

加载失败,请刷新页面

加载更多

《Designing.Data-Intensive.Applications》笔记 四

第九章 一致性与共识 分布式系统最重要的的抽象之一是共识(consensus):让所有的节点对某件事达成一致。 最终一致性(eventual consistency)只提供较弱的保证,需要探索更高的一致性保证(stro...

丰田破产标志
今天
6
0
docker 使用mysql

1, 进入容器 比如 myslq1 里面进行操作 docker exec -it mysql1 /bin/bash 2. 退出 容器 交互: exit 3. mysql 启动在容器里面,并且 可以本地连接mysql docker run --name mysql1 --env MY...

之渊
今天
7
0
python数据结构

1、字符串及其方法(案例来自Python-100-Days) def main(): str1 = 'hello, world!' # 通过len函数计算字符串的长度 print(len(str1)) # 13 # 获得字符串首字母大写的...

huijue
今天
5
0
OSChina 周日乱弹 —— 我,小小编辑,食人族酋长

Osc乱弹歌单(2019)请戳(这里) 【今日歌曲】 @宇辰OSC :分享娃娃的单曲《飘洋过海来看你》: #今日歌曲推荐# 《飘洋过海来看你》- 娃娃 手机党少年们想听歌,请使劲儿戳(这里) @宇辰OSC...

小小编辑
今天
1K
11
MongoDB系列-- SpringBoot 中对 MongoDB 的 基本操作

SpringBoot 中对 MongoDB 的 基本操作 Database 库的创建 首先 在MongoDB 操作客户端 Robo 3T 中 创建数据库: 增加用户User: 创建 Collections 集合(类似mysql 中的 表): 后面我们大部分都...

TcWong
今天
40
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部