文档章节

HDFS的客户端编写-重构版

强子大叔的码田
 强子大叔的码田
发布于 2015/11/13 14:14
字数 721
阅读 57
收藏 3

3 月,跳不动了?>>>

public class MyHDFSSupport {

 

private static final Log logger = LogFactory.getLog(MyHDFSSupport.class);

private static ThreadLocal<MyInformation> threadLocalInformations = new ThreadLocal<MyInformation>() {

public MyInformation initialValue() {

return new MyInformation();

}

};

 

public static void sendFlowEvent(String project, String eventthrows URISyntaxException {

 

MyInformation information = threadLocalInformations.get();

synchronized (information) {// 所有的逻辑都在这里

information.checkHourEqual();// 1 跨小时的文件需要关闭

information.ensureOpen(project); // 2 开启

information.write(event); // 3 写文件

information.closeForFileSize();// 4 跨1G的文件需要关闭

}

 

}

}

 

=============

public class MyInformation {

 

private static final Log logger = LogFactory.getLog(MyInformation.class);

private static Configuration conf = null;

 

static {

conf = new Configuration();

conf.set("fs.hdfs.impl""org.apache.hadoop.hdfs.DistributedFileSystem");

}

 

private static final ThreadLocal<SimpleDateFormat> HOURFORMAT = new ThreadLocal<SimpleDateFormat>() {

protected SimpleDateFormat initialValue() {

return new SimpleDateFormat("/yyyy/MM/dd/HH/");

}

};

 

private String hdfsMasterIP;

private String hdfsBackIP;

 

private FSDataOutputStream hdfsStream;

private String streamCreateHour;// 记录上面文件流写文件的小时

private long hdfsFileSize;

 

private long lastWriteTime;

 

public MyInformation() {

hdfsMasterIP = MonitorRpcHdfsConfig.getInstance().getHdfsIp();

hdfsBackIP = MonitorRpcHdfsConfig.getInstance().getHdfsIp1();

hdfsStream = null;

streamCreateHour = null;

hdfsFileSize = 0;

lastWriteTime = 0;

// 初始化时增加进入超时监控队列

MonitorQueue.addObject(this);

}

 

public void checkHourEqual() {

if (null == this.hdfsStream || null == this.streamCreateHour)

return;// 在合理的前提下

 

boolean hourEqual = HOURFORMAT.get().format(new Date()).equals(this.streamCreateHour);

if (!hourEqual) {// 不是同一个小时就关闭流

this.close(false);

}

}

 

public void ensureOpen(String projectName) {

// 在合理的前提下

if (null != this.hdfsStream)

return;

 

// 尝试打开

MonitorRpcHdfsConfig config = MonitorRpcHdfsConfig.getInstance();

final String dateStr = HOURFORMAT.get().format(new Date());

String path = "/BDM/" + projectName + dateStr + UUID.randomUUID().toString();

FileSystem hdfsFileSystem = null;

FSDataOutputStream fsStream = null;

try {

URI masterUri = new URI(config.getHdfsScheme() + this.hdfsMasterIP + ":" + config.getHdfsPort());

hdfsFileSystem = FileSystem.get(masterUriconf);

fsStream = hdfsFileSystem.create(new Path(path), true);

catch (Exception e) {

logger.error("通过master ip 获取 file system 失败"e);

try {

URI backupUri = new URI(config.getHdfsScheme() + this.hdfsBackIP + ":" + config.getHdfsPort());

hdfsFileSystem = FileSystem.get(backupUriconf);

fsStream = hdfsFileSystem.create(new Path(path), true);

this.exchangeIP();

catch (Exception e1) {

logger.error("通过slave ip 获取 file system 失败" + e1.toString());

}

}

// 保留结果

if (null != fsStream) {

this.hdfsStream = fsStream;

this.streamCreateHour = dateStr;

this.hdfsFileSize = 0;

this.lastWriteTime = System.currentTimeMillis();

}

 

}

 

public void write(String event) {

if (null == this.hdfsStream)//在合理的条件下

return;

try {//写文件

this.hdfsStream.writeBytes(event);

this.hdfsFileSize += event.length();

this.lastWriteTime = System.currentTimeMillis();

catch (IOException e) {

this.close(true);

}

 

}

 

public void closeForFileSize() {

if (null == this.hdfsStream) {//在必要的前提下

return;

}

// 必要时关闭文件,更新内容大小

if (this.hdfsFileSize >= MonitorRpcHdfsConfig.getInstance().getHdfsFileMax()) {

this.close(false);

}

}

 

public void closeForWriteIdleTime() {

if (null == this.hdfsStream) {

return;

}

MonitorRpcHdfsConfig config = MonitorRpcHdfsConfig.getInstance();

if (System.currentTimeMillis() - this.lastWriteTime >= config.getHdfsFileOperationTimeSpan()) {

this.close(false);

}

}

 

private void close(boolean exception) {

// 重新设置stream

logger.debug("MyInformation.close() is invoked..." + (exception ? "写异常发生" : ""));

if (null != hdfsStream) {

try {

hdfsStream.close();

catch (Exception e) {

logger.error(""e);

}

}

// 全部复原

hdfsStream = null;

streamCreateHour = null;

hdfsFileSize = (long) 0;

lastWriteTime = 0;

}

 

public void exchangeIP() {

String oldMaster = this.hdfsMasterIP;

String oldBackUp = this.hdfsBackIP;

this.hdfsMasterIP = oldBackUp;

this.hdfsBackIP = oldMaster;

}

 

}

 

===

 

public class MonitorQueue {

// poll: 若队列为空,返回null。

// remove:若队列为空,抛出NoSuchElementException异常。

// take:若队列为空,发生阻塞,等待有元素。

 

// put---无空间会等待

// add--- 满时,立即返回,会抛出异常

// offer---满时,立即返回,不抛异常

// private static final Logger logger =

// LoggerFactory.getLogger(MonitorQueue.class);

public static BlockingQueue<MyInformation> objectQueue = new LinkedBlockingQueue<MyInformation>();

 

public static void addObject(MyInformation obj) {

objectQueue.offer(obj);

}

 

public static MyInformation getObject() {

return objectQueue.poll();

}

 

static {

// 一旦有对象加入时,只启动一次检测线程

Runnable r = new MonitorRunnable();

new Thread(r).start();

}

 

}

===

 

public class MonitorRunnable implements Runnable {

 

private ArrayList<MyInformation> informations = new ArrayList<MyInformation>();

 

public MonitorRunnable() {

}

 

@Override

public void run() {

MonitorRpcHdfsConfig config = MonitorRpcHdfsConfig.getInstance();

while (true) {

// 然后再处理MonitorQueue里的对象

MyInformation myInfor = null;

while ((myInfor = MonitorQueue.getObject()) != null) {

// 添加到本地

informations.add(myInfor);

}

// 先遍历每一个ArrayList

for (MyInformation information : informations) {

// 尝试获取锁,获取锁后,写句柄是无法写入的

synchronized (information) {

information.closeForWriteIdleTime();

}

}

// 睡眠一段时间,防止过度占用写句柄

try {

Thread.sleep(config.getHdfsCheckFsstreamPeriod());

catch (Exception e) {

e.printStackTrace();

}

}

 

}

}

© 著作权归作者所有

强子大叔的码田

强子大叔的码田

粉丝 923
博文 1612
码字总数 1280768
作品 9
南京
架构师
私信 提问
加载中

评论(0)

重构(Ruby版)迷你书免费下载

《重构(Ruby版)》前言 差不多十年前,我(Martin)曾经和Kent Beck一起做过一个项目。这个项目的名字叫C3,它后来成为极限编程诞生的标志性项目,并帮助我们看清了敏捷软件运动的方向。 我们...

hzbook2010
2010/05/21
1.2K
1
使用 React + Koa 从零开始一步一步的带你开发一个 36kr SSR 案例(二)

前言 本来在上周就想写下这篇文章,但是在学习的过程中,越来越觉得之前的很多思路需要修改,所以就下定决心,等我重构完这个项目之后再写第二篇教程。 先上代码仓库github 看过我第一篇文章...

zwmmm
2019/04/23
0
0
Hadoop编写调试MapReduce程序详解

编程学习,最好的方法还是自己动手,所以这里简单介绍在Hadoop上编写调试一个MapReduce程序。 先说一下我的开发环境,我的操作系统是Centos6.0,Hadoop版本是0.20.2,开发环境是eclipse。在H...

miaosu
2013/03/20
1.2W
3
开源图书--《全栈增长工程师指南》

依据在《Repractise简介篇:Web开发的七天里》中所说的 Web 开发的七个步骤而展开的电子书。当然它也是一个 APP,它一本关于如何成为全栈增长工程师的指南。 简介 我们都会学习,但是有时候我...

Phodal
2016/04/15
6.4K
1
Hadoop 2.0中单点故障解决方案总结

项目构建 Hadoop 1.0内核主要由两个分支组成:MapReduce和HDFS,众所周知,这两个系统的设计缺陷是单点故障,即MR的JobTracker和HDFS的NameNode两个核心服务均存在单点问题,该问题在很长时间...

jackwxh
2018/06/29
0
0

没有更多内容

加载失败,请刷新页面

加载更多

Pandas选项和自定义

Pandas提供API来自定义其行为的某些方面,大多使用来显示。 API由五个相关函数组成。它们分别是 - get_option() set_option() reset_option() describe_option() option_context() get_opti...

沙门行道
14分钟前
9
0
leetcode150(逆波兰运算)--C语言实现

求: 根据逆波兰表示法,求表达式的值。 有效的运算符包括 +, -, *, / 。每个运算对象可以是整数,也可以是另一个逆波兰表达式。 说明: 整数除法只保留整数部分。 给定逆波兰表达式总是有效...

拓拔北海
17分钟前
9
0
masm的IDE环境

masm有自带IDE环境:qeditor 注意:在include/includelib时要写完整路径 .386 .model flat,stdcall optioncasemap:none;includeinclude<E:\masm32/include/windows.inc>inc......

_Heisenberg_
27分钟前
17
0
使用 Redis 用户登录,整合JWT

1、使用 Redis 用户登录分析 2、使用工具类生成验证码 将随机生成的验证码存放到 redis 使用 for循环随机生成,使用StringBuilder保存4个字符串 使用 HttpServletResponse 将图片响应到浏览器...

庭前云落
27分钟前
15
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部