文档章节

hadoop HDFS 操作

sky_hebiao
 sky_hebiao
发布于 2016/12/07 10:20
字数 1076
阅读 5
收藏 0
package com.clpc.core.util;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URI;
import java.text.SimpleDateFormat;
import java.util.Date;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;

/**
 * 对hdfs文件进行操作
 * @author hebiao
 * @date   2016-11-11
 *
 */
public class ControlHDFS {
	final static String hdfsUrl = SystemConfigHelper.getValue("hdfsAddr");
	
	public static void main(String[] args) {
		SimpleDateFormat formatterYear = new SimpleDateFormat("yyyy");
		SimpleDateFormat formatterMonth = new SimpleDateFormat("MM");
		//获取hdfs文件保存到本地指定目录
		SimpleDateFormat dfFile = new SimpleDateFormat("yyyyMMddHHmmss");
		String formatYear = formatterYear.format(new Date());
		String formatMonth = formatterMonth.format(new Date());
		String dataFile = formatYear+"/"+formatMonth;
		//hdfs 文件生成路径
		String hdfsPath = hdfsUrl+"/callcenter/"+dataFile+"/";
		//6位顺序号  conttractNumber+orderNum 呼入人工接通率
		String orderNum = "000002";
		//交换文件编号 电话中心:10
		String conttractNumber = "10";
		String createFileDate = dfFile.format(new Date());
		String fileNameCTL = conttractNumber+orderNum+"#"+createFileDate+".CTL";
		String fileNameSND = conttractNumber+orderNum+"#"+createFileDate+".DAT";
		//生成文件路径 SND: 发送文件 最终生成的文件地址
		String sourPathSND = "D:/callcenter/"+conttractNumber+"/SND"+"/"+orderNum;
		String sourPathNameCTL = sourPathSND+"/"+fileNameCTL;
		String sourPathNameSND = sourPathSND+"/"+fileNameSND;
		//生成文件路径 INT: 接口文件 临时文件目录
	    String sourPathINT = "D:/callcenter/"+conttractNumber+"/INT/"+orderNum; 
		String fileNameINT = "INTERFACE#"+conttractNumber+orderNum+"#"+createFileDate+".DEF";
		String sourPathNameINT = sourPathINT+"/"+fileNameINT;
//		if(new ControlHDFS().mkdir(hdfsPath)){
//			System.out.println("创建目录成功!");
//		}
//		System.out.println("上传目录:"+hdfsPath);
//		if(new ControlHDFS().put2HDFS(fileName, hdfsPath)){
//			System.out.println("文件上传成功!");
//		}
		
		String fileNames = new ControlHDFS().getFilePathName(hdfsPath);
		System.out.println(fileNames);
		
		if(new ControlHDFS().getFileEnFlag("/callcenter/2016/11/out201611/_SUCCESS")){
			System.out.println("找到文件");
			ControlFile controlF= new ControlFile();
			//创建linux目录
			controlF.newFile("/usr/local/document/callcenter/201611/SND/000002/");
			
			//创建文件路径
			controlF.newFile(sourPathSND);
			//将文件下载到地址地址
			if(new ControlHDFS().writeFile(hdfsUrl+"/callcenter/2016/11/out201611/part-r-00000", sourPathNameSND)){
				System.out.println("文件下载完毕!");
				System.out.println(controlF.getFilePath(sourPathSND));
				try {
					//执行加密生成CTL 和INT 
					//生成000002 CTL
					BufferedWriter visitCTL =controlF.writerFile(sourPathSND, sourPathNameCTL);
					ControlOutPutFiles controlOutPutF= new ControlOutPutFiles();
					controlOutPutF.cratteTeletePhoneServiceServiceCTL(visitCTL,35,sourPathNameSND);
					//生成000002 INT
					BufferedWriter visitINT =controlF.writerFile(sourPathINT, sourPathNameINT);
					controlOutPutF.createTeletePhoneServiceINT(visitINT);
					//关闭流
					visitCTL.close();
					visitINT.close();
				} catch (Exception e) {
					e.printStackTrace();
				}
				//上传文件到ftp服务器
				ControlFTPUtil ftpUtil= new ControlFTPUtil();
				String hostname = "9.1.23.65";
				int port = 21;
				String username = "admin";
				String password = "clpccc";
				String pathnameSND = "/callcenterMT/10/SND/000002"; 
				String originfilename = "D:/callcenter/10/SND/000002/10000002#20161114135931.DAT";
				ftpUtil.uploadFileFromProduction(hostname, port, username, password, pathnameSND, originfilename);
				originfilename = "D:/callcenter/10/SND/000002/10000002#20161114135931.CTL";
				ftpUtil.uploadFileFromProduction(hostname, port, username, password, pathnameSND, originfilename);
				pathnameSND = "/callcenterMT/10/INT/000002";
				originfilename = "D:/callcenter/10/INT/000002/INTERFACE#10000002#20161114135931.DEF";
				ftpUtil.uploadFileFromProduction(hostname, port, username, password, pathnameSND, originfilename);
			}
		}
		
	}
	
	
	/**
	 * 文件上传是否成功  成功返回true;失败返回false;
	 * @param src  本地文件路径
	 * @param dst  目标文件路径
	 * @return		
	 */
	public boolean put2HDFS(String src, String dst){
		boolean ret = true;
		Configuration conf = new Configuration();
		try {
			Path dstPath = new Path(dst) ;
			FileSystem hdfs = dstPath.getFileSystem(conf);
			hdfs.copyFromLocalFile(false, new Path(src), dstPath);
		} catch (Exception e) {
			ret = false;
			e.printStackTrace();
		}
		return ret;
	}
	
	/**
	 * 将HDFS服务器上文件下载到linux指定地址
	 * @param src	hdfs文件路径地址
	 * @param dst	linux绝对路径
	 * @return
	 */
	public boolean get2HDFS(String src, String dst){
		boolean ret = true;
		try {
			Configuration conf = new Configuration();
			Path dstPath = new Path(dst);
			FileSystem hdfs = dstPath.getFileSystem(conf);
			hdfs.copyToLocalFile(false,new Path(src), dstPath);
		} catch (Exception e) {
			ret = false;
			e.printStackTrace();
		}
		return ret;
	}
	
	
	/**
	 * 读取文件,调用fileSystem的open(path)
	 * 将读取内容通过输出流写入本地目录
	 * @param hdfsUrl  文件路径
	 */
	public boolean writeFile(String src, String dst) {
		FileSystem fileSystem = getFileSystem();
		FSDataInputStream openStream = null;
		FileOutputStream fop = null;
		boolean ret = true;
		try {
			File files = new File(dst);
			fop = new FileOutputStream(files);
			if (!files.exists()) {
			 files.createNewFile();
			}
			openStream = fileSystem.open(new Path(src));
			IOUtils.copyBytes(openStream, fop, 1024,false);
			IOUtils.closeStream(openStream);
			fop.close();
		} catch (Exception e) {
			e.printStackTrace();
			ret = false;
		}
		return ret;
	}
	
	
	
	/**
	 * 连接HDFS的FileSystem读取文件
	 * @return 
	 */
	public FileSystem getFileSystem() {
		Configuration conf = new Configuration();
		FileSystem fileSystem = null;
		try {
			URI uri = new URI(hdfsUrl);
			fileSystem = FileSystem.get(uri,conf);
		} catch (Exception e) {
			e.printStackTrace();
		}
		return fileSystem;
	}

	
	/**
	 * 读取文件,调用fileSystem的open(path)
	 * @param hdfsUrl  文件路径
	 */
	public void readFile(String hdfsUrl) {
		FileSystem fileSystem = getFileSystem();
		FSDataInputStream openStream = null;
		try {
			openStream = fileSystem.open(new Path(hdfsUrl));
			IOUtils.copyBytes(openStream, System.out, 1024,false);
			IOUtils.closeStream(openStream);
		} catch (IllegalArgumentException e) {
			e.printStackTrace();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

	
	/**
	 * 创建hdfs新的目录
	 * @param hdfsUrl
	 */
	public boolean mkdir(String hdfsUrl) {
		boolean ret = true;
		FileSystem fileSystem = getFileSystem();
		try {
			if(!fileSystem.isFile(new Path(hdfsUrl))){
				fileSystem.mkdirs(new Path(hdfsUrl));
			}else{
				System.out.println("文件已经存在不需要创建新目录");
			}
		} catch (Exception e) {
			ret = false;
			e.printStackTrace();
		}
		return ret;
	}

	
	/**
	 * 删除hdfs目录
	 * @param hdfsUrl
	 */
	public void rmdir(String hdfsUrl) {
		FileSystem fileSystem = getFileSystem();
		try {
//			fileSystem.delete();
			fileSystem.delete(new Path(hdfsUrl),true);
		} catch (IllegalArgumentException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

	
	/**
	 * 遍历指定目录
	 * @param hdfsUrl
	 */
	public void list(String hdfsUrl) {
		FileSystem fileSystem = getFileSystem();
		FileStatus[] listStatus = null;
		try {
			listStatus = fileSystem.listStatus(new Path(hdfsUrl));
			for(FileStatus fileStatus : listStatus){
//				String isDir= fileStatus.isDir() ? "目录" :"文件";
				String isFile= fileStatus.isFile() ? "文件" :"目录";
				String name = fileStatus.getPath().toString();
				System.out.println("isFile:"+isFile+", name:"+name);
			}
		} catch (IllegalArgumentException e) {
			e.printStackTrace();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
	
	/**
	 * 遍历指定目录下文件,获取文件名称
	 * @param hdfsUrl	文件目录
	 * @return			文件名称
	 */
	public String getFilePathName(String hdfsUrl){
		FileSystem fileSystem = getFileSystem();
		FileStatus[] listStatus = null;
		String re = "";
		try {
			listStatus = fileSystem.listStatus(new Path(hdfsUrl));
			for(FileStatus fileStatus : listStatus){
				if(fileStatus.isFile()){
					re = fileStatus.getPath().toString();
				}
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
		return re;
	}
	
	/**
	 * 获取指定目录下文件判断是否存在
	 * @param hdfsUrl   目录地址
	 * @return			存在返回 true; 不存在返回false;
	 */
	public boolean getFileEnFlag(String hdfsUrl){
		FileSystem fileSystem = getFileSystem();
		FileStatus[] listStatus = null;
		boolean ret = true;
		try {
			listStatus = fileSystem.listStatus(new Path(hdfsUrl));
			for(FileStatus fileStatus : listStatus){
				if(fileStatus.isFile()){
					System.out.println("找到文件:"+fileStatus.getPath().toString());
					ret = true;
				}
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
		return ret;
	}
	
	
}

 

© 著作权归作者所有

共有 人打赏支持
sky_hebiao
粉丝 0
博文 7
码字总数 4148
作品 0
大兴
后端工程师
初学hadoop之linux系统操作的hdfs的常用命令

在对linux的一般操作有 ls mikdir rmdir vi 等操作 在对hadoop的hdfs的一般操作语法为hadoop fs -ls / /表示查看Hadoop跟目录文件/ hadoop fs -lsr / /*递归查看hadoop的文件目录/ hadoop fs...

viewsonic001
2016/11/09
19
0
Hadoop2.2.0 入门教程(三)之HDFS SHELL脚本

HDFS Shell hadoop fs -help: HDFS的帮助命令 hadoop fs -ls:查看文件或目录的命令 for example: hadoop fs -ls / 查看/(根目录下得文件或目录)for example: hadoop fs -lsr / 递归查看/(根目...

残风vs逝梦
2014/07/02
0
2
hadoop(02)、使用JAVA API对HDFS进行基本操作

本文源码码云地址:https://gitee.com/MaxBill/hadoop 在上篇《hadoop(01)、windows平台下hadoop环境搭建》中,实践了在windows平台下使用搭建hadoop开发环境,同时搭建完毕在基于命令行的形...

MaxBill
2017/11/16
0
0
Hadoop-HDFS分布式环境

HDFS简单介绍 HDFS的英文全称是Hadoop Distributed FileSystem,顾名思义,就是Hadoop分布式文件系统,是根据Google的GFS的论文,由Doug Cutting使用Java开发的开源项目。HDFS本身是Hadoop项...

bengozhong
2016/02/26
14
0
HDFS 实验 (五) 图形化管理

./bin/hdfs dfsadmin -report http://192.168.209.162:50070/dfshealth.html#tab-overview 可以操作目录 http://192.168.209.162:50070/explorer.html#/ 方式一:命令行方式 Hadoop文件操作命......

pcdog
04/13
0
0

没有更多内容

加载失败,请刷新页面

加载更多

五大云原生技术

云原生(Cloud-Native)是一种文化,更是一种潮流,它是云计算的一个必然导向,是让云成为云化战略成功的基石。云计算时代,云原生技术注定将对现代化应用的建设、交付与运维产生颠覆性的影响...

问题终结者
18分钟前
3
0
Android JNI开发系列(十二) JNI局部引用、全局引用和弱全局引用

JNI 局部引用、全局引用和弱全局引用 在JNI规范中定义了三种引用:局部引用(Local Reference)、全局引用(Global Reference)、弱全局引用(Weak Global Reference)。区别如下: 局部引用...

蔡小鹏
19分钟前
2
0
Android 实现类似考试座号表效果

类似于这种效果 1,新建一个Student类,用户添加学生信息 private int icon; private String name; private int age; private String sex ; private int id; publ...

lanyu96
25分钟前
1
0
聊聊storm的CustomStreamGrouping

序 本文主要研究一下storm的CustomStreamGrouping CustomStreamGrouping storm-2.0.0/storm-client/src/jvm/org/apache/storm/grouping/CustomStreamGrouping.java public interface CustomS......

go4it
34分钟前
2
0
编程中的各种闲谈

service 是否一定要定义 interface 在学习ssh(spring, struts2, hibernate)时,老师教在 service 层要定义接口,再去实现此接口,方便解耦。 在 spring 框架中,自身定义了很多接口,并且有不...

seal_90
35分钟前
8
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部