首页 > 代码库 > 矩阵在mr的实现
矩阵在mr的实现
先随机生成一个矩阵,矩阵的行数与列数由用户输入:
#!/bin/bash
for i in `seq 1 $1`
do
for j in `seq 1 $2`
do
s=$((RANDOM%100))
echo "s==="$s
echo -e "$i,$j\t$s" >>M_$1_$2
done
done
for i in `seq 1 $2`
do
for j in `seq 1 $3`
do
s=$((RANDOM%100))
echo -e "$i,$j\t$s" >>N_$2_$3
done
done
执行上面的shell脚本,即可生成一个矩阵。矩阵的乘法在hadoop的并行化实现,其基本的实现思路如下:
map:对于矩阵的M的每一个元素mij,产生一系列的key-value对<(i,k),(M,j,mij)>;对于矩阵的N的每个元素Njk,产生一系列的key-value对<(j,k),(N,j,mjk)>;
reduce:对于每个键(i,k)相关联的值(M,j,mij),(N,j,mjk),根据相同的j值将mij和njk分别存入不同的数组中,然后将两者的的第J个元素抽取出来分别相乘,最后相加,即可得到Pik的值。
下面给出hadoop的map和reduce代码的实现。
public static class MatrixMapper extends Mapper<Object,Text,Text,Text>{
private Text map_key=new Text();
private Text map_value=http://www.mamicode.com/new Text();
public void setup(Context context) throws IOException{
Configuration conf=context.getConfiguration();
columnN=Integer.parseInt(conf.get("columnN"));
rowM=Integer.parseInt(conf.get("rowM"));
}
public void map(Object key,Text value,Context context) throws IOException,InterruptedException{
FileSplit fileSplit=(FileSplit )context.getInputSplit();
String fileName=fileSplit.getPath().getName();
String[] tuple=value.toString().split(",");
if(fileName.startWith("M")){
int i=Integer.parseInt(tuple[0]);
String tuples=tuple[1].split("\t");
int j=Integer.parseInt(tuples[0]);
int Mij=Integer.parseInt(tuples[1]);
for(int k=1;k<columnN+1;k++){
map_key.set(i+","+k);
map_value.set("M"+","+j+","+Mij);
context.write(map_key,map_value);
}
}else if(fileName.startWith("N")){
int j=Integer.parseInt(tuple[0]);
String tuples=tuple[1].split("\t");
int k=Integer.parseInt(tuples[0]);
int Njk=Integer.parseInt(tuples[1]);
for(int i=1;i<rowM+1;i++){
map_key.set(i+","+k);
map_value.set("N"+","+j+","+Njk);
context.write(map_key,map_value);
}
}
}
矩阵在mr的实现