首页 > 代码库 > hbase->Mapreduce->hbase

hbase->Mapreduce->hbase

Hbase对Mapreduce API进行了扩展,方便Mapreduce任务读写HTable数据。


package taglib.customer;
import java.io.IOException;  

import org.apache.hadoop.conf.Configuration;  
import org.apache.hadoop.hbase.HBaseConfiguration;  
import org.apache.hadoop.hbase.client.Put;  
import org.apache.hadoop.hbase.client.Result;  
import org.apache.hadoop.hbase.client.Scan;  
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;  
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;  
import org.apache.hadoop.hbase.mapreduce.TableMapper;  
import org.apache.hadoop.hbase.mapreduce.TableReducer;  
import org.apache.hadoop.hbase.util.Bytes;  
import org.apache.hadoop.io.IntWritable;  
import org.apache.hadoop.io.Text;  
import org.apache.hadoop.mapreduce.Job;
public class MrHbase {

	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		// TODO Auto-generated method stub
		Configuration conf = HBaseConfiguration.create();  
		conf.set("hbase.zookeeper.quorum", "192.168.58.101");
        Job job = new Job(conf,"ExampleSummary");  
        job.setJarByClass(MrHbase.class);     // class that contains mapper and reducer  

        Scan scan = new Scan();  
        scan.setCaching(500);        // 1 is the default in Scan, which will be bad for MapReduce jobs  
        scan.setCacheBlocks(false);  // don't set to true for MR jobs  
        // set other scan attrs  
        //scan.addColumn(family, qualifier);  
        TableMapReduceUtil.initTableMapperJob(  
                "blog",        // input table  
                scan,               // Scan instance to control CF and attribute selection  
                MyMapper.class,     // mapper class  
                Text.class,         // mapper output key  
                IntWritable.class,  // mapper output value  
                job);  
        TableMapReduceUtil.initTableReducerJob(  
                "blog2",        // output table  
                MyTableReducer.class,    // reducer class  
                job);  
        job.setNumReduceTasks(1);   // at least one, adjust as required  

        boolean b = job.waitForCompletion(true);  
        if (!b) {  
            throw new IOException("error with job!");  
        }
	}
	public static class MyMapper extends TableMapper<Text, IntWritable>  {  
		  
        private final IntWritable ONE = new IntWritable(1);  
        private Text text = new Text();  
  
        public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {  
            String ip = Bytes.toString(row.get());  
            String url = new String(value.getValue(Bytes.toBytes("article"), Bytes.toBytes("title")));  
            text.set(ip+"&"+url);  
            context.write(text, ONE);  
        }  
    }
	public static class MyTableReducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable>  {  
        public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {  
            int sum = 0;  
            for (IntWritable val : values) {  
                sum += val.get();  
            }  
  
            Put put = new Put(key.getBytes());  
            put.add(Bytes.toBytes("article"), Bytes.toBytes("title"), Bytes.toBytes(String.valueOf(sum)));  
  
            context.write(null, put);  
        }  
    }  

}


hbase->Mapreduce->hbase