文档章节

flume的hdfssink自定义EventSerializer序列化类

ivan-Zhao
 ivan-Zhao
发布于 2016/04/01 18:41
字数 643
阅读 168
收藏 0

    因为之前做了hbasesink的序列化类,觉得写hdfs的应该会很简单,可是没想到竟然不一样。hdfs并没有直接配置序列化类的选项需要根据fileType来选择对相应序列化类,我们使用的datastream的类型,对应的类是HDFSDataStream,这个类默认的序列化类TEXT(这是个枚举类型)

serializerType = context.getString("serializer", "TEXT");



枚举的类如下:

public enum EventSerializerType {
  TEXT(BodyTextEventSerializer.Builder.class),
  HEADER_AND_TEXT(HeaderAndBodyTextEventSerializer.Builder.class),
  AVRO_EVENT(FlumeEventAvroEventSerializer.Builder.class),
  CUSTOM(CUSTOMEventSerializer.Builder.class),//自定义的序列化类
  OTHER(null);

  private final Class<? extends EventSerializer.Builder> builderClass;

  EventSerializerType(Class<? extends EventSerializer.Builder> builderClass) {
    this.builderClass = builderClass;
  }

  public Class<? extends EventSerializer.Builder> getBuilderClass() {
    return builderClass;
  }

}

在里面加了自定义的类型和枚举,在配置agent的时候配置好filetype和serializer即可,同样需要编译上传。

自定义的序列化类如下:

public class CUSTOMEventSerializer implements EventSerializer {
	private final static Logger logger = LoggerFactory.getLogger(CUSTOMEventSerializer.class);
	private final String SPLITCHAR = "\001";//列分隔符
	// for legacy reasons, by default, append a newline to each event written
	// out
	private final String APPEND_NEWLINE = "appendNewline";
	private final boolean APPEND_NEWLINE_DFLT = true;

	private final OutputStream out;
	private final boolean appendNewline;

	private CUSTOMEventSerializer(OutputStream out, Context ctx) {
		this.appendNewline = ctx.getBoolean(APPEND_NEWLINE, APPEND_NEWLINE_DFLT);
		this.out = out;
	}

	@Override
	public boolean supportsReopen() {
		return true;
	}

	@Override
	public void afterCreate() {
		// noop
	}

	@Override
	public void afterReopen() {
		// noop
	}

	@Override
	public void beforeClose() {
		// noop
	}

	@Override
	public void write(Event e) throws IOException {
		// 获取日志信息
		String log = new String(e.getBody(), StandardCharsets.UTF_8);
		logger.info("-----------logs-------" + log);
		// headers包含日志中项目编号和host信息
		Map<String, String> headers = e.getHeaders();
		String parsedLog = parseJson2Value(log, headers);
		out.write(parsedLog.getBytes());
		logger.info("-----------values-------" + parsedLog);
		logger.info("-----------valueSSSSSS-------" + parsedLog.getBytes());
		out.write('\n');
	}
	/**
	 * 
	 * @Title: parseJson2Value 
	 * @Description: 解析出json日志中的value。 
	 * @param log json格式日志
	 * @param headers event头信息
	 * @return  
	 * @return String 解析后的日志
	 * @throws
	 */
	private String parseJson2Value(String log, Map<String, String> headers) {
		log.replace("\\", "/");
		String time = "";
		String path = "";
		Object value = "";
		StringBuilder values = new StringBuilder();
		ObjectMapper objectMapper = new ObjectMapper();
		try {
			Map<String,Object> m = objectMapper.readValue(log, Map.class);
			for(String key:m.keySet()){
				value = m.get(key);
				if (key.equals("uri")){
					//解析访问路径
					path = pasreUriToPath(value.toString());
				}
				if(key.equals("time")){
					time = value.toString().substring(10);
				}
				values.append(value).append(this.SPLITCHAR);
			}
		} catch (JsonParseException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (JsonMappingException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		// 解析headers中的项目编号和服务host
		String pcode = headers.get("pcode");
		String host = headers.get("host");
		values.append(path).append(this.SPLITCHAR).
		append(pcode).append(this.SPLITCHAR).
		append(host).append(this.SPLITCHAR).
		append(time).append(this.SPLITCHAR);
		//value字符串
		return values.toString();
	}

	@Override
	public void flush() throws IOException {
		// noop
	}

	public static class Builder implements EventSerializer.Builder {

		@Override
		public EventSerializer build(Context context, OutputStream out) {
			CUSTOMEventSerializer s = new CUSTOMEventSerializer(out, context);
			return s;
		}

	}
	/**
	 * 把请求uri转换成具体的访问路径
	 * 
	 * @param uri 请求uri
	 * @return   访问路径
	 */
	protected String pasreUriToPath(String uri){
		if(uri == null || "".equals(uri.trim())){
			return uri;
		}
		int index = uri.indexOf("/");
		if(index > -1){
			uri = uri.substring(index);
		}
		index = uri.indexOf("?");
		if(index > -1){
			uri = uri.substring(0, index);
		}
		index = uri.indexOf(";");
		if(index > -1){
			uri = uri.substring(0, index);
		}
		index = uri.indexOf(" HTTP/1.1");
		if(index > -1){
			uri = uri.substring(0, index);
		}
		index = uri.indexOf("HTTP/1.1");
		if(index > -1){
			uri = uri.substring(0, index);
		}
		return uri;
	}
}



© 著作权归作者所有

ivan-Zhao
粉丝 10
博文 33
码字总数 29110
作品 0
深圳
程序员
私信 提问
阿里大数据工程师教你怎样理解Flume

lume是干什么的? 收集日志的 flume如何搜集日志? 我们把flume比作情报人员 (1)搜集信息 (2)获取记忆信息 (3)传递报告间谍信息 flume是怎么完成上面三件事情的,三个组件: source: ...

JAVA丶学习
2018/04/14
0
0
使用Flume将日志导入OSS

前言 Apache Flume是一个高可用、高可靠的分布式日志采集、聚合与传输的系统。它基于流式的数据传输,架构简单、灵活。它简单可扩展的模型,也适合在线的数据分析。 上图是它的简单数据流模型...

冷月_wjh
2018/08/29
0
0
flume 总结--flume入门介绍

flume介绍 flume被设计为一个灵活的分布式系统,可以很容易的扩展,而且是高度可定制化的,一个配置正确的Flume Agent和由互相连接的Agent创建的Agent管道,保证不会丢失数据,提供持久的cha...

u013362353
2018/05/28
0
0
flume收集到的日志数据无法下沉到hbase

最近用三台虚拟机练手一个新闻日志收集项目,如下: hadoop2和hadoop3将收集到的日志发送给Hadoop1上的flume进行合并,日志可以接受到,source源没有问题: Hadoop1上的flume-conf.properti...

不是靠颜值
02/03
60
0
Flume将MySQL表数据存入到HBase

Flume将MySQL表数据存入到HBase HBasesink的三种序列化模式 SimpleHbaseEventSerializer RegexHbaseEventSerializer SimpleAsyncHbaseEventSerializer 使用SimpleHbaseEventSerializer序列化......

lwenhao
03/06
242
0

没有更多内容

加载失败,请刷新页面

加载更多

浅谈Visitor访问者模式

一、前言 什么叫访问,如果大家学过数据结构,对于这点就很清晰了,遍历就是访问的一般形式,单独读取一个元素进行相应的处理也叫作访问,读取到想要查看的内容+对其进行处理就叫作访问,那么...

青衣霓裳
22分钟前
5
0
JS内嵌多个页面,页面之间如何更快捷的查找相关联的页面

假设parent为P页面, P页面有两个子页面,分别为B页面和C页面; B页面和C页面分别内嵌一个iframe,分别为:D页面和E页面 现在通过B页面的内嵌页面D的方法refreshEpage(eUrl)来加载内嵌页面E的内容...

文文1
23分钟前
6
0
Hibernate 5 升级后 getProperties 错误

升级到 Hibernate 5 后,提示有错误: org.hibernate.engine.spi.SessionFactoryImplementor.getProperties()Ljava/util/Map; 完整的错误栈为: java.lang.NoSuchMethodError: org.hibernate......

honeymoose
24分钟前
4
0
mysql-connector-java升级到8.0后保存时间到数据库出现了时差

在一个新项目中用到了新版的mysql jdbc 驱动 <dependency>     <groupId>mysql</groupId>     <artifactId>mysql-connector-java</artifactId>     <version>8.0.18</version> ......

ValSong
28分钟前
6
0
Spring中BeanFactory与FactoryBean的区别

在Spring中有BeanFactory和FactoryBean这2个接口,从名字来看很相似,比较容易搞混。 一、BeanFactory BeanFactory是一个接口,它是Spring中工厂的顶层规范,是SpringIoc容器的核心接口,它定...

大王叫下
30分钟前
5
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部