文档章节

Flume自定义Hbase Sink的EventSerializer序列化类

ivan-Zhao
 ivan-Zhao
发布于 2016/04/01 18:26
字数 708
阅读 1668
收藏 1

    最近要分析公司的日志,需要把日志从flume打到hbase中,但是我们的日志由于前期是存到MongoDb中的,所以都是Json格式的日志,这时候使用flume自带的SimpleHbaseEventSerializer和RegexHbaseEventSerializer这样的就不行了,于是开始痛苦的看源码,自己写序列化的类(这里需要注意,如果是在flume的hbasesink包下编写的代码,License信息一定要加上。就是最上面那段英文,要不然在运行的时候会报错),比较简单,编写好类之后,编译打包,传到flume的lib目录下,然后在配置agent的时候指定Serializer的类为编写的类即可。下面是代码(类注释没贴出来,见谅哈):

public class PRTMSAsyncHbaseEventSerializer implements AsyncHbaseEventSerializer {
	private byte[] table;//hbase表
	private byte[] cf;//列簇
	private byte[][] payload;//列集合
	private byte[][] payloadColumn;//列值
	private byte[] incrementColumn;
	private String rowSuffix;//roykey后缀
	private String rowPrefix;//rowkey前缀
	private byte[] incrementRow;
	private KeyType keyType;//rowkey后缀类型 
	private static final Logger logger = LoggerFactory.getLogger(PRTMSAsyncHbaseEventSerializer.class);

	@Override
	public void configure(Context context) {
		// TODO Auto-generated method stub
		//设置主键后缀类型,这里使用时间戳
		keyType = KeyType.TS;
		if (iCol != null && !iCol.isEmpty()) {
			incrementColumn = iCol.getBytes(Charsets.UTF_8);
		}
		incrementRow = context.getString("incrementRow", "incRow").getBytes(Charsets.UTF_8);
	}

	@Override
	public void configure(ComponentConfiguration conf) {
		// TODO Auto-generated method stub

	}

	@Override
	public void initialize(byte[] table, byte[] cf) {
		// TODO Auto-generated method stub
		this.table = table;
		this.cf = cf;
	}
	/**
	 * 
	 * @Title: setEvent 
	 * @Description: 获取日志信息,并解析出HBase的列以及列的value值 
	 * @param event   
	 * @throws 
	 * @see org.apache.flume.sink.hbase.AsyncHbaseEventSerializer#setEvent(org.apache.flume.Event)
	 */
	@Override
	public void setEvent(Event event) {
		// TODO Auto-generated method stub
		//获取日志信息
		String log = new String(event.getBody(), StandardCharsets.UTF_8);
		//headers包含日志中项目编号和host信息
		Map<String, String> headers = event.getHeaders();
		JsonReader jsonReader = new JsonReader(new StringReader(log));
		String name = "";
		String value = "";
		String path = "";
		Map<String, String> kv = new HashMap<String, String>();
		try {
			//解析日志中的键值对缓存到map中
			jsonReader.beginObject();
			while (jsonReader.hasNext()) {
				name = jsonReader.nextName();
				value = jsonReader.nextString();
				if(name.equals("uri"))
					path = value.split(" ")[1];
				kv.put(name, value);
			}
			jsonReader.endObject();
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		//解析headers中的项目id和服务host、路径
		if(path.contains("?")){
			path = path.substring(0, path.indexOf("?"));
		}
		String pcode = headers.get("pcode");
		String host = headers.get("host");
		//将项目编号和服务器host添加到map中
		kv.put("pcode",pcode);
		kv.put("host", host);
		//初始化列和value数组
		this.payloadColumn = new byte[kv.keySet().size()][];
		this.payload = new byte[kv.keySet().size()][];
		int i = 0;
		//给hbase的列和value赋值
		for (String key : kv.keySet()) {
			this.payloadColumn[i] = key.getBytes();
			this.payload[i] = kv.get(key).getBytes();
			i++;
		}
		//设置rowkey的前缀 格式是项目编号+路径
		
		this.rowSuffix = new StringBuilder(pcode).reverse().toString() + ":"+path+":"+kv.get("time");
	}
	
	@Override
	public List<PutRequest> getActions() {
		// TODO Auto-generated method stub
		List<PutRequest> actions = new ArrayList<PutRequest>();
		if (payloadColumn != null) {
			byte[] rowKey;
			try {
				rowKey = rowSuffix.getBytes();
				// for 循环,提交所有列和对于数据的put请求。
				for (int i = 0; i < this.payload.length; i++) {
					PutRequest putRequest = new PutRequest(table, rowKey, cf, payloadColumn[i], payload[i]);
					actions.add(putRequest);
				}

			} catch (Exception e) {
				throw new FlumeException("Could not get row key!", e);
			}
		}
		return actions;
	}

	@Override
	public List<AtomicIncrementRequest> getIncrements() {
		// TODO Auto-generated method stub
		List<AtomicIncrementRequest> actions = new ArrayList<AtomicIncrementRequest>();
		if (incrementColumn != null) {
			AtomicIncrementRequest inc = new AtomicIncrementRequest(table, incrementRow, cf, incrementColumn);
			actions.add(inc);
		}
		return actions;
	}
	@Override
	public void cleanUp() {
		// TODO Auto-generated method stub
	}

}



© 著作权归作者所有

ivan-Zhao
粉丝 10
博文 33
码字总数 29110
作品 0
深圳
程序员
私信 提问
Flume将MySQL表数据存入到HBase

Flume将MySQL表数据存入到HBase HBasesink的三种序列化模式 SimpleHbaseEventSerializer RegexHbaseEventSerializer SimpleAsyncHbaseEventSerializer 使用SimpleHbaseEventSerializer序列化......

lwenhao
03/06
242
0
阿里大数据工程师教你怎样理解Flume

lume是干什么的? 收集日志的 flume如何搜集日志? 我们把flume比作情报人员 (1)搜集信息 (2)获取记忆信息 (3)传递报告间谍信息 flume是怎么完成上面三件事情的,三个组件: source: ...

JAVA丶学习
2018/04/14
0
0
flume 总结--flume入门介绍

flume介绍 flume被设计为一个灵活的分布式系统,可以很容易的扩展,而且是高度可定制化的,一个配置正确的Flume Agent和由互相连接的Agent创建的Agent管道,保证不会丢失数据,提供持久的cha...

u013362353
2018/05/28
0
0
Flume NG 学习笔记(一)简介

一、简介 Flume是一个分布式、可靠、高可用的海量日志聚合系统,支持在系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据的简单处理,并写到各种数据接收方的能力。 Flume在0...

jackwxh
2018/06/29
0
0
Flume---大数据协作框架

flume是什么 Apache Flume是一个分布式的、可靠的、易用的系统,可以有效地将来自很多不同源系统的大量日志数据收集、汇总或者转移到一个数据中心存储。 Apache Flume的作用不仅限于日志汇总...

简心
2018/05/06
135
0

没有更多内容

加载失败,请刷新页面

加载更多

java通过ServerSocket与Socket实现通信

首先说一下ServerSocket与Socket. 1.ServerSocket ServerSocket是用来监听客户端Socket连接的类,如果没有连接会一直处于等待状态. ServetSocket有三个构造方法: (1) ServerSocket(int port);...

Blueeeeeee
今天
6
0
用 Sphinx 搭建博客时,如何自定义插件?

之前有不少同学看过我的个人博客(http://python-online.cn),也根据我写的教程完成了自己个人站点的搭建。 点此:使用 Python 30分钟 教你快速搭建一个博客 为防有的同学不清楚 Sphinx ,这...

王炳明
昨天
5
0
黑客之道-40本书籍助你快速入门黑客技术免费下载

场景 黑客是一个中文词语,皆源自英文hacker,随着灰鸽子的出现,灰鸽子成为了很多假借黑客名义控制他人电脑的黑客技术,于是出现了“骇客”与"黑客"分家。2012年电影频道节目中心出品的电影...

badaoliumang
昨天
14
0
很遗憾,没有一篇文章能讲清楚线程的生命周期!

(手机横屏看源码更方便) 注:java源码分析部分如无特殊说明均基于 java8 版本。 简介 大家都知道线程是有生命周期,但是彤哥可以认真负责地告诉你网上几乎没有一篇文章讲得是完全正确的。 ...

彤哥读源码
昨天
15
0
jquery--DOM操作基础

本文转载于:专业的前端网站➭jquery--DOM操作基础 元素的访问 元素属性操作 获取:attr(name);$("#my").attr("src"); 设置:attr(name,value);$("#myImg").attr("src","images/1.jpg"); ......

前端老手
昨天
7
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部