hadoop HDFS 操作
hadoop HDFS 操作
sky_hebiao 发表于1年前
hadoop HDFS 操作
  • 发表于 1年前
  • 阅读 4
  • 收藏 0
  • 点赞 0
  • 评论 0

【腾讯云】新注册用户域名抢购1元起>>>   

package com.clpc.core.util;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URI;
import java.text.SimpleDateFormat;
import java.util.Date;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;

/**
 * 对hdfs文件进行操作
 * @author hebiao
 * @date   2016-11-11
 *
 */
public class ControlHDFS {
	final static String hdfsUrl = SystemConfigHelper.getValue("hdfsAddr");
	
	public static void main(String[] args) {
		SimpleDateFormat formatterYear = new SimpleDateFormat("yyyy");
		SimpleDateFormat formatterMonth = new SimpleDateFormat("MM");
		//获取hdfs文件保存到本地指定目录
		SimpleDateFormat dfFile = new SimpleDateFormat("yyyyMMddHHmmss");
		String formatYear = formatterYear.format(new Date());
		String formatMonth = formatterMonth.format(new Date());
		String dataFile = formatYear+"/"+formatMonth;
		//hdfs 文件生成路径
		String hdfsPath = hdfsUrl+"/callcenter/"+dataFile+"/";
		//6位顺序号  conttractNumber+orderNum 呼入人工接通率
		String orderNum = "000002";
		//交换文件编号 电话中心:10
		String conttractNumber = "10";
		String createFileDate = dfFile.format(new Date());
		String fileNameCTL = conttractNumber+orderNum+"#"+createFileDate+".CTL";
		String fileNameSND = conttractNumber+orderNum+"#"+createFileDate+".DAT";
		//生成文件路径 SND: 发送文件 最终生成的文件地址
		String sourPathSND = "D:/callcenter/"+conttractNumber+"/SND"+"/"+orderNum;
		String sourPathNameCTL = sourPathSND+"/"+fileNameCTL;
		String sourPathNameSND = sourPathSND+"/"+fileNameSND;
		//生成文件路径 INT: 接口文件 临时文件目录
	    String sourPathINT = "D:/callcenter/"+conttractNumber+"/INT/"+orderNum; 
		String fileNameINT = "INTERFACE#"+conttractNumber+orderNum+"#"+createFileDate+".DEF";
		String sourPathNameINT = sourPathINT+"/"+fileNameINT;
//		if(new ControlHDFS().mkdir(hdfsPath)){
//			System.out.println("创建目录成功!");
//		}
//		System.out.println("上传目录:"+hdfsPath);
//		if(new ControlHDFS().put2HDFS(fileName, hdfsPath)){
//			System.out.println("文件上传成功!");
//		}
		
		String fileNames = new ControlHDFS().getFilePathName(hdfsPath);
		System.out.println(fileNames);
		
		if(new ControlHDFS().getFileEnFlag("/callcenter/2016/11/out201611/_SUCCESS")){
			System.out.println("找到文件");
			ControlFile controlF= new ControlFile();
			//创建linux目录
			controlF.newFile("/usr/local/document/callcenter/201611/SND/000002/");
			
			//创建文件路径
			controlF.newFile(sourPathSND);
			//将文件下载到地址地址
			if(new ControlHDFS().writeFile(hdfsUrl+"/callcenter/2016/11/out201611/part-r-00000", sourPathNameSND)){
				System.out.println("文件下载完毕!");
				System.out.println(controlF.getFilePath(sourPathSND));
				try {
					//执行加密生成CTL 和INT 
					//生成000002 CTL
					BufferedWriter visitCTL =controlF.writerFile(sourPathSND, sourPathNameCTL);
					ControlOutPutFiles controlOutPutF= new ControlOutPutFiles();
					controlOutPutF.cratteTeletePhoneServiceServiceCTL(visitCTL,35,sourPathNameSND);
					//生成000002 INT
					BufferedWriter visitINT =controlF.writerFile(sourPathINT, sourPathNameINT);
					controlOutPutF.createTeletePhoneServiceINT(visitINT);
					//关闭流
					visitCTL.close();
					visitINT.close();
				} catch (Exception e) {
					e.printStackTrace();
				}
				//上传文件到ftp服务器
				ControlFTPUtil ftpUtil= new ControlFTPUtil();
				String hostname = "9.1.23.65";
				int port = 21;
				String username = "admin";
				String password = "clpccc";
				String pathnameSND = "/callcenterMT/10/SND/000002"; 
				String originfilename = "D:/callcenter/10/SND/000002/10000002#20161114135931.DAT";
				ftpUtil.uploadFileFromProduction(hostname, port, username, password, pathnameSND, originfilename);
				originfilename = "D:/callcenter/10/SND/000002/10000002#20161114135931.CTL";
				ftpUtil.uploadFileFromProduction(hostname, port, username, password, pathnameSND, originfilename);
				pathnameSND = "/callcenterMT/10/INT/000002";
				originfilename = "D:/callcenter/10/INT/000002/INTERFACE#10000002#20161114135931.DEF";
				ftpUtil.uploadFileFromProduction(hostname, port, username, password, pathnameSND, originfilename);
			}
		}
		
	}
	
	
	/**
	 * 文件上传是否成功  成功返回true;失败返回false;
	 * @param src  本地文件路径
	 * @param dst  目标文件路径
	 * @return		
	 */
	public boolean put2HDFS(String src, String dst){
		boolean ret = true;
		Configuration conf = new Configuration();
		try {
			Path dstPath = new Path(dst) ;
			FileSystem hdfs = dstPath.getFileSystem(conf);
			hdfs.copyFromLocalFile(false, new Path(src), dstPath);
		} catch (Exception e) {
			ret = false;
			e.printStackTrace();
		}
		return ret;
	}
	
	/**
	 * 将HDFS服务器上文件下载到linux指定地址
	 * @param src	hdfs文件路径地址
	 * @param dst	linux绝对路径
	 * @return
	 */
	public boolean get2HDFS(String src, String dst){
		boolean ret = true;
		try {
			Configuration conf = new Configuration();
			Path dstPath = new Path(dst);
			FileSystem hdfs = dstPath.getFileSystem(conf);
			hdfs.copyToLocalFile(false,new Path(src), dstPath);
		} catch (Exception e) {
			ret = false;
			e.printStackTrace();
		}
		return ret;
	}
	
	
	/**
	 * 读取文件,调用fileSystem的open(path)
	 * 将读取内容通过输出流写入本地目录
	 * @param hdfsUrl  文件路径
	 */
	public boolean writeFile(String src, String dst) {
		FileSystem fileSystem = getFileSystem();
		FSDataInputStream openStream = null;
		FileOutputStream fop = null;
		boolean ret = true;
		try {
			File files = new File(dst);
			fop = new FileOutputStream(files);
			if (!files.exists()) {
			 files.createNewFile();
			}
			openStream = fileSystem.open(new Path(src));
			IOUtils.copyBytes(openStream, fop, 1024,false);
			IOUtils.closeStream(openStream);
			fop.close();
		} catch (Exception e) {
			e.printStackTrace();
			ret = false;
		}
		return ret;
	}
	
	
	
	/**
	 * 连接HDFS的FileSystem读取文件
	 * @return 
	 */
	public FileSystem getFileSystem() {
		Configuration conf = new Configuration();
		FileSystem fileSystem = null;
		try {
			URI uri = new URI(hdfsUrl);
			fileSystem = FileSystem.get(uri,conf);
		} catch (Exception e) {
			e.printStackTrace();
		}
		return fileSystem;
	}

	
	/**
	 * 读取文件,调用fileSystem的open(path)
	 * @param hdfsUrl  文件路径
	 */
	public void readFile(String hdfsUrl) {
		FileSystem fileSystem = getFileSystem();
		FSDataInputStream openStream = null;
		try {
			openStream = fileSystem.open(new Path(hdfsUrl));
			IOUtils.copyBytes(openStream, System.out, 1024,false);
			IOUtils.closeStream(openStream);
		} catch (IllegalArgumentException e) {
			e.printStackTrace();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

	
	/**
	 * 创建hdfs新的目录
	 * @param hdfsUrl
	 */
	public boolean mkdir(String hdfsUrl) {
		boolean ret = true;
		FileSystem fileSystem = getFileSystem();
		try {
			if(!fileSystem.isFile(new Path(hdfsUrl))){
				fileSystem.mkdirs(new Path(hdfsUrl));
			}else{
				System.out.println("文件已经存在不需要创建新目录");
			}
		} catch (Exception e) {
			ret = false;
			e.printStackTrace();
		}
		return ret;
	}

	
	/**
	 * 删除hdfs目录
	 * @param hdfsUrl
	 */
	public void rmdir(String hdfsUrl) {
		FileSystem fileSystem = getFileSystem();
		try {
//			fileSystem.delete();
			fileSystem.delete(new Path(hdfsUrl),true);
		} catch (IllegalArgumentException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

	
	/**
	 * 遍历指定目录
	 * @param hdfsUrl
	 */
	public void list(String hdfsUrl) {
		FileSystem fileSystem = getFileSystem();
		FileStatus[] listStatus = null;
		try {
			listStatus = fileSystem.listStatus(new Path(hdfsUrl));
			for(FileStatus fileStatus : listStatus){
//				String isDir= fileStatus.isDir() ? "目录" :"文件";
				String isFile= fileStatus.isFile() ? "文件" :"目录";
				String name = fileStatus.getPath().toString();
				System.out.println("isFile:"+isFile+", name:"+name);
			}
		} catch (IllegalArgumentException e) {
			e.printStackTrace();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
	
	/**
	 * 遍历指定目录下文件,获取文件名称
	 * @param hdfsUrl	文件目录
	 * @return			文件名称
	 */
	public String getFilePathName(String hdfsUrl){
		FileSystem fileSystem = getFileSystem();
		FileStatus[] listStatus = null;
		String re = "";
		try {
			listStatus = fileSystem.listStatus(new Path(hdfsUrl));
			for(FileStatus fileStatus : listStatus){
				if(fileStatus.isFile()){
					re = fileStatus.getPath().toString();
				}
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
		return re;
	}
	
	/**
	 * 获取指定目录下文件判断是否存在
	 * @param hdfsUrl   目录地址
	 * @return			存在返回 true; 不存在返回false;
	 */
	public boolean getFileEnFlag(String hdfsUrl){
		FileSystem fileSystem = getFileSystem();
		FileStatus[] listStatus = null;
		boolean ret = true;
		try {
			listStatus = fileSystem.listStatus(new Path(hdfsUrl));
			for(FileStatus fileStatus : listStatus){
				if(fileStatus.isFile()){
					System.out.println("找到文件:"+fileStatus.getPath().toString());
					ret = true;
				}
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
		return ret;
	}
	
	
}

 

标签: hadoop HDFS 操作
  • 打赏
  • 点赞
  • 收藏
  • 分享
共有 人打赏支持
粉丝 0
博文 7
码字总数 4148
×
sky_hebiao
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
* 金额(元)
¥1 ¥5 ¥10 ¥20 其他金额
打赏人
留言
* 支付类型
微信扫码支付
打赏金额:
已支付成功
打赏金额: