首页 > 代码库 > 利用mapreduce清洗日志

利用mapreduce清洗日志

package com.libc;import java.io.IOException;import java.io.UnsupportedEncodingException;import java.util.HashMap;import java.util.Iterator;import java.util.Map;import java.util.Set;import java.util.regex.Matcher;import java.util.regex.Pattern;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.GenericOptionsParser;public class Process {	public static class TokenizerMapper extends			Mapper<Object, Text, Text, Text> {		private Text word = new Text();		public void map(Object key, Text value, Context context)				throws IOException, InterruptedException {			// TODO Auto-generated method stub			String datas = "";			try {				datas = new String(value.getBytes(), 0, value.getLength(),						"GBK");			} catch (UnsupportedEncodingException e1) {				// TODO Auto-generated catch block				e1.printStackTrace();			}			// datas = value.toString();			try {				String[] split = datas.split(" time=");				// 处理头中包含空格的字段				Pattern p = Pattern.compile("phonemodel=\"(.*?)\"");				String pm = getIndex(split[0], p);				split[0] = split[0].replaceAll(pm, pm.replace(" ", ""));				Pattern p1 = Pattern.compile("networktype=\"(.*?)\"");				String nt = getIndex(split[0], p1);				split[0] = split[0].replaceAll(nt, nt.replace(" ", ""));				for (int i = 1; i < split.length; i++) {					String[] codes = split[i].split(" ", 4);					int headLen = split[0].split(" ").length;					if (headLen != 20) {						// 丢掉错误日志						continue;					}					// 处理旧版本日志判别标准:|					if (codes[2].equals("code=\"100\"")){						if(codes[3].indexOf("contact_name")>-1){							codes[3] = process100(codes[3]);						}							codes[3] = codes[3].replace(‘ ‘, ‘#‘);											}else if(codes[2].equals("code=\"101\"") ){						if(codes[3].indexOf("message_to_")>-1){							codes[3] = process101(codes[3]);						}							codes[3] = codes[3].replace(‘ ‘, ‘#‘);					}					else if(codes[2].equals("code=\"102\"")){						if(codes[3].indexOf("caller_n")>-1||codes[3].indexOf("caller_d")>-1){							codes[3] = process102(codes[3]);						}							codes[3] = codes[3].replace(‘ ‘, ‘#‘);											}else{						codes[3] = codes[3].replace("  ", " ");					}											String collect = split[0] + " time=" + codes[0] + " "							+ codes[1] + " " + codes[2] + " " + codes[3];					word.set(collect);					context.write(word, new Text(""));				}			} catch (Exception e) {				// TODO Auto-generated catch block			}		}	}	public static String process100(String code) throws Exception{		String[] codes = code.split(" ");		HashMap<String, Contact> hs = new HashMap<String, Process.Contact>();		Pattern p0 = Pattern.compile("_(\\d*)=");		Pattern p1 = Pattern.compile("\"(.*)\"");		for (int i = 0; i < codes.length; i++) {			if (codes[i].equals(""))				continue;			String index = getIndex(codes[i], p0);			if (index == null)				continue;			String value = http://www.mamicode.com/getIndex(codes[i], p1);"contact_name_")) {				contact.contactName = value;			} else if (codes[i].startsWith("contact_num_")) {				contact.contactNum = value;			}			contact.index = index;			hs.put(index, contact);		}		return printToString(hs);	}	public static String process101(String code) throws Exception{		String[] codes = code.split("\"  ");		HashMap<String, Message> hs = new HashMap<String, Process.Message>();		Pattern p = Pattern.compile("_(\\d*)=");		Pattern p1 = Pattern.compile("\"(.*)");		for (int i = 0; i < codes.length; i++) {			String index = getIndex(codes[i], p);			String value = http://www.mamicode.com/getIndex(codes[i], p1);"message_time_")) {				message.messageTime = value;			} else if (codes[i].startsWith("message_to_")) {				message.messageTo = value;			}			message.index = index;			hs.put(index, message);		}		return printToString(hs);	}	public static String process102(String code) throws Exception{		String[] codes = code.split("\"  ");		HashMap<String, CallLog> hs = new HashMap<String, Process.CallLog>();		Pattern p = Pattern.compile("_(\\d*)=");		Pattern p1 = Pattern.compile("\"(.*)");		for (int i = 0; i < codes.length; i++) {			String index = getIndex(codes[i], p);			if (index == null)				continue;			String value = http://www.mamicode.com/getIndex(codes[i], p1);"caller_date_")) {				callLog.callerDate = value;			} else if (codes[i].startsWith("caller_duration_")) {				callLog.callerDuration = value;			} else if (codes[i].startsWith("caller_name_")) {				callLog.callerName = value;			} else if (codes[i].startsWith("caller_num_")) {				callLog.callerNum = value;			}			callLog.index = index;			hs.put(index, callLog);		}		return printToString(hs);	}	public static String printToString(Map hs) {		Set set = hs.keySet();		Iterator<String> it = set.iterator();		String result = "";		while (it.hasNext()) {			result = result + hs.get(it.next()).toString() + "|";		}		return result;	}	public static String getIndex(String code, Pattern p) {		String index = null;		Matcher matcher = p.matcher(code);		if (matcher.find()) {			index = matcher.group(1);		}		return index;	}	public static class IntSumReducer extends Reducer<Text, Text, Text, Text> {		public void reduce(Text key, Text rr, Context context)				throws IOException, InterruptedException {			context.write(key, new Text(""));		}	}	public static class Contact {		public String index;		public String contactName;		public String contactNum;		@Override		public String toString() {			// TODO Auto-generated method stub			return "contact_" + index + "=" + this.contactName + ";"					+ this.contactNum;		}	}	public static class Message {		public String index;		public String messageTime;		public String messageTo;		@Override		public String toString() {			// TODO Auto-generated method stub			return "message_" + this.index + "=" + this.messageTo + ";"					+ this.messageTime;		}	}	public static class CallLog {		public String index;		public String callerDuration;		public String callerNum;		public String callerName;		public String callerDate;		@Override		public String toString() {			// TODO Auto-generated method stub			return "callLog_" + this.index + "=" + this.callerName + ";"					+ this.callerNum + ";" + this.callerDate + ";"					+ this.callerDuration;		}	}	public static void main(String[] args) throws Exception {		Configuration conf = new Configuration();		String[] otherArgs = new GenericOptionsParser(conf, args)				.getRemainingArgs();		if (otherArgs.length != 2) {			System.err.println("Usage: process <in> <out>");			System.exit(2);		}		Job job = new Job(conf, "process");		job.setJarByClass(Process.class);		job.setMapperClass(TokenizerMapper.class);		job.setCombinerClass(IntSumReducer.class);		job.setReducerClass(IntSumReducer.class);		job.setOutputKeyClass(Text.class);		job.setOutputValueClass(Text.class);		FileInputFormat.addInputPath(job, new Path(otherArgs[0]));		FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));		System.exit(job.waitForCompletion(true) ? 0 : 1);	}}

  此版本为第一版,运行几天后服务器日志量暴增,导致堆栈溢出错误,

因此修改为第二版后可以对jvm内存自定义配置

利用mapreduce清洗日志