文档章节

大数据教程(9.6)map端join实现

em_aaron
 em_aaron
发布于 2018/12/13 01:17
字数 1254
阅读 17
收藏 0

        上一篇文章讲了mapreduce配合实现join,本节博主将讲述在map端的join实现;

        一、需求

               实现两个“表”的join操作,其中一个表数据量小,一个表很大,这种场景在实际中非常常见,比如“订单日志” join “产品信息”

        二、分析

               --原理阐述:适用于关联表中有小表的情形;可以将小表分发到所有的map节点,这样,map节点就可以在本地对自己所读到的大表数据进行join并输出最终结果,可以大大提高join操作的并发度,加快处理速度

               --示例:先在mapper类中预先定义好小表,进行join

               --并用distributedcache机制将小表的数据分发到每一个maptask执行节点,从而每一个maptask节点可以从本地加载到小表的数据,进而在本地即可实现join

        三、代码实现

package com.empire.hadoop.mr.mapsidejoin;

import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MapSideJoin {

    public static class MapSideJoinMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
        // 用一个hashmap来加载保存产品信息表
        Map<String, String> pdInfoMap = new HashMap<String, String>();

        Text                k         = new Text();

        /**
         * 通过阅读父类Mapper的源码,发现 setup方法是在maptask处理数据之前调用一次 可以用来做一些初始化工作
         */
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("product.txt")));
            String line;
            while (StringUtils.isNotEmpty(line = br.readLine())) {
                String[] fields = line.split("\t");
                pdInfoMap.put(fields[0], fields[2]);
            }
            br.close();
        }

        // 由于已经持有完整的产品信息表,所以在map方法中就能实现join逻辑了
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String orderLine = value.toString();
            String[] fields = orderLine.split("\t");
            String pdName = pdInfoMap.get(fields[1]);
            k.set(orderLine + "\t" + pdName);
            context.write(k, NullWritable.get());
        }

    }

    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();

        Job job = Job.getInstance(conf);

        job.setJarByClass(MapSideJoin.class);
        //job.setJar("D:/mapsidejoin.jar");

        job.setMapperClass(MapSideJoinMapper.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 指定需要缓存一个文件到所有的maptask运行节点工作目录
        /* job.addArchiveToClassPath(archive); */// 缓存jar包到task运行节点的classpath中
        /* job.addFileToClassPath(file); */// 缓存普通文件到task运行节点的classpath中
        /* job.addCacheArchive(uri); */// 缓存压缩包文件到task运行节点的工作目录
        /* job.addCacheFile(uri) */// 缓存普通文件到task运行节点的工作目录

        // 将产品表文件缓存到task工作节点的工作目录中去
        //job.addCacheFile(new URI("file:/D:/srcdata/mapjoincache/pdts.txt"));
        job.addCacheFile(new URI("hdfs://centos-aaron-h1:9000/rjoin/mapjoincache/product.txt"));

        //map端join的逻辑不需要reduce阶段,设置reducetask数量为0
        job.setNumReduceTasks(0);

        boolean res = job.waitForCompletion(true);
        System.exit(res ? 0 : 1);

    }

}

        四、执行程序

               

#上传jar

Alt+p
lcd d:/
put  mapsidejoin.jar

#准备hadoop处理的数据文件

cd /home/hadoop/apps/hadoop-2.9.1
hadoop fs  -mkdir -p /rjoin/mapjoinsideinput
hadoop fs  -mkdir -p /rjoin/mapjoincache
hdfs dfs -put  order.txt  /rjoin/mapjoinsideinput
hdfs dfs -put  product.txt  /rjoin/mapjoincache


#运行mapsidejoin程序

hadoop jar mapsidejoin.jar  com.empire.hadoop.mr.mapsidejoin.MapSideJoin /rjoin/mapjoinsideinput /rjoin/mapjoinsideoutput        

        五、运行效果

IPC Parameter Sending Thread #0] DEBUG org.apache.hadoop.ipc.Client - IPC Client (1880635722) connection to centos-aaron-h2/192.168.29.145:37772 from hadoop sending #87 org.apache.hadoop.mapreduce.v2.api.MRClientProtocolPB.getCounters
[IPC Client (1880635722) connection to centos-aaron-h2/192.168.29.145:37772 from hadoop] DEBUG org.apache.hadoop.ipc.Client - IPC Client (1880635722) connection to centos-aaron-h2/192.168.29.145:37772 from hadoop got value #87
[main] DEBUG org.apache.hadoop.ipc.ProtobufRpcEngine - Call: getCounters took 36ms
[main] INFO org.apache.hadoop.mapreduce.Job - Counters: 30
	File System Counters
		FILE: Number of bytes read=0
		FILE: Number of bytes written=189612
		FILE: Number of read operations=0
		FILE: Number of large read operations=0
		FILE: Number of write operations=0
		HDFS: Number of bytes read=218
		HDFS: Number of bytes written=108
		HDFS: Number of read operations=5
		HDFS: Number of large read operations=0
		HDFS: Number of write operations=2
	Job Counters 
		Launched map tasks=1
		Data-local map tasks=1
		Total time spent by all maps in occupied slots (ms)=3057
		Total time spent by all reduces in occupied slots (ms)=0
		Total time spent by all map tasks (ms)=3057
		Total vcore-milliseconds taken by all map tasks=3057
		Total megabyte-milliseconds taken by all map tasks=3130368
	Map-Reduce Framework
		Map input records=4
		Map output records=4
		Input split bytes=125
		Spilled Records=0
		Failed Shuffles=0
		Merged Map outputs=0
		GC time elapsed (ms)=99
		CPU time spent (ms)=350
		Physical memory (bytes) snapshot=117669888
		Virtual memory (bytes) snapshot=845942784
		Total committed heap usage (bytes)=16121856
	File Input Format Counters 
		Bytes Read=93
	File Output Format Counters 
		Bytes Written=108
[main] DEBUG org.apache.hadoop.security.UserGroupInformation - PrivilegedAction as:hadoop (auth:SIMPLE) from:org.apache.hadoop.mapreduce.Job.updateStatus(Job.java:328)
[IPC Parameter Sending Thread #0] DEBUG org.apache.hadoop.ipc.Client - IPC Client (1880635722) connection to centos-aaron-h2/192.168.29.145:37772 from hadoop sending #88 org.apache.hadoop.mapreduce.v2.api.MRClientProtocolPB.getJobReport
[IPC Client (1880635722) connection to centos-aaron-h2/192.168.29.145:37772 from hadoop] DEBUG org.apache.hadoop.ipc.Client - IPC Client (1880635722) connection to centos-aaron-h2/192.168.29.145:37772 from hadoop got value #88
[main] DEBUG org.apache.hadoop.ipc.ProtobufRpcEngine - Call: getJobReport took 0ms
[pool-4-thread-1] DEBUG org.apache.hadoop.ipc.Client - stopping client from cache: org.apache.hadoop.ipc.Client@303c7016
[Thread-3] DEBUG org.apache.hadoop.util.ShutdownHookManager - ShutdownHookManger complete shutdown.

            六、运行结果

[hadoop@centos-aaron-h1 ~]$  hdfs dfs -cat  /rjoin/mapjoinsideoutput/part-m-00000
1001    20150710        P0001   2       小米5
1002    20150710        P0001   3       小米5
1002    20150710        P0002   3       锤子T1
1003    20150710        P0003   3       锤子

            最后寄语,以上是博主本次文章的全部内容,如果大家觉得博主的文章还不错,请点赞;如果您对博主其它服务器大数据技术或者博主本人感兴趣,请关注博主博客,并且欢迎随时跟博主沟通交流。

© 著作权归作者所有

共有 人打赏支持
em_aaron
粉丝 73
博文 98
码字总数 164936
作品 3
黄浦
高级程序员
私信 提问
hadoop中MapReduce多种join实现实例分析

一、概述 对于RDBMS中的join操作大伙一定非常熟悉,写sql的时候要十分注意细节,稍有差池就会耗时巨久造成很大的性能瓶颈,而在Hadoop中使用MapReduce框架进行join的操作时同样耗时,但是由于...

zengzhaozheng
2018/07/02
0
0
Spark map-side-join 关联优化

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/lsshlsw/article/details/50834858 将多份数据进行关联是数据处理过程中非常普遍的用法,不过在分布式计算系统...

breeze_lsw
2016/03/09
0
0
Hadoop中MapReduce多种join实现实例分析

一、概述 对于RDBMS中的join操作大伙一定非常熟悉,写sql的时候要十分注意细节,稍有差池就会耗时巨久造成很大的性能瓶颈,而在Hadoop中使用MapReduce框架进行join的操作时同样耗时,但是由于...

lgscofield
2015/09/06
309
0
MapReduce编程之Join多种应用场景与使用

MapReduce编程之Join多种应用场景与使用 Join操作概述 在关系型数据库中 Join 是非常常见的操作,各种优化手段已经到了极致。在海量数据的环境下,不可避免的也会碰到这种类型的需求,例如在...

wypersist
2018/05/06
0
0
几种 hive join 类型简介

作为数据分析中经常进行的join 操作,传统DBMS 数据库已经将各种算法优化到了极致,而对于hadoop 使用的mapreduce 所进行的join 操作,去年开始也是有各种不同的算法论文出现,讨论各种算法的...

大数据之路
2012/10/11
0
0

没有更多内容

加载失败,请刷新页面

加载更多

Windows 上安装 Scala

在安装 Scala 之前需要先安装 Java 环境,具体安装的详细方法就不在这里描述了。 您可以自行搜索我们网站中的内容获得其他网站的帮助来获得如何安装 Java 环境的方法。 接下来,我们可以从 ...

honeymose
今天
1
0
数据库篇多表操作

第1章 多表操作 实际开发中,一个项目通常需要很多张表才能完成。例如:一个商城项目就需要分类表(category)、商品表(products)、订单表(orders)等多张表。且这些表的数据之间存在一定的关系...

stars永恒
今天
3
0
nginx日志自动切割

1.日志配置(Nginx 日志) access.log----记录哪些用户,哪些页面以及用户浏览器,IP等访问信息;error.log------记录服务器错误的日志 #配置日志存储路径:location / {      a...

em_aaron
昨天
5
0
java 反射

基本概念 RTTI,即Run-Time Type Identification,运行时类型识别。RTTI能在运行时就能够自动识别每个编译时已知的类型。   要想理解反射的原理,首先要了解什么是类型信息。Java让我们在运...

细节探索者
昨天
2
0
推荐转载连接

https://www.cnblogs.com/ysocean/p/7409779.html#_label0

小橙子的曼曼
昨天
3
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部