Flume学习-Interceptor配置

原创
2017/09/21 21:25
阅读数 760

Flume的interceptors可以在event放到channel之前插入一些header,比如HDFS Sink用到的timestamp,也可以对body进行修改

参考:http://flume.apache.org/FlumeUserGuide.html

Timestamp Interceptor

插入一个timestamp头部

type

The component type name, has to be timestamp or the FQCN

preserveExisting

false

If the timestamp already exists, should it be preserved - true or false

Host Interceptor

插入hostname或者ip地址头部

Property Name

Default

Description

type

The component type name, has to be host

preserveExisting

false

If the host header already exists, should it be preserved - true or false

useIP

true

Use the IP Address if true, else use hostname.

hostHeader

host

The header key to be used.

 

Static Interceptor

插入自定义的头部,可以指定key和value

type

The component type name, has to be static

preserveExisting

true

If configured header already exists, should it be preserved - true or false

key

key

Name of header that should be created

value

value

Static value that should be created

 

UUID Interceptor

插入一个uuid头部

type

The component type name has to be org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder

headerName

id

The name of the Flume header to modify

preserveExisting

true

If the UUID header already exists, should it be preserved - true or false

prefix

“”

The prefix string constant to prepend to each generated UUID

Search and Replace Interceptor

查找匹配某个模式的字符串,用目标字符串进行替换

type

The component type name has to be search_replace

searchPattern

The pattern to search for and replace.

replaceString

The replacement string.

charset

UTF-8

The charset of the event body. Assumed by default to be UTF-8

Regex Filtering Interceptor

根据正则表达式,过滤掉满足条件或不满足条件的event

type

The component type name has to be regex_filter

regex

”.*”

Regular expression for matching against events

excludeEvents

false

If true, regex determines events to exclude, otherwise regex determines events to include.

 

Regex Extractor Interceptor

提取满足正则表达式的部分,放入到header里

type

The component type name has to be regex_extractor

regex

Regular expression for matching against events

serializers

Space-separated list of serializers for mapping matches to header names and serializing their values. (See example below)

Flume provides built-in support for the following serializers: org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializeror

org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer

serializers.<s1>.type

default

Must be default (org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer),

org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer,

or the FQCN of a custom class that implements org.apache.flume.interceptor.RegexExtractorInterceptorSerializer

serializers.<s1>.name

 

serializers.*

Serializer-specific properties

自定义interceptor

很简单,只需要实现Interceptor接口,编写intercept方法即可

 

package com.endless.flume;

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

public class MyTimestampInterceptor implements Interceptor {

	private String headerName;
	private boolean preserveExisting;
	
	private SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

	public static final String HEADER_NAME = "headerName";
	public static final String PRESERVE_EXISTING_NAME = "preserveExisting";

	public MyTimestampInterceptor(Context context) {
		headerName = context.getString(HEADER_NAME, "timestamp");
		preserveExisting = context.getBoolean(PRESERVE_EXISTING_NAME, true);;
	}
	@Override
	public void initialize() {
	}
	
	@Override
	public Event intercept(Event event) {
		Map<String, String> headers = event.getHeaders();
		if (preserveExisting && headers.containsKey(headerName)) {
		} else {
			headers.put(headerName, formatter.format(new Date()));
		}
		return event;
	}

	@Override
	public List<Event> intercept(List<Event> events) {
		List<Event> results = new ArrayList<Event>(events.size());
		for (Event event : events) {
			event = intercept(event);
			if (event != null) {
				results.add(event);
			}
		}
		return results;
	}

	@Override
	public void close() {
	}

	public static class Builder implements Interceptor.Builder {
		private Context context;
		public Builder() {
		}

		@Override
		public MyTimestampInterceptor build() {
			return new MyTimestampInterceptor(context);
		}

		@Override
		public void configure(Context context) {
			this.context = context;
		}

	}
}

 

 

 

展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部