首页 > 代码库 > Hadoop通过HCatalog编写Mapreduce任务

Hadoop通过HCatalog编写Mapreduce任务

 

1、dirver

package com.kangaroo.hadoop.drive;

import java.util.Map;
import java.util.Properties;

import com.kangaroo.hadoop.mapper.AggregateMapper;
import com.kangaroo.hadoop.reducer.AggregateReducer;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.kangaroo.hadoop.utils.PropertiesUtil;


public class DriveMain extends Configured implements Tool {

    private static final Logger logger = LoggerFactory.getLogger(DriveMain.class);
    private Configuration conf;
    private PropertiesUtil propUtil;

    public DriveMain() {
        this.conf = new Configuration();
        this.propUtil = new PropertiesUtil("configure.properties");
    }

    public int run(String[] args) throws Exception {
        try {
            logger.info("MapReduce Job Beginning.");
            String dbName = args[0];
            String tableName = args[1];
            String partition = args[2];
            String sumField = args[3];
            String outPath = args[4];
            String partFilter = partitionFormat(partition);
            logger.info("[Params] dbName:{}; tableName:{}, partition:{}, sumField:{}, outPath:{}, partFilter:{}",
                    dbName, tableName, partition, sumField, outPath, partFilter);
            this.conf.set("sumField", sumField);
            this.setMapRedConfiguration();
            Job job = this.setJobConfiguration(this.conf);
            HCatInputFormat.setInput(job, dbName, tableName, partFilter);
            logger.info("setInput successfully.");
            FileOutputFormat.setOutputPath(job, new Path(outPath));
            logger.info("setOutput successfully.");
            return (job.waitForCompletion(true) ? 0 : 1);
        } catch (Exception ex) {
            logger.error(ex.getMessage());
            throw ex;
        }
    }

    private Job setJobConfiguration(Configuration conf) throws Exception {
        try {
            logger.info("enter setJobConfiguration");
            Job job = Job.getInstance(conf);
            job.setJarByClass(DriveMain.class);
            job.setInputFormatClass(HCatInputFormat.class);
            job.setMapperClass(AggregateMapper.class);
            job.setReducerClass(AggregateReducer.class);

            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
            job.setNumReduceTasks(1);
            logger.info("setJobConfiguration successfully.");
            return job;
        } catch (Exception ex) {
            logger.error("setJobConfiguration: " + ex.getMessage());
            throw new Exception(ex);
        }
    }

    private void setMapRedConfiguration() {
        try {
            Properties properties = propUtil.getProperties();
            logger.info("Load MapReduce Configuration Successfully.");
            for (Map.Entry entry : properties.entrySet()) {
                if (entry.getKey().toString().startsWith("mapred")) {
                    conf.set(entry.getKey().toString(), entry.getValue().toString());
                    logger.info("[MR][Config] key:{}, value:{}", entry.getKey().toString(), entry.getValue().toString());
                }
            }
            logger.info("[MR][Config] Set MapReduce Configuration Successfully.");
        } catch (Exception e) {

        }

    }


    private String partitionFormat(String partition) {
        String format = "";
        if(!partition.contains("pt") && ! partition.contains("dt")) {
            String[] items = partition.split("/");
            String[] keys = {"year","month","day", "hour"};
            for(int i=0; i<items.length; i++) {
                if (i == items.length-1) {
                    format += keys[i] + "=‘" + items[i] + "‘";
                } else {
                    format += keys[i] + "=‘" + items[i] + "‘ and ";
                }
            }
        } else {
            format = partition;
        }
        return format;
    }

    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new DriveMain(), args);
        System.exit(exitCode);
    }

}

 

2、Mapper

package com.kangaroo.hadoop.mapper;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hive.hcatalog.data.HCatRecord;
import org.apache.hive.hcatalog.data.schema.HCatSchema;
import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

@SuppressWarnings("rawtypes")
public class AggregateMapper extends Mapper<WritableComparable, HCatRecord, Text, Text> {

    private static final Logger logger = LoggerFactory.getLogger(AggregateMapper.class);

    private HCatSchema schema;
    private Text outKey;
    private Text outValue;
    private IntWritable one;

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        outKey = new Text();
        outValue = new Text();
        schema = HCatInputFormat.getTableSchema(context.getConfiguration());
    }

    @Override
    protected void map(WritableComparable key, HCatRecord value, Context context) throws IOException, InterruptedException {
        String sumField = context.getConfiguration().get("sumField");
        Map<String, String> recordMap = new HashMap<String, String>();
        for (String fieldName : schema.getFieldNames()) {
            logger.info("fieldName={}", fieldName);
            String fieldValue = value.get(fieldName, schema).toString();
            logger.info("fieldName={}, fieldValue=http://www.mamicode.com/{}", fieldName, fieldValue);
            recordMap.put(fieldName, fieldValue);
            logger.info("recordMap={}", recordMap.toString());
        }
        outKey.set(recordMap.get(sumField));
        outValue.set("1");
    }

    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {
        context.write(outKey, outValue);
    }
}

 

3、Reducer

package com.kangaroo.hadoop.reducer;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hive.hcatalog.data.schema.HCatSchema;
import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

@SuppressWarnings("rawtypes")
public class AggregateReducer extends Reducer<Text, Text, Text, Text> {
    protected static final Logger logger = LoggerFactory.getLogger(AggregateReducer.class);
    HCatSchema schema;
    Text outKey;
    Text outValue;

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        schema = HCatInputFormat.getTableSchema(context.getConfiguration());
    }

    @Override
    public void reduce(Text key, Iterable<Text> values, Context context) throws IOException,InterruptedException {
        outKey.set(key);
        int sum = 0;
        for (Text value : values) {
            sum += Integer.parseInt(value.toString());
        }
        outValue.set(String.valueOf(sum));
    }

    protected void cleanup(Context context) throws IOException, InterruptedException {
        context.write(outKey, outValue);
    }
}

 

4、propertyUtil

package com.kangaroo.hadoop.utils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.UnsupportedEncodingException;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;


import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;


public class PropertiesUtil {
    private String filePath;

    public PropertiesUtil() {
        this.filePath = "configure.properties";
    }

    public PropertiesUtil(String filePath) {
        this.filePath = filePath;
    }

    public Properties getProperties() throws IOException {
        Properties prop;
        InputStream inStream = null;
        try {
            inStream = PropertiesUtil.class.getClassLoader()
                    .getResourceAsStream(this.filePath);
            prop = new Properties();
            prop.load(inStream);

            return prop;
        } finally {
            if (inStream != null)
                inStream.close();
        }
    }
}

 

5、配置

mapred.job.queue.name=root.dashujudidiyanjiuyuan-zhinengpingtaibu.datapolicy-develop
mapred.jar=./rulecheck.jar
mapred.map.tasks=300
mapred.reduce.tasks=100
#mapred.map.capacity=1
#mapred.reduce.capacity=1
mapred.job.priority=HIGH
mapred.job.name=bigdata_qa_data_monitor

 

Hadoop通过HCatalog编写Mapreduce任务