首页 > 代码库 > hadoop记录topk

hadoop记录topk

lk@lk-virtual-machine:~/hadoop-1.0.1/bin$ ./hadoop jar ~/hadoop-1.0.1/to.jar top.Top input output
14/05/12 03:44:37 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
****hdfs://localhost:9000/user/lk/input
14/05/12 03:44:41 INFO input.FileInputFormat: Total input paths to process : 4
14/05/12 03:44:48 INFO mapred.JobClient: Running job: job_201405120333_0001
14/05/12 03:44:49 INFO mapred.JobClient:  map 0% reduce 0%
14/05/12 03:46:36 INFO mapred.JobClient:  map 50% reduce 0%
14/05/12 03:47:28 INFO mapred.JobClient:  map 0% reduce 0%
14/05/12 03:47:29 INFO mapred.JobClient: Task Id : attempt_201405120333_0001_m_000000_0, Status : FAILED
attempt_201405120333_0001_m_000000_0: log4j:WARN No appenders could be found for logger (org.apache.hadoop.mapred.Task).
attempt_201405120333_0001_m_000000_0: log4j:WARN Please initialize the log4j system properly.
14/05/12 03:47:44 INFO mapred.JobClient: Task Id : attempt_201405120333_0001_m_000001_0, Status : FAILED
attempt_201405120333_0001_m_000001_0: log4j:WARN No appenders could be found for logger (org.apache.hadoop.mapred.Task).
attempt_201405120333_0001_m_000001_0: log4j:WARN Please initialize the log4j system properly.
14/05/12 03:49:37 INFO mapred.JobClient:  map 50% reduce 0%
14/05/12 03:50:13 INFO mapred.JobClient: Task Id : attempt_201405120333_0001_m_000000_1, Status : FAILED
java.lang.ClassCastException: org.apache.hadoop.io.LongWritable cannot be cast to org.apache.hadoop.io.Text
    at top.Top$topMap.map(Top.java:1)
    at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:370)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:416)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1093)
    at org.apache.hadoop.mapred.Child.main(Child.java:249)

attempt_201405120333_0001_m_000000_1: log4j:WARN No appenders could be found for logger (org.apache.hadoop.mapred.Task).
attempt_201405120333_0001_m_000000_1: log4j:WARN Please initialize the log4j system properly.
14/05/12 03:50:17 INFO mapred.JobClient:  map 0% reduce 0%
14/05/12 03:50:17 INFO mapred.JobClient: Task Id : attempt_201405120333_0001_m_000001_1, Status : FAILED
attempt_201405120333_0001_m_000001_1: log4j:WARN No appenders could be found for logger (org.apache.hadoop.mapred.Task).
attempt_201405120333_0001_m_000001_1: log4j:WARN Please initialize the log4j system properly.
14/05/12 03:52:36 INFO mapred.JobClient:  map 50% reduce 0%
14/05/12 03:52:57 INFO mapred.JobClient:  map 25% reduce 0%
14/05/12 03:53:01 INFO mapred.JobClient: Task Id : attempt_201405120333_0001_m_000001_2, Status : FAILED
java.lang.ClassCastException: org.apache.hadoop.io.LongWritable cannot be cast to org.apache.hadoop.io.Text
    at top.Top$topMap.map(Top.java:1)
    at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:370)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:416)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1093)
    at org.apache.hadoop.mapred.Child.main(Child.java:249)

attempt_201405120333_0001_m_000001_2: log4j:WARN No appenders could be found for logger (org.apache.hadoop.mapred.Task).
attempt_201405120333_0001_m_000001_2: log4j:WARN Please initialize the log4j system properly.
14/05/12 03:53:09 INFO mapred.JobClient:  map 0% reduce 0%
14/05/12 03:53:37 INFO mapred.JobClient: Task Id : attempt_201405120333_0001_m_000000_2, Status : FAILED
java.lang.ClassCastException: org.apache.hadoop.io.LongWritable cannot be cast to org.apache.hadoop.io.Text
    at top.Top$topMap.map(Top.java:1)
    at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:370)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:416)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1093)
    at org.apache.hadoop.mapred.Child.main(Child.java:249)

attempt_201405120333_0001_m_000000_2: log4j:WARN No appenders could be found for logger (org.apache.hadoop.mapred.Task).
attempt_201405120333_0001_m_000000_2: log4j:WARN Please initialize the log4j system properly.
14/05/12 03:54:03 INFO mapred.JobClient: Job complete: job_201405120333_0001
14/05/12 03:54:06 INFO mapred.JobClient: Counters: 7
14/05/12 03:54:06 INFO mapred.JobClient:   Job Counters
14/05/12 03:54:06 INFO mapred.JobClient:     SLOTS_MILLIS_MAPS=830699
14/05/12 03:54:06 INFO mapred.JobClient:     Total time spent by all reduces waiting after reserving slots (ms)=0
14/05/12 03:54:06 INFO mapred.JobClient:     Total time spent by all maps waiting after reserving slots (ms)=0
14/05/12 03:54:06 INFO mapred.JobClient:     Launched map tasks=8
14/05/12 03:54:06 INFO mapred.JobClient:     Data-local map tasks=8
14/05/12 03:54:06 INFO mapred.JobClient:     SLOTS_MILLIS_REDUCES=0
14/05/12 03:54:06 INFO mapred.JobClient:     Failed map tasks=1
lk@lk-virtual-machine:~/hadoop-1.0.1/bin$ ./hadoop dfs -ls output
Found 1 items
drwxr-xr-x   - lk supergroup          0 2014-05-12 03:44 /user/lk/output/_logs

lk@lk-virtual-machine:~/hadoop-1.0.1/bin$





package top;

import java.io.IOException;
import java.util.*;
import java.util.TreeMap;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;



public class Top {
	public static class topMap extends Mapper<Text, Text,  IntWritable,Text>{  

		          

		         private TreeMap<Integer, String> topMap = new TreeMap<Integer,String>();  

		         private int topNum = 1;  

		         public void map(Text key,Text value,Context context){  

		             topMap.put(Integer.parseInt(value.toString()), key.toString());  

		    

		             while(topMap.size()>topNum)  

		                 topMap.remove(topMap.firstKey());  

		         }  

		            

		         protected void cleanup(Context context) throws IOException, InterruptedException{  

		             for(Integer entry:topMap.keySet()){  

		                 context.write(new IntWritable(entry),new Text(topMap.get(entry)));  

		             }  

		         }  

		     }  

		        

		     //瀹炵幇闄嶅簭  

		     private static class descendComparator implements Comparator{  

		    

		         @Override 

		         public int compare(Object o1, Object o2) {  

		             // TODO Auto-generated method stub  

		             Integer a=(Integer) o1;  

		             Integer b=(Integer) o2;  

		                

		             return -a.compareTo(b);  

		         }  

		            

		     }  

		     public static class topReduce extends Reducer<IntWritable,Text , IntWritable,Text>{  

		            

		         private TreeMap<Integer, String> topMap =new TreeMap<Integer,String>(new descendComparator());  

		         private int topNum = 1;  

		         public void reduce(IntWritable key,Iterable<Text> values,Context context){  

		                

		             for(Text text:values)  

		                 topMap.put(key.get(),text.toString());  

		             while(topMap.size()>topNum){  

		                 topMap.remove(topMap.firstKey());  

		             }  

		        

		         }  

		            

		         protected void cleanup(Context context) throws IOException, InterruptedException{  

		             for(Integer integer:topMap.keySet()){  

		                context.write(new IntWritable(integer),new Text(topMap.get(integer)));  

		             }  

		         }  

		     }  
		     
		     public static void main(String[]args)throws Exception
		 	{
		 		Configuration conf =new Configuration();
		 	
		 		Job job =new Job(conf,"Top");
		 		
		 		job.setJarByClass(topMap.class);
		 		job.setMapperClass(topMap.class);
		         job.setReducerClass(topReduce.class);
		 		
		 		job.setOutputKeyClass(IntWritable.class);
		 		job.setOutputValueClass(Text.class);
		 		
		 		FileInputFormat.addInputPath(job,new Path(args[0]));
		 		FileOutputFormat.setOutputPath(job,new Path(args[1]));//setOutputPath(job,new Path(args[1]));
		 		
		 		System.exit(job.waitForCompletion(true)?0:1);
		 		
		 		
		 		
		 	}

		    



}