首页 > 代码库 > 写一个读取hfile的mapreduce之获取HFile内容

写一个读取hfile的mapreduce之获取HFile内容

  之前介绍了关于Mapreduce是进行输入处理的。这一篇将会介绍如何从Hfile中获取内容。这里和一般获取hbase的数据过程不太一样,不会去创建HTable对象而是直接去读取HFile文件。闲话不多说,直接上代码。先写一个单进程读取HFile的程序

 1 public class HFileReaderUtil {
 2 
 3     private Configuration conf ;
 4 
 5     private Path path ;
 6 
 7     private HFile.Reader reader;
 8 
 9     private HFileScanner scanner;
10 
11     public HFileReaderUtil()  {
12         if(conf==null){
13             conf= HBaseConfiguration.create();
14         }
15 
16     }
17 
18 public void scanHfile(String pathstr)throws IOException{
19     path = new Path(pathstr);
20     reader = HFile.createReader(FileSystem.get(conf),path ,new CacheConfig(conf),conf);
21     scanner = reader.getScanner(false,false);
22     reader.loadFileInfo();
23     scanner.seekTo();
24 
25     do{
26         KeyValue kv = scanner.getKeyValue();
27         System.out.println("rowkey = "+Bytes.toString(CellUtil.cloneRow(kv)));
28         System.out.println("cf = "+Bytes.toString(CellUtil.cloneFamily(kv)));
29         System.out.println("column value = "http://www.mamicode.com/+Bytes.toString(CellUtil.cloneValue(kv)));
30         System.out.println("column name = "+CellUtil.cloneQualifier(kv));
31 
32     }while (scanner.next());
33 
34 }
35 
36 
37 
38 }

   接着实现一个从HFile中获取数据的RecordReader,看一下RecordReader的描述,会不断从HFile中读取数据并返回键值对数据交给map去处理

The record reader breaks the data into key/value pairs for input
 1 package spdbccc.mapreduce.inputformat;
 2 
 3 import org.apache.hadoop.conf.Configuration;
 4 import org.apache.hadoop.fs.FileSystem;
 5 import org.apache.hadoop.fs.Path;
 6 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 7 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 8 import org.apache.hadoop.hbase.io.hfile.HFile;
 9 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
10 import org.apache.hadoop.mapreduce.InputSplit;
11 import org.apache.hadoop.mapreduce.RecordReader;
12 import org.apache.hadoop.mapreduce.TaskAttemptContext;
13 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
14 
15 import java.io.IOException;
16 
17 
18 public class HFileRecordReader<K,V>extends RecordReader{
19 
20 
21     private HFile.Reader reader;
22     private final HFileScanner scanner;
23     private int entryNumber = 0;
24 
25     public HFileRecordReader(FileSplit split, Configuration conf)
26             throws IOException {
27         final Path path = split.getPath();
28         reader = HFile.createReader(FileSystem.get(conf), path,new CacheConfig(conf), conf);
29         scanner = reader.getScanner(false, false);
30         reader.loadFileInfo(); 
31         scanner.seekTo(); 
32     }
33 
34 
35 
36     public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
37 
38     }
39 
40     public boolean nextKeyValue() throws IOException, InterruptedException {
41         entryNumber++;
42         return scanner.next();
43     }
44 
45     public Object getCurrentKey() throws IOException, InterruptedException {
46         // TODO Auto-generated method stub
47         return new ImmutableBytesWritable(scanner.getKeyValue().getRow());
48     }
49 
50     public Object getCurrentValue() throws IOException, InterruptedException {
51         return scanner.getKeyValue();
52     }
53 
54 
55     /**
56      * 返回运行进度
57      * @return
58      * @throws IOException
59      * @throws InterruptedException
60      */
61     public float getProgress() throws IOException, InterruptedException {
62         if (reader != null) {
63             return (entryNumber / reader.getEntries());
64         }
65         return 1;
66     }
67 
68 
69     /**
70      * 关闭读取资源
71      * @throws IOException
72      */
73     public void close() throws IOException {
74         if (reader != null) {
75             reader.close();
76         }
77     }
78 }

 

最后实现FileInputFormat,这样就实现了一个读取HFile的InputFormat。

 1 package spdbccc.mapreduce.inputformat;
 2 
 3 
 4 import org.apache.hadoop.fs.Path;
 5 import org.apache.hadoop.hbase.KeyValue;
 6 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 7 import org.apache.hadoop.mapreduce.InputSplit;
 8 import org.apache.hadoop.mapreduce.JobContext;
 9 import org.apache.hadoop.mapreduce.RecordReader;
10 import org.apache.hadoop.mapreduce.TaskAttemptContext;
11 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
12 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
13 
14 import java.io.IOException;
15 
16 public class HFileInputFormat extends
17         FileInputFormat<ImmutableBytesWritable, KeyValue> {
18 
19     @Override
20     protected boolean isSplitable(JobContext context, Path filename) {
21         return false;
22     }
23 
24     @Override
25     public RecordReader<ImmutableBytesWritable, KeyValue> createRecordReader(
26             InputSplit split, TaskAttemptContext context) throws IOException,
27             InterruptedException {
28         return new HFileRecordReader((FileSplit) split, context
29                 .getConfiguration());
30     }
31 
32 
33 }

 

  到这里是不是感觉自己蓄满了洪荒之力?从此HBase读取再没什么难度可以走上人生巅峰了,然而!!!

  这样的实现方式并没有什么卵用!

  这样的实现方式并没有什么卵用!

  这样的实现方式并没有什么卵用!

  重要的事情说三遍!

  因为到这里,如果直接使用这个InputFormat会有两个问题。一,虽然可以读取到HFile中的数据,但是同一个kv会有不同时间戳的版本在不同的HFile,无法保证获取到的是最新的记录。  二,任务运行的时候会产生大量的Map任务,因为HBase数据落地时会产生大量的小文件,而FileInputFormat会对每个小文件生成一个map任务,最终结果时短时间占满集群资源。

  而这两个问题都是由于HBase自身的原理有关,下一篇将会介绍这些背后的故事——————LSM。

 

写一个读取hfile的mapreduce之获取HFile内容