首页 > 代码库 > 实现一个mapreduce的job

实现一个mapreduce的job

介绍

Hadoop安装好后,有人会想做一个mapreduce的job跑一跑,mapreduce其实是两个功能,一个是mapper,一个是reducer,废话不多说,现在开始。


正文

1 环境

1.1 部署hadoop

单机版即可,namenode,datanode,resourcemanager, nodemanager,secondnamenode都部署在同一台机器上。

创建hadoop用户

生成ssh公钥私钥,保证ssh localhost能通

配置文件core-site.xml

<configuration>
    <property>
        <name>hadoop.tmp.dir</name>
        <value>/opt/hadoop-2.6.0/tmp</value>
    </property>

    <property>
      <name>fs.defaultFS</name>
        <value>hdfs://localhost:9000</value>
        </property>

</configuration>

配置文件hdfs-site.xml

<configuration>
<property>
             <name>dfs.namenode.name.dir</name>
                     <value>file:/opt/hadoop-2.6.0/hdfs/name</value>
                         </property>
    <property>
            <name>dfs.datanode.data.dir</name>
                    <value>file:/opt/hadoop-2.6.0/hdfs/data</value>
                        </property>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
</configuration>

配置文件mapred-site.xml

<configuration>
<property>
<name>mapred.job.tracker</name>
<value>localhost:8000</value>
</property>
</configuration>

具体步骤清参见我的博客的文章:  

linux上部署hadoop集群 基础篇  

1.2 安装eclipse

这个自己想办法搞定吧,只要能启动就行了,这里就不一一赘述了。


2 写java程序

2.1 新建一个java project,再新建一个类:MaxTemperatureMapper

代码如下:

package mapreduce_maxtempature;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class MaxTemperatureMapper extends
 Mapper<LongWritable, Text, Text, IntWritable> {
private static final int MISSING = 9999;
@Override
 public void map(LongWritable key, Text value, Context context)
 throws IOException, InterruptedException {
String line = value.toString();
 String year = line.substring(15, 19);
 int airTemperature;
 if (line.charAt(87) == ‘+‘) { // parseInt doesn‘t like leading plus
 // signs
 airTemperature = Integer.parseInt(line.substring(88, 92));
 } else {
 airTemperature = Integer.parseInt(line.substring(87, 92));
 }
 String quality = line.substring(92, 93);
 if (airTemperature != MISSING && quality.matches("[01459]")) {
 context.write(new Text(year), new IntWritable(airTemperature));
 }
 }
}

2.2 再建一个类:MaxTemperatureReducer

代码如下:

package mapreduce_maxtempature;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class MaxTemperatureReducer
extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterable<IntWritable> values,
 Context context)
 throws IOException, InterruptedException {
 int maxValue = Integer.MIN_VALUE;
 for (IntWritable value : values) {
 maxValue = Math.max(maxValue, value.get());
 }
 context.write(key, new IntWritable(maxValue));
}
}

2.3 再建一个类:MaxTemperatureDriver

代码如下:

package mapreduce_maxtempature;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/*This class is responsible for running map reduce job*/
public class MaxTemperatureDriver extends Configured implements Tool{
public int run(String[] args) throws Exception
 {
 if(args.length !=2) {
 System.err.println("Usage: MaxTemperatureDriver <input path> <outputpath>");
 System.exit(-1);
 }
 @SuppressWarnings("deprecation")
Job job = new Job();
 job.setJarByClass(MaxTemperatureDriver.class);
 job.setJobName("Max Temperature");
 FileInputFormat.addInputPath(job, new Path(args[0]));
 FileOutputFormat.setOutputPath(job,new Path(args[1]));
 job.setMapperClass(MaxTemperatureMapper.class);
 job.setReducerClass(MaxTemperatureReducer.class);
 job.setOutputKeyClass(Text.class);
 job.setOutputValueClass(IntWritable.class);
 System.exit(job.waitForCompletion(true) ? 0:1); 
 boolean success = job.waitForCompletion(true);
 return success ? 0 : 1;
 }
public static void main(String[] args) throws Exception {
 MaxTemperatureDriver driver = new MaxTemperatureDriver();
 int exitCode = ToolRunner.run(driver, args);
 System.exit(exitCode);
 }
}

最后一个调用ToolRunner来运行job。保证三个java程序都没有错误。


2.4 打包成jar包

如何打jar包,这里不再赘述,不会的请自己搞定。


2.5 生成sample.txt

新建一个文件sample.txt,内容如下:

0035227070999991902122213004+62167+030650FM-12+010299999V0209991C000019999999N0000001N9-01721+99999101221ADDGF107991999999999999999999MW1721
0029227070999991902122220004+62167+030650FM-12+010299999V0209991C000019999999N0000001N9-02001+99999101551ADDGF100991999999999999999999
0035227070999991902122306004+62167+030650FM-12+010299999V0201401N002119999999N0000001N9-01611+99999101161ADDGF100991999999999999999999MW1721
0035227070999991902122313004+62167+030650FM-12+010299999V0201801N001019999999N0000001N9-00781+99999100191ADDGF108991999999999999999999MW1721
0029227070999991902122320004+62167+030650FM-12+010299999V0203201N002119999999N0000001N9-00281+99999099601ADDGF108991999999999999999999
0029227070999991902122406004+62167+030650FM-12+010299999V0203201N004119999999N0000001N9-00111+99999098601ADDGF100991999999999999999999
0029227070999991902122413004+62167+030650FM-12+010299999V0209991C000019999999N0000001N9-00281+99999098711ADDGF108991999999999999999999
0029227070999991902122420004+62167+030650FM-12+010299999V0209991C000019999999N0000001N9-00501+99999098831ADDGF100991999999999999999999
0029227070999991902122506004+62167+030650FM-12+010299999V0201801N001019999999N0000001N9-00281+99999097351ADDGF108991999999999999999999
0029227070999991902122513004+62167+030650FM-12+010299999V0201801N015919999999N0000001N9-00221+99999095821ADDGF108991999999999999999999
0029227070999991902122520004+62167+030650FM-12+010299999V0201801N015919999999N0000001N9-00331+99999095751ADDGF108991999999999999999999
0029227070999991902122606004+62167+030650FM-12+010299999V0201801N006219999999N0000001N9-00891+99999095401ADDGF100991999999999999999999
0029227070999991902122613004+62167+030650FM-12+010299999V0202301N002119999999N0000001N9-00891+99999095281ADDGF107991999999999999999999
0029227070999991902122620004+62167+030650FM-12+010299999V0202301N001019999999N0000001N9-01331+99999095581ADDGF100991999999999999999999
0029227070999991902122706004+62167+030650FM-12+010299999V0203201N001019999999N0000001N9-01111+99999095801ADDGF108991999999999999999999
0035227070999991902122713004+62167+030650FM-12+010299999V0203601N002119999999N0000001N9-01061+99999096221ADDGF108991999999999999999999MW1721
0029227070999991902122720004+62167+030650FM-12+010299999V0203201N002119999999N0000001N9-01171+99999096711ADDGF108991999999999999999999
0029227070999991902122806004+62167+030650FM-12+010299999V0203201N004119999999N0000001N9-01171+99999097441ADDGF100991999999999999999999
0029227070999991902122813004+62167+030650FM-12+010299999V0202901N001019999999N0000001N9-00891+99999097791ADDGF108991999999999999999999
0029227070999991902122820004+62167+030650FM-12+010299999V0203201N002119999999N0000001N9-01061+99999097671ADDGF100991999999999999999999
0029227070999991902122906004+62167+030650FM-12+010299999V0203201N002119999999N0000001N9-01281+99999097601ADDGF100991999999999999999999
0029227070999991902122913004+62167+030650FM-12+010299999V0203601N004119999999N0000001N9-01221+99999097721ADDGF108991999999999999999999
0029227070999991902122920004+62167+030650FM-12+010299999V0203601N001019999999N0000001N9-01441+99999097821ADDGF100991999999999999999999
0029227070999991902123006004+62167+030650FM-12+010299999V0203601N006219999999N0000001N9-01561+99999098041ADDGF106991999999999999999999
0029227070999991902123013004+62167+030650FM-12+010299999V0200501N001019999999N0000001N9-01561+99999097981ADDGF108991999999999999999999
0029227070999991902123020004+62167+030650FM-12+010299999V0209991C000019999999N0000001N9-01501+99999098461ADDGF100991999999999999999999
0029227070999991902123106004+62167+030650FM-12+010299999V0200501N002119999999N0000001N9-01171+99999098641ADDGF108991999999999999999999
0029227070999991902123113004+62167+030650FM-12+010299999V0200501N002119999999N0000001N9-01281+99999099551ADDGF108991999999999999999999
0029227070999991902123120004+62167+030650FM-12+010299999V0200501N002119999999N0000001N9-01831+99999100111ADDGF100991999999999999999999

2.6 上传sample.txt到hdfs文件系统,即hdfs://localhost:9000/

hadoop dfs -put sample.txt hdfs://localhost:9000/

查看结果:

hadoop dfs -ls hdfs://localhost:9000/

2.7 执行job

hadoop jar mapreduce_maxtempature.jar     /sample.txt /output

成功后会在hdfs文件系统中自动生成output文件夹,里面有内容,是job执行结果。

若有错误请根据具体的结果调试。

我的结果如下:

hdfs dfs -cat hdfs://localhost:9000/output/part-r-00000
1901    317
1902    244


本文出自 “Linux和网络” 博客,谢绝转载!

实现一个mapreduce的job