Hadoop 集群模式深入定制切割与副本数以及块大小定制

原创
2017/04/23 16:50
阅读数 67

gz 文件不能切割(split),无法创建多个map函数,整体输入map函数
解压 gz 文件,对原生文件进行处理
在map中获取切割长度

split切割准则

     1.不能超过max blocksize,否则=maxblocksize

     2.可以小于minblocksize

     3.介于max 和min之间也是ok的

package com.wgy.hadoop;

import java.io.IOException;
import java.net.InetAddress;
import java.text.SimpleDateFormat;
import java.util.Date;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;

public class MaxTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
	
	private InetAddress address;
	private String host;
	private String ip ;
	private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
	private long startTime;
	//缺失
	private static final int MISSING = 9999;
	@Override
	protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
		
		//转换text类型为String类型
		String line = value.toString();
		//提取出年份
		String year = line.substring(15, 19);
		//气温变量
		int airTemperature;
		//判断符号
		if (line.charAt(87) == '+') { // parseInt doesn't like leading plus
			airTemperature = Integer.parseInt(line.substring(88, 92));
		} else {
			airTemperature = Integer.parseInt(line.substring(87, 92));
		}
		//质量
		String quality = line.substring(92, 93);
		//将有效数据写到map的上下文中,注意类型务必要和泛型声明一致
		if (airTemperature != MISSING && quality.matches("[01459]")) {
			context.write(new Text(year), new IntWritable(airTemperature));
		}
	}
	
	/**
	 * map task 执行前调用,调用一次
	 */
	@Override
	protected void setup(Mapper<LongWritable, Text, Text, IntWritable>.Context context)
			throws IOException, InterruptedException {
		//取得切割的长度
		InputSplit split = context.getInputSplit();
		long length = split.getLength();
		
		//获取主机信息
		address = InetAddress.getLocalHost();
		ip = address.getHostAddress();
		host = address.getHostName();
		
		startTime = System.currentTimeMillis();
		
		System.out.println("split Length: "+length+" \r\n主机信息:" +host + ":"+ip + ":" + sdf.format(new Date()) + ": map.setup");
	}
	
	/**
	 * map执行完成后清理过程,调用一次
	 */
	@Override
	protected void cleanup(Mapper<LongWritable, Text, Text, IntWritable>.Context context)
			throws IOException, InterruptedException {
		long duration = (System.currentTimeMillis() - startTime) / 1000;
		System.out.println("主机信息:" +host + ":"+ip + ":" + sdf.format(new Date()) + ": map.cleanup() :" + duration);
	}
	
}

 

package com.wgy.hadoop;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MaxTemperature {
	public static void main(String[] args) throws Exception {
		if (args.length != 2) {
			System.err.println("Usage: MaxTemperature <input path> <output path>");
			System.exit(-1);
		}
		/*FileSystem fs = FileSystem.get(null);
		fs.open(null);*/
		//作业,每一次 map + reduce 过程就是一次作业
		Job job = new Job();
		
		//获取配置信息
		Configuration conf = job.getConfiguration();
		
		//设置最大的切割尺寸
		conf.setLong(FileInputFormat.SPLIT_MAXSIZE, 1024*20);
		
		//设置最小的切割尺寸
		conf.setLong(FileInputFormat.SPLIT_MINSIZE, 1024*10);
		
		job.setJarByClass(MaxTemperature.class);
		//设置作业名称,便于调试
		job.setJobName("Max temperature");
		//添加输入路径,可以添加多个路径,可以是文件目录(不递归)或具体文件
		FileInputFormat.addInputPath(job, new Path(args[0]));
		//设置输出路径,只有一个,而且不能存在
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
		job.setMapperClass(MaxTemperatureMapper.class);
		job.setReducerClass(MaxTemperatureReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		
		//等待作业的完成
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}

 

ubuntu@s1:/Downloads$ hadoop jar HadoopDemo.jar /usr/wgy/1901 /usr/wgy/out1
17/04/23 01:32:37 INFO client.RMProxy: Connecting to ResourceManager at s1/192.168.10.134:8032
17/04/23 01:32:38 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
17/04/23 01:32:39 INFO input.FileInputFormat: Total input files to process : 1
17/04/23 01:32:39 INFO mapreduce.JobSubmitter: number of splits:4
17/04/23 01:32:40 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1492931751258_0002
17/04/23 01:32:40 INFO impl.YarnClientImpl: Submitted application application_1492931751258_0002
17/04/23 01:32:41 INFO mapreduce.Job: The url to track the job: http://s1:8088/proxy/application_1492931751258_0002/
17/04/23 01:32:41 INFO mapreduce.Job: Running job: job_1492931751258_0002
17/04/23 01:32:56 INFO mapreduce.Job: Job job_1492931751258_0002 running in uber mode : false
17/04/23 01:32:56 INFO mapreduce.Job:  map 0% reduce 0%
17/04/23 01:34:15 INFO mapreduce.Job:  map 25% reduce 0%
17/04/23 01:34:25 INFO mapreduce.Job:  map 50% reduce 0%
17/04/23 01:34:26 INFO mapreduce.Job:  map 75% reduce 0%
17/04/23 01:34:28 INFO mapreduce.Job:  map 100% reduce 0%
17/04/23 01:34:37 INFO mapreduce.Job:  map 100% reduce 8%
17/04/23 01:34:40 INFO mapreduce.Job:  map 100% reduce 100%
17/04/23 01:34:41 INFO mapreduce.Job: Job job_1492931751258_0002 completed successfully
17/04/23 01:34:41 INFO mapreduce.Job: Counters: 51
	File System Counters
		FILE: Number of bytes read=6485
		FILE: Number of bytes written=692133
		FILE: Number of read operations=0
		FILE: Number of large read operations=0
		FILE: Number of write operations=0
		HDFS: Number of bytes read=92364
		HDFS: Number of bytes written=9
		HDFS: Number of read operations=15
		HDFS: Number of large read operations=0
		HDFS: Number of write operations=2
	Job Counters 
		Killed map tasks=1
		Launched map tasks=5
		Launched reduce tasks=1
		Data-local map tasks=3
		Rack-local map tasks=2
		Total time spent by all maps in occupied slots (ms)=358644
		Total time spent by all reduces in occupied slots (ms)=19807
		Total time spent by all map tasks (ms)=358644
		Total time spent by all reduce tasks (ms)=19807
		Total vcore-milliseconds taken by all map tasks=358644
		Total vcore-milliseconds taken by all reduce tasks=19807
		Total megabyte-milliseconds taken by all map tasks=367251456
		Total megabyte-milliseconds taken by all reduce tasks=20282368
	Map-Reduce Framework
		Map input records=589
		Map output records=589
		Map output bytes=5301
		Map output materialized bytes=6503
		Input split bytes=348
		Combine input records=0
		Combine output records=0
		Reduce input groups=1
		Reduce shuffle bytes=6503
		Reduce input records=589
		Reduce output records=1
		Spilled Records=1178
		Shuffled Maps =4
		Failed Shuffles=0
		Merged Map outputs=4
		GC time elapsed (ms)=26740
		CPU time spent (ms)=9300
		Physical memory (bytes) snapshot=742277120
		Virtual memory (bytes) snapshot=9406091264
		Total committed heap usage (bytes)=505143296
	Shuffle Errors
		BAD_ID=0
		CONNECTION=0
		IO_ERROR=0
		WRONG_LENGTH=0
		WRONG_MAP=0
		WRONG_REDUCE=0
	File Input Format Counters 
		Bytes Read=92016
	File Output Format Counters 
		Bytes Written=9

 

展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部