首页 > 代码库 > 矩阵在mr的实现

矩阵在mr的实现

先随机生成一个矩阵,矩阵的行数与列数由用户输入:

#!/bin/bash
for i in `seq 1 $1`
do
for j in `seq 1 $2`
do
s=$((RANDOM%100))
echo "s==="$s
echo -e "$i,$j\t$s" >>M_$1_$2

done
done

for i in `seq 1 $2`
do
for j in `seq 1 $3`
do
s=$((RANDOM%100))
echo -e "$i,$j\t$s" >>N_$2_$3
done
done

执行上面的shell脚本,即可生成一个矩阵。矩阵的乘法在hadoop的并行化实现,其基本的实现思路如下:

map:对于矩阵的M的每一个元素mij,产生一系列的key-value对<(i,k),(M,j,mij)>;对于矩阵的N的每个元素Njk,产生一系列的key-value对<(j,k),(N,j,mjk)>;

reduce:对于每个键(i,k)相关联的值(M,j,mij),(N,j,mjk),根据相同的j值将mij和njk分别存入不同的数组中,然后将两者的的第J个元素抽取出来分别相乘,最后相加,即可得到Pik的值。

下面给出hadoop的map和reduce代码的实现。

public static class MatrixMapper extends Mapper<Object,Text,Text,Text>{

      private Text map_key=new Text();

   private Text map_value=http://www.mamicode.com/new Text();

    public void setup(Context context) throws IOException{

   Configuration conf=context.getConfiguration();

  columnN=Integer.parseInt(conf.get("columnN"));

  rowM=Integer.parseInt(conf.get("rowM"));

  }

     public void map(Object key,Text value,Context context) throws IOException,InterruptedException{

  FileSplit fileSplit=(FileSplit )context.getInputSplit();

   String fileName=fileSplit.getPath().getName();

  String[] tuple=value.toString().split(",");

  if(fileName.startWith("M")){

    int i=Integer.parseInt(tuple[0]);

    String tuples=tuple[1].split("\t");

    int j=Integer.parseInt(tuples[0]);

    int Mij=Integer.parseInt(tuples[1]);

    for(int k=1;k<columnN+1;k++){

    map_key.set(i+","+k);

    map_value.set("M"+","+j+","+Mij);

    context.write(map_key,map_value);

    }

  }else if(fileName.startWith("N")){

    int j=Integer.parseInt(tuple[0]);

    String tuples=tuple[1].split("\t");

    int k=Integer.parseInt(tuples[0]);

    int Njk=Integer.parseInt(tuples[1]);

    for(int i=1;i<rowM+1;i++){

    map_key.set(i+","+k);

    map_value.set("N"+","+j+","+Njk);

    context.write(map_key,map_value);

  }

  }

}

 

矩阵在mr的实现