文档章节

Hadoop 2.5.1学习笔记8: 完整的程序模板

强子大叔的码田
 强子大叔的码田
发布于 2014/11/19 13:14
字数 1552
阅读 66
收藏 0

1 从Mongo中读取数据进入HDFS

2 分发HDFS作为背景数据

3 一个MR计算,输出为HDFS文件

4 将3的HDFS文件作为输入,通过HTTP写到远程数据库。

--------------------------------------------------------------------------

package com.dew.task;

import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.Tool;

import com.mongodb.BasicDBObject;
import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.DBCursor;
import com.mongodb.MongoClient;
import com.mongodb.ServerAddress;

public class PullMongoDB extends Configured implements Tool {

	@Override
	public int run(String[] args) throws Exception {
		if (null == args || args.length < 4) {
			return 0;
		}
		List list = new ArrayList();
		String[] array = args[0].split(":");
		list.add(new ServerAddress(array[0], Integer.parseInt(array[1])));
		MongoClient mongoClient = new MongoClient(list);
		DB database = mongoClient.getDB("" + array[2]);
		DBCollection collection = database.getCollection("" + array[3]);

		//
		BasicDBObject query = new BasicDBObject();
		query.put("pkg", new BasicDBObject("$exists", true));
		query.put("tags", new BasicDBObject("$exists", true));
		BasicDBObject fields = new BasicDBObject();
		fields.put("pkg", 1);
		fields.put("tags", 1);

		//write hdfs
		Configuration conf = new Configuration();
		FileSystem hdfs = FileSystem.get(conf);
		FSDataOutputStream outHandler = hdfs.create(new Path("" + args[1]));
		
		//write
		DBCursor cursor = collection.find(query, fields);
		while (cursor.hasNext()) {
			BasicDBObject record = (BasicDBObject) cursor.next();
			String pkg = record.getString("pkg");
			ArrayList<String> als = (ArrayList<String>) record.get("tags");
			String tags = "";
			for (String s : als) {
				tags += " " + s.trim();
			}
			tags = tags.trim();
			String finalString = pkg + "\t" + tags + System.getProperty("line.separator");
			outHandler.write(finalString.getBytes("UTF8"));
		}
		
		//remove handle 
		outHandler.close();
		cursor.close();
		mongoClient.close();
		return 0;
	}

}

 

package com.dew.task;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Hashtable;
import java.util.Set;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import com.mongodb.BasicDBObject;

public class ComputeProfileHDFS extends Configured implements Tool {

	// map
	public static class MapClass extends Mapper<LongWritable, Text, Text, Text> {

		private Hashtable<String, String> joinData = new Hashtable<String, String>();

		private void readFile(String file) {
			BufferedReader joinReader = null;
			String line = null;
			try {
				joinReader = new BufferedReader(new FileReader(file));
				while ((line = joinReader.readLine()) != null) {
					String[] array = line.split("\t");
					if (null == array || array.length < 2)
						continue;

					String pkg = array[0];
					if (null == pkg || pkg.length() <= 0)
						continue;

					String tagStr = array[1];
					if (null == tagStr)
						continue;
					tagStr = tagStr.trim();
					if (tagStr.length() <= 0)
						continue;
					joinData.put(pkg, tagStr);

				}
			} catch (Exception e) {
				// XXX

			} finally {
				if (null != joinReader)
					try {
						joinReader.close();
					} catch (IOException e) {
						e.printStackTrace();
					}
			}

		}

		protected void setup(Context context) throws java.io.IOException,
				java.lang.InterruptedException {
			try {
				// Configuration conf = context.getConfiguration();
				URI[] cacheFiles = context.getCacheFiles();
				if (null != cacheFiles && cacheFiles.length > 0) {

					readFile(cacheFiles[0].getPath().toString());

				}
			} catch (IOException e) {
				// xxx
			}
		}

		public void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {

			// key neglected
			if (null == value)
				return;
			String content = value.toString();
			if (null == content || content.trim().length() == 0)
				return;
			// split
			String[] strArray = content.split("\t");
			if (null == strArray || strArray.length < 26)
				return;

			String cat = strArray[20].trim();
			if (null != cat && cat.trim().equals("4")) {
				// app
			} else {

				return;
			}

			String sender = strArray[11].trim();
			String receiver = strArray[13].trim();
			String pkg = strArray[25].trim();

			if (null == sender || sender.length() == 0 || null == receiver
					|| receiver.length() == 0 || null == pkg
					|| pkg.length() == 0) {
				return;
			}
			String tags = this.joinData.get(pkg);
			if (null == tags || tags.trim().length() == 0) {
				return;
			}
			// okay,output it
			context.write(new Text(sender), new Text(tags));
			context.write(new Text(receiver), new Text(tags));
			Counter c = context.getCounter("ComputerProfileHDFS",
					"MapWriteRecord");
			c.increment(2);
		}

	}

	public static class Combiner extends Reducer<Text, Text, Text, Text> {
		public void reduce(Text key, Iterable<Text> values, Context context)
				throws IOException, InterruptedException {
			String totalTags = "";
			for (Text tag : values) {
				totalTags += " " + tag;
			}
			totalTags = totalTags.trim();
			if (totalTags.length() <= 0)
				return;
			// okay

			context.write(key, new Text(totalTags));
			Counter c = context.getCounter("ComputerProfileHDFS",
					"CombineWriteRecord");
			c.increment(1);

		}

	}

	public static class Reduce extends Reducer<Text, Text, Text, Text> {
		public void reduce(Text key, Iterable<Text> values, Context context)
				throws IOException, InterruptedException {

			String totalTags = "";
			for (Text tag : values) {
				totalTags += " " + tag;
			}
			totalTags = totalTags.trim();
			if (totalTags.length() <= 0)
				return;
			// okay,let us do it now!
			context.write(new Text(key.toString()), new Text(totalTags));
			Counter c = context.getCounter("ComputerProfileHDFS",
					"ReduceWriteRecord");
			c.increment(1);
		}

	}

	private static String[] getInputPathsByMonth(int year, int month) {

		// /user/flume/transfer2*/year=*/month=*/day=*/*.tsv
		String[] path = new String[3];		
		if (1 == month) {
			path[0] = "/user/f/transfer2*/year=" + (year - 1)
					+ "/month=11/day=*/*.tsv";
			path[1] = "/user/f/transfer2*/year=" + (year - 1)
					+ "/month=12/day=*/*.tsv";
			path[2] = "/user/f/transfer2*/year=" + (year)
					+ "/month=01/day=*/*.tsv";
		} else if (2 == month) {
			path[0] = "/user/f/transfer2*/year=" + (year - 1)
					+ "/month=12/day=*/*.tsv";
			path[1] = "/user/f/transfer2*/year=" + (year)
					+ "/month=01/day=*/*.tsv";
			path[2] = "/user/f/transfer2*/year=" + (year)
					+ "/month=02/day=*/*.tsv";
		} else {
			path[0] = "/user/f/transfer2*/year=" + (year) + "/month="
					+ ((month - 2) < 10 ? "0" : "") + (month - 2)
					+ "/day=*/*.tsv";
			path[1] = "/user/f/transfer2*/year=" + (year) + "/month="
					+ ((month - 1) < 10 ? "0" : "") + (month - 1)
					+ "/day=*/*.tsv";
			path[2] = "/user/f/transfer2*/year=" + (year) + "/month="
					+ ((month - 0) < 10 ? "0" : "") + (month - 0)
					+ "/day=*/*.tsv";
		}
		return path;
	}

	private void setInputPathByMonth(Job job) throws Exception {

		// FileInputFormat.setInputPaths(job, new Path(args[2]));
		Calendar cal = Calendar.getInstance();
		int year = cal.get(Calendar.YEAR);
		int month = cal.get(Calendar.MONTH) + 1;
		String[] paths = getInputPathsByMonth(year, month);
		for (String path : paths) {
			FileInputFormat.addInputPaths(job, path);
		}

	}

	@Override
	public int run(String[] args) throws Exception {

		Configuration conf = getConf();
		Job job = new Job(conf, "ComputeProfileHDFS");

		// add distributed file
		job.addCacheFile(new Path(args[1]).toUri());

		// prepare
		setInputPathByMonth(job);
		// FileInputFormat.setInputPaths(job, new Path(args[2]));
		FileOutputFormat.setOutputPath(job, new Path(args[3]));

		job.setJobName("ComputeProfileHDFS");
		job.setJarByClass(ComputeProfileHDFS.class);
		job.setMapperClass(MapClass.class);
		job.setCombinerClass(Combiner.class);
		job.setReducerClass(Reduce.class);// job.setNumReduceTasks(0);
		job.setInputFormatClass(TextInputFormat.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		job.setOutputFormatClass(TextOutputFormat.class);

		// execute
		int exitCode = job.waitForCompletion(true) ? 0 : 1;

		return exitCode;
	}

	public static String[] args = null;

	public static void main(String[] a) throws Exception {
		// delete all temp files
		Configuration conf = new Configuration();
		FileSystem fs = FileSystem.get(conf);
		try {
			//fs.delete(new Path(args[1]), true);
		} catch (Exception e) {
		}

		try {
			//fs.delete(new Path(args[3]), true);
		} catch (Exception e) {
		}

		args = a;

		int res;
		//res = ToolRunner.run(new Configuration(), new PullMongoDB(), args);

		//res = ToolRunner.run(new Configuration(), new ComputeProfileHDFS(),
			//	args);

		res = ToolRunner.run(new Configuration(), new HttpApiClient(), args);
		// delete all temp files
		try {
			//fs.delete(new Path(args[1]), true);
		} catch (Exception e) {
		}

		try {
			//fs.delete(new Path(args[3]), true);
		} catch (Exception e) {
		}
		System.exit(res);
	}

}

 

package com.dew.task;
【实际上这个程序应该在map中读取数据,在reduce中做HTTP请求,暂未做修改,读者可自行修改!】
import java.io.IOException;
import java.util.ArrayList;
import java.util.Hashtable;
import java.util.Set;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.http.protocol.HTTP;
import org.apache.http.util.EntityUtils;

import com.dewmobile.task.ComputeProfileHDFS.Combiner;
import com.dewmobile.task.ComputeProfileHDFS.MapClass;
import com.dewmobile.task.ComputeProfileHDFS.Reduce;
import com.mongodb.BasicDBObject;

public class HttpApiClient extends Configured implements Tool {

	private static ArrayList<BasicDBObject> persons = new ArrayList<BasicDBObject>();
	private static int lock = 500;

	public static void writeRecord(String key, String[] arrays, Context context) {
		if (null == key || key.length() <= 0 || null == arrays
				|| arrays.length <= 0)
			return;
		Hashtable<String, Integer> table = new Hashtable<String, Integer>();
		for (String tag : arrays) {
			Integer number = table.get(tag);
			int count = (null == number ? 0 : number.intValue());
			count++;
			table.put(tag, count);
		}
		// single person tag
		ArrayList<BasicDBObject> tagDocument = new ArrayList<BasicDBObject>();
		Set<String> tagSet = table.keySet();
		for (String tag : tagSet) {
			BasicDBObject doc = new BasicDBObject();
			doc.put("n", tag);
			doc.put("#", table.get(tag).intValue());
			tagDocument.add(doc);
		}
		// person document
		BasicDBObject person = new BasicDBObject();
		person.put("_id", key);
		person.put("t", tagDocument);

		// add it
		persons.add(person);
		if (persons.size() >= lock) {
			submit(context);
		}

	}

	public static void submit(Context context) {
		context.getCounter("Record", "submit").increment(1);

		try {
			// submit
			String entityString = persons.toString();
			if (null == entityString || entityString.length() <= 0) {
				return;
			}
			StringEntity se = new StringEntity(entityString, HTTP.UTF_8);

			String[] args = context.getConfiguration().getStrings(HTTP_URL);
			if (null == args) {
				context.getCounter("Record", "args_null").increment(1);
				return;
			}
			if (args.length < 1) {
				return;
			}
			context.getCounter("Record", "args_length_valid").increment(1);
			String httpTarget = args[0];
			if (null == httpTarget) {
				return;
			}
			context.getCounter("Record", "httpTarget_OK").increment(1);
			String[] parameters = httpTarget.split(":");
			if (null == parameters || parameters.length < 3) {
				return;
			}
			context.getCounter("Record", "parameters_valid").increment(1);
			String ip = parameters[0];
			int port = Integer.parseInt(parameters[1]);
			String path = parameters[2];
			context.getCounter("Record", "3_parameters_valid").increment(1);
			HttpPut request = new HttpPut("http://" + ip + ":" + port + ""
					+ path);
			request.setHeader("Content-type", "application/json");
			request.setHeader("User-Agent", "Mozilla");
			request.setHeader("Connection", "Close");

			request.setEntity(se);
			// HttpClient
			HttpClient client = new DefaultHttpClient();
			HttpResponse response = client.execute(request);
			if (null == response) {
				context.getCounter("Record", "response_is_null").increment(1);
				return;
			}
			
			int code = response.getStatusLine().getStatusCode();
			context.getCounter("Record", "" + code).increment(1);
			
			String respStr = EntityUtils.toString(response.getEntity(),
					HTTP.UTF_8);
			context.getCounter("Record", "respStr").increment(1);

		} catch (Exception e) {
			context.getCounter("Record", "exception-" + e.toString())
					.increment(1);
		} finally {
			persons.clear();
		}
	}

	// //////////////////////////////////////
	// map
	public static class MapClass extends Mapper<LongWritable, Text, Text, Text> {
		public void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			String content = value.toString();
			String[] arrays = content.split("\t");
			String k = arrays[0];
			String[] tags = arrays[1].split(" ");
			HttpApiClient.writeRecord(k, tags, context);
			context.getCounter("Record", "write").increment(1);
		}

		public void cleanup(Context context) throws java.io.IOException,
				java.lang.InterruptedException {
			HttpApiClient.submit(context);
		}

	}

	@Override
	public int run(String[] args) throws Exception {
		// TODO Auto-generated method stub
		Configuration conf = new Configuration();
		Job job = new Job(conf, "ComputeProfileHDFS_HttpApiClient");
		FileInputFormat.addInputPath(job, new Path(args[3]));
		FileOutputFormat.setOutputPath(job, new Path("dev/null"));
		job.setJobName("ComputeProfileHDFS_HttpApiClient");
		job.setJarByClass(HttpApiClient.class);
		job.setMapperClass(MapClass.class);
		// job.setCombinerClass(Combiner.class);
		// job.setReducerClass(Reduce.class);//
		job.setNumReduceTasks(0);
		job.setInputFormatClass(TextInputFormat.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		job.setOutputFormatClass(NullOutputFormat.class);
		if (null != args && null != args[4])
			job.getConfiguration().set(HTTP_URL, args[4]);
		// execute
		int exitCode = job.waitForCompletion(true) ? 0 : 1;
		return 0;
	}

	private static String HTTP_URL = "http_url";

}

 

然后在任务的计数器Counters页面观察结果即可。

© 著作权归作者所有

强子大叔的码田

强子大叔的码田

粉丝 922
博文 1490
码字总数 1246722
作品 9
南京
架构师
私信 提问
Hadoop学习笔记(二)设置单节点集群

本文描述如何设置一个单一节点的 Hadoop 安装,以便您可以快速执行简单的操作,使用 Hadoop MapReduce 和 Hadoop 分布式文件系统 (HDFS)。 参考官方文档:Hadoop MapReduce Next Generation ...

微wx笑
2014/10/07
0
0
[Hadoop][笔记]4个节点搭建Hadoop2.x HA测试集群

搭建Hadoop2.x HA 1.机器准备 虚拟机 4台 10.211.55.22 node1 10.211.55.23 node2 10.211.55.24 node3 10.211.55.25 node4 2.四台主机节点安排 node namenode datanode zk zkfc jn rm appli......

zemel
2016/08/22
93
0
Eclipse连接Hadoop分析的三种方式

Hadoop一般都部署在linux平台上,想让Hadoop执行我们写好的程序,首先需要在本地写好程序打包,然后上传到liunx,最后通过指定命令执行打包好的程序;一次两次还可以,如果进行频繁的调试是很...

ksfzhaohui
2016/10/27
2.6K
0
大数据之Hadoop平台(二)Centos6.5(64bit)Hadoop2.5.1伪分布式安装记录,wordcount运行测试

注意:以下安装步骤在Centos6.5操作系统中进行,安装步骤同样适于其他操作系统,如有同学使用Ubuntu等其他Linux操作系统,只需注意个别命令略有不同。 注意一下不同用户权限的操作,比如关闭...

chaun
2015/04/14
376
0
Hadoop实战读书笔记(8)

什么是开发数据集? 一个流行的开发策略是为生产环境中的大数据集建立一个较小的、抽样的数据子集,称为开发数据集。这个开发数据集可能只有几百兆字节。当你以单机或者伪分布式模式编写程序...

祥林会跟你远走高飞
2014/12/08
242
0

没有更多内容

加载失败,请刷新页面

加载更多

spring-boot-maven-plugin not found的解决方案。

通过IDE创建一个springboot项目, <plugin> <groupId>org.springframework.boot</groupId>//这行红色 <artifactId>spring-boot-maven-plugin</artifactId>//这行红色</plugin> 提示sprin......

一片云里的天空
今天
52
0
OSChina 周三乱弹 —— 我可能是个憨憨

Osc乱弹歌单(2020)请戳(这里) 【今日歌曲】 @宇辰OSC :分享Hare Je的单曲《Alan Walker-Faded(Hare Je remix)》: #今日歌曲推荐# 可以放松大脑的一首纯音乐 《Alan Walker-Faded(Har...

小小编辑
今天
99
0
搞定SpringBoot多数据源(3):参数化变更源

春节将至,今天放假了,在此祝小伙伴们新春大吉,身体健康,思路清晰,永远无BUG! 一句话概括:参数化变更源意思是根据参数动态添加数据源以及切换数据源,解决不确定数据源的问题。 1. 引言...

mason技术记录
昨天
99
0
sql 基础知识

sql 基础知识 不要极至最求一条sql语句搞定一切,可合理拆分为多条语句 1. sql 变量定义与赋值 Sql 语句中,直接在SELECT使用@定义一个变量,如:[@a](https://my.oschina.net/a8856225a)。 ...

DrChenXX
昨天
57
0
MacOSX 安装 TensorFlow

TensorFlow是一个端到端开源机器学习平台。它拥有一个包含各种工具、库和社区资源的全面灵活生态系统,可以让研究人员推动机器学习领域的先进技术的。 准备 安装 Anaconda TensorFlow 安装的...

叉叉敌
昨天
79
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部