文档章节

Hadoop 2.5.1学习笔记6:不同数据来源的联结代码范例

强子哥哥
 强子哥哥
发布于 2014/11/13 13:42
字数 1067
阅读 228
收藏 11
点赞 0
评论 2

下面的程序有个bug,应该是

Path[] cacheFiles = context.getLocalCacheFiles();

if (null != cacheFiles && cacheFiles.length > 0) {
          readFile(cacheFiles[0].toString(), context);
}

代码中未修正,读者自行修正,谢谢!

数据源来源于2个: 1个是HDFS,数据量大,还一个是mongodb,数据量小。

需要2个来源一起做数据联结,然后分析。代码范例如下:

-----------------------------------------------------------------------

主要思路:

从mongodb中导入数据大hdfs,然后通过hadoop的分发机制分发此文件

到所有计算节点上作为“背景”数据。

然后计算节点的map类的setup函数中读取此文件即可。

----------------------------------------------------------------------- 实际代码如下:

package com.dew.task;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Hashtable;
import java.util.Set;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import com.mongodb.BasicDBObject;

public class ComputeProfileHDFS extends Configured implements Tool {

	// map
	public static class MapClass extends Mapper<LongWritable, Text, Text, Text> {

		private Hashtable<String, String> joinData = new Hashtable<String, String>();

		private void readFile(String file) {
			BufferedReader joinReader = null;
			String line = null;
			try {
				joinReader = new BufferedReader(new FileReader(file));
				while ((line = joinReader.readLine()) != null) {
					String[] array = line.split("\t");
					if (null == array || array.length < 2)
						continue;

					String pkg = array[0];
					if (null == pkg || pkg.length() <= 0)
						continue;

					String tagStr = array[1];
					if (null == tagStr)
						continue;
					tagStr = tagStr.trim();
					if (tagStr.length() <= 0)
						continue;
					joinData.put(pkg, tagStr);
					System.out.println("[map,setup] " + pkg + "  |  " + tagStr);
				}
			} catch (Exception e) {
				// XXX
				System.out
						.println("--------------------------------------------\n"
								+ e.toString());
			} finally {
				if (null != joinReader)
					try {
						joinReader.close();
					} catch (IOException e) {
						e.printStackTrace();
					}
			}

		}

		protected void setup(Context context) throws java.io.IOException,
				java.lang.InterruptedException {
			try {
				// Configuration conf = context.getConfiguration();
				URI[] cacheFiles = context.getCacheFiles();
				if (null != cacheFiles && cacheFiles.length > 0) {

					readFile(cacheFiles[0].getPath().toString());

				}
			} catch (IOException e) {
				// xxx
			}
		}

		public void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {

			// key neglected
			if (null == value)
				return;
			String content = value.toString();
			if (null == content || content.trim().length() == 0)
				return;
			// split
			String[] strArray = content.split("\t");
			if (null == strArray || strArray.length < 29)
				return;
			String sender = strArray[12].trim();
			String receiver = strArray[14].trim();
			String pkg = strArray[28].trim();
			if (null == sender || sender.length() == 0 || null == receiver
					|| receiver.length() == 0 || null == pkg
					|| pkg.length() == 0) {
				return;
			}
			String tags = this.joinData.get(pkg);
			if (null == tags || tags.trim().length() == 0)
				return;
			// okay,output it
			System.out.println("sender---" + sender + "   tags---" + tags);
			System.out.println("receiver---" + receiver + "   tags---" + tags);
			context.write(new Text(sender), new Text(tags));
			context.write(new Text(receiver), new Text(tags));
		}

	}

	public static class Combiner extends Reducer<Text, Text, Text, Text> {
		public void reduce(Text key, Iterable<Text> values, Context context)
				throws IOException, InterruptedException {
			String totalTags = "";
			for (Text tag : values) {
				totalTags += " " + tag;
			}
			totalTags = totalTags.trim();
			if (totalTags.length() <= 0)
				return;
			// okay
			// System.out.println("combiner function invoked....********************");
			context.write(key, new Text(totalTags));

		}

	}

	public static class Reduce extends Reducer<Text, Text, Text, Text> {
		public void reduce(Text key, Iterable<Text> values, Context context)
				throws IOException, InterruptedException {

			String totalTags = "";
			for (Text tag : values) {
				totalTags += " " + tag;
			}
			totalTags = totalTags.trim();
			if (totalTags.length() <= 0)
				return;
			// okay,let us do it now!
			String[] tagArray = totalTags.split(" ");
			if (null == tagArray || tagArray.length <= 0)
				return;
			// context.write(arg0, arg1);
			HttpApiClient.writeRecord(key.toString(), tagArray);
		}

		protected void cleanup(Context context) throws java.io.IOException,
				java.lang.InterruptedException {
			HttpApiClient.submit();
		}

	}

	public static class HttpApiClient {

		private static ArrayList<BasicDBObject> persons = new ArrayList<BasicDBObject>();
		private static int lock = 50;

		public static void writeRecord(String key, String[] arrays) {
			System.out.println("writeRecord 4");
			// 数据校验
			if (null == key || key.length() <= 0 || null == arrays
					|| arrays.length <= 0)
				return;
			// 字符串数组--->数量统计
			Hashtable<String, Integer> table = new Hashtable<String, Integer>();
			for (String tag : arrays) {
				Integer number = table.get(tag);
				int count = (null == number ? 0 : number.intValue());
				count++;
				table.put(tag, count);
			}
			// 构造单个人标签
			ArrayList<BasicDBObject> tagDocument = new ArrayList<BasicDBObject>();
			Set<String> tagSet = table.keySet();
			for (String tag : tagSet) {
				BasicDBObject doc = new BasicDBObject();
				doc.put("n", tag);
				doc.put("#", table.get(tag).intValue());
				tagDocument.add(doc);
			}
			// 构造单个人的文档
			BasicDBObject person = new BasicDBObject();
			person.put("_id", key);
			person.put("t", tagDocument);
			System.out.println("*************************************");
			System.out.println(person.toString());
			System.out.println("*************************************");
			// 加入到全局当中
			persons.add(person);
			if (persons.size() >= lock) {
				submit();
			}
		}

		public static void submit() {
			// 提交上去并发送http请求...
			// persons.clear();
		}

	}

	@Override
	public int run(String[] args) throws Exception {

		Configuration conf = getConf();
		Job job = new Job(conf, "ComputeProfileHDFSPlusMongoDB");

		// add distributed file
		job.addCacheFile(new Path(args[1]).toUri());
		// DistributedCache.addCacheFile(new Path(args[1]).toUri(),
		// job.getConfiguration());

		// prepare
		FileInputFormat.setInputPaths(job, new Path(args[2]));
		FileOutputFormat.setOutputPath(job, new Path(args[3]));
		// FileOutputFormat.setOutputPath(job, new Path(args[2]));
		job.setJobName("ComputeProfileHDFSPlusMongoDB");
		job.setMapperClass(MapClass.class);
		job.setCombinerClass(Combiner.class);
		job.setReducerClass(Reduce.class);// job.setNumReduceTasks(0);
		job.setInputFormatClass(TextInputFormat.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		job.setOutputFormatClass(NullOutputFormat.class);

		// execute
		int exitCode = job.waitForCompletion(true) ? 0 : 1;
		try {
			FileSystem fs = FileSystem.get(conf);
			fs.delete(new Path(args[1]));
			fs.delete(new Path(args[3]));
		} catch (Exception e) {

		}
		return exitCode;
	}

	public static void main(String[] args) throws Exception {

		int res;
		res = ToolRunner.run(new Configuration(), new PullMongoDB(), args);

		res = ToolRunner.run(new Configuration(), new ComputeProfileHDFS(),
				args);
		System.exit(res);
	}

}

 

package com.dew.task;

import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.Tool;

import com.mongodb.BasicDBObject;
import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.DBCursor;
import com.mongodb.MongoClient;
import com.mongodb.ServerAddress;

public class PullMongoDB extends Configured implements Tool {

	@Override
	public int run(String[] args) throws Exception {
		if (null == args || args.length < 4) {
			return 0;
		}
		List list = new ArrayList();
		String[] array = args[0].split(":");
		list.add(new ServerAddress(array[0], Integer.parseInt(array[1])));
		MongoClient mongoClient = new MongoClient(list);
		DB database = mongoClient.getDB("" + array[2]);
		DBCollection collection = database.getCollection("" + array[3]);

		// 开始查询
		BasicDBObject query = new BasicDBObject();
		query.put("pkg", new BasicDBObject("$exists", true));
		query.put("tags", new BasicDBObject("$exists", true));
		BasicDBObject fields = new BasicDBObject();
		fields.put("pkg", 1);
		fields.put("tags", 1);

		// 准备写入的HDFS文件
		Configuration conf = new Configuration();
		FileSystem hdfs = FileSystem.get(conf);
		FSDataOutputStream outHandler = hdfs.create(new Path("" + args[1]));
		
		// 准备写入
		DBCursor cursor = collection.find(query, fields);
		while (cursor.hasNext()) {
			BasicDBObject record = (BasicDBObject) cursor.next();
			String pkg = record.getString("pkg");
			ArrayList<String> als = (ArrayList<String>) record.get("tags");
			String tags = "";
			for (String s : als) {
				tags += " " + s.trim();
			}
			tags = tags.trim();
			String finalString = pkg + "\t" + tags + System.getProperty("line.separator");
			outHandler.write(finalString.getBytes("UTF8"));
		}
		
		// 移除句柄
		outHandler.close();
		cursor.close();
		mongoClient.close();
		return 0;
	}

}

 

测试通过

 

 补充: job.setJarByClass(ComputeProfileHDFS.class); !!!

© 著作权归作者所有

共有 人打赏支持
强子哥哥

强子哥哥

粉丝 859
博文 551
码字总数 640910
作品 8
南京
架构师
加载中

评论(2)

强子哥哥
强子哥哥

引用来自“chenzhi1987”的评论

???????????
这是啥意思?
c
chenzhi1987
???????????
Hadoop学习笔记(二)设置单节点集群

本文描述如何设置一个单一节点的 Hadoop 安装,以便您可以快速执行简单的操作,使用 Hadoop MapReduce 和 Hadoop 分布式文件系统 (HDFS)。 参考官方文档:Hadoop MapReduce Next Generation ...

微wx笑
2014/10/07
0
0
大数据之Hadoop平台(二)Centos6.5(64bit)Hadoop2.5.1伪分布式安装记录,wordcount运行测试

注意:以下安装步骤在Centos6.5操作系统中进行,安装步骤同样适于其他操作系统,如有同学使用Ubuntu等其他Linux操作系统,只需注意个别命令略有不同。 注意一下不同用户权限的操作,比如关闭...

chaun
2015/04/14
0
0
[Hadoop][笔记]4个节点搭建Hadoop2.x HA测试集群

搭建Hadoop2.x HA 1.机器准备 虚拟机 4台 10.211.55.22 node1 10.211.55.23 node2 10.211.55.24 node3 10.211.55.25 node4 2.四台主机节点安排 node namenode datanode zk zkfc jn rm appli......

zemel
2016/08/22
37
0
Eclipse连接Hadoop分析的三种方式

Hadoop一般都部署在linux平台上,想让Hadoop执行我们写好的程序,首先需要在本地写好程序打包,然后上传到liunx,最后通过指定命令执行打包好的程序;一次两次还可以,如果进行频繁的调试是很...

ksfzhaohui
2016/10/27
1K
0
Linux防火墙iptables学习笔记(二)参数指令

iptables 指令 语法: iptables [-t table] command [match] [-j target/jump] -t 参数用来指定规则表,内建的规则表有三个,分别是:nat、mangle 和 filter,当未指定规则表时,则一律视为是...

露露露露张
2015/09/29
0
0
MySQL多表查询Left Join,Right Join学习笔记

http://my.oschina.net/adamboy/blog MySQL多表连接查询Left Join,Right Join php开源嘛 在讲MySQL的Join语法前还是先回顾一下联结的语法,呵呵,其实连我自己都忘得差不多了,那就大家一起温...

Adam-Lee
2011/06/01
0
0
centos6.5安装hadoop集群

环境准备:4台机器 192.168.217.174 node1 -----namenode 192.168.217.175 node2 -----secondary namenode 192.168.217.176 node3 -----datanode 192.168.217.177 node4 -----datanode 1. 4......

飞侠119
2017/05/03
0
0
MySQL快速入门

继恶补了C++基础之后,又来恶补MySQL了,花了将近一天的时间啃完了<MySQL必知必会>这本书,整理了有点糙的读书笔记。 1.SHOW语句的使用: 2.SELECT查询语句: 3.LIMIT子句限定显示结果行数:...

waffle930
2016/10/04
59
0
Nutch1.7结合Hadoop2.5.1的分布式爬取全攻略

Hadoop 2.5.1可执行文件及集群搭建 集群安装:http://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/ClusterSetup.html【安装Hadoop集群】 http://blog.csdn.net/jiuti......

强子哥哥
2014/09/22
0
0
Sqoop1和Sqoop2简介

主要来源: http://www.linuxidc.com/Linux/2014-10/108337.htm 1.什么是Sqoop Sqoop即 SQL to Hadoop ,是一款方便的在传统型数据库与Hadoop之间进行数据迁移的工具,充分利用MapReduce并行特...

强子哥哥
2015/12/23
792
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

gcviewer的使用

1、没有安装git软件 在编译打包GCViewer的过程中,不能执行git命令,错误信息如下: [ERROR] Failed to execute goal org.codehaus.mojo:buildnumber-maven-plugin:1.4:create (create-build...

刀锋
10分钟前
0
0
Android LogUtil 日志优化 调试的时候打印 点击跳转

打印日志的时候,可以点击跳转 LogUtil.java public class LogUtil { private static boolean IS_DEBUG = BuildConfig.DEBUG; public static void i(String tag, String message) {......

Jay_kyzg
20分钟前
0
0
人工智能你必须掌握的32个算法(二)归并排序算法

归并排序(MERGE-SORT)是建立在归并操作上的一种有效的排序算法,该算法是采用分治法(Divide and Conquer)的一个非常典型的应用。将已有序的子序列合并,得到完全有序的序列;即先使每个子...

木头释然
23分钟前
0
0
第十四章NFS服务搭建与配置

14.1 NFS介绍 NFS介绍 NFS是Network File System的缩写;这个文件系统是基于网路层面,通过网络层面实现数据同步 NFS最早由Sun公司开发,分2,3,4三个版本,2和3由Sun起草开发,4.0开始Netap...

Linux学习笔记
46分钟前
1
0
流利阅读笔记27-20180716待学习

生了娃照样能打,两位母亲温网会师 Lala 2018-07-16 1.今日导读 现今在生儿育女后回归事业的母亲们已经非常多见,但是很少有人想到,以高强度运动与竞争激烈为特色的竞技体育项目也会有 work...

aibinxiao
46分钟前
6
0
Guava 源码分析(Cache 原理【二阶段】)

前言 在上文「Guava 源码分析(Cache 原理)」中分析了 Guava Cache 的相关原理。 文末提到了回收机制、移除时间通知等内容,许多朋友也挺感兴趣,这次就这两个内容再来分析分析。 在开始之前...

crossoverJie
59分钟前
0
0
OSChina 周一乱弹 —— 如果是你喜欢的女同学找你借钱

Osc乱弹歌单(2018)请戳(这里) 【今日歌曲】 @guanglun :分享Michael Learns To Rock的单曲《Fairy Tale》 《Fairy Tale》- Michael Learns To Rock 手机党少年们想听歌,请使劲儿戳(这...

小小编辑
今天
749
19
NNS域名系统之域名竞拍

0x00 前言 其实在官方文档中已经对域名竞拍的过程有详细的描述,感兴趣的可以移步http://doc.neons.name/zh_CN/latest/nns_protocol.html#id30 此处查阅。 我这里主要对轻钱包开发中会用到的...

暖冰
今天
0
0
32.filter表案例 nat表应用 (iptables)

10.15 iptables filter表案例 10.16/10.17/10.18 iptables nat表应用 10.15 iptables filter表案例: ~1. 写一个具体的iptables小案例,需求是把80端口、22端口、21 端口放行。但是,22端口我...

王鑫linux
今天
0
0
shell中的函数&shell中的数组&告警系统需求分析

20.16/20.17 shell中的函数 20.18 shell中的数组 20.19 告警系统需求分析

影夜Linux
今天
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部