文档章节

hadoop HDFS 操作

sky_hebiao
 sky_hebiao
发布于 2016/12/07 10:20
字数 1076
阅读 4
收藏 0
点赞 0
评论 0
package com.clpc.core.util;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URI;
import java.text.SimpleDateFormat;
import java.util.Date;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;

/**
 * 对hdfs文件进行操作
 * @author hebiao
 * @date   2016-11-11
 *
 */
public class ControlHDFS {
	final static String hdfsUrl = SystemConfigHelper.getValue("hdfsAddr");
	
	public static void main(String[] args) {
		SimpleDateFormat formatterYear = new SimpleDateFormat("yyyy");
		SimpleDateFormat formatterMonth = new SimpleDateFormat("MM");
		//获取hdfs文件保存到本地指定目录
		SimpleDateFormat dfFile = new SimpleDateFormat("yyyyMMddHHmmss");
		String formatYear = formatterYear.format(new Date());
		String formatMonth = formatterMonth.format(new Date());
		String dataFile = formatYear+"/"+formatMonth;
		//hdfs 文件生成路径
		String hdfsPath = hdfsUrl+"/callcenter/"+dataFile+"/";
		//6位顺序号  conttractNumber+orderNum 呼入人工接通率
		String orderNum = "000002";
		//交换文件编号 电话中心:10
		String conttractNumber = "10";
		String createFileDate = dfFile.format(new Date());
		String fileNameCTL = conttractNumber+orderNum+"#"+createFileDate+".CTL";
		String fileNameSND = conttractNumber+orderNum+"#"+createFileDate+".DAT";
		//生成文件路径 SND: 发送文件 最终生成的文件地址
		String sourPathSND = "D:/callcenter/"+conttractNumber+"/SND"+"/"+orderNum;
		String sourPathNameCTL = sourPathSND+"/"+fileNameCTL;
		String sourPathNameSND = sourPathSND+"/"+fileNameSND;
		//生成文件路径 INT: 接口文件 临时文件目录
	    String sourPathINT = "D:/callcenter/"+conttractNumber+"/INT/"+orderNum; 
		String fileNameINT = "INTERFACE#"+conttractNumber+orderNum+"#"+createFileDate+".DEF";
		String sourPathNameINT = sourPathINT+"/"+fileNameINT;
//		if(new ControlHDFS().mkdir(hdfsPath)){
//			System.out.println("创建目录成功!");
//		}
//		System.out.println("上传目录:"+hdfsPath);
//		if(new ControlHDFS().put2HDFS(fileName, hdfsPath)){
//			System.out.println("文件上传成功!");
//		}
		
		String fileNames = new ControlHDFS().getFilePathName(hdfsPath);
		System.out.println(fileNames);
		
		if(new ControlHDFS().getFileEnFlag("/callcenter/2016/11/out201611/_SUCCESS")){
			System.out.println("找到文件");
			ControlFile controlF= new ControlFile();
			//创建linux目录
			controlF.newFile("/usr/local/document/callcenter/201611/SND/000002/");
			
			//创建文件路径
			controlF.newFile(sourPathSND);
			//将文件下载到地址地址
			if(new ControlHDFS().writeFile(hdfsUrl+"/callcenter/2016/11/out201611/part-r-00000", sourPathNameSND)){
				System.out.println("文件下载完毕!");
				System.out.println(controlF.getFilePath(sourPathSND));
				try {
					//执行加密生成CTL 和INT 
					//生成000002 CTL
					BufferedWriter visitCTL =controlF.writerFile(sourPathSND, sourPathNameCTL);
					ControlOutPutFiles controlOutPutF= new ControlOutPutFiles();
					controlOutPutF.cratteTeletePhoneServiceServiceCTL(visitCTL,35,sourPathNameSND);
					//生成000002 INT
					BufferedWriter visitINT =controlF.writerFile(sourPathINT, sourPathNameINT);
					controlOutPutF.createTeletePhoneServiceINT(visitINT);
					//关闭流
					visitCTL.close();
					visitINT.close();
				} catch (Exception e) {
					e.printStackTrace();
				}
				//上传文件到ftp服务器
				ControlFTPUtil ftpUtil= new ControlFTPUtil();
				String hostname = "9.1.23.65";
				int port = 21;
				String username = "admin";
				String password = "clpccc";
				String pathnameSND = "/callcenterMT/10/SND/000002"; 
				String originfilename = "D:/callcenter/10/SND/000002/10000002#20161114135931.DAT";
				ftpUtil.uploadFileFromProduction(hostname, port, username, password, pathnameSND, originfilename);
				originfilename = "D:/callcenter/10/SND/000002/10000002#20161114135931.CTL";
				ftpUtil.uploadFileFromProduction(hostname, port, username, password, pathnameSND, originfilename);
				pathnameSND = "/callcenterMT/10/INT/000002";
				originfilename = "D:/callcenter/10/INT/000002/INTERFACE#10000002#20161114135931.DEF";
				ftpUtil.uploadFileFromProduction(hostname, port, username, password, pathnameSND, originfilename);
			}
		}
		
	}
	
	
	/**
	 * 文件上传是否成功  成功返回true;失败返回false;
	 * @param src  本地文件路径
	 * @param dst  目标文件路径
	 * @return		
	 */
	public boolean put2HDFS(String src, String dst){
		boolean ret = true;
		Configuration conf = new Configuration();
		try {
			Path dstPath = new Path(dst) ;
			FileSystem hdfs = dstPath.getFileSystem(conf);
			hdfs.copyFromLocalFile(false, new Path(src), dstPath);
		} catch (Exception e) {
			ret = false;
			e.printStackTrace();
		}
		return ret;
	}
	
	/**
	 * 将HDFS服务器上文件下载到linux指定地址
	 * @param src	hdfs文件路径地址
	 * @param dst	linux绝对路径
	 * @return
	 */
	public boolean get2HDFS(String src, String dst){
		boolean ret = true;
		try {
			Configuration conf = new Configuration();
			Path dstPath = new Path(dst);
			FileSystem hdfs = dstPath.getFileSystem(conf);
			hdfs.copyToLocalFile(false,new Path(src), dstPath);
		} catch (Exception e) {
			ret = false;
			e.printStackTrace();
		}
		return ret;
	}
	
	
	/**
	 * 读取文件,调用fileSystem的open(path)
	 * 将读取内容通过输出流写入本地目录
	 * @param hdfsUrl  文件路径
	 */
	public boolean writeFile(String src, String dst) {
		FileSystem fileSystem = getFileSystem();
		FSDataInputStream openStream = null;
		FileOutputStream fop = null;
		boolean ret = true;
		try {
			File files = new File(dst);
			fop = new FileOutputStream(files);
			if (!files.exists()) {
			 files.createNewFile();
			}
			openStream = fileSystem.open(new Path(src));
			IOUtils.copyBytes(openStream, fop, 1024,false);
			IOUtils.closeStream(openStream);
			fop.close();
		} catch (Exception e) {
			e.printStackTrace();
			ret = false;
		}
		return ret;
	}
	
	
	
	/**
	 * 连接HDFS的FileSystem读取文件
	 * @return 
	 */
	public FileSystem getFileSystem() {
		Configuration conf = new Configuration();
		FileSystem fileSystem = null;
		try {
			URI uri = new URI(hdfsUrl);
			fileSystem = FileSystem.get(uri,conf);
		} catch (Exception e) {
			e.printStackTrace();
		}
		return fileSystem;
	}

	
	/**
	 * 读取文件,调用fileSystem的open(path)
	 * @param hdfsUrl  文件路径
	 */
	public void readFile(String hdfsUrl) {
		FileSystem fileSystem = getFileSystem();
		FSDataInputStream openStream = null;
		try {
			openStream = fileSystem.open(new Path(hdfsUrl));
			IOUtils.copyBytes(openStream, System.out, 1024,false);
			IOUtils.closeStream(openStream);
		} catch (IllegalArgumentException e) {
			e.printStackTrace();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

	
	/**
	 * 创建hdfs新的目录
	 * @param hdfsUrl
	 */
	public boolean mkdir(String hdfsUrl) {
		boolean ret = true;
		FileSystem fileSystem = getFileSystem();
		try {
			if(!fileSystem.isFile(new Path(hdfsUrl))){
				fileSystem.mkdirs(new Path(hdfsUrl));
			}else{
				System.out.println("文件已经存在不需要创建新目录");
			}
		} catch (Exception e) {
			ret = false;
			e.printStackTrace();
		}
		return ret;
	}

	
	/**
	 * 删除hdfs目录
	 * @param hdfsUrl
	 */
	public void rmdir(String hdfsUrl) {
		FileSystem fileSystem = getFileSystem();
		try {
//			fileSystem.delete();
			fileSystem.delete(new Path(hdfsUrl),true);
		} catch (IllegalArgumentException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

	
	/**
	 * 遍历指定目录
	 * @param hdfsUrl
	 */
	public void list(String hdfsUrl) {
		FileSystem fileSystem = getFileSystem();
		FileStatus[] listStatus = null;
		try {
			listStatus = fileSystem.listStatus(new Path(hdfsUrl));
			for(FileStatus fileStatus : listStatus){
//				String isDir= fileStatus.isDir() ? "目录" :"文件";
				String isFile= fileStatus.isFile() ? "文件" :"目录";
				String name = fileStatus.getPath().toString();
				System.out.println("isFile:"+isFile+", name:"+name);
			}
		} catch (IllegalArgumentException e) {
			e.printStackTrace();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
	
	/**
	 * 遍历指定目录下文件,获取文件名称
	 * @param hdfsUrl	文件目录
	 * @return			文件名称
	 */
	public String getFilePathName(String hdfsUrl){
		FileSystem fileSystem = getFileSystem();
		FileStatus[] listStatus = null;
		String re = "";
		try {
			listStatus = fileSystem.listStatus(new Path(hdfsUrl));
			for(FileStatus fileStatus : listStatus){
				if(fileStatus.isFile()){
					re = fileStatus.getPath().toString();
				}
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
		return re;
	}
	
	/**
	 * 获取指定目录下文件判断是否存在
	 * @param hdfsUrl   目录地址
	 * @return			存在返回 true; 不存在返回false;
	 */
	public boolean getFileEnFlag(String hdfsUrl){
		FileSystem fileSystem = getFileSystem();
		FileStatus[] listStatus = null;
		boolean ret = true;
		try {
			listStatus = fileSystem.listStatus(new Path(hdfsUrl));
			for(FileStatus fileStatus : listStatus){
				if(fileStatus.isFile()){
					System.out.println("找到文件:"+fileStatus.getPath().toString());
					ret = true;
				}
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
		return ret;
	}
	
	
}

 

© 著作权归作者所有

共有 人打赏支持
sky_hebiao
粉丝 0
博文 7
码字总数 4148
作品 0
大兴
后端工程师
初学hadoop之linux系统操作的hdfs的常用命令

在对linux的一般操作有 ls mikdir rmdir vi 等操作 在对hadoop的hdfs的一般操作语法为hadoop fs -ls / /表示查看Hadoop跟目录文件/ hadoop fs -lsr / /*递归查看hadoop的文件目录/ hadoop fs...

viewsonic001
2016/11/09
19
0
Hadoop2.2.0 入门教程(三)之HDFS SHELL脚本

HDFS Shell hadoop fs -help: HDFS的帮助命令 hadoop fs -ls:查看文件或目录的命令 for example: hadoop fs -ls / 查看/(根目录下得文件或目录)for example: hadoop fs -lsr / 递归查看/(根目...

残风vs逝梦
2014/07/02
0
2
hadoop(02)、使用JAVA API对HDFS进行基本操作

本文源码码云地址:https://gitee.com/MaxBill/hadoop 在上篇《hadoop(01)、windows平台下hadoop环境搭建》中,实践了在windows平台下使用搭建hadoop开发环境,同时搭建完毕在基于命令行的形...

MaxBill
2017/11/16
0
0
Hadoop-HDFS分布式环境

HDFS简单介绍 HDFS的英文全称是Hadoop Distributed FileSystem,顾名思义,就是Hadoop分布式文件系统,是根据Google的GFS的论文,由Doug Cutting使用Java开发的开源项目。HDFS本身是Hadoop项...

bengozhong
2016/02/26
14
0
hadoop的体系结构 hadoop hdfs 命令

hadoop的体系结构 NameNode - 主节点主服务器 SecondaryNameNode– 是辅助nameNode DataNode -数据保存用的 TaskTracker – 接收任务 JobTracker - 分数据 -100M Datanode1,DataNode2,DataNo...

八戒_o
2015/12/03
218
0
HDFS常用文件操作命令及注意事项

HDFS 文件操作命令 HDFS 文件系统提供了相当多的shell 操作命令,大大方便了程序员和系统管理人员查看、修改HDFS 上的文件。进一步,HDFS 的操作命令和Unix/Linux 的命令名称和格式相当一致,...

2846613430
2016/04/01
90
0
hadoop(01)、windows平台下hadoop环境搭建

hadoop是运行在linux系统下的一个由Apache基金会所开发的分布式系统基础架构。用户可以在不了解分布式底层细节的情况下,开发分布式程序,充分利用集群的威力进行高速运算和存储。今天我们要...

MaxBill
2017/11/13
0
0
HDFS 实验 (五) 图形化管理

./bin/hdfs dfsadmin -report http://192.168.209.162:50070/dfshealth.html#tab-overview 可以操作目录 http://192.168.209.162:50070/explorer.html#/ 方式一:命令行方式 Hadoop文件操作命......

pcdog
04/13
0
0
hue(02)、Hue集成Hadoop集群(HDFS和YARN)

在上文 hue(01)、Hue4.1的编译安装启动 中,我们完整的进行了Hue的源码下载编译安装,Hue的web控制台与Hdfs、Hive、Hbase等集成才能展现它的魅力。本文我们在Hue中集成hadoop的hdfs和yarn服务,...

MaxBill
01/25
0
0
Hadoop namenode无法启动问题解决

原文:http://www.cnblogs.com/unflynaomi/p/4476870.html 原因:在root账户(非hadoop账户)下操作hadoop会导致很大的问题。 首先运行bin/start-all.sh发现namenode没有启动 只有它们 9428 D...

夏春涛
2017/10/04
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

【面试题】盲人坐飞机

有100位乘客乘坐飞机,其中有一位是盲人,每位乘客都按自己的座位号就坐。由于盲人看不见自己的座位号,所以他可能会坐错位置,而自己的座位被占的乘客会随便找个座位就坐。问所有乘客都坐对...

garkey
今天
0
0
谈谈神秘的ES6——(二)ES6的变量

谈谈神秘的ES6——(二)ES6的变量 我们在《零基础入门JavaScript》的时候就说过,在ES5里,变量是有弊端的,我们先来回顾一下。 首先,在ES5中,我们所有的变量都是通过关键字var来定义的。...

JandenMa
今天
1
0
arts-week1

Algorithm 594. Longest Harmonious Subsequence - LeetCode 274. H-Index - LeetCode 219. Contains Duplicate II - LeetCode 217. Contains Duplicate - LeetCode 438. Find All Anagrams ......

yysue
今天
0
0
NNS拍卖合约

前言 关于NNS的介绍,这里就不多做描述,相关的信息可以查看NNS的白皮书http://doc.neons.name/zh_CN/latest/nns_background.html。 首先nns中使用的竞价货币是sgas,关于sgas介绍可以戳htt...

红烧飞鱼
今天
1
0
Java IO类库之管道流PipeInputStream与PipeOutputStream

一、java管道流介绍 在java多线程通信中管道通信是一种重要的通信方式,在java中我们通过配套使用管道输出流PipedOutputStream和管道输入流PipedInputStream完成线程间通信。多线程管道通信的...

老韭菜
今天
0
0
用Python绘制红楼梦词云图,竟然发现了这个!

Python在数据分析中越来越受欢迎,已经达到了统计学家对R的喜爱程度,Python的拥护者们当然不会落后于R,开发了一个个好玩的数据分析工具,下面我们来看看如何使用Python,来读红楼梦,绘制小...

猫咪编程
今天
1
0
Java中 发出请求获取别人的数据(阿里云 查询IP归属地)

1.效果 调用阿里云的接口 去定位IP地址 2. 代码 /** * 1. Java中远程调用方法 * http://localhost:8080/mavenssm20180519/invokingUrl.action * @Title: invokingUrl * @Description: * @ret......

Lucky_Me
今天
1
0
protobuf学习笔记

相关文档 Protocol buffers(protobuf)入门简介及性能分析 Protobuf学习 - 入门

OSC_fly
昨天
0
0
Mybaties入门介绍

Mybaties和Hibernate是我们在Java开发中应用的比较多的两个ORM框架。当然,目前Mybaties正在慢慢取代Hibernate,这是因为相比较Hibernate而言Mybaties性能更好,响应更快,更加灵活。我们在开...

王子城
昨天
2
0
编程学习笔记之python深入之装饰器案例及说明文档[图]

编程学习笔记之python深入之装饰器案例及说明文档[图] 装饰器即在不对一个函数体进行任何修改,以及不改变整体的原本意思的情况下,增加函数功能的新函数,因为这个新函数对旧函数进行了装饰...

原创小博客
昨天
1
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部