文档章节

Hadoop 2.5.1学习笔记8: 完整的程序模板

强子1985
 强子1985
发布于 2014/11/19 13:14
字数 1552
阅读 61
收藏 0

1 从Mongo中读取数据进入HDFS

2 分发HDFS作为背景数据

3 一个MR计算,输出为HDFS文件

4 将3的HDFS文件作为输入,通过HTTP写到远程数据库。

--------------------------------------------------------------------------

package com.dew.task;

import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.Tool;

import com.mongodb.BasicDBObject;
import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.DBCursor;
import com.mongodb.MongoClient;
import com.mongodb.ServerAddress;

public class PullMongoDB extends Configured implements Tool {

	@Override
	public int run(String[] args) throws Exception {
		if (null == args || args.length < 4) {
			return 0;
		}
		List list = new ArrayList();
		String[] array = args[0].split(":");
		list.add(new ServerAddress(array[0], Integer.parseInt(array[1])));
		MongoClient mongoClient = new MongoClient(list);
		DB database = mongoClient.getDB("" + array[2]);
		DBCollection collection = database.getCollection("" + array[3]);

		//
		BasicDBObject query = new BasicDBObject();
		query.put("pkg", new BasicDBObject("$exists", true));
		query.put("tags", new BasicDBObject("$exists", true));
		BasicDBObject fields = new BasicDBObject();
		fields.put("pkg", 1);
		fields.put("tags", 1);

		//write hdfs
		Configuration conf = new Configuration();
		FileSystem hdfs = FileSystem.get(conf);
		FSDataOutputStream outHandler = hdfs.create(new Path("" + args[1]));
		
		//write
		DBCursor cursor = collection.find(query, fields);
		while (cursor.hasNext()) {
			BasicDBObject record = (BasicDBObject) cursor.next();
			String pkg = record.getString("pkg");
			ArrayList<String> als = (ArrayList<String>) record.get("tags");
			String tags = "";
			for (String s : als) {
				tags += " " + s.trim();
			}
			tags = tags.trim();
			String finalString = pkg + "\t" + tags + System.getProperty("line.separator");
			outHandler.write(finalString.getBytes("UTF8"));
		}
		
		//remove handle 
		outHandler.close();
		cursor.close();
		mongoClient.close();
		return 0;
	}

}

 

package com.dew.task;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Hashtable;
import java.util.Set;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import com.mongodb.BasicDBObject;

public class ComputeProfileHDFS extends Configured implements Tool {

	// map
	public static class MapClass extends Mapper<LongWritable, Text, Text, Text> {

		private Hashtable<String, String> joinData = new Hashtable<String, String>();

		private void readFile(String file) {
			BufferedReader joinReader = null;
			String line = null;
			try {
				joinReader = new BufferedReader(new FileReader(file));
				while ((line = joinReader.readLine()) != null) {
					String[] array = line.split("\t");
					if (null == array || array.length < 2)
						continue;

					String pkg = array[0];
					if (null == pkg || pkg.length() <= 0)
						continue;

					String tagStr = array[1];
					if (null == tagStr)
						continue;
					tagStr = tagStr.trim();
					if (tagStr.length() <= 0)
						continue;
					joinData.put(pkg, tagStr);

				}
			} catch (Exception e) {
				// XXX

			} finally {
				if (null != joinReader)
					try {
						joinReader.close();
					} catch (IOException e) {
						e.printStackTrace();
					}
			}

		}

		protected void setup(Context context) throws java.io.IOException,
				java.lang.InterruptedException {
			try {
				// Configuration conf = context.getConfiguration();
				URI[] cacheFiles = context.getCacheFiles();
				if (null != cacheFiles && cacheFiles.length > 0) {

					readFile(cacheFiles[0].getPath().toString());

				}
			} catch (IOException e) {
				// xxx
			}
		}

		public void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {

			// key neglected
			if (null == value)
				return;
			String content = value.toString();
			if (null == content || content.trim().length() == 0)
				return;
			// split
			String[] strArray = content.split("\t");
			if (null == strArray || strArray.length < 26)
				return;

			String cat = strArray[20].trim();
			if (null != cat && cat.trim().equals("4")) {
				// app
			} else {

				return;
			}

			String sender = strArray[11].trim();
			String receiver = strArray[13].trim();
			String pkg = strArray[25].trim();

			if (null == sender || sender.length() == 0 || null == receiver
					|| receiver.length() == 0 || null == pkg
					|| pkg.length() == 0) {
				return;
			}
			String tags = this.joinData.get(pkg);
			if (null == tags || tags.trim().length() == 0) {
				return;
			}
			// okay,output it
			context.write(new Text(sender), new Text(tags));
			context.write(new Text(receiver), new Text(tags));
			Counter c = context.getCounter("ComputerProfileHDFS",
					"MapWriteRecord");
			c.increment(2);
		}

	}

	public static class Combiner extends Reducer<Text, Text, Text, Text> {
		public void reduce(Text key, Iterable<Text> values, Context context)
				throws IOException, InterruptedException {
			String totalTags = "";
			for (Text tag : values) {
				totalTags += " " + tag;
			}
			totalTags = totalTags.trim();
			if (totalTags.length() <= 0)
				return;
			// okay

			context.write(key, new Text(totalTags));
			Counter c = context.getCounter("ComputerProfileHDFS",
					"CombineWriteRecord");
			c.increment(1);

		}

	}

	public static class Reduce extends Reducer<Text, Text, Text, Text> {
		public void reduce(Text key, Iterable<Text> values, Context context)
				throws IOException, InterruptedException {

			String totalTags = "";
			for (Text tag : values) {
				totalTags += " " + tag;
			}
			totalTags = totalTags.trim();
			if (totalTags.length() <= 0)
				return;
			// okay,let us do it now!
			context.write(new Text(key.toString()), new Text(totalTags));
			Counter c = context.getCounter("ComputerProfileHDFS",
					"ReduceWriteRecord");
			c.increment(1);
		}

	}

	private static String[] getInputPathsByMonth(int year, int month) {

		// /user/flume/transfer2*/year=*/month=*/day=*/*.tsv
		String[] path = new String[3];		
		if (1 == month) {
			path[0] = "/user/f/transfer2*/year=" + (year - 1)
					+ "/month=11/day=*/*.tsv";
			path[1] = "/user/f/transfer2*/year=" + (year - 1)
					+ "/month=12/day=*/*.tsv";
			path[2] = "/user/f/transfer2*/year=" + (year)
					+ "/month=01/day=*/*.tsv";
		} else if (2 == month) {
			path[0] = "/user/f/transfer2*/year=" + (year - 1)
					+ "/month=12/day=*/*.tsv";
			path[1] = "/user/f/transfer2*/year=" + (year)
					+ "/month=01/day=*/*.tsv";
			path[2] = "/user/f/transfer2*/year=" + (year)
					+ "/month=02/day=*/*.tsv";
		} else {
			path[0] = "/user/f/transfer2*/year=" + (year) + "/month="
					+ ((month - 2) < 10 ? "0" : "") + (month - 2)
					+ "/day=*/*.tsv";
			path[1] = "/user/f/transfer2*/year=" + (year) + "/month="
					+ ((month - 1) < 10 ? "0" : "") + (month - 1)
					+ "/day=*/*.tsv";
			path[2] = "/user/f/transfer2*/year=" + (year) + "/month="
					+ ((month - 0) < 10 ? "0" : "") + (month - 0)
					+ "/day=*/*.tsv";
		}
		return path;
	}

	private void setInputPathByMonth(Job job) throws Exception {

		// FileInputFormat.setInputPaths(job, new Path(args[2]));
		Calendar cal = Calendar.getInstance();
		int year = cal.get(Calendar.YEAR);
		int month = cal.get(Calendar.MONTH) + 1;
		String[] paths = getInputPathsByMonth(year, month);
		for (String path : paths) {
			FileInputFormat.addInputPaths(job, path);
		}

	}

	@Override
	public int run(String[] args) throws Exception {

		Configuration conf = getConf();
		Job job = new Job(conf, "ComputeProfileHDFS");

		// add distributed file
		job.addCacheFile(new Path(args[1]).toUri());

		// prepare
		setInputPathByMonth(job);
		// FileInputFormat.setInputPaths(job, new Path(args[2]));
		FileOutputFormat.setOutputPath(job, new Path(args[3]));

		job.setJobName("ComputeProfileHDFS");
		job.setJarByClass(ComputeProfileHDFS.class);
		job.setMapperClass(MapClass.class);
		job.setCombinerClass(Combiner.class);
		job.setReducerClass(Reduce.class);// job.setNumReduceTasks(0);
		job.setInputFormatClass(TextInputFormat.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		job.setOutputFormatClass(TextOutputFormat.class);

		// execute
		int exitCode = job.waitForCompletion(true) ? 0 : 1;

		return exitCode;
	}

	public static String[] args = null;

	public static void main(String[] a) throws Exception {
		// delete all temp files
		Configuration conf = new Configuration();
		FileSystem fs = FileSystem.get(conf);
		try {
			//fs.delete(new Path(args[1]), true);
		} catch (Exception e) {
		}

		try {
			//fs.delete(new Path(args[3]), true);
		} catch (Exception e) {
		}

		args = a;

		int res;
		//res = ToolRunner.run(new Configuration(), new PullMongoDB(), args);

		//res = ToolRunner.run(new Configuration(), new ComputeProfileHDFS(),
			//	args);

		res = ToolRunner.run(new Configuration(), new HttpApiClient(), args);
		// delete all temp files
		try {
			//fs.delete(new Path(args[1]), true);
		} catch (Exception e) {
		}

		try {
			//fs.delete(new Path(args[3]), true);
		} catch (Exception e) {
		}
		System.exit(res);
	}

}

 

package com.dew.task;
【实际上这个程序应该在map中读取数据,在reduce中做HTTP请求,暂未做修改,读者可自行修改!】
import java.io.IOException;
import java.util.ArrayList;
import java.util.Hashtable;
import java.util.Set;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.http.protocol.HTTP;
import org.apache.http.util.EntityUtils;

import com.dewmobile.task.ComputeProfileHDFS.Combiner;
import com.dewmobile.task.ComputeProfileHDFS.MapClass;
import com.dewmobile.task.ComputeProfileHDFS.Reduce;
import com.mongodb.BasicDBObject;

public class HttpApiClient extends Configured implements Tool {

	private static ArrayList<BasicDBObject> persons = new ArrayList<BasicDBObject>();
	private static int lock = 500;

	public static void writeRecord(String key, String[] arrays, Context context) {
		if (null == key || key.length() <= 0 || null == arrays
				|| arrays.length <= 0)
			return;
		Hashtable<String, Integer> table = new Hashtable<String, Integer>();
		for (String tag : arrays) {
			Integer number = table.get(tag);
			int count = (null == number ? 0 : number.intValue());
			count++;
			table.put(tag, count);
		}
		// single person tag
		ArrayList<BasicDBObject> tagDocument = new ArrayList<BasicDBObject>();
		Set<String> tagSet = table.keySet();
		for (String tag : tagSet) {
			BasicDBObject doc = new BasicDBObject();
			doc.put("n", tag);
			doc.put("#", table.get(tag).intValue());
			tagDocument.add(doc);
		}
		// person document
		BasicDBObject person = new BasicDBObject();
		person.put("_id", key);
		person.put("t", tagDocument);

		// add it
		persons.add(person);
		if (persons.size() >= lock) {
			submit(context);
		}

	}

	public static void submit(Context context) {
		context.getCounter("Record", "submit").increment(1);

		try {
			// submit
			String entityString = persons.toString();
			if (null == entityString || entityString.length() <= 0) {
				return;
			}
			StringEntity se = new StringEntity(entityString, HTTP.UTF_8);

			String[] args = context.getConfiguration().getStrings(HTTP_URL);
			if (null == args) {
				context.getCounter("Record", "args_null").increment(1);
				return;
			}
			if (args.length < 1) {
				return;
			}
			context.getCounter("Record", "args_length_valid").increment(1);
			String httpTarget = args[0];
			if (null == httpTarget) {
				return;
			}
			context.getCounter("Record", "httpTarget_OK").increment(1);
			String[] parameters = httpTarget.split(":");
			if (null == parameters || parameters.length < 3) {
				return;
			}
			context.getCounter("Record", "parameters_valid").increment(1);
			String ip = parameters[0];
			int port = Integer.parseInt(parameters[1]);
			String path = parameters[2];
			context.getCounter("Record", "3_parameters_valid").increment(1);
			HttpPut request = new HttpPut("http://" + ip + ":" + port + ""
					+ path);
			request.setHeader("Content-type", "application/json");
			request.setHeader("User-Agent", "Mozilla");
			request.setHeader("Connection", "Close");

			request.setEntity(se);
			// HttpClient
			HttpClient client = new DefaultHttpClient();
			HttpResponse response = client.execute(request);
			if (null == response) {
				context.getCounter("Record", "response_is_null").increment(1);
				return;
			}
			
			int code = response.getStatusLine().getStatusCode();
			context.getCounter("Record", "" + code).increment(1);
			
			String respStr = EntityUtils.toString(response.getEntity(),
					HTTP.UTF_8);
			context.getCounter("Record", "respStr").increment(1);

		} catch (Exception e) {
			context.getCounter("Record", "exception-" + e.toString())
					.increment(1);
		} finally {
			persons.clear();
		}
	}

	// //////////////////////////////////////
	// map
	public static class MapClass extends Mapper<LongWritable, Text, Text, Text> {
		public void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			String content = value.toString();
			String[] arrays = content.split("\t");
			String k = arrays[0];
			String[] tags = arrays[1].split(" ");
			HttpApiClient.writeRecord(k, tags, context);
			context.getCounter("Record", "write").increment(1);
		}

		public void cleanup(Context context) throws java.io.IOException,
				java.lang.InterruptedException {
			HttpApiClient.submit(context);
		}

	}

	@Override
	public int run(String[] args) throws Exception {
		// TODO Auto-generated method stub
		Configuration conf = new Configuration();
		Job job = new Job(conf, "ComputeProfileHDFS_HttpApiClient");
		FileInputFormat.addInputPath(job, new Path(args[3]));
		FileOutputFormat.setOutputPath(job, new Path("dev/null"));
		job.setJobName("ComputeProfileHDFS_HttpApiClient");
		job.setJarByClass(HttpApiClient.class);
		job.setMapperClass(MapClass.class);
		// job.setCombinerClass(Combiner.class);
		// job.setReducerClass(Reduce.class);//
		job.setNumReduceTasks(0);
		job.setInputFormatClass(TextInputFormat.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		job.setOutputFormatClass(NullOutputFormat.class);
		if (null != args && null != args[4])
			job.getConfiguration().set(HTTP_URL, args[4]);
		// execute
		int exitCode = job.waitForCompletion(true) ? 0 : 1;
		return 0;
	}

	private static String HTTP_URL = "http_url";

}

 

然后在任务的计数器Counters页面观察结果即可。

© 著作权归作者所有

共有 人打赏支持
强子1985

强子1985

粉丝 862
博文 935
码字总数 645914
作品 8
南京
架构师
Hadoop学习笔记(二)设置单节点集群

本文描述如何设置一个单一节点的 Hadoop 安装,以便您可以快速执行简单的操作,使用 Hadoop MapReduce 和 Hadoop 分布式文件系统 (HDFS)。 参考官方文档:Hadoop MapReduce Next Generation ...

微wx笑
2014/10/07
0
0
Spring for Apache Hadoop 2.1.0.M3 发布

Spring for Apache Hadoop 2.1.0.M3 发布,此版本现已提供在 Spring IO repository。 此版本主要是改进 YARN 和数据存储编写程序,同时还有新的 @Configruation 改进和 Boot 自动配置改进。更...

oschina
2014/12/19
1K
2
[Hadoop][笔记]4个节点搭建Hadoop2.x HA测试集群

搭建Hadoop2.x HA 1.机器准备 虚拟机 4台 10.211.55.22 node1 10.211.55.23 node2 10.211.55.24 node3 10.211.55.25 node4 2.四台主机节点安排 node namenode datanode zk zkfc jn rm appli......

zemel
2016/08/22
37
0
Eclipse连接Hadoop分析的三种方式

Hadoop一般都部署在linux平台上,想让Hadoop执行我们写好的程序,首先需要在本地写好程序打包,然后上传到liunx,最后通过指定命令执行打包好的程序;一次两次还可以,如果进行频繁的调试是很...

ksfzhaohui
2016/10/27
1K
0
大数据之Hadoop平台(二)Centos6.5(64bit)Hadoop2.5.1伪分布式安装记录,wordcount运行测试

注意:以下安装步骤在Centos6.5操作系统中进行,安装步骤同样适于其他操作系统,如有同学使用Ubuntu等其他Linux操作系统,只需注意个别命令略有不同。 注意一下不同用户权限的操作,比如关闭...

chaun
2015/04/14
0
0

没有更多内容

加载失败,请刷新页面

加载更多

20180920 rzsz传输文件、用户和用户组相关配置文件与管理

利用rz、sz实现Linux与Windows互传文件 [root@centos01 ~]# yum install -y lrzsz # 安装工具sz test.txt # 弹出对话框,传递到选择的路径下rz # 回车后,会从对话框中选择对应的文件传递...

野雪球
今天
2
0
OSChina 周四乱弹 —— 毒蛇当辣条

Osc乱弹歌单(2018)请戳(这里) 【今日歌曲】 @ 达尔文:分享花澤香菜/前野智昭/小野大輔/井上喜久子的单曲《ミッション! 健?康?第?イチ》 《ミッション! 健?康?第?イチ》- 花澤香菜/前野智...

小小编辑
今天
7
3
java -jar运行内存设置

java -Xms64m #JVM启动时的初始堆大小 -Xmx128m #最大堆大小 -Xmn64m #年轻代的大小,其余的空间是老年代 -XX:MaxMetaspaceSize=128m # -XX:CompressedClassSpaceSize=6...

李玉长
今天
4
0
Spring | 手把手教你SSM最优雅的整合方式

HEY 本节主要内容为:基于Spring从0到1搭建一个web工程,适合初学者,Java初级开发者。欢迎与我交流。 MODULE 新建一个Maven工程。 不论你是什么工具,选这个就可以了,然后next,直至finis...

冯文议
今天
2
0
RxJS的另外四种实现方式(四)——性能最高的库(续)

接上一篇RxJS的另外四种实现方式(三)——性能最高的库 上一篇文章我展示了这个最高性能库的实现方法。下面我介绍一下这个性能提升的秘密。 首先,为了弄清楚Most库究竟为何如此快,我必须借...

一个灰
今天
3
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部