首页 > 代码库 > mapreduce 只使用Mapper往多个hbase表中写数据

mapreduce 只使用Mapper往多个hbase表中写数据

 

只使用Mapper不使用reduce会大大减少mapreduce程序的运行时间。

有时候程序会往多张hbase表写数据。

所以有如题的需求。

下面给出的代码,不是可以运行的代码,只是展示driver中需要进行的必要项设置,mapper类需要实现的接口,map函数需要的参数以及函数内部的处理方式。

实现过程比较曲折,只贴代码:

class Qos2HbaseDriver extends Configured implements Tool{    private static Logger logger = LoggerFactory            .getLogger(Qos2HbaseDriver.class);    private static final int DEFAULT_NUM_REDUCE = 0;    /**     * args[0]输入hdfs文件路径,args[1]输出表     */    @Override    public int run(String[] args) throws Exception    {        Configuration conf = HBaseConfiguration.create();        conf.set("output", args[1]);//输出表1        conf.set("output2", args[2]);//输出表2        Job job = Job.getInstance(conf);        job.setJobName("iplane_Qos2Hbase");        job.setMapperClass(Qos2HbaseMapper.class);        FileInputFormat.setInputPaths(job, args[0]);        job.setMapOutputKeyClass(ImmutableBytesWritable.class);        job.setMapOutputValueClass(Put.class);        job.setOutputFormatClass(MultiTableOutputFormat.class);        TableMapReduceUtil.addDependencyJars(job);        TableMapReduceUtil.addDependencyJars(job.getConfiguration());        job.setJarByClass(Qos2Hbase.class);        // 设置reduce个数,可调节        int numberReduceTasks = 0;               job.setNumReduceTasks(numberReduceTasks);        boolean b = job.waitForCompletion(true);        if (!b)        {            logger.error("工作错误!");            return -1;        }        return 0;    }}/** * @ClassName: Qos2HbaseMapper * @Description: 将结果入Hbase库的mapper类 * @author Dangzhang * @date 2014-9-16 下午1:18:49 *  */class Qos2HbaseMapper extends        Mapper<LongWritable, Text, ImmutableBytesWritable, Put>{    private static Logger logger = LoggerFactory            .getLogger(Qos2HbaseMapper.class);    @Override    public void map(LongWritable key, Text line, Context context)            throws IOException, InterruptedException    {        String output = context.getConfiguration().get("output");        String output2 = context.getConfiguration().get("output2");        // 组装rowkey:ip_ip        StringBuffer rowkeySb = "aaaa";        Put put = null;        String family = "d";        String qualifier = "";        // 直接将结果存入hbase        long ts = System.currentTimeMillis();        put = new Put(Bytes.toBytes(rowkeySb.toString()));        qualifier = "del";        put.add(Bytes.toBytes(family), Bytes.toBytes(qualifier), ts,                Bytes.toBytes(values[6]));// 组装一条数据                if (!put.isEmpty())            {                ImmutableBytesWritable ib = new ImmutableBytesWritable();                ib.set(Bytes.toBytes(output));                context.write(ib, put);// 将结果存入hbase表            }                          // 存历史表        rowkeySb.append(rowkeySeparator).append(myDate);        put = new Put(Bytes.toBytes(rowkeySb.toString()));        qualifier = "del";        put.add(Bytes.toBytes(family), Bytes.toBytes(qualifier), ts,                Bytes.toBytes(values[6]));// 组装一条数据            if (!put.isEmpty())            {                ImmutableBytesWritable ib = new ImmutableBytesWritable();                ib.set(Bytes.toBytes(output2));                context.write(ib, put);// 将结果存入hbase表            }          }}

 

mapreduce 只使用Mapper往多个hbase表中写数据