文档章节

Hadoop 2.5.1学习笔记8: 完整的程序模板

强子1985
 强子1985
发布于 2014/11/19 13:14
字数 1552
阅读 61
收藏 0

1 从Mongo中读取数据进入HDFS

2 分发HDFS作为背景数据

3 一个MR计算,输出为HDFS文件

4 将3的HDFS文件作为输入,通过HTTP写到远程数据库。

--------------------------------------------------------------------------

package com.dew.task;

import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.Tool;

import com.mongodb.BasicDBObject;
import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.DBCursor;
import com.mongodb.MongoClient;
import com.mongodb.ServerAddress;

public class PullMongoDB extends Configured implements Tool {

	@Override
	public int run(String[] args) throws Exception {
		if (null == args || args.length < 4) {
			return 0;
		}
		List list = new ArrayList();
		String[] array = args[0].split(":");
		list.add(new ServerAddress(array[0], Integer.parseInt(array[1])));
		MongoClient mongoClient = new MongoClient(list);
		DB database = mongoClient.getDB("" + array[2]);
		DBCollection collection = database.getCollection("" + array[3]);

		//
		BasicDBObject query = new BasicDBObject();
		query.put("pkg", new BasicDBObject("$exists", true));
		query.put("tags", new BasicDBObject("$exists", true));
		BasicDBObject fields = new BasicDBObject();
		fields.put("pkg", 1);
		fields.put("tags", 1);

		//write hdfs
		Configuration conf = new Configuration();
		FileSystem hdfs = FileSystem.get(conf);
		FSDataOutputStream outHandler = hdfs.create(new Path("" + args[1]));
		
		//write
		DBCursor cursor = collection.find(query, fields);
		while (cursor.hasNext()) {
			BasicDBObject record = (BasicDBObject) cursor.next();
			String pkg = record.getString("pkg");
			ArrayList<String> als = (ArrayList<String>) record.get("tags");
			String tags = "";
			for (String s : als) {
				tags += " " + s.trim();
			}
			tags = tags.trim();
			String finalString = pkg + "\t" + tags + System.getProperty("line.separator");
			outHandler.write(finalString.getBytes("UTF8"));
		}
		
		//remove handle 
		outHandler.close();
		cursor.close();
		mongoClient.close();
		return 0;
	}

}

 

package com.dew.task;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Hashtable;
import java.util.Set;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import com.mongodb.BasicDBObject;

public class ComputeProfileHDFS extends Configured implements Tool {

	// map
	public static class MapClass extends Mapper<LongWritable, Text, Text, Text> {

		private Hashtable<String, String> joinData = new Hashtable<String, String>();

		private void readFile(String file) {
			BufferedReader joinReader = null;
			String line = null;
			try {
				joinReader = new BufferedReader(new FileReader(file));
				while ((line = joinReader.readLine()) != null) {
					String[] array = line.split("\t");
					if (null == array || array.length < 2)
						continue;

					String pkg = array[0];
					if (null == pkg || pkg.length() <= 0)
						continue;

					String tagStr = array[1];
					if (null == tagStr)
						continue;
					tagStr = tagStr.trim();
					if (tagStr.length() <= 0)
						continue;
					joinData.put(pkg, tagStr);

				}
			} catch (Exception e) {
				// XXX

			} finally {
				if (null != joinReader)
					try {
						joinReader.close();
					} catch (IOException e) {
						e.printStackTrace();
					}
			}

		}

		protected void setup(Context context) throws java.io.IOException,
				java.lang.InterruptedException {
			try {
				// Configuration conf = context.getConfiguration();
				URI[] cacheFiles = context.getCacheFiles();
				if (null != cacheFiles && cacheFiles.length > 0) {

					readFile(cacheFiles[0].getPath().toString());

				}
			} catch (IOException e) {
				// xxx
			}
		}

		public void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {

			// key neglected
			if (null == value)
				return;
			String content = value.toString();
			if (null == content || content.trim().length() == 0)
				return;
			// split
			String[] strArray = content.split("\t");
			if (null == strArray || strArray.length < 26)
				return;

			String cat = strArray[20].trim();
			if (null != cat && cat.trim().equals("4")) {
				// app
			} else {

				return;
			}

			String sender = strArray[11].trim();
			String receiver = strArray[13].trim();
			String pkg = strArray[25].trim();

			if (null == sender || sender.length() == 0 || null == receiver
					|| receiver.length() == 0 || null == pkg
					|| pkg.length() == 0) {
				return;
			}
			String tags = this.joinData.get(pkg);
			if (null == tags || tags.trim().length() == 0) {
				return;
			}
			// okay,output it
			context.write(new Text(sender), new Text(tags));
			context.write(new Text(receiver), new Text(tags));
			Counter c = context.getCounter("ComputerProfileHDFS",
					"MapWriteRecord");
			c.increment(2);
		}

	}

	public static class Combiner extends Reducer<Text, Text, Text, Text> {
		public void reduce(Text key, Iterable<Text> values, Context context)
				throws IOException, InterruptedException {
			String totalTags = "";
			for (Text tag : values) {
				totalTags += " " + tag;
			}
			totalTags = totalTags.trim();
			if (totalTags.length() <= 0)
				return;
			// okay

			context.write(key, new Text(totalTags));
			Counter c = context.getCounter("ComputerProfileHDFS",
					"CombineWriteRecord");
			c.increment(1);

		}

	}

	public static class Reduce extends Reducer<Text, Text, Text, Text> {
		public void reduce(Text key, Iterable<Text> values, Context context)
				throws IOException, InterruptedException {

			String totalTags = "";
			for (Text tag : values) {
				totalTags += " " + tag;
			}
			totalTags = totalTags.trim();
			if (totalTags.length() <= 0)
				return;
			// okay,let us do it now!
			context.write(new Text(key.toString()), new Text(totalTags));
			Counter c = context.getCounter("ComputerProfileHDFS",
					"ReduceWriteRecord");
			c.increment(1);
		}

	}

	private static String[] getInputPathsByMonth(int year, int month) {

		// /user/flume/transfer2*/year=*/month=*/day=*/*.tsv
		String[] path = new String[3];		
		if (1 == month) {
			path[0] = "/user/f/transfer2*/year=" + (year - 1)
					+ "/month=11/day=*/*.tsv";
			path[1] = "/user/f/transfer2*/year=" + (year - 1)
					+ "/month=12/day=*/*.tsv";
			path[2] = "/user/f/transfer2*/year=" + (year)
					+ "/month=01/day=*/*.tsv";
		} else if (2 == month) {
			path[0] = "/user/f/transfer2*/year=" + (year - 1)
					+ "/month=12/day=*/*.tsv";
			path[1] = "/user/f/transfer2*/year=" + (year)
					+ "/month=01/day=*/*.tsv";
			path[2] = "/user/f/transfer2*/year=" + (year)
					+ "/month=02/day=*/*.tsv";
		} else {
			path[0] = "/user/f/transfer2*/year=" + (year) + "/month="
					+ ((month - 2) < 10 ? "0" : "") + (month - 2)
					+ "/day=*/*.tsv";
			path[1] = "/user/f/transfer2*/year=" + (year) + "/month="
					+ ((month - 1) < 10 ? "0" : "") + (month - 1)
					+ "/day=*/*.tsv";
			path[2] = "/user/f/transfer2*/year=" + (year) + "/month="
					+ ((month - 0) < 10 ? "0" : "") + (month - 0)
					+ "/day=*/*.tsv";
		}
		return path;
	}

	private void setInputPathByMonth(Job job) throws Exception {

		// FileInputFormat.setInputPaths(job, new Path(args[2]));
		Calendar cal = Calendar.getInstance();
		int year = cal.get(Calendar.YEAR);
		int month = cal.get(Calendar.MONTH) + 1;
		String[] paths = getInputPathsByMonth(year, month);
		for (String path : paths) {
			FileInputFormat.addInputPaths(job, path);
		}

	}

	@Override
	public int run(String[] args) throws Exception {

		Configuration conf = getConf();
		Job job = new Job(conf, "ComputeProfileHDFS");

		// add distributed file
		job.addCacheFile(new Path(args[1]).toUri());

		// prepare
		setInputPathByMonth(job);
		// FileInputFormat.setInputPaths(job, new Path(args[2]));
		FileOutputFormat.setOutputPath(job, new Path(args[3]));

		job.setJobName("ComputeProfileHDFS");
		job.setJarByClass(ComputeProfileHDFS.class);
		job.setMapperClass(MapClass.class);
		job.setCombinerClass(Combiner.class);
		job.setReducerClass(Reduce.class);// job.setNumReduceTasks(0);
		job.setInputFormatClass(TextInputFormat.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		job.setOutputFormatClass(TextOutputFormat.class);

		// execute
		int exitCode = job.waitForCompletion(true) ? 0 : 1;

		return exitCode;
	}

	public static String[] args = null;

	public static void main(String[] a) throws Exception {
		// delete all temp files
		Configuration conf = new Configuration();
		FileSystem fs = FileSystem.get(conf);
		try {
			//fs.delete(new Path(args[1]), true);
		} catch (Exception e) {
		}

		try {
			//fs.delete(new Path(args[3]), true);
		} catch (Exception e) {
		}

		args = a;

		int res;
		//res = ToolRunner.run(new Configuration(), new PullMongoDB(), args);

		//res = ToolRunner.run(new Configuration(), new ComputeProfileHDFS(),
			//	args);

		res = ToolRunner.run(new Configuration(), new HttpApiClient(), args);
		// delete all temp files
		try {
			//fs.delete(new Path(args[1]), true);
		} catch (Exception e) {
		}

		try {
			//fs.delete(new Path(args[3]), true);
		} catch (Exception e) {
		}
		System.exit(res);
	}

}

 

package com.dew.task;
【实际上这个程序应该在map中读取数据,在reduce中做HTTP请求,暂未做修改,读者可自行修改!】
import java.io.IOException;
import java.util.ArrayList;
import java.util.Hashtable;
import java.util.Set;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.http.protocol.HTTP;
import org.apache.http.util.EntityUtils;

import com.dewmobile.task.ComputeProfileHDFS.Combiner;
import com.dewmobile.task.ComputeProfileHDFS.MapClass;
import com.dewmobile.task.ComputeProfileHDFS.Reduce;
import com.mongodb.BasicDBObject;

public class HttpApiClient extends Configured implements Tool {

	private static ArrayList<BasicDBObject> persons = new ArrayList<BasicDBObject>();
	private static int lock = 500;

	public static void writeRecord(String key, String[] arrays, Context context) {
		if (null == key || key.length() <= 0 || null == arrays
				|| arrays.length <= 0)
			return;
		Hashtable<String, Integer> table = new Hashtable<String, Integer>();
		for (String tag : arrays) {
			Integer number = table.get(tag);
			int count = (null == number ? 0 : number.intValue());
			count++;
			table.put(tag, count);
		}
		// single person tag
		ArrayList<BasicDBObject> tagDocument = new ArrayList<BasicDBObject>();
		Set<String> tagSet = table.keySet();
		for (String tag : tagSet) {
			BasicDBObject doc = new BasicDBObject();
			doc.put("n", tag);
			doc.put("#", table.get(tag).intValue());
			tagDocument.add(doc);
		}
		// person document
		BasicDBObject person = new BasicDBObject();
		person.put("_id", key);
		person.put("t", tagDocument);

		// add it
		persons.add(person);
		if (persons.size() >= lock) {
			submit(context);
		}

	}

	public static void submit(Context context) {
		context.getCounter("Record", "submit").increment(1);

		try {
			// submit
			String entityString = persons.toString();
			if (null == entityString || entityString.length() <= 0) {
				return;
			}
			StringEntity se = new StringEntity(entityString, HTTP.UTF_8);

			String[] args = context.getConfiguration().getStrings(HTTP_URL);
			if (null == args) {
				context.getCounter("Record", "args_null").increment(1);
				return;
			}
			if (args.length < 1) {
				return;
			}
			context.getCounter("Record", "args_length_valid").increment(1);
			String httpTarget = args[0];
			if (null == httpTarget) {
				return;
			}
			context.getCounter("Record", "httpTarget_OK").increment(1);
			String[] parameters = httpTarget.split(":");
			if (null == parameters || parameters.length < 3) {
				return;
			}
			context.getCounter("Record", "parameters_valid").increment(1);
			String ip = parameters[0];
			int port = Integer.parseInt(parameters[1]);
			String path = parameters[2];
			context.getCounter("Record", "3_parameters_valid").increment(1);
			HttpPut request = new HttpPut("http://" + ip + ":" + port + ""
					+ path);
			request.setHeader("Content-type", "application/json");
			request.setHeader("User-Agent", "Mozilla");
			request.setHeader("Connection", "Close");

			request.setEntity(se);
			// HttpClient
			HttpClient client = new DefaultHttpClient();
			HttpResponse response = client.execute(request);
			if (null == response) {
				context.getCounter("Record", "response_is_null").increment(1);
				return;
			}
			
			int code = response.getStatusLine().getStatusCode();
			context.getCounter("Record", "" + code).increment(1);
			
			String respStr = EntityUtils.toString(response.getEntity(),
					HTTP.UTF_8);
			context.getCounter("Record", "respStr").increment(1);

		} catch (Exception e) {
			context.getCounter("Record", "exception-" + e.toString())
					.increment(1);
		} finally {
			persons.clear();
		}
	}

	// //////////////////////////////////////
	// map
	public static class MapClass extends Mapper<LongWritable, Text, Text, Text> {
		public void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			String content = value.toString();
			String[] arrays = content.split("\t");
			String k = arrays[0];
			String[] tags = arrays[1].split(" ");
			HttpApiClient.writeRecord(k, tags, context);
			context.getCounter("Record", "write").increment(1);
		}

		public void cleanup(Context context) throws java.io.IOException,
				java.lang.InterruptedException {
			HttpApiClient.submit(context);
		}

	}

	@Override
	public int run(String[] args) throws Exception {
		// TODO Auto-generated method stub
		Configuration conf = new Configuration();
		Job job = new Job(conf, "ComputeProfileHDFS_HttpApiClient");
		FileInputFormat.addInputPath(job, new Path(args[3]));
		FileOutputFormat.setOutputPath(job, new Path("dev/null"));
		job.setJobName("ComputeProfileHDFS_HttpApiClient");
		job.setJarByClass(HttpApiClient.class);
		job.setMapperClass(MapClass.class);
		// job.setCombinerClass(Combiner.class);
		// job.setReducerClass(Reduce.class);//
		job.setNumReduceTasks(0);
		job.setInputFormatClass(TextInputFormat.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		job.setOutputFormatClass(NullOutputFormat.class);
		if (null != args && null != args[4])
			job.getConfiguration().set(HTTP_URL, args[4]);
		// execute
		int exitCode = job.waitForCompletion(true) ? 0 : 1;
		return 0;
	}

	private static String HTTP_URL = "http_url";

}

 

然后在任务的计数器Counters页面观察结果即可。

© 著作权归作者所有

共有 人打赏支持
强子1985

强子1985

粉丝 875
博文 1111
码字总数 818977
作品 8
南京
架构师
私信 提问
Hadoop学习笔记(二)设置单节点集群

本文描述如何设置一个单一节点的 Hadoop 安装,以便您可以快速执行简单的操作,使用 Hadoop MapReduce 和 Hadoop 分布式文件系统 (HDFS)。 参考官方文档:Hadoop MapReduce Next Generation ...

微wx笑
2014/10/07
0
0
Spring for Apache Hadoop 2.1.0.M3 发布

Spring for Apache Hadoop 2.1.0.M3 发布,此版本现已提供在 Spring IO repository。 此版本主要是改进 YARN 和数据存储编写程序,同时还有新的 @Configruation 改进和 Boot 自动配置改进。更...

oschina
2014/12/19
1K
2
[Hadoop][笔记]4个节点搭建Hadoop2.x HA测试集群

搭建Hadoop2.x HA 1.机器准备 虚拟机 4台 10.211.55.22 node1 10.211.55.23 node2 10.211.55.24 node3 10.211.55.25 node4 2.四台主机节点安排 node namenode datanode zk zkfc jn rm appli......

zemel
2016/08/22
37
0
Eclipse连接Hadoop分析的三种方式

Hadoop一般都部署在linux平台上,想让Hadoop执行我们写好的程序,首先需要在本地写好程序打包,然后上传到liunx,最后通过指定命令执行打包好的程序;一次两次还可以,如果进行频繁的调试是很...

ksfzhaohui
2016/10/27
1K
0
大数据之Hadoop平台(二)Centos6.5(64bit)Hadoop2.5.1伪分布式安装记录,wordcount运行测试

注意:以下安装步骤在Centos6.5操作系统中进行,安装步骤同样适于其他操作系统,如有同学使用Ubuntu等其他Linux操作系统,只需注意个别命令略有不同。 注意一下不同用户权限的操作,比如关闭...

chaun
2015/04/14
0
0

没有更多内容

加载失败,请刷新页面

加载更多

分布式Session共享解决方案

分布式Session一致性? 说白了就是服务器集群Session共享的问题 Session的作用? Session 是客户端与服务器通讯会话跟踪技术,服务器与客户端保持整个通讯的会话基本信息。 客户端在第一次访...

Java干货分享
7分钟前
0
0
开源软件和开源模式面临的生存危机

导读 开源模式可能正面临一场危机。越来越多的开源软件和平台被大型云计算服务商融入自家的云服务体系,并以此获利颇丰,但并不支付费用,也没有对开源社区做出相应的回馈。而实际上,大部分...

问题终结者
9分钟前
0
0
让看不见的AI算法,助你拿下看得见的广阔市场

人工智能技术的飞速发展给各行各业都带来了深远的影响,AI已被视为企业提升运营效能、应对市场竞争的必经之路。然而对于一些企业而言,让AI真正实现落地和应用,并且创造价值,仍是一件需要努...

个推
13分钟前
0
0
用SAN还是NAS?我来告诉你

存储区域网络(SAN)是以一种结构连接的存储,通常通过交换机连接,使许多不同的服务器能够轻松访问存储设备。从服务器应用程序和操作系统的角度来看,访问SAN中的数据存储或直接连接的存储之间...

linux-tao
17分钟前
0
0
centos7 部署Apache服务器

centos7 部署Apache服务器 置顶 2017年09月05日 09:12:49 师太,老衲把持不住了 阅读数:19700 飞翔科技 2017-09-04 16:24 Apache程序是目前拥有很高市场占有率的Web服务程序之一,其跨平台和...

linjin200
49分钟前
1
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部