首页 > 代码库 > hbase->Mapreduce->hbase
hbase->Mapreduce->hbase
Hbase对Mapreduce API进行了扩展,方便Mapreduce任务读写HTable数据。
package taglib.customer; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; public class MrHbase { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // TODO Auto-generated method stub Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "192.168.58.101"); Job job = new Job(conf,"ExampleSummary"); job.setJarByClass(MrHbase.class); // class that contains mapper and reducer Scan scan = new Scan(); scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs scan.setCacheBlocks(false); // don't set to true for MR jobs // set other scan attrs //scan.addColumn(family, qualifier); TableMapReduceUtil.initTableMapperJob( "blog", // input table scan, // Scan instance to control CF and attribute selection MyMapper.class, // mapper class Text.class, // mapper output key IntWritable.class, // mapper output value job); TableMapReduceUtil.initTableReducerJob( "blog2", // output table MyTableReducer.class, // reducer class job); job.setNumReduceTasks(1); // at least one, adjust as required boolean b = job.waitForCompletion(true); if (!b) { throw new IOException("error with job!"); } } public static class MyMapper extends TableMapper<Text, IntWritable> { private final IntWritable ONE = new IntWritable(1); private Text text = new Text(); public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException { String ip = Bytes.toString(row.get()); String url = new String(value.getValue(Bytes.toBytes("article"), Bytes.toBytes("title"))); text.set(ip+"&"+url); context.write(text, ONE); } } public static class MyTableReducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable> { public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } Put put = new Put(key.getBytes()); put.add(Bytes.toBytes("article"), Bytes.toBytes("title"), Bytes.toBytes(String.valueOf(sum))); context.write(null, put); } } }
hbase->Mapreduce->hbase
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。