首页 > 代码库 > 利用mapreduce清洗日志
利用mapreduce清洗日志
package com.libc;import java.io.IOException;import java.io.UnsupportedEncodingException;import java.util.HashMap;import java.util.Iterator;import java.util.Map;import java.util.Set;import java.util.regex.Matcher;import java.util.regex.Pattern;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.GenericOptionsParser;public class Process { public static class TokenizerMapper extends Mapper<Object, Text, Text, Text> { private Text word = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub String datas = ""; try { datas = new String(value.getBytes(), 0, value.getLength(), "GBK"); } catch (UnsupportedEncodingException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } // datas = value.toString(); try { String[] split = datas.split(" time="); // 处理头中包含空格的字段 Pattern p = Pattern.compile("phonemodel=\"(.*?)\""); String pm = getIndex(split[0], p); split[0] = split[0].replaceAll(pm, pm.replace(" ", "")); Pattern p1 = Pattern.compile("networktype=\"(.*?)\""); String nt = getIndex(split[0], p1); split[0] = split[0].replaceAll(nt, nt.replace(" ", "")); for (int i = 1; i < split.length; i++) { String[] codes = split[i].split(" ", 4); int headLen = split[0].split(" ").length; if (headLen != 20) { // 丢掉错误日志 continue; } // 处理旧版本日志判别标准:| if (codes[2].equals("code=\"100\"")){ if(codes[3].indexOf("contact_name")>-1){ codes[3] = process100(codes[3]); } codes[3] = codes[3].replace(‘ ‘, ‘#‘); }else if(codes[2].equals("code=\"101\"") ){ if(codes[3].indexOf("message_to_")>-1){ codes[3] = process101(codes[3]); } codes[3] = codes[3].replace(‘ ‘, ‘#‘); } else if(codes[2].equals("code=\"102\"")){ if(codes[3].indexOf("caller_n")>-1||codes[3].indexOf("caller_d")>-1){ codes[3] = process102(codes[3]); } codes[3] = codes[3].replace(‘ ‘, ‘#‘); }else{ codes[3] = codes[3].replace(" ", " "); } String collect = split[0] + " time=" + codes[0] + " " + codes[1] + " " + codes[2] + " " + codes[3]; word.set(collect); context.write(word, new Text("")); } } catch (Exception e) { // TODO Auto-generated catch block } } } public static String process100(String code) throws Exception{ String[] codes = code.split(" "); HashMap<String, Contact> hs = new HashMap<String, Process.Contact>(); Pattern p0 = Pattern.compile("_(\\d*)="); Pattern p1 = Pattern.compile("\"(.*)\""); for (int i = 0; i < codes.length; i++) { if (codes[i].equals("")) continue; String index = getIndex(codes[i], p0); if (index == null) continue; String value = http://www.mamicode.com/getIndex(codes[i], p1);"contact_name_")) { contact.contactName = value; } else if (codes[i].startsWith("contact_num_")) { contact.contactNum = value; } contact.index = index; hs.put(index, contact); } return printToString(hs); } public static String process101(String code) throws Exception{ String[] codes = code.split("\" "); HashMap<String, Message> hs = new HashMap<String, Process.Message>(); Pattern p = Pattern.compile("_(\\d*)="); Pattern p1 = Pattern.compile("\"(.*)"); for (int i = 0; i < codes.length; i++) { String index = getIndex(codes[i], p); String value = http://www.mamicode.com/getIndex(codes[i], p1);"message_time_")) { message.messageTime = value; } else if (codes[i].startsWith("message_to_")) { message.messageTo = value; } message.index = index; hs.put(index, message); } return printToString(hs); } public static String process102(String code) throws Exception{ String[] codes = code.split("\" "); HashMap<String, CallLog> hs = new HashMap<String, Process.CallLog>(); Pattern p = Pattern.compile("_(\\d*)="); Pattern p1 = Pattern.compile("\"(.*)"); for (int i = 0; i < codes.length; i++) { String index = getIndex(codes[i], p); if (index == null) continue; String value = http://www.mamicode.com/getIndex(codes[i], p1);"caller_date_")) { callLog.callerDate = value; } else if (codes[i].startsWith("caller_duration_")) { callLog.callerDuration = value; } else if (codes[i].startsWith("caller_name_")) { callLog.callerName = value; } else if (codes[i].startsWith("caller_num_")) { callLog.callerNum = value; } callLog.index = index; hs.put(index, callLog); } return printToString(hs); } public static String printToString(Map hs) { Set set = hs.keySet(); Iterator<String> it = set.iterator(); String result = ""; while (it.hasNext()) { result = result + hs.get(it.next()).toString() + "|"; } return result; } public static String getIndex(String code, Pattern p) { String index = null; Matcher matcher = p.matcher(code); if (matcher.find()) { index = matcher.group(1); } return index; } public static class IntSumReducer extends Reducer<Text, Text, Text, Text> { public void reduce(Text key, Text rr, Context context) throws IOException, InterruptedException { context.write(key, new Text("")); } } public static class Contact { public String index; public String contactName; public String contactNum; @Override public String toString() { // TODO Auto-generated method stub return "contact_" + index + "=" + this.contactName + ";" + this.contactNum; } } public static class Message { public String index; public String messageTime; public String messageTo; @Override public String toString() { // TODO Auto-generated method stub return "message_" + this.index + "=" + this.messageTo + ";" + this.messageTime; } } public static class CallLog { public String index; public String callerDuration; public String callerNum; public String callerName; public String callerDate; @Override public String toString() { // TODO Auto-generated method stub return "callLog_" + this.index + "=" + this.callerName + ";" + this.callerNum + ";" + this.callerDate + ";" + this.callerDuration; } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args) .getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: process <in> <out>"); System.exit(2); } Job job = new Job(conf, "process"); job.setJarByClass(Process.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); }}
此版本为第一版,运行几天后服务器日志量暴增,导致堆栈溢出错误,
因此修改为第二版后可以对jvm内存自定义配置
利用mapreduce清洗日志
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。