文档章节

使用MapReduce解决矩阵乘法的问题

passionfly
 passionfly
发布于 2015/09/07 15:30
字数 2782
阅读 1308
收藏 45
摘要:在海量数据中淘金,已是各大互联网公司的既定目标,亚马逊是数据化运营的成功典范,Google、百度投巨资用于对海量数据进行深度学习研究,阿里把数据与平台、金融并列成为未来三大战略。想在海量数据中淘到金子,强大的挖掘工具是必不可少的,而诸如回归、聚类、主成分分析、决策树等数据挖掘算法常常涉及大规模矩阵运算。这其中,大矩阵乘法具有较大的时间消耗,是算法的瓶颈。所以将矩阵乘法移植到分布式系统中进行运算,可谓是基本需求,所以本文暂且从最基础开始,简单介绍使用MapReduce实现矩阵乘法的方式。

简单回顾一下矩阵乘法:

C=AB

矩阵乘法要求左矩阵的列数与右矩阵的行数相等,m×n的矩阵A,与n×p的矩阵B相乘,结果为m×p的矩阵C。

为了方便描述,先进行假设:

  • 矩阵A的行数为m,列数为n,aij为矩阵A第i行j列的元素。
  • 矩阵B的行数为n,列数为p,bij为矩阵B第i行j列的元素。

分析

因为分布式计算的特点,需要找到相互独立的计算过程,以便能够在不同的节点上进行计算而不会彼此影响。根据矩阵乘法的公式,C中各个元素的计算都是相互独立的,即各个cij在计算过程中彼此不影响。这样的话,在Map阶段可以把计算所需要的元素都集中到同一个key中,然后,在Reduce阶段就可以从中解析出各个元素来计算cij

另外,以a11为例,它将会在c11、c12……c1p的计算中使用。也就是说,在Map阶段,当我们从HDFS取出一行记录时,如果该记录是A的元素,则需要存储成p个<key, value>对,并且这p个key互不相同;如果该记录是B的元素,则需要存储成m个<key, value>对,同样的,m个key也应互不相同;但同时,用于存放计算cij的ai1、ai2……ain和b1j、b2j……bnj的<key, value>对的key应该都是相同的,这样才能被传递到同一个Reduce中。

设计

普遍有一个共识是:数据结构+算法=程序,所以在编写代码之前需要先理清数据存储结构和处理数据的算法。

算法

map阶段

在map阶段,需要做的是进行数据准备。把来自矩阵A的元素aij,标识成p条<key, value>的形式,key="i,k",(其中k=1,2,...,p),value="a:j,aij";把来自矩阵B的元素bij,标识成m条<key, value>形式,key="k,j"(其中k=1,2,...,m),value="b:i,bij"。

经过处理,用于计算cij需要的a、b就转变为有相同key("i,j")的数据对,通过value中"a:"、"b:"能区分元素是来自矩阵A还是矩阵B,以及具体的位置(在矩阵A的第几列,在矩阵B的第几行)。

shuffle阶段

这个阶段是Hadoop自动完成的阶段,具有相同key的value被分到同一个Iterable中,形成<key,Iterable(value)>对,再传递给reduce。

reduce阶段

通过map数据预处理和shuffle数据分组两个阶段,reduce阶段只需要知道两件事就行:

  • <key,Iterable(value)>对经过计算得到的是矩阵C的哪个元素?因为map阶段对数据的处理,key(i,j)中的数据对,就是其在矩阵C中的位置,第i行j列。
  • Iterable中的每个value来自于矩阵A和矩阵B的哪个位置?这个也在map阶段进行了标记,对于value(x:y,z),只需要找到y相同的来自不同矩阵(即x分别为a和b)的两个元素,取z相乘,然后加和即可。

数据结构

计算过程已经设计清楚了,就需要对数据结构进行设计。大体有两种设计方案:

第一种:使用最原始的表示方式,相同行内不同列数据通过","分割,不同行通过换行分割;

第二种:通过行列表示法,即文件中的每行数据有三个元素通过分隔符分割,第一个元素表示行,第二个元素表示列,第三个元素表示数据。这种方式对于可以不列出为0的元素,即可以减少稀疏矩阵的数据量。

http://img.blog.csdn.net/20141009222508641

在上图中,第一种方式存储的数据量小于第二种,但这只是因为例子中的数据设计成这样。在现实中,使用分布式计算矩阵乘法的环境中,大部分矩阵是稀疏矩阵,且数据量极大,在这种情况下,第二种数据结构的优势就显现了出来。而且,因为使用分布式计算,如果数据大于64m,在map阶段将不能够逐行处理,将不能确定数据来自于哪一行。不过,由于现实中对于大矩阵的乘法,考虑到存储空间和内存的情况,需要特殊的处理方式,有一种是将矩阵进行行列转换然后计算,这个时候第一种还是挺实用的。

编写代码

第一种数据结构

代码为:

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class MatrixMultiply {
    public static class MatrixMapper extends
            Mapper<LongWritable, Text, Text, Text> {
        private String flag = null;// 数据集名称
        private int rowNum = 4;// 矩阵A的行数
        private int colNum = 2;// 矩阵B的列数
        private int rowIndexA = 1; // 矩阵A,当前在第几行
        private int rowIndexB = 1; // 矩阵B,当前在第几行

        @Override
        protected void setup(Context context) throws IOException,
                InterruptedException {
            flag = ((FileSplit) context.getInputSplit()).getPath().getName();// 获取文件名称
        }

        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            String[] tokens = value.toString().split(",");
            if ("ma".equals(flag)) {
                for (int i = 1; i <= colNum; i++) {
                    Text k = new Text(rowIndexA + "," + i);
                    for (int j = 0; j < tokens.length; j++) {
                        Text v = new Text("a," + (j + 1) + "," + tokens[j]);
                        context.write(k, v);
                    }
                }
                rowIndexA++;// 每执行一次map方法,矩阵向下移动一行
            } else if ("mb".equals(flag)) {
                for (int i = 1; i <= rowNum; i++) {
                    for (int j = 0; j < tokens.length; j++) {
                        Text k = new Text(i + "," + (j + 1));
                        Text v = new Text("b," + rowIndexB + "," + tokens[j]);
                        context.write(k, v);
                    }
                }
                rowIndexB++;// 每执行一次map方法,矩阵向下移动一行
            }
        }
    }

    public static class MatrixReducer extends
            Reducer<Text, Text, Text, IntWritable> {
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
            Map<String, String> mapA = new HashMap<String, String>();
            Map<String, String> mapB = new HashMap<String, String>();

            for (Text value : values) {
                String[] val = value.toString().split(",");
                if ("a".equals(val[0])) {
                    mapA.put(val[1], val[2]);
                } else if ("b".equals(val[0])) {
                    mapB.put(val[1], val[2]);
                }
            }

            int result = 0;
            Iterator<String> mKeys = mapA.keySet().iterator();
            while (mKeys.hasNext()) {
                String mkey = mKeys.next();
                if (mapB.get(mkey) == null) {// 因为mkey取的是mapA的key集合,所以只需要判断mapB是否存在即可。
                    continue;
                }
                result += Integer.parseInt(mapA.get(mkey))
                        * Integer.parseInt(mapB.get(mkey));
            }
            context.write(key, new IntWritable(result));
        }
    }

    public static void main(String[] args) throws IOException,
            ClassNotFoundException, InterruptedException {
        String input1 = "hdfs://192.168.1.128:9000/user/lxh/matrix/ma";
        String input2 = "hdfs://192.168.1.128:9000/user/lxh/matrix/mb";
        String output = "hdfs://192.168.1.128:9000/user/lxh/matrix/out";

        Configuration conf = new Configuration();
        conf.addResource("classpath:/hadoop/core-site.xml");
        conf.addResource("classpath:/hadoop/hdfs-site.xml");
        conf.addResource("classpath:/hadoop/mapred-site.xml");
        conf.addResource("classpath:/hadoop/yarn-site.xml");

        Job job = Job.getInstance(conf, "MatrixMultiply");
        job.setJarByClass(MatrixMultiply.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        job.setMapperClass(MatrixMapper.class);
        // job.setReducerClass(MatrixReducer.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        FileInputFormat.setInputPaths(job, new Path(input1), new Path(input2));// 加载2个输入数据集
        Path outputPath = new Path(output);
        outputPath.getFileSystem(conf).delete(outputPath, true);
        FileOutputFormat.setOutputPath(job, outputPath);

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

绘图演示效果:

http://img.blog.csdn.net/20141010105520586

第二种数据结构

代码为:

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;  public class SparseMatrixMultiply {
    public static class SMMapper extends Mapper<LongWritable, Text, Text, Text> {
        private String flag = null;
        private int m = 4;// 矩阵A的行数
        private int p = 2;// 矩阵B的列数

        @Override
        protected void setup(Context context) throws IOException,
                InterruptedException {
            FileSplit split = (FileSplit) context.getInputSplit();
            flag = split.getPath().getName();
        }

        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            String[] val = value.toString().split(",");
            if ("t1".equals(flag)) {
                for (int i = 1; i <= p; i++) {
                    context.write(new Text(val[0] + "," + i), new Text("a,"
                            + val[1] + "," + val[2]));
                }
            } else if ("t2".equals(flag)) {
                for (int i = 1; i <= m; i++) {
                    context.write(new Text(i + "," + val[1]), new Text("b,"
                            + val[0] + "," + val[2]));
                }
            }
        }
    }

    public static class SMReducer extends
            Reducer<Text, Text, Text, IntWritable> {
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
            Map<String, String> mapA = new HashMap<String, String>();
            Map<String, String> mapB = new HashMap<String, String>();

            for (Text value : values) {
                String[] val = value.toString().split(",");
                if ("a".equals(val[0])) {
                    mapA.put(val[1], val[2]);
                } else if ("b".equals(val[0])) {
                    mapB.put(val[1], val[2]);
                }
            }

            int result = 0;
            // 可能在mapA中存在在mapB中不存在的key,或相反情况
            // 因为,数据定义的时候使用的是稀疏矩阵的定义
            // 所以,这种只存在于一个map中的key,说明其对应元素为0,不影响结果
            Iterator<String> mKeys = mapA.keySet().iterator();
            while (mKeys.hasNext()) {
                String mkey = mKeys.next();
                if (mapB.get(mkey) == null) {// 因为mkey取的是mapA的key集合,所以只需要判断mapB是否存在即可。
                    continue;
                }
                result += Integer.parseInt(mapA.get(mkey))
                        * Integer.parseInt(mapB.get(mkey));
            }
            context.write(key, new IntWritable(result));
        }
    }

    public static void main(String[] args) throws IOException,
            ClassNotFoundException, InterruptedException {
        String input1 = "hdfs://192.168.1.128:9000/user/lxh/matrix/t1";
        String input2 = "hdfs://192.168.1.128:9000/user/lxh/matrix/t2";
        String output = "hdfs://192.168.1.128:9000/user/lxh/matrix/out";

        Configuration conf = new Configuration();
        conf.addResource("classpath:/hadoop/core-site.xml");
        conf.addResource("classpath:/hadoop/hdfs-site.xml");
        conf.addResource("classpath:/hadoop/mapred-site.xml");
        conf.addResource("classpath:/hadoop/yarn-site.xml");

        Job job = Job.getInstance(conf, "SparseMatrixMultiply");
        job.setJarByClass(SparseMatrixMultiply.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        job.setMapperClass(SMMapper.class);
        job.setReducerClass(SMReducer.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        FileInputFormat.setInputPaths(job, new Path(input1), new Path(input2));// 加载2个输入数据集
        Path outputPath = new Path(output);
        outputPath.getFileSystem(conf).delete(outputPath, true);
        FileOutputFormat.setOutputPath(job, outputPath);

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

绘图演示效果:

http://img.blog.csdn.net/20141010101823682

代码分析

比较两种代码,可以很清楚的看出,两种实现只是在map阶段有些区别,reduce阶段基本相同。对于其中关于行i、列j定义不是从0计数(虽然我倾向于从0开始计数,不用写等号,简单),是为了更直观的观察数据处理过程是否符合设计。

在第一种实现中,需要记录当前是读取的哪一行数据,所以,这种仅适用于不需要分块的小文件中进行的矩阵乘法运算。第二种实现中,每行数据记录了所在行所在列,不会有这方面的限制。

在第二种实现中,遍历两个HashMap时,取mapA的key作为循环标准,是因为在一般情况下,mapA和mapB的key是相同的(如第一种实现),因为使用稀疏矩阵,两个不相同的key说明是0,可以舍弃不参与计算,所以只使用mapA的key,并判断mapB是否存在该key对应的值。

两种实现的reduce阶段,计算最后结果时,都是直接使用内存存储数据、计算结果,所以当数据量很大的时候(通常都会很大,否则不会用分布式处理),极易造成内存溢出,所以,对于大矩阵的运算,还需要其他的转换方式,比如行列相乘运算、分块矩阵运算、基于最小粒度相乘的算法等方式。另外,因为这两份代码都是demo,所以代码中缺少过滤错误数据的部分。

本文转载自:http://p.primeton.com/articles/54ccca6dbe20aa6a9000020f

passionfly
粉丝 15
博文 106
码字总数 76465
作品 0
西安
私信 提问
MapReduce 矩阵乘法之我见

一、最简单的算法 把mn 和nl的矩阵A和B相乘,这估计是最容易想到的方法了: 把A(mn)的元素,每个发送l次,把B(nl)的元素每个发送m次。将发送到一起的数据相乘求和,得到最后的结果。 优点...

一只小桃子
2014/05/18
2.2K
5
MapReduce 算法 - 反序模式 (Order Inversion)

这一篇其它段落的一系列MapReduce算法在" Data-Intensive Text Processing with MapReduce"这本书上呈现。以前分别是 Local Aggregation , Local Aggregation PartII 和 Creating a Co-Occu......

可观
2013/01/25
3.1K
5
使用 Hadoop 和 Mahout 实现推荐引擎

作为我之前博客的延续,在这篇博客中,我将探讨如何使用 Mahout 和 Hadoop 实现一个 推荐引擎 第一部分 介绍 MapReduce 和 为什么为了利用并行计算的优势,一些算法需要而重写 第二部分 我会...

oschina
2013/02/14
6.5K
5
MapReduce: 一个巨大的倒退

前言 databasecolumn 的数据库大牛们(其中包括PostgreSQL的最初伯克利领导:Michael Stonebraker)最近写了一篇评论当前如日中天的MapReduce技术的文章,引发剧烈的讨论。我抽空在这儿翻译一...

ddatsh
2011/11/04
4.6K
7
MapReduce 算法 - 反转排序 (Order Inversion)

本文另一地址请见MapReduce算法-反转排序 本文译自 MapReduce Algorithms - Order Inversion排序 译者注:在刚开始翻译的时候,我将Order Inversion按照字面意思翻译成“反序”或者“倒序”,...

可观
2013/01/27
986
0

没有更多内容

加载失败,请刷新页面

加载更多

领域驱动中的“贫血症和失忆症” --实践领域驱动--原文

贫血症严重危害着人类健康,并且伴随有危险的副作用。当贫血领域对象被首次提出来时,它并不是一个博得赞美的词汇,它描述的是一个缺少内在行为领域对象。奇怪的是,人们对于贫血领域对象的态...

还仙
28分钟前
5
0
条码打印软件中标签预览正常打印无反应怎么解决

在使用条码打印软件制作标签时,有客户反馈,标签打印预览正常的,但是打印无反应,咨询是怎么回事?今天针对这个情况,可以参考以下方法进行解决。 一、预览正常情况下,打印没反应 (1)在条码...

中琅软件
38分钟前
5
0
判断字符串的时候

判断字符串的时候一定把常量房前边, //报警程度 String leve = vo.getDeviceAlertDeal().getWarnLevel(); if(("0").equals(leve)) { row.add("无报警"); }else if(("1").equals(leve)) { ro......

简小姐
39分钟前
7
0
Linux maven3.6.2 install

PS:安装 maven 之前请先安装 jdk 1.安装 wget 命令(安装过就不用了) yum -y install wget 2.寻找需要的 maven 版本 https://maven.apache.org/download.cgi 3.进入 /var/local 文件夹 cd...

东方神祇
41分钟前
5
0
Tomcat源码分析二:先看看Tomcat的整体架构

Tomcat源码分析二:先看看Tomcat的整体架构 Tomcat架构图 我们先来看一张比较经典的Tomcat架构图: 从这张图中,我们可以看出Tomcat中含有Server、Service、Connector、Container等组件,接下...

flygrk
44分钟前
6
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部