首页 > 代码库 > 使用MapReduce查询Hbase表指定列簇的全部数据输入到HDFS(一)

使用MapReduce查询Hbase表指定列簇的全部数据输入到HDFS(一)

package com.bank.service;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * 查询hbase表指定列簇的全部数据输出到HDFS上
 * @author mengyao
 *
 */
public class ReadHbase extends Configured implements Tool {

    private static String tableName;
    private static String outputDir;

    static class ReadHbaseMapper extends TableMapper<Text, Text> {
        private static Text k = new Text();
        private static Text v = new Text();
        @Override
        protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
            StringBuffer sb = new StringBuffer("");
            for(java.util.Map.Entry<byte[], byte[]> val : value.getFamilyMap(Bytes.toBytes("info")).entrySet()){
                String str = new String(val.getValue());
                if (str != null) {
                    sb.append(new String(val.getKey()));
                    sb.append(":");
                    sb.append(str);
                    sb.append(",");
                }
            }
            String line = sb.toString();
            k.set(key.get());
            v.set(new String(line.substring(0,line.length()-1)));
            context.write(k, v);
        }
    }

    static class ReadHbaseReduce extends Reducer<Text, Text, Text, Text> {
        private Text result = new Text();
        @Override
        protected void reduce(Text key, Iterable<Text> value, Context context) throws IOException, InterruptedException {
            for (Text val : value) {
                result.set(val);
                context.write(key, result);
            }
        }
    }
    
    @Override
    public int run(String[] arg0) throws Exception {
        tableName = arg0[0];
        outputDir = arg0[1];
        Job job = Job.getInstance(getConf(), ReadHbase.class.getSimpleName());
        job.setJarByClass(ReadHbase.class);
        job.setReducerClass(ReadHbaseReduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        FileOutputFormat.setOutputPath(job, new Path(outputDir));
        TableMapReduceUtil.initTableMapperJob(tableName, new Scan(), ReadHbaseMapper.class, Text.class, Text.class, job);
        TableMapReduceUtil.addDependencyJars(job);
        return job.waitForCompletion(true) ? 0 : 1;
    }
    
    public static void main(String[] args) throws Exception {
        String[] otherArgs = new GenericOptionsParser(args).getRemainingArgs();
        if (otherArgs.length != 2) {
            System.err.println(" Usage:" + ReadHbase.class.getSimpleName() + " <tableName> <outputDir> ");
            System.exit(2);
        }
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "h5:2181,h6:2181,h7:2181");
        conf.set("hbase.zookeeper.property.clientPort", "2181");
        conf.set("dfs.socket.timeout", "3600000");
        int status = ToolRunner.run(conf, new ReadHbase(), otherArgs);
        System.exit(status);
    }

}

使用MapReduce查询Hbase表指定列簇的全部数据输入到HDFS(一)