首页 > 代码库 > HBase MapReduce 使用

HBase MapReduce 使用

项目中需要用MapReduce来读取或者写Hbase,这样可以节省大量开发时间。

Hbase本身的jar包里就有这样的API , 以下是我从官网上找的一些资料,和大家分享一下。

总体说明一下:TableMapper 主要是读hbase数据,TableReducer 主要是写hbase数据。可以结合一起用,也可以分开用。

(一) 读Hbase实例

 

public static class MyMapper extends TableMapper<Text, Text> {  public void map(ImmutableBytesWritable row, Result value, Context context) throws InterruptedException, IOException {    // process data for the row from the Result instance.   }}
Configuration config = HBaseConfiguration.create();Job job = new Job(config, "ExampleRead");job.setJarByClass(MyReadJob.class);     // class that contains mapperScan scan = new Scan();scan.setCaching(500);        // 1 is the default in Scan, which will be bad for MapReduce jobsscan.setCacheBlocks(false);  // don‘t set to true for MR jobs// set other scan attrs...TableMapReduceUtil.initTableMapperJob(  tableName,        // input HBase table name  scan,             // Scan instance to control CF and attribute selection  MyMapper.class,   // mapper  null,             // mapper output key  null,             // mapper output value  job);job.setOutputFormatClass(NullOutputFormat.class);   // because we aren‘t emitting anything from mapperboolean b = job.waitForCompletion(true);if (!b) {  throw new IOException("error with job!");}

 

(二) 读写实例

 

public static class MyMapper extends TableMapper<ImmutableBytesWritable, Put>  {	public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {		// this example is just copying the data from the source table...   		context.write(row, resultToPut(row,value));   	}  	private static Put resultToPut(ImmutableBytesWritable key, Result result) throws IOException {  		Put put = new Put(key.get()); 		for (KeyValue kv : result.raw()) {			put.add(kv);		}		return put;   	}}    
Configuration config = HBaseConfiguration.create();Job job = new Job(config,"ExampleReadWrite");job.setJarByClass(MyReadWriteJob.class);    // class that contains mapperScan scan = new Scan();scan.setCaching(500);        // 1 is the default in Scan, which will be bad for MapReduce jobsscan.setCacheBlocks(false);  // don‘t set to true for MR jobs// set other scan attrsTableMapReduceUtil.initTableMapperJob(	sourceTable,      // input table	scan,	          // Scan instance to control CF and attribute selection	MyMapper.class,   // mapper class	null,	          // mapper output key	null,	          // mapper output value	job);TableMapReduceUtil.initTableReducerJob(	targetTable,      // output table	null,             // reducer class	job);job.setNumReduceTasks(0);boolean b = job.waitForCompletion(true);if (!b) {    throw new IOException("error with job!");}

 

 

(三) 做统计实例

 

public static class MyMapper extends TableMapper<Text, IntWritable>  {	private final IntWritable ONE = new IntWritable(1);   	private Text text = new Text();   	public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {        	String val = new String(value.getValue(Bytes.toBytes("cf"), Bytes.toBytes("attr1")));          	text.set(val);     // we can only emit Writables...        	context.write(text, ONE);   	}}  

 

 

public static class MyTableReducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable>  { 	public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {    		int i = 0;    		for (IntWritable val : values) {    			i += val.get();    		}    		Put put = new Put(Bytes.toBytes(key.toString()));    		put.add(Bytes.toBytes("cf"), Bytes.toBytes("count"), Bytes.toBytes(i));    		context.write(null, put);   	}}

 

 

Configuration config = HBaseConfiguration.create();Job job = new Job(config,"ExampleSummary");job.setJarByClass(MySummaryJob.class);     // class that contains mapper and reducerScan scan = new Scan();scan.setCaching(500);        // 1 is the default in Scan, which will be bad for MapReduce jobsscan.setCacheBlocks(false);  // don‘t set to true for MR jobs// set other scan attrsTableMapReduceUtil.initTableMapperJob(	sourceTable,        // input table	scan,               // Scan instance to control CF and attribute selection	MyMapper.class,     // mapper class	Text.class,         // mapper output key	IntWritable.class,  // mapper output value	job);TableMapReduceUtil.initTableReducerJob(	targetTable,        // output table	MyTableReducer.class,    // reducer class	job);job.setNumReduceTasks(1);   // at least one, adjust as requiredboolean b = job.waitForCompletion(true);if (!b) {	throw new IOException("error with job!");}

 

(四)混合实例,结果存在文件上

 

 public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable>  {	public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {		int i = 0;		for (IntWritable val : values) {			i += val.get();		}		context.write(key, new IntWritable(i));	}}  

 

 

 

Configuration config = HBaseConfiguration.create();Job job = new Job(config,"ExampleSummaryToFile");job.setJarByClass(MySummaryFileJob.class);     // class that contains mapper and reducerScan scan = new Scan();scan.setCaching(500);        // 1 is the default in Scan, which will be bad for MapReduce jobsscan.setCacheBlocks(false);  // don‘t set to true for MR jobs// set other scan attrsTableMapReduceUtil.initTableMapperJob(	sourceTable,        // input table	scan,               // Scan instance to control CF and attribute selection	MyMapper.class,     // mapper class	Text.class,         // mapper output key	IntWritable.class,  // mapper output value	job);job.setReducerClass(MyReducer.class);    // reducer classjob.setNumReduceTasks(1);    // at least one, adjust as requiredFileOutputFormat.setOutputPath(job, new Path("/tmp/mr/mySummaryFile"));  // adjust directories as requiredboolean b = job.waitForCompletion(true);if (!b) {	throw new IOException("error with job!");}