Flume-1.6.0自定义拦截器(Interceptor)

原创
2017/05/19 16:22
阅读数 3.1K

    Flume中的拦截器是插件式的组件,作用在source和channel之间。可以实现source接收的事件,在写入channel之前,进行转换或者删除。Flume官方提供了一些常用的拦截器,也可以自定义拦截器对日志进行处理。自定义拦截器只需以下几步:

  •     使用的Flume版本为:apache-flume-1.6.0

1. 实现org.apache.flume.interceptor.Interceptor接口,位于flume-ng-core-1.6.0.jar中,maven坐标:

<dependency>
   <groupId>org.apache.flume</groupId>
   <artifactId>flume-ng-core</artifactId>
   <version>1.6.0</version>
</dependency>
package org.ziyuzile.demo.flume;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;

/**
 * Created by Kang on 2017/5/19.
 */
public class MyInterceptor implements Interceptor {
    @Override
    public void initialize() {}

    @Override
    public Event intercept(Event event) {
        String body = new String(event.getBody(), Charset.forName("UTF-8"));
        try{
            // body为原始数据,newBody为处理后的数据
            String newBody = body + "interceptor...";

            event.setBody(newBody.toString().getBytes());
        }catch (Exception e){
           e.printStackTrace();
        }
        return event;
    }

    @Override
    public List<Event> intercept(List<Event> events) {
        List<Event> intercepted = new ArrayList<>(events.size());
        for (Event event : events) {
            Event interceptedEvent = intercept(event);
            if (interceptedEvent != null) {
                intercepted.add(interceptedEvent);
            }
        }
        return intercepted;
    }

    @Override
    public void close() {}

    public static class Builder implements Interceptor.Builder{
        @Override
        public Interceptor build() {
            return new MyInterceptor();
        }

        @Override
        public void configure(Context context) {

        }
    }
}

2. 将开发好的自定义interceptor打jar包,上传至flume所在服务器,放置于$FLUME_HOME/lib中。

3. 配置flume agent的配置文件,增加interceptor配置,如图:

agent001.sources = s1
agent001.channels = c1
agent001.sinks = k1

# define sources
agent001.sources.s1.type = exec
agent001.sources.s1.command = tail -F /home/bd/tmp/user.log
# define interceptors
agent001.sources.s1.interceptors = i1
agent001.sources.s1.interceptors.i1.type = com.xiwei.flume.interceptor.ETLInterceptor$Builder

# define channels
agent001.channels.c1.type = memory
agent001.channels.c1.capacity = 10000
agent001.channels.c1.transactionCapacity = 10000

# define sinks
agent001.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
agent001.sinks.k1.brokerList = xwhadoop225:9092
agent001.sinks.k1.topic = yyy

# relationship
agent001.sources.s1.channels = c1
agent001.sinks.k1.channel = c1

4. 启动flume,interceptor生效

展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部