文档章节

Hadoop 2.5.1学习笔记7: 计数器的使用

强子1985
 强子1985
发布于 2014/11/17 18:15
字数 721
阅读 706
收藏 13

因为有的字段无法爬到标签,那么需要评估标签的缺失对整个数据分析的影响。

代码很简单,比上一个例子要简单很多,直接拿来改改就可以了。

----------------------------------------------------------------------------

package com.dew.counter;

import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.Tool;

import com.mongodb.BasicDBObject;
import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.DBCursor;
import com.mongodb.MongoClient;
import com.mongodb.ServerAddress;

public class PullMongoDB extends Configured implements Tool {

	@Override
	public int run(String[] args) throws Exception {
		if (null == args || args.length < 4) {
			return 0;
		}
		List list = new ArrayList();
		String[] array = args[0].split(":");
		list.add(new ServerAddress(array[0], Integer.parseInt(array[1])));
		MongoClient mongoClient = new MongoClient(list);
		DB database = mongoClient.getDB("" + array[2]);
		DBCollection collection = database.getCollection("" + array[3]);

		//
		BasicDBObject query = new BasicDBObject();
		query.put("pkg", new BasicDBObject("$exists", true));
		query.put("tags", new BasicDBObject("$exists", true));
		BasicDBObject fields = new BasicDBObject();
		fields.put("pkg", 1);
		fields.put("tags", 1);

		//write hdfs
		Configuration conf = new Configuration();
		FileSystem hdfs = FileSystem.get(conf);
		FSDataOutputStream outHandler = hdfs.create(new Path("" + args[1]));
		
		//write
		DBCursor cursor = collection.find(query, fields);
		while (cursor.hasNext()) {
			BasicDBObject record = (BasicDBObject) cursor.next();
			String pkg = record.getString("pkg");
			ArrayList<String> als = (ArrayList<String>) record.get("tags");
			String tags = "";
			for (String s : als) {
				tags += " " + s.trim();
			}
			tags = tags.trim();
			String finalString = pkg + "\t" + tags + System.getProperty("line.separator");
			outHandler.write(finalString.getBytes("UTF8"));
		}
		
		//remove handle 
		outHandler.close();
		cursor.close();
		mongoClient.close();
		return 0;
	}

} 
package com.dew.counter;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Hashtable;
import java.util.Set;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import com.mongodb.BasicDBObject;

public class TagCounter extends Configured implements Tool {
	private static String PREFIX = "";
	private static String NULL = "Null";
	private static String ZERO = "Zero";
	private static String HIT = "Hit";

	// map
	public static class MapClass extends Mapper<LongWritable, Text, Text, Text> {

		private Hashtable<String, String> joinData = new Hashtable<String, String>();

		private void readFile(String file) {
			BufferedReader joinReader = null;
			String line = null;
			try {
				joinReader = new BufferedReader(new FileReader(file));
				while ((line = joinReader.readLine()) != null) {
					String[] array = line.split("\t");
					if (null == array || array.length < 2)
						continue;

					String pkg = array[0];
					if (null == pkg || pkg.length() <= 0)
						continue;

					String tagStr = array[1];
					if (null == tagStr)
						continue;
					tagStr = tagStr.trim();
					if (tagStr.length() <= 0)
						continue;
					joinData.put(pkg, tagStr);
					System.out.println("[map,setup] " + pkg + "  |  " + tagStr);
				}
			} catch (Exception e) {
				// XXX
				System.out
						.println("--------------------------------------------\n"
								+ e.toString());
			} finally {
				if (null != joinReader)
					try {
						joinReader.close();
					} catch (IOException e) {
						e.printStackTrace();
					}
			}

		}

		protected void setup(Context context) throws java.io.IOException,
				java.lang.InterruptedException {
			try {
				// Configuration conf = context.getConfiguration();
				URI[] cacheFiles = context.getCacheFiles();
				if (null != cacheFiles && cacheFiles.length > 0) {

					readFile(cacheFiles[0].getPath().toString());

				}
			} catch (IOException e) {
				// xxx
			}
		}

		public void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {

			// key neglected
			if (null == value)
				return;
			String content = value.toString();
			if (null == content || content.trim().length() == 0)
				return;
			// split
			String[] strArray = content.split("\t");
			if (null == strArray || strArray.length < 29)
				return;
			String sender = strArray[12].trim();
			String receiver = strArray[14].trim();
			String pkg = strArray[28].trim();
			if (null == sender || sender.length() == 0 || null == receiver
					|| receiver.length() == 0 || null == pkg
					|| pkg.length() == 0) {
				return;
			}
			String tags = this.joinData.get(pkg);
			if (null == tags) {
				Counter c = context.getCounter(PREFIX, NULL);
				c.increment(1);
			} else if (tags.trim().length() == 0) {
				Counter c = context.getCounter(PREFIX, ZERO);
				c.increment(1);
			} else {
				Counter c = context.getCounter(PREFIX, HIT);
				c.increment(1);
			}
			// okay,output it

			// context.write(new Text(sender), new Text(tags));
			// context.write(new Text(receiver), new Text(tags));
		}
	}

	@Override
	public int run(String[] args) throws Exception {

		Configuration conf = getConf();
		Job job = new Job(conf, "ComputeProfileHDFSPlusMongoDB");

		// add distributed file
		job.addCacheFile(new Path(args[1]).toUri());
		// DistributedCache.addCacheFile(new Path(args[1]).toUri(),
		// job.getConfiguration());

		// prepare
		FileInputFormat.setInputPaths(job, new Path(args[2]));
		FileOutputFormat.setOutputPath(job, new Path(args[3]));
		// FileOutputFormat.setOutputPath(job, new Path(args[2]));
		job.setJobName("TagCounter");
		job.setJarByClass(TagCounter.class);
		job.setMapperClass(MapClass.class);
		// job.setCombinerClass(Combiner.class);
		job.setNumReduceTasks(0);
		job.setInputFormatClass(TextInputFormat.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		job.setOutputFormatClass(NullOutputFormat.class);

		// execute
		int exitCode = job.waitForCompletion(true) ? 0 : 1;
		try {
			FileSystem fs = FileSystem.get(conf);
			fs.delete(new Path(args[1]));
			fs.delete(new Path(args[3]));
		} catch (Exception e) {

		}
		return exitCode;
	}

	public static String[] args = null;

	public static void main(String[] a) throws Exception {
		args = a;

		int res;
		res = ToolRunner.run(new Configuration(), new PullMongoDB(), a);

		res = ToolRunner.run(new Configuration(), new TagCounter(), args);
		System.exit(res);
	}

}

 

很简单,没啥好说的。

结果截图如下:

 

 

© 著作权归作者所有

共有 人打赏支持
强子1985

强子1985

粉丝 864
博文 992
码字总数 675251
作品 8
南京
架构师
Hadoop学习笔记(二)设置单节点集群

本文描述如何设置一个单一节点的 Hadoop 安装,以便您可以快速执行简单的操作,使用 Hadoop MapReduce 和 Hadoop 分布式文件系统 (HDFS)。 参考官方文档:Hadoop MapReduce Next Generation ...

微wx笑
2014/10/07
0
0
[Hadoop][笔记]4个节点搭建Hadoop2.x HA测试集群

搭建Hadoop2.x HA 1.机器准备 虚拟机 4台 10.211.55.22 node1 10.211.55.23 node2 10.211.55.24 node3 10.211.55.25 node4 2.四台主机节点安排 node namenode datanode zk zkfc jn rm appli......

zemel
2016/08/22
37
0
Nutch1.7结合Hadoop2.5.1的分布式爬取全攻略

Hadoop 2.5.1可执行文件及集群搭建 集群安装:http://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/ClusterSetup.html【安装Hadoop集群】 http://blog.csdn.net/jiuti......

强子哥哥
2014/09/22
0
0
大数据之Hadoop平台(二)Centos6.5(64bit)Hadoop2.5.1伪分布式安装记录,wordcount运行测试

注意:以下安装步骤在Centos6.5操作系统中进行,安装步骤同样适于其他操作系统,如有同学使用Ubuntu等其他Linux操作系统,只需注意个别命令略有不同。 注意一下不同用户权限的操作,比如关闭...

chaun
2015/04/14
0
0
Eclipse连接Hadoop分析的三种方式

Hadoop一般都部署在linux平台上,想让Hadoop执行我们写好的程序,首先需要在本地写好程序打包,然后上传到liunx,最后通过指定命令执行打包好的程序;一次两次还可以,如果进行频繁的调试是很...

ksfzhaohui
2016/10/27
1K
0

没有更多内容

加载失败,请刷新页面

加载更多

腾讯投资最高1.75亿美元正式进军菲律宾移动支付市场

菲律宾长途电话公司(PLDT)公司今日宣布,中国互联网巨头腾讯和私募股权公司KKR将获得该公司旗下金融科技公司Voyager Innovations的少数股权。 PLDT在一份声明中称:“腾讯和KKR最多将分别收...

linuxCool
22分钟前
2
0
正则介绍及grep/egrep用法

10月16日任务 9.1 正则介绍_grep上 9.2 grep中 9.3 grep下 扩展 把一个目录下,过滤所有*.php文档中含有eval的行 grep -r --include="*.php" 'eval' /data 正则介绍 正则就是一串有规律的字符...

hhpuppy
33分钟前
1
0
J2Cache 中使用 Lettuce 替代 Jedis 管理 Redis 连接

一直以来 J2Cache 都是使用 Jedis 连接 Redis 服务的。Jedis 是一个很老牌的 Redis 的 Java 开发包,使用很稳定,作者维护很勤勉,社区上能搜到的文章也非常非常多。算是使用范围最广的 Redi...

红薯
今天
12
0
一个可能的NEO链上安全随机数解决方案

0x00 困境 链上安全随机数生成应该算是一个比较蛋疼的问题,哪怕你的系统再牛逼,合约程序困在小小的虚拟机里,哪怕天大的本事也施展不开。 更悲催的是,交易执行的时候,是在每一个节点都执...

暖冰
今天
1
0
【大福利】极客时间专栏返现二维码大汇总

我已经购买了如下专栏,大家通过我的二维码你可以获得一定额度的返现! 然后,再给大家来个福利,只要你通过我的二维码购买,并且关注了【飞鱼说编程】公众号,可以加我微信或者私聊我,我再...

飞鱼说编程
今天
5
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部