首页 > 代码库 > Hadoop日记Day16---命令行运行MapReduce程序

Hadoop日记Day16---命令行运行MapReduce程序

一、代码编写

1.1 单词统计

  回顾我们以前单词统计的例子,如代码1.1所示。

 1 package counter; 2  3 import java.net.URI; 4  5 import org.apache.hadoop.conf.Configuration; 6 import org.apache.hadoop.fs.FileSystem; 7 import org.apache.hadoop.fs.Path; 8 import org.apache.hadoop.io.LongWritable; 9 import org.apache.hadoop.io.Text;10 import org.apache.hadoop.mapreduce.Counter;11 import org.apache.hadoop.mapreduce.Job;12 import org.apache.hadoop.mapreduce.Mapper;13 import org.apache.hadoop.mapreduce.Reducer;14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;15 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;16 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;17 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;18 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;19 20 public class WordCountApp {21     static final String INPUT_PATH = "hdfs://hadoop:9000/hello";22     static final String OUT_PATH = "hdfs://hadoop:9000/out";23     24     public static void main(String[] args) throws Exception {25         26         Configuration conf = new Configuration();27         28         final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);29         final Path outPath = new Path(OUT_PATH);30         31         if(fileSystem.exists(outPath)){32             fileSystem.delete(outPath, true);33         }34         35         final Job job = new Job(conf , WordCountApp.class.getSimpleName());36         37         FileInputFormat.setInputPaths(job, INPUT_PATH);//1.1指定读取的文件位于哪里38         39         job.setInputFormatClass(TextInputFormat.class);//指定如何对输入文件进行格式化,把输入文件每一行解析成键值对40         41         job.setMapperClass(MyMapper.class);//1.2 指定自定义的map类42         job.setMapOutputKeyClass(Text.class);//map输出的<k,v>类型。如果<k3,v3>的类型与<k2,v2>类型一致,则可以省略43         job.setMapOutputValueClass(LongWritable.class);44         45         job.setPartitionerClass(HashPartitioner.class);//1.3 分区46         job.setNumReduceTasks(1);//有一个reduce任务运行47         48         job.setReducerClass(MyReducer.class);//2.2 指定自定义reduce类49         job.setOutputKeyClass(Text.class);//指定reduce的输出类型50         job.setOutputValueClass(LongWritable.class);51         52         FileOutputFormat.setOutputPath(job, outPath);//2.3 指定写出到哪里53         54         job.setOutputFormatClass(TextOutputFormat.class);//指定输出文件的格式化类55         56         job.waitForCompletion(true);//把job提交给JobTracker运行57     }58     59     /**60      * KEYIN    即k1        表示行的偏移量61      * VALUEIN    即v1        表示行文本内容62      * KEYOUT    即k2        表示行中出现的单词63      * VALUEOUT    即v2        表示行中出现的单词的次数,固定值164      */65     static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{66         protected void map(LongWritable k1, Text v1, Context context) throws java.io.IOException ,InterruptedException {67         //    final Counter helloCounter = context.getCounter("Sensitive Words", "hello");68             69             final String line = v1.toString();70         /*    if(line.contains("hello")){71                 //记录敏感词出现在一行中72                 helloCounter.increment(1L);73             }*/74             final String[] splited = line.split(" ");75             for (String word : splited) {76                 context.write(new Text(word), new LongWritable(1));77             }78         };79     }80     81     /**82      * KEYIN    即k2        表示行中出现的单词83      * VALUEIN    即v2        表示行中出现的单词的次数84      * KEYOUT    即k3        表示文本中出现的不同单词85      * VALUEOUT    即v3        表示文本中出现的不同单词的总次数86      *87      */88     static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable>{89         protected void reduce(Text k2, java.lang.Iterable<LongWritable> v2s, Context ctx) throws java.io.IOException ,InterruptedException {90             long times = 0L;91             for (LongWritable count : v2s) {92                 times += count.get();93             }94             ctx.write(k2, new LongWritable(times));95         };96     }97         98 }
View Code

代码 1.1

  分析上面代码,我们会发现该单词统计方法的输入输出路径都已经写死了,比如输入路径:INPUT_PATH = "hdfs://hadoop:9000/hello"输出路径:OUT_PATH = "hdfs://hadoop:9000/out"。这样一来,这个算法的输入出路径也就固定死了,想要使用这个算法,相应的数据就必须满足这个固定的路径要求,从而算法的灵活性和可操作性也就大大降低了,也就是说我们的算法,目前还不算是一个通用的算法。所以为了提高算法灵活性和可操作性,应该通过指令运行时参数来指定输入输出路径。

1.2 在命令行运行的单词统计

  在命令行运行的单词统计程序,如代码1.2所示。

  1 package cmd;  2   3 import java.net.URI;  4   5 import org.apache.hadoop.conf.Configuration;  6 import org.apache.hadoop.conf.Configured;  7 import org.apache.hadoop.fs.FileSystem;  8 import org.apache.hadoop.fs.Path;  9 import org.apache.hadoop.io.LongWritable; 10 import org.apache.hadoop.io.Text; 11 import org.apache.hadoop.mapreduce.Job; 12 import org.apache.hadoop.mapreduce.Mapper; 13 import org.apache.hadoop.mapreduce.Reducer; 14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 15 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 16 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 17 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 18 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; 19 import org.apache.hadoop.util.Tool; 20 import org.apache.hadoop.util.ToolRunner; 21  22 public class WordCountApp extends Configured implements Tool{ 23     static String INPUT_PATH = ""; 24     static String OUT_PATH = ""; 25      26     @Override 27     public int run(String[] arg0) throws Exception { 28         INPUT_PATH = arg0[0]; 29         OUT_PATH = arg0[1]; 30          31         Configuration conf = new Configuration(); 32         final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf); 33         final Path outPath = new Path(OUT_PATH); 34         if(fileSystem.exists(outPath)){ 35             fileSystem.delete(outPath, true); 36         } 37          38         final Job job = new Job(conf , WordCountApp.class.getSimpleName());         39          40         job.setJarByClass(WordCountApp.class);//打包运行必须执行的秘密方法         41         FileInputFormat.setInputPaths(job, INPUT_PATH);//1.1指定读取的文件位于哪里 42         job.setInputFormatClass(TextInputFormat.class);//指定如何对输入文件进行格式化,把输入文件每一行解析成键值对 43          44         job.setMapperClass(MyMapper.class);//1.2 指定自定义的map类         45         job.setMapOutputKeyClass(Text.class);//map输出的<k,v>类型。如果<k3,v3>的类型与<k2,v2>类型一致,则可以省略 46         job.setMapOutputValueClass(LongWritable.class); 47          48          49         job.setPartitionerClass(HashPartitioner.class);//1.3 分区         50         job.setNumReduceTasks(1);//有一个reduce任务运行 51          52          53         job.setReducerClass(MyReducer.class);//2.2 指定自定义reduce类         54         job.setOutputKeyClass(Text.class);//指定reduce的输出类型 55         job.setOutputValueClass(LongWritable.class); 56          57         FileOutputFormat.setOutputPath(job, outPath);//2.3 指定写出到哪里 58         job.setOutputFormatClass(TextOutputFormat.class);//指定输出文件的格式化类 59          60         job.waitForCompletion(true);//把job提交给JobTracker运行 61         return 0; 62     } 63      64     public static void main(String[] args) throws Exception { 65         ToolRunner.run(new WordCountApp(), args); 66     } 67      68     /** 69      * KEYIN    即k1        表示行的偏移量 70      * VALUEIN    即v1        表示行文本内容 71      * KEYOUT    即k2        表示行中出现的单词 72      * VALUEOUT    即v2        表示行中出现的单词的次数,固定值1 73      */ 74     static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{ 75         protected void map(LongWritable k1, Text v1, Context context) throws java.io.IOException ,InterruptedException { 76             final String[] splited = v1.toString().split("\t"); 77             for (String word : splited) { 78                 context.write(new Text(word), new LongWritable(1)); 79             } 80         }; 81     } 82      83     /** 84      * KEYIN    即k2        表示行中出现的单词 85      * VALUEIN    即v2        表示行中出现的单词的次数 86      * KEYOUT    即k3        表示文本中出现的不同单词 87      * VALUEOUT    即v3        表示文本中出现的不同单词的总次数 88      * 89      */ 90     static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable>{ 91         protected void reduce(Text k2, java.lang.Iterable<LongWritable> v2s, Context ctx) throws java.io.IOException ,InterruptedException { 92             long times = 0L; 93             for (LongWritable count : v2s) { 94                 times += count.get(); 95             } 96             ctx.write(k2, new LongWritable(times)); 97         }; 98     } 99 100         101 }
View Code

代码 1.2

  在编写能够在命令行运行的单词统计程序时,我们的类要继承Configured类实现Tool接口,实现Tool接口就要添加一个run()方法。在run()方法中执行我们原来在Main()方法中运行的配置代码。然而run方法如何运行呢?那就要在Main方法中调用run方法,调用方式如代码1.3所示。

1 public static void main(String[] args) throws Exception {2         ToolRunner.run(new WordCountApp(), args);3     }

代码 1.3

  我们看一下run方法的参数,ToolRunner.run(Tool tool, String[] args),第一参数为Tool接口,我们知道该程序的类就是Tool的实现类所以我们可以,用该程序类的对象来作为参数。他的第二个参数,是一个字符数组args。在这里我们先讲一下,main函数的args参数。这个参数是运行程序前给它的参数。如果你在你程序要用这个参数的话,就需要在运行前指定。比如一个打印helloworld的程序如下:

public class HelloWorld{    public static void main(String[] args) {        System.out.println(args[0]);    }}

  执行命令java HelloWorld ceshi ceshi1 ceshi2,那么在HelloWorld的main方法里面 args就是{"ceshi", "ceshi1", "ceshi2"},打印的结果就是creshi。

  经过对main方法的分析,我应该就知道了,run方法的第二个参数就应该是main函数的参数,这样就能够接受命令行所指定的参数了。那么既然输入输出路径由运行时的命令行的参数指定,那么就不需要在代码中指定路径了,所以将INPUT_PATHOUT_PATH初始化为空。然后在run方法中通过由命令行传过来的参数来进行赋值,如下所示。

INPUT_PATH = arg0[0];//表示输入路径OUT_PATH = arg0[1];//表示输出路径

  而为了我们的程序能够在命令行运行,必须添加“job.setJarByClass(WordCountApp.class);”代码,表示我们的程序以打包的方式运行。

二、运行方式

2.1 将程序以.jar类型导出到桌面

<1> 选择WordCountApp右击选择Export,如图2.1所示。

 图 2.1

<2>  选择JAR file,选择Next,如图2.2所示。

 图 2.2

<3> 选择Next后,弹出如下界面,如图2.3,再次选择Next。

 图 2.3

<4> 选择Next之后,弹出如图2.4的界面,选择Browse。

 图 2.4

<5> 选择Browse后,在弹出的界面选择ok,如图2.5所示。

图 2.5

<6> 选择Ok后,直接选择finish即可,如图2.6所示。

图 2.6

2.2 将jar包传到Linux

  使用WinScp将程序的jar包传到Linux,如图2.7所示。

图 2.7

2.3 在Linux命令行执行jar包

2.3.1 创建输入输出路径

  执行命令:

hadoop fs -mkdir /inputhadoop fs -mkdir /output

2.3.2 编写、上传file文件

  执行命令:vi file1

  输入内容:

        hello    word
        hello    me

  执行命令:hadoop fs -put file1 /

2.3.3 执行程序

执行命令:hadoop jar jar.jar hdfs://hadoop:9000/input  hdfs: //hadoop:9000/output

运行过程:

14/09/28 20:08:08 WARN mapred.JobClient: Use GenericOptionsParser for parsi                                                                                             ng the arguments. Applications should implement Tool for the same.14/09/28 20:08:09 INFO input.FileInputFormat: Total input paths to process                                                                                              : 114/09/28 20:08:09 INFO util.NativeCodeLoader: Loaded the native-hadoop libr                                                                                             ary14/09/28 20:08:09 WARN snappy.LoadSnappy: Snappy native library not loaded14/09/28 20:08:11 INFO mapred.JobClient: Running job: job_201409281916_000114/09/28 20:08:12 INFO mapred.JobClient:  map 0% reduce 0%14/09/28 20:09:03 INFO mapred.JobClient:  map 100% reduce 0%14/09/28 20:09:14 INFO mapred.JobClient:  map 100% reduce 100%14/09/28 20:09:14 INFO mapred.JobClient: Job complete: job_201409281916_000114/09/28 20:09:14 INFO mapred.JobClient: Counters: 2914/09/28 20:09:14 INFO mapred.JobClient:   Job Counters14/09/28 20:09:14 INFO mapred.JobClient:     Launched reduce tasks=114/09/28 20:09:14 INFO mapred.JobClient:     SLOTS_MILLIS_MAPS=4767514/09/28 20:09:14 INFO mapred.JobClient:     Total time spent by all reduces waiting after rese                                                                         rving slots (ms)=014/09/28 20:09:14 INFO mapred.JobClient:     Total time spent by all maps waiting after reservi                                                                         ng slots (ms)=014/09/28 20:09:14 INFO mapred.JobClient:     Launched map tasks=114/09/28 20:09:14 INFO mapred.JobClient:     Data-local map tasks=114/09/28 20:09:14 INFO mapred.JobClient:     SLOTS_MILLIS_REDUCES=1090214/09/28 20:09:14 INFO mapred.JobClient:   File Output Format Counters14/09/28 20:09:14 INFO mapred.JobClient:     Bytes Written=2114/09/28 20:09:14 INFO mapred.JobClient:   FileSystemCounters14/09/28 20:09:14 INFO mapred.JobClient:     FILE_BYTES_READ=6714/09/28 20:09:14 INFO mapred.JobClient:     HDFS_BYTES_READ=11614/09/28 20:09:14 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=10583414/09/28 20:09:14 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=2114/09/28 20:09:14 INFO mapred.JobClient:   File Input Format Counters14/09/28 20:09:14 INFO mapred.JobClient:     Bytes Read=2114/09/28 20:09:14 INFO mapred.JobClient:   Map-Reduce Framework14/09/28 20:09:14 INFO mapred.JobClient:     Map output materialized bytes=6714/09/28 20:09:14 INFO mapred.JobClient:     Map input records=214/09/28 20:09:14 INFO mapred.JobClient:     Reduce shuffle bytes=6714/09/28 20:09:14 INFO mapred.JobClient:     Spilled Records=814/09/28 20:09:14 INFO mapred.JobClient:     Map output bytes=5314/09/28 20:09:14 INFO mapred.JobClient:     CPU time spent (ms)=3514014/09/28 20:09:14 INFO mapred.JobClient:     Total committed heap usage (bytes)=13166592014/09/28 20:09:14 INFO mapred.JobClient:     Combine input records=014/09/28 20:09:14 INFO mapred.JobClient:     SPLIT_RAW_BYTES=9514/09/28 20:09:14 INFO mapred.JobClient:     Reduce input records=414/09/28 20:09:14 INFO mapred.JobClient:     Reduce input groups=314/09/28 20:09:14 INFO mapred.JobClient:     Combine output records=014/09/28 20:09:14 INFO mapred.JobClient:     Physical memory (bytes) snapshot=18195251214/09/28 20:09:14 INFO mapred.JobClient:     Reduce output records=314/09/28 20:09:14 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=75269734414/09/28 20:09:14 INFO mapred.JobClient:     Map output records=4
View Code

执行结果:

[root@hadoop Downloads]# hadoop fs -ls /outputFound 3 items-rw-r--r--   1 root supergroup          0 2014-09-28 20:09 /output/_SUCCESSdrwxr-xr-x   - root supergroup          0 2014-09-28 20:08 /output/_logs-rw-r--r--   1 root supergroup         21 2014-09-28 20:09 /output/part-r-00000[root@hadoop Downloads]# hadoop fs -cat /output/part-r-00000hello   2me      1world   1[root@hadoop Downloads]#

 

 

 

 

Hadoop日记Day16---命令行运行MapReduce程序