首页 > 代码库 > Hadoop通过HCatalog编写Mapreduce任务
Hadoop通过HCatalog编写Mapreduce任务
1、dirver
package com.kangaroo.hadoop.drive; import java.util.Map; import java.util.Properties; import com.kangaroo.hadoop.mapper.AggregateMapper; import com.kangaroo.hadoop.reducer.AggregateReducer; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.util.ToolRunner; import org.apache.hive.hcatalog.mapreduce.HCatInputFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.kangaroo.hadoop.utils.PropertiesUtil; public class DriveMain extends Configured implements Tool { private static final Logger logger = LoggerFactory.getLogger(DriveMain.class); private Configuration conf; private PropertiesUtil propUtil; public DriveMain() { this.conf = new Configuration(); this.propUtil = new PropertiesUtil("configure.properties"); } public int run(String[] args) throws Exception { try { logger.info("MapReduce Job Beginning."); String dbName = args[0]; String tableName = args[1]; String partition = args[2]; String sumField = args[3]; String outPath = args[4]; String partFilter = partitionFormat(partition); logger.info("[Params] dbName:{}; tableName:{}, partition:{}, sumField:{}, outPath:{}, partFilter:{}", dbName, tableName, partition, sumField, outPath, partFilter); this.conf.set("sumField", sumField); this.setMapRedConfiguration(); Job job = this.setJobConfiguration(this.conf); HCatInputFormat.setInput(job, dbName, tableName, partFilter); logger.info("setInput successfully."); FileOutputFormat.setOutputPath(job, new Path(outPath)); logger.info("setOutput successfully."); return (job.waitForCompletion(true) ? 0 : 1); } catch (Exception ex) { logger.error(ex.getMessage()); throw ex; } } private Job setJobConfiguration(Configuration conf) throws Exception { try { logger.info("enter setJobConfiguration"); Job job = Job.getInstance(conf); job.setJarByClass(DriveMain.class); job.setInputFormatClass(HCatInputFormat.class); job.setMapperClass(AggregateMapper.class); job.setReducerClass(AggregateReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setNumReduceTasks(1); logger.info("setJobConfiguration successfully."); return job; } catch (Exception ex) { logger.error("setJobConfiguration: " + ex.getMessage()); throw new Exception(ex); } } private void setMapRedConfiguration() { try { Properties properties = propUtil.getProperties(); logger.info("Load MapReduce Configuration Successfully."); for (Map.Entry entry : properties.entrySet()) { if (entry.getKey().toString().startsWith("mapred")) { conf.set(entry.getKey().toString(), entry.getValue().toString()); logger.info("[MR][Config] key:{}, value:{}", entry.getKey().toString(), entry.getValue().toString()); } } logger.info("[MR][Config] Set MapReduce Configuration Successfully."); } catch (Exception e) { } } private String partitionFormat(String partition) { String format = ""; if(!partition.contains("pt") && ! partition.contains("dt")) { String[] items = partition.split("/"); String[] keys = {"year","month","day", "hour"}; for(int i=0; i<items.length; i++) { if (i == items.length-1) { format += keys[i] + "=‘" + items[i] + "‘"; } else { format += keys[i] + "=‘" + items[i] + "‘ and "; } } } else { format = partition; } return format; } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new DriveMain(), args); System.exit(exitCode); } }
2、Mapper
package com.kangaroo.hadoop.mapper; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hive.hcatalog.data.HCatRecord; import org.apache.hive.hcatalog.data.schema.HCatSchema; import org.apache.hive.hcatalog.mapreduce.HCatInputFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.HashMap; import java.util.Map; @SuppressWarnings("rawtypes") public class AggregateMapper extends Mapper<WritableComparable, HCatRecord, Text, Text> { private static final Logger logger = LoggerFactory.getLogger(AggregateMapper.class); private HCatSchema schema; private Text outKey; private Text outValue; private IntWritable one; @Override protected void setup(Context context) throws IOException, InterruptedException { outKey = new Text(); outValue = new Text(); schema = HCatInputFormat.getTableSchema(context.getConfiguration()); } @Override protected void map(WritableComparable key, HCatRecord value, Context context) throws IOException, InterruptedException { String sumField = context.getConfiguration().get("sumField"); Map<String, String> recordMap = new HashMap<String, String>(); for (String fieldName : schema.getFieldNames()) { logger.info("fieldName={}", fieldName); String fieldValue = value.get(fieldName, schema).toString(); logger.info("fieldName={}, fieldValue=http://www.mamicode.com/{}", fieldName, fieldValue); recordMap.put(fieldName, fieldValue); logger.info("recordMap={}", recordMap.toString()); } outKey.set(recordMap.get(sumField)); outValue.set("1"); } @Override protected void cleanup(Context context) throws IOException, InterruptedException { context.write(outKey, outValue); } }
3、Reducer
package com.kangaroo.hadoop.reducer; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hive.hcatalog.data.schema.HCatSchema; import org.apache.hive.hcatalog.mapreduce.HCatInputFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; @SuppressWarnings("rawtypes") public class AggregateReducer extends Reducer<Text, Text, Text, Text> { protected static final Logger logger = LoggerFactory.getLogger(AggregateReducer.class); HCatSchema schema; Text outKey; Text outValue; @Override protected void setup(Context context) throws IOException, InterruptedException { schema = HCatInputFormat.getTableSchema(context.getConfiguration()); } @Override public void reduce(Text key, Iterable<Text> values, Context context) throws IOException,InterruptedException { outKey.set(key); int sum = 0; for (Text value : values) { sum += Integer.parseInt(value.toString()); } outValue.set(String.valueOf(sum)); } protected void cleanup(Context context) throws IOException, InterruptedException { context.write(outKey, outValue); } }
4、propertyUtil
package com.kangaroo.hadoop.utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStreamReader; import java.io.OutputStreamWriter; import java.io.UnsupportedEncodingException; import java.util.Enumeration; import java.util.HashMap; import java.util.Map; import java.util.Properties; import java.io.IOException; import java.io.InputStream; import java.util.Properties; public class PropertiesUtil { private String filePath; public PropertiesUtil() { this.filePath = "configure.properties"; } public PropertiesUtil(String filePath) { this.filePath = filePath; } public Properties getProperties() throws IOException { Properties prop; InputStream inStream = null; try { inStream = PropertiesUtil.class.getClassLoader() .getResourceAsStream(this.filePath); prop = new Properties(); prop.load(inStream); return prop; } finally { if (inStream != null) inStream.close(); } } }
5、配置
mapred.job.queue.name=root.dashujudidiyanjiuyuan-zhinengpingtaibu.datapolicy-develop
mapred.jar=./rulecheck.jar
mapred.map.tasks=300
mapred.reduce.tasks=100
#mapred.map.capacity=1
#mapred.reduce.capacity=1
mapred.job.priority=HIGH
mapred.job.name=bigdata_qa_data_monitor
Hadoop通过HCatalog编写Mapreduce任务
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。