首页 > 代码库 > HBase MapReduce 使用
HBase MapReduce 使用
项目中需要用MapReduce来读取或者写Hbase,这样可以节省大量开发时间。
Hbase本身的jar包里就有这样的API , 以下是我从官网上找的一些资料,和大家分享一下。
总体说明一下:TableMapper 主要是读hbase数据,TableReducer 主要是写hbase数据。可以结合一起用,也可以分开用。
(一) 读Hbase实例
public static class MyMapper extends TableMapper<Text, Text> { public void map(ImmutableBytesWritable row, Result value, Context context) throws InterruptedException, IOException { // process data for the row from the Result instance. }}
Configuration config = HBaseConfiguration.create();Job job = new Job(config, "ExampleRead");job.setJarByClass(MyReadJob.class); // class that contains mapperScan scan = new Scan();scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobsscan.setCacheBlocks(false); // don‘t set to true for MR jobs// set other scan attrs...TableMapReduceUtil.initTableMapperJob( tableName, // input HBase table name scan, // Scan instance to control CF and attribute selection MyMapper.class, // mapper null, // mapper output key null, // mapper output value job);job.setOutputFormatClass(NullOutputFormat.class); // because we aren‘t emitting anything from mapperboolean b = job.waitForCompletion(true);if (!b) { throw new IOException("error with job!");}
(二) 读写实例
public static class MyMapper extends TableMapper<ImmutableBytesWritable, Put> { public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException { // this example is just copying the data from the source table... context.write(row, resultToPut(row,value)); } private static Put resultToPut(ImmutableBytesWritable key, Result result) throws IOException { Put put = new Put(key.get()); for (KeyValue kv : result.raw()) { put.add(kv); } return put; }}
Configuration config = HBaseConfiguration.create();Job job = new Job(config,"ExampleReadWrite");job.setJarByClass(MyReadWriteJob.class); // class that contains mapperScan scan = new Scan();scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobsscan.setCacheBlocks(false); // don‘t set to true for MR jobs// set other scan attrsTableMapReduceUtil.initTableMapperJob( sourceTable, // input table scan, // Scan instance to control CF and attribute selection MyMapper.class, // mapper class null, // mapper output key null, // mapper output value job);TableMapReduceUtil.initTableReducerJob( targetTable, // output table null, // reducer class job);job.setNumReduceTasks(0);boolean b = job.waitForCompletion(true);if (!b) { throw new IOException("error with job!");}
(三) 做统计实例
public static class MyMapper extends TableMapper<Text, IntWritable> { private final IntWritable ONE = new IntWritable(1); private Text text = new Text(); public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException { String val = new String(value.getValue(Bytes.toBytes("cf"), Bytes.toBytes("attr1"))); text.set(val); // we can only emit Writables... context.write(text, ONE); }}
public static class MyTableReducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable> { public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int i = 0; for (IntWritable val : values) { i += val.get(); } Put put = new Put(Bytes.toBytes(key.toString())); put.add(Bytes.toBytes("cf"), Bytes.toBytes("count"), Bytes.toBytes(i)); context.write(null, put); }}
Configuration config = HBaseConfiguration.create();Job job = new Job(config,"ExampleSummary");job.setJarByClass(MySummaryJob.class); // class that contains mapper and reducerScan scan = new Scan();scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobsscan.setCacheBlocks(false); // don‘t set to true for MR jobs// set other scan attrsTableMapReduceUtil.initTableMapperJob( sourceTable, // input table scan, // Scan instance to control CF and attribute selection MyMapper.class, // mapper class Text.class, // mapper output key IntWritable.class, // mapper output value job);TableMapReduceUtil.initTableReducerJob( targetTable, // output table MyTableReducer.class, // reducer class job);job.setNumReduceTasks(1); // at least one, adjust as requiredboolean b = job.waitForCompletion(true);if (!b) { throw new IOException("error with job!");}
(四)混合实例,结果存在文件上
public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int i = 0; for (IntWritable val : values) { i += val.get(); } context.write(key, new IntWritable(i)); }}
Configuration config = HBaseConfiguration.create();Job job = new Job(config,"ExampleSummaryToFile");job.setJarByClass(MySummaryFileJob.class); // class that contains mapper and reducerScan scan = new Scan();scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobsscan.setCacheBlocks(false); // don‘t set to true for MR jobs// set other scan attrsTableMapReduceUtil.initTableMapperJob( sourceTable, // input table scan, // Scan instance to control CF and attribute selection MyMapper.class, // mapper class Text.class, // mapper output key IntWritable.class, // mapper output value job);job.setReducerClass(MyReducer.class); // reducer classjob.setNumReduceTasks(1); // at least one, adjust as requiredFileOutputFormat.setOutputPath(job, new Path("/tmp/mr/mySummaryFile")); // adjust directories as requiredboolean b = job.waitForCompletion(true);if (!b) { throw new IOException("error with job!");}
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。