首页 > 代码库 > Hadoop MapReduce编程 API入门系列之统计学生成绩版本1(十七)
Hadoop MapReduce编程 API入门系列之统计学生成绩版本1(十七)
不多说,直接上代码。
代码
package zhouls.bigdata.myMapReduce.ScoreCount;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.util.LineReader;
/**
* 自定义学生成绩读写InputFormat
* 数据格式参考:19020090017 小明 90 99 100 89 95
* @author Bertron
*/
public class ScoreInputFormat extends FileInputFormat< Text,ScoreWritable >
{
@Override
protected boolean isSplitable(JobContext context, Path filename)
{
// TODO Auto-generated method stub
return false;
}
@Override
public RecordReader< Text,ScoreWritable > createRecordReader(InputSplit inputsplit,
TaskAttemptContext context) throws IOException, InterruptedException
{
return new ScoreRecordReader();
}
//RecordReader 中的两个参数分别填写我们期望返回的key/value类型,我们期望key为Text类型,value为ScoreWritable类型封装学生所有成绩
public static class ScoreRecordReader extends RecordReader<Text, ScoreWritable >
{
public LineReader in;//行读取器
public Text lineKey;//自定义key类型
public ScoreWritable lineValue;//自定义value类型
public Text line;//每行数据类型
@Override
public void close() throws IOException
{
if(in !=null)
{
in.close();
}
}
@Override
public Text getCurrentKey() throws IOException, InterruptedException
{
return lineKey;
}
@Override
public ScoreWritable getCurrentValue() throws IOException,
InterruptedException
{
return lineValue;
}
@Override
public float getProgress() throws IOException, InterruptedException
{
return 0;
}
@Override
public void initialize(InputSplit input, TaskAttemptContext context)
throws IOException, InterruptedException
{
FileSplit split=(FileSplit)input;
Configuration job=context.getConfiguration();
Path file=split.getPath();
FileSystem fs=file.getFileSystem(job);
FSDataInputStream filein=fs.open(file);
in=new LineReader(filein,job);
line=new Text();
lineKey=new Text();
lineValue = http://www.mamicode.com/new ScoreWritable();
}
//此方法读取每行数据,完成自定义的key和value
@Override
public boolean nextKeyValue() throws IOException, InterruptedException
{
int linesize=in.readLine(line);//每行数据长度
if(linesize==0) return false;
String[] pieces = line.toString().split("\\s+");//解析每行数据
if(pieces.length != 7)
{
throw new IOException("Invalid record received");
}
//将学生的每门成绩转换为 float 类型
float a,b,c,d,e;
try{
a = Float.parseFloat(pieces[2].trim());
b = Float.parseFloat(pieces[3].trim());
c = Float.parseFloat(pieces[4].trim());
d = Float.parseFloat(pieces[5].trim());
e = Float.parseFloat(pieces[6].trim());
}catch(NumberFormatException nfe)
{
throw new IOException("Error parsing floating poing value in record");
}
lineKey.set(pieces[0]+"\t"+pieces[1]);//完成自定义key数据
lineValue.set(a, b, c, d, e);//封装自定义value数据
return true;
}
}
}
package zhouls.bigdata.myMapReduce.ScoreCount;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
/**
* 学习成绩读写类
* 数据格式参考:19020090017 小讲 90 99 100 89 95
* @author Bertron
*/
public class ScoreWritable implements WritableComparable<Object >
{
private float Chinese;
private float Math;
private float English;
private float Physics;
private float Chemistry;
public ScoreWritable()
{
}
public ScoreWritable(float Chinese,float Math,float English,float Physics,float Chemistry)
{
this.Chinese = Chinese;
this.Math = Math;
this.English = English;
this.Physics = Physics;
this.Chemistry = Chemistry;
}
public void set(float Chinese,float Math,float English,float Physics,float Chemistry)
{
this.Chinese = Chinese;
this.Math = Math;
this.English = English;
this.Physics = Physics;
this.Chemistry = Chemistry;
}
public float getChinese() {
return Chinese;
}
public float getMath() {
return Math;
}
public float getEnglish() {
return English;
}
public float getPhysics()
{
return Physics;
}
public float getChemistry()
{
return Chemistry;
}
public void readFields(DataInput in) throws IOException
{
Chinese = in.readFloat();
Math = in.readFloat();
English = in.readFloat();
Physics = in.readFloat();
Chemistry = in.readFloat();
}
public void write(DataOutput out) throws IOException
{
out.writeFloat(Chinese);
out.writeFloat(Math);
out.writeFloat(English);
out.writeFloat(Physics);
out.writeFloat(Chemistry);
}
public int compareTo(Object o)
{
return 0;
}
}
package zhouls.bigdata.myMapReduce.ScoreCount;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* 学生成绩统计Hadoop程序
* 数据格式参考:19020090017 小讲 90 99 100 89 95
* @author HuangBQ
*/
public class ScoreCount extends Configured implements Tool
{
public static class ScoreMapper extends Mapper< Text, ScoreWritable, Text, ScoreWritable >
{
@Override
protected void map(Text key, ScoreWritable value, Context context)throws IOException, InterruptedException
{
context.write(key, value);
}
}
public static class ScoreReducer extends Reducer< Text, ScoreWritable, Text, Text >
{
private Text text = new Text();
protected void reduce(Text Key, Iterable< ScoreWritable > Values, Context context)
throws IOException, InterruptedException
{
float totalScore=0.0f;
float averageScore = 0.0f;
for(ScoreWritable ss:Values)
{
totalScore +=ss.getChinese()+ss.getMath()+ss.getEnglish()+ss.getPhysics()+ss.getChemistry();
averageScore +=totalScore/5;
}
text.set(totalScore+"\t"+averageScore);
context.write(Key, text);
}
}
public int run(String[] args) throws Exception
{
Configuration conf = new Configuration();//读取配置文件
Path mypath = new Path(args[1]);
FileSystem hdfs = mypath.getFileSystem(conf);//创建输出路径
if (hdfs.isDirectory(mypath))
{
hdfs.delete(mypath, true);
}
Job job = new Job(conf, "ScoreCount");//新建任务
job.setJarByClass(ScoreCount.class);//设置主类
FileInputFormat.addInputPath(job, new Path(args[0]));// 输入路径
FileOutputFormat.setOutputPath(job, new Path(args[1]));// 输出路径
job.setMapperClass(ScoreMapper.class);// Mapper
job.setReducerClass(ScoreReducer.class);// Reducer
job.setMapOutputKeyClass(Text.class);// Mapper key输出类型
job.setMapOutputValueClass(ScoreWritable.class);// Mapper value输出类型
job.setInputFormatClass(ScoreInputFormat.class);//设置自定义输入格式
job.waitForCompletion(true);
return 0;
}
public static void main(String[] args) throws Exception
{
// String[] args0 =
// {
// "hdfs://HadoopMaster:9000/score/score.txt",
// "hdfs://HadoopMaster:9000/out/score/"
// };
String[] args0 =
{
"./data/score/score.txt",
"./out/score/"
};
int ec = ToolRunner.run(new Configuration(), new ScoreCount(), args0);
System.exit(ec);
}
}
Hadoop MapReduce编程 API入门系列之统计学生成绩版本1(十七)