首页 > 代码库 > MapReduce的reduce函数里的key用的是同一个引用

MapReduce的reduce函数里的key用的是同一个引用

最近写MapReduce程序,出现了这么一个问题,程序代码如下:

 1 package demo; 2  3 import java.io.IOException; 4 import java.util.HashMap; 5 import java.util.Map; 6 import java.util.Map.Entry; 7  8 import org.apache.hadoop.fs.FSDataOutputStream; 9 import org.apache.hadoop.fs.FileSystem;10 import org.apache.hadoop.fs.Path;11 import org.apache.hadoop.io.IntWritable;12 import org.apache.hadoop.io.Text;13 import org.apache.hadoop.mapreduce.Reducer;14 15 public class ReducerDemo extends Reducer<Text, IntWritable, Text, IntWritable>{16 17     private FileSystem fs = null;18     private FSDataOutputStream outs = null;19     public Map<Text, Integer> wordNumMap = new HashMap<Text, Integer>();20     21 22     23     @Override24     protected void setup(Context context)25             throws IOException, InterruptedException {26         String logFile = context.getConfiguration().get(HdpDemo.LOG_FILE);27         fs = FileSystem.get(context.getConfiguration());28         if(null != logFile){29             int taskId = context.getTaskAttemptID().getTaskID().getId();30             logFile += ("_"+taskId);31             outs = fs.create(new Path(logFile));32         }33     }34     35 /*    public void reduce(Text key, IntWritable value, Context context){36         37     }*/38 39     public void reduce(Text key, Iterable<IntWritable> numberIter, Context context)40             throws IOException, InterruptedException {41         Text word = key;42         Integer currNum = wordNumMap.get(word);43         if(null == currNum){44             currNum = 0;45         }46         for(IntWritable num:numberIter){47             currNum += num.get();48         }49         wordNumMap.put(word, currNum);50 51     }52     53     @Override54     protected void cleanup(Context context)55             throws IOException, InterruptedException {56         for(Entry<Text, Integer> entry : wordNumMap.entrySet()){57             IntWritable num = new IntWritable(entry.getValue());58             context.write(entry.getKey(), num);59         }60         outs.close();61     }62 63     private void log(String content) throws IOException{64         if(null != outs){65             outs.write(content.getBytes());66         }67     }68 69 }

 

 

这是个单词统计的reducer类,按理说打印出来的结果应该是如下结果:

world   2ccc     2of      1best    1the     1is      1bbb     2james   2ddd     2hello   2aaa     1

而实际上的打印结果却为:

world:2world:2world:1world:1world:1world:1world:2world:2world:2world:2world:1

 

原因分析如下:

Hadoop的MapReduce框架每次调用reducer的reduce函数,代码中的第39行,每次传入的key都是对同一个地址的引用,导致了插入wordNumMap中的那些key都被修改了。

而如果把第41行的

Text word = key;

改为

Text word = new Text();
word.set(key);

这样结果就正确了,也印证了我的猜测。

 

MapReduce的reduce函数里的key用的是同一个引用