首页 > 代码库 > InActon-日志分析(KPI)
InActon-日志分析(KPI)
我参照的前辈的文章http://blog.fens.me/hadoop-mapreduce-log-kpi/
从1.x改到了2.x。虽然没什么大改。(说实话,视频没什么看的,看文章最好)
先用maven构建hadoop项目
下载maven、添加环境变量、替换eclipse默认maven配置、修改maven默认库位置... ...
这里没有像前辈一样用maven命令去新建一个maven项目,直接用eclipse这个方便IDE就行了
重要的pom.xml添加依赖
1 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 2 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 3 <modelVersion>4.0.0</modelVersion> 4 5 <groupId>org.admln</groupId> 6 <artifactId>getKPI</artifactId> 7 <version>0.0.1-SNAPSHOT</version> 8 <packaging>jar</packaging> 9 10 <name>getKPI</name>11 <url>http://maven.apache.org</url>12 13 <properties>14 <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>15 </properties>16 17 <dependencies>18 <dependency>19 <groupId>junit</groupId>20 <artifactId>junit</artifactId>21 <version>4.4</version>22 <scope>test</scope>23 </dependency>24 <dependency>25 <groupId>org.apache.hadoop</groupId>26 <artifactId>hadoop-common</artifactId>27 <version>2.2.0</version>28 </dependency>29 <dependency>30 <groupId>org.apache.hadoop</groupId>31 <artifactId>hadoop-mapreduce-client-core</artifactId>32 <version>2.2.0</version>33 </dependency>34 <dependency>35 <groupId>org.apache.hadoop</groupId>36 <artifactId>hadoop-mapreduce-client-common</artifactId>37 <version>2.2.0</version>38 </dependency>39 <dependency>40 <groupId>org.apache.hadoop</groupId>41 <artifactId>hadoop-hdfs</artifactId>42 <version>2.2.0</version>43 </dependency>44 <dependency>45 <groupId>jdk.tools</groupId>46 <artifactId>jdk.tools</artifactId>47 <version>1.7</version>48 <scope>system</scope>49 <systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>50 </dependency>51 </dependencies>52 </project>
然后让maven下载jar包就行了(第一次下载很多很慢,以后就不用下载,快的很了)
然后就是MR了。
这个MR的任务就是根据日志提取一些KPI指标。
日志格式:
1 222.68.172.190 - - [18/Sep/2013:06:49:57 +0000] "GET /images/my.jpg HTTP/1.1" 200 199392 "http://www.angularjs.cn/A00n" "Mozilla/5.0 (Windows NT 6.1)3 AppleWebKit/537.36 (KHTML, like Gecko) Chrome/29.0.1547.66 Safari/537.36"
有用的变量:
- remote_addr: 记录客户端的ip地址, 222.68.172.190
- remote_user: 记录客户端用户名称, –
- time_local: 记录访问时间与时区, [18/Sep/2013:06:49:57 +0000]
- request: 记录请求的url与http协议, “GET /images/my.jpg HTTP/1.1″
- status: 记录请求状态,成功是200, 200
- body_bytes_sent: 记录发送给客户端文件主体内容大小, 19939
- http_referer: 用来记录从那个页面链接访问过来的, “http://www.angularjs.cn/A00n”
- http_user_agent: 记录客户浏览器的相关信息, “Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/29.0.1547.66 Safari/537.36″
KPI目标:
- PV(PageView): 页面访问量统计
- IP: 页面独立IP的访问量统计
- Time: 用户每小时PV的统计
- Source: 用户来源域名的统计
- Browser: 用户的访问设备统计
具体MR:
KPI.java
1 package org.admln.kpi; 2 3 import java.text.ParseException; 4 import java.text.SimpleDateFormat; 5 import java.util.Date; 6 import java.util.HashSet; 7 import java.util.Locale; 8 import java.util.Set; 9 10 /** 11 * @author admln 12 * 13 */ 14 public class KPI { 15 private String remote_addr;// 记录客户端的ip地址 16 private String remote_user;// 记录客户端用户名称,忽略属性"-" 17 private String time_local;// 记录访问时间与时区 18 private String request;// 记录请求的url与http协议 19 private String status;// 记录请求状态;成功是200 20 private String body_bytes_sent;// 记录发送给客户端文件主体内容大小 21 private String http_referer;// 用来记录从那个页面链接访问过来的 22 private String http_user_agent;// 记录客户浏览器的相关信息 23 24 private boolean valid = true;// 判断数据是否合法 25 26 27 public static KPI parser(String line) { 28 KPI kpi = new KPI(); 29 String [] arr = line.split(" "); 30 if(arr.length>11) { 31 kpi.setRemote_addr(arr[0]); 32 kpi.setRemote_user(arr[1]); 33 kpi.setTime_local(arr[3].substring(1)); 34 kpi.setRequest(arr[6]); 35 kpi.setStatus(arr[8]); 36 kpi.setBody_bytes_sent(arr[9]); 37 kpi.setHttp_referer(arr[10]); 38 39 if(arr.length>12) { 40 kpi.setHttp_user_agent(arr[11]+" "+arr[12]); 41 }else { 42 kpi.setHttp_user_agent(arr[11]); 43 } 44 45 if(Integer.parseInt(kpi.getStatus())>400) { 46 kpi.setValid(false); 47 } 48 49 }else { 50 kpi.setValid(false); 51 } 52 53 return kpi; 54 55 } 56 public static KPI filterPVs(String line) { 57 KPI kpi = parser(line); 58 Set pages = new HashSet(); 59 pages.add("/about"); 60 pages.add("/black-ip-list/"); 61 pages.add("/cassandra-clustor/"); 62 pages.add("/finance-rhive-repurchase/"); 63 pages.add("/hadoop-family-roadmap/"); 64 pages.add("/hadoop-hive-intro/"); 65 pages.add("/hadoop-zookeeper-intro/"); 66 pages.add("/hadoop-mahout-roadmap/"); 67 68 if (!pages.contains(kpi.getRequest())) { 69 kpi.setValid(false); 70 } 71 return kpi; 72 } 73 74 public String getRemote_addr() { 75 return remote_addr; 76 } 77 78 public void setRemote_addr(String remote_addr) { 79 this.remote_addr = remote_addr; 80 } 81 82 public String getRemote_user() { 83 return remote_user; 84 } 85 86 public void setRemote_user(String remote_user) { 87 this.remote_user = remote_user; 88 } 89 90 public String getTime_local() { 91 return time_local; 92 } 93 94 public Date getTime_local_Date() throws ParseException { 95 SimpleDateFormat df = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss",Locale.US); 96 return df.parse(this.time_local); 97 } 98 //为了以小时为单位统计数据 99 public String getTime_local_Date_Hour() throws ParseException {100 SimpleDateFormat df = new SimpleDateFormat("yyyyMMddHH");101 return df.format(this.getTime_local_Date());102 }103 104 public void setTime_local(String time_local) {105 this.time_local = time_local;106 }107 108 public String getRequest() {109 return request;110 }111 112 public void setRequest(String request) {113 this.request = request;114 }115 116 public String getStatus() {117 return status;118 }119 120 public void setStatus(String status) {121 this.status = status;122 }123 124 public String getBody_bytes_sent() {125 return body_bytes_sent;126 }127 128 public void setBody_bytes_sent(String body_bytes_sent) {129 this.body_bytes_sent = body_bytes_sent;130 }131 132 public String getHttp_referer() {133 return http_referer;134 }135 136 public void setHttp_referer(String http_referer) {137 this.http_referer = http_referer;138 }139 140 public String getHttp_user_agent() {141 return http_user_agent;142 }143 144 public void setHttp_user_agent(String http_user_agent) {145 this.http_user_agent = http_user_agent;146 }147 148 public boolean isValid() {149 return valid;150 }151 152 public void setValid(boolean valid) {153 this.valid = valid;154 }155 }
KPIBrowser.java
1 package org.admln.kpi; 2 3 import java.io.IOException; 4 5 import org.apache.hadoop.conf.Configuration; 6 import org.apache.hadoop.fs.Path; 7 import org.apache.hadoop.io.IntWritable; 8 import org.apache.hadoop.mapreduce.Job; 9 import org.apache.hadoop.mapreduce.Mapper;10 import org.apache.hadoop.mapreduce.Reducer;11 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;12 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;13 import org.apache.hadoop.io.Text;14 15 /**16 * @author admln17 *18 */19 public class KPIBrowser {20 21 public static class browserMapper extends Mapper<Object,Text,Text,IntWritable> {22 Text word = new Text();23 IntWritable ONE = new IntWritable(1);24 @Override25 public void map(Object key,Text value,Context context) throws IOException, InterruptedException {26 KPI kpi = KPI.parser(value.toString());27 if(kpi.isValid()) {28 word.set(kpi.getHttp_user_agent());29 context.write(word, ONE);30 }31 }32 }33 34 public static class browserReducer extends Reducer<Text,IntWritable,Text,IntWritable> {35 int sum;36 public void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {37 sum = 0;38 for(IntWritable val : values) {39 sum += val.get();40 }41 context.write(key, new IntWritable(sum));42 }43 }44 45 public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {46 Path input = new Path("hdfs://hadoop:9001/fens/kpi/input/");47 Path output = new Path("hdfs://hadoop:9001/fens/kpi/browser/output");48 49 Configuration conf = new Configuration();50 51 @SuppressWarnings("deprecation")52 Job job = new Job(conf,"get KPI Browser");53 54 job.setJarByClass(KPIBrowser.class);55 56 job.setMapperClass(browserMapper.class);57 job.setCombinerClass(browserReducer.class);58 job.setReducerClass(browserReducer.class);59 60 job.setOutputKeyClass(Text.class);61 job.setOutputValueClass(IntWritable.class);62 63 FileInputFormat.addInputPath(job,input);64 FileOutputFormat.setOutputPath(job,output);65 66 System.exit(job.waitForCompletion(true)?0:1);67 68 }69 }
KPIIP.java
1 package org.admln.kpi; 2 3 import java.io.IOException; 4 import java.util.HashSet; 5 import java.util.Set; 6 7 import org.apache.hadoop.conf.Configuration; 8 import org.apache.hadoop.fs.Path; 9 import org.apache.hadoop.io.Text;10 import org.apache.hadoop.mapreduce.Job;11 import org.apache.hadoop.mapreduce.Mapper;12 import org.apache.hadoop.mapreduce.Reducer;13 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;14 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;15 16 /**17 * @author admln18 *19 */20 public class KPIIP {21 //map类22 public static class ipMapper extends Mapper<Object,Text,Text,Text> {23 private Text word = new Text();24 private Text ips = new Text();25 26 @Override27 public void map(Object key,Text value,Context context) throws IOException, InterruptedException {28 KPI kpi = KPI.parser(value.toString());29 if(kpi.isValid()) {30 word.set(kpi.getRequest());31 ips.set(kpi.getRemote_addr());32 context.write(word, ips);33 }34 }35 }36 37 //reduce类38 public static class ipReducer extends Reducer<Text,Text,Text,Text> {39 private Text result = new Text();40 private Set<String> count = new HashSet<String>();41 42 public void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException {43 44 for (Text val : values) {45 count.add(val.toString());46 }47 result.set(String.valueOf(count.size()));48 context.write(key, result);49 }50 }51 52 public static void main(String[] args) throws Exception {53 Path input = new Path("hdfs://hadoop:9001/fens/kpi/input/");54 Path output = new Path("hdfs://hadoop:9001/fens/kpi/ip/output");55 56 Configuration conf = new Configuration();57 58 @SuppressWarnings("deprecation")59 Job job = new Job(conf,"get KPI IP");60 job.setJarByClass(KPIIP.class);61 62 job.setMapperClass(ipMapper.class);63 job.setCombinerClass(ipReducer.class);64 job.setReducerClass(ipReducer.class);65 66 job.setOutputKeyClass(Text.class);67 job.setOutputValueClass(Text.class);68 69 FileInputFormat.addInputPath(job,input);70 FileOutputFormat.setOutputPath(job,output);71 System.exit(job.waitForCompletion(true)?0:1);72 73 }74 }
KPIPV.java
1 package org.admln.kpi; 2 3 import java.io.IOException; 4 5 import org.apache.hadoop.conf.Configuration; 6 import org.apache.hadoop.fs.Path; 7 import org.apache.hadoop.io.IntWritable; 8 import org.apache.hadoop.io.Text; 9 import org.apache.hadoop.mapreduce.Job;10 import org.apache.hadoop.mapreduce.Mapper;11 import org.apache.hadoop.mapreduce.Reducer;12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;13 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;14 15 /**16 * @author admln17 *18 */19 public class KPIPV {20 21 public static class pvMapper extends Mapper<Object,Text,Text,IntWritable> {22 private Text word = new Text();23 private final static IntWritable ONE = new IntWritable(1);24 25 public void map(Object key,Text value,Context context) throws IOException, InterruptedException {26 KPI kpi = KPI.filterPVs(value.toString());27 if(kpi.isValid()) {28 word.set(kpi.getRequest());29 context.write(word, ONE);30 }31 }32 }33 34 public static class pvReducer extends Reducer<Text,IntWritable,Text,IntWritable> {35 IntWritable result = new IntWritable();36 public void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {37 int sum = 0;38 for (IntWritable val : values) {39 sum += val.get();40 }41 result.set(sum);42 context.write(key,result);43 }44 }45 46 public static void main(String[] args) throws Exception {47 Path input = new Path("hdfs://hadoop:9001/fens/kpi/input/");48 Path output = new Path("hdfs://hadoop:9001/fens/kpi/pv/output");49 50 Configuration conf = new Configuration();51 52 @SuppressWarnings("deprecation")53 Job job = new Job(conf,"get KPI PV");54 55 job.setJarByClass(KPIPV.class);56 57 job.setMapperClass(pvMapper.class);58 job.setCombinerClass(pvReducer.class);59 job.setReducerClass(pvReducer.class);60 61 job.setOutputKeyClass(Text.class);62 job.setOutputValueClass(IntWritable.class);63 64 FileInputFormat.addInputPath(job,input);65 FileOutputFormat.setOutputPath(job,output);66 67 System.exit(job.waitForCompletion(true)?0:1);68 69 }70 71 }
KPISource.java
1 package org.admln.kpi; 2 3 import java.io.IOException; 4 5 import org.apache.hadoop.conf.Configuration; 6 import org.apache.hadoop.fs.Path; 7 import org.apache.hadoop.io.IntWritable; 8 import org.apache.hadoop.io.Text; 9 import org.apache.hadoop.mapreduce.Job;10 import org.apache.hadoop.mapreduce.Mapper;11 import org.apache.hadoop.mapreduce.Reducer;12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;13 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;14 15 /**16 * @author admln17 *18 */19 public class KPISource {20 21 public static class sourceMapper extends Mapper<Object,Text,Text,IntWritable> {22 Text word = new Text();23 IntWritable ONE = new IntWritable(1);24 @Override25 public void map(Object key,Text value,Context context) throws IOException, InterruptedException {26 KPI kpi = KPI.parser(value.toString());27 if(kpi.isValid()) {28 word.set(kpi.getHttp_referer());29 context.write(word, ONE);30 }31 }32 }33 34 public static class sourceReducer extends Reducer<Text,IntWritable,Text,IntWritable> {35 int sum;36 public void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {37 sum = 0;38 for(IntWritable val : values) {39 sum += val.get();40 }41 context.write(key, new IntWritable(sum));42 }43 }44 45 public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {46 Path input = new Path("hdfs://hadoop:9001/fens/kpi/input/");47 Path output = new Path("hdfs://hadoop:9001/fens/kpi/source/output");48 49 Configuration conf = new Configuration();50 51 @SuppressWarnings("deprecation")52 Job job = new Job(conf,"get KPI Source");53 54 job.setJarByClass(KPISource.class);55 56 job.setMapperClass(sourceMapper.class);57 job.setCombinerClass(sourceReducer.class);58 job.setReducerClass(sourceReducer.class);59 60 job.setOutputKeyClass(Text.class);61 job.setOutputValueClass(IntWritable.class);62 63 FileInputFormat.addInputPath(job,input);64 FileOutputFormat.setOutputPath(job,output);65 66 System.exit(job.waitForCompletion(true)?0:1);67 }68 }
KPITime.java
1 package org.admln.kpi; 2 3 import java.io.IOException; 4 import java.text.ParseException; 5 6 import org.apache.hadoop.conf.Configuration; 7 import org.apache.hadoop.fs.Path; 8 import org.apache.hadoop.io.IntWritable; 9 import org.apache.hadoop.io.Text;10 import org.apache.hadoop.mapreduce.Job;11 import org.apache.hadoop.mapreduce.Mapper;12 import org.apache.hadoop.mapreduce.Reducer;13 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;14 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;15 16 /**17 * @author admln18 *19 */20 public class KPITime {21 22 public static class timeMapper extends Mapper<Object,Text,Text,IntWritable> {23 Text word = new Text();24 IntWritable ONE = new IntWritable(1);25 @Override26 public void map(Object key,Text value,Context context) throws IOException, InterruptedException {27 KPI kpi = KPI.parser(value.toString());28 if(kpi.isValid()) {29 try {30 word.set(kpi.getTime_local_Date_Hour());31 } catch (ParseException e) {32 e.printStackTrace();33 }34 context.write(word, ONE);35 }36 }37 }38 39 public static class timeReducer extends Reducer<Text,IntWritable,Text,IntWritable> {40 int sum;41 public void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {42 sum = 0;43 for(IntWritable val : values) {44 sum += val.get();45 }46 context.write(key, new IntWritable(sum));47 }48 }49 50 public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {51 Path input = new Path("hdfs://hadoop:9001/fens/kpi/input/");52 Path output = new Path("hdfs://hadoop:9001/fens/kpi/time/output");53 54 Configuration conf = new Configuration();55 56 @SuppressWarnings("deprecation")57 Job job = new Job(conf,"get KPI Time");58 59 job.setJarByClass(KPITime.class);60 61 job.setMapperClass(timeMapper.class);62 job.setCombinerClass(timeReducer.class);63 job.setReducerClass(timeReducer.class);64 65 job.setOutputKeyClass(Text.class);66 job.setOutputValueClass(IntWritable.class);67 68 FileInputFormat.addInputPath(job,input);69 FileOutputFormat.setOutputPath(job,output);70 71 System.exit(job.waitForCompletion(true)?0:1);72 73 }74 75 }
其实五个MR都差不多,都是WordCount稍作改变。(前辈好像写的有点小错误,被我发现改了)
hadoop环境是:hadoop2.2.0;JDK1.7;虚拟机伪分布式;IP 192.168.111.132。
具体效果:
这里前辈是把指定目录提取出来了。实际情况可以根据自己的需求提取指定页面。
具体代码和日志文件:http://pan.baidu.com/s/1qW5D63M
实验日志数据也可以从别的地方获得来练手,比如搜狗http://www.sogou.com/labs/dl/q.html
不当之处期盼喷正。
InActon-日志分析(KPI)
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。