矩阵乘法的mapreduce程序实现

原创
2017/05/25 20:36
阅读数 365

map函数:对于矩阵M中的每个元素m(ij),产生一系列的key-value对<(i,k),(M,j,m(ij))>

其中k=1,2.....知道矩阵N的总列数;对于矩阵N中的每个元素n(jk),产生一系列的key-value对<(i , k) , (N , j ,n(jk)>, 其中i=1,2.......直到i=1,2.......直到矩阵M的总列数。

map

package com.cb.matrix;

import static org.mockito.Matchers.intThat;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapreduce.Mapper;

import com.sun.org.apache.bcel.internal.generic.NEW;


public class MatrixMapper extends Mapper<Object, Text, Text, Text> {
	private Text map_key=new Text();
	private Text map_value= new Text();
	private int columnN;
	private int rowM;
	/**
	 * 执行map()函数前先由conf.get()得到main函数中提供的必要变量
	 * 也就是从输入文件名中得到的矩阵维度信息
	 */
	
	@Override
	protected void setup(Mapper<Object, Text, Text, Text>.Context context) throws IOException, InterruptedException {
		// TODO Auto-generated method stub
		Configuration config=context.getConfiguration();
		columnN=Integer.parseInt(config.get("columnN"));
		rowM =Integer.parseInt(config.get("rowM"));
	}
	
	@Override
	protected void map(Object key, Text value, Mapper<Object, Text, Text, Text>.Context context)
			throws IOException, InterruptedException {
		// TODO Auto-generated method stub
		//得到文件名,从而区分输入矩阵M和N
		FileSplit fileSplit=(FileSplit)context.getInputSplit();
		String fileName=fileSplit.getPath().getName();
		
		if (fileName.contains("M")) {
			String[] tuple =value.toString().split(",");
			int i =Integer.parseInt(tuple[0]);
			String[] tuples=tuple[1].split("\t");
			int j=Integer.parseInt(tuples[0]);
			int Mij=Integer.parseInt(tuples[1]);
			for(int k=1;k<columnN+1;k++){
				map_key.set(i+","+k);
				map_value.set("M"+","+j+","+Mij);
				context.write(map_key, map_value);
			}
			
		}
		else if(fileName.contains("N")){
			String[] tuple=value.toString().split(",");
			int j=Integer.parseInt(tuple[0]);
			String[] tuples =tuple[1].split("\t");
			int k=Integer.parseInt(tuples[0]);
			int Njk=Integer.parseInt(tuples[1]);
			for(int i=1;i<rowM+1;i++){
				map_key.set(i+","+k);
				map_value.set("N"+","+j+","+Njk);
				context.write(map_key, map_value);
			}
		}
		
	}

}

reduce函数:对于每个键(i,k)相关联的值(M,j,m(ij))及(N,j,n(jk)),根据相同的j值将m(ij)和n(jk)分别存入不同的数组中,然后将俩者的第j个元素抽取出来分别相乘,最后相加,即可得到p(jk)的值。

reducer

package com.cb.matrix;


import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;



public class MatrixReducer extends Reducer<Text, Text, Text, Text> {
	private int sum=0;
	private int columnM;
	@Override
	protected void setup(Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException {
		// TODO Auto-generated method stub
		Configuration conf =context.getConfiguration();
		columnM=Integer.parseInt(conf.get("columnM"));
	}
	@Override
	protected void reduce(Text arg0, Iterable<Text> arg1, Reducer<Text, Text, Text, Text>.Context arg2)
			throws IOException, InterruptedException {
		// TODO Auto-generated method stub
		int[] M=new int[columnM+1];
		int[] N=new int[columnM+1];
		
		for(Text val:arg1){
			String[] tuple=val.toString().split(",");
			if(tuple[0].equals("M")){
				M[Integer.parseInt(tuple[1])]=Integer.parseInt(tuple[2]);
				
			}else{
				N[Integer.parseInt(tuple[1])]=Integer.parseInt(tuple[2]);
			}
			for(int j=1;j<columnM+1;j++){
				sum+=M[j]*N[j];
			}
			arg2.write(arg0, new Text(Integer.toString(sum)));
			sum=0;
		}
	}

}

 

展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部