首页 > 代码库 > LatAndLonRange项目记录

LatAndLonRange项目记录

Mapper:

package latandlonRange;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import au.com.bytecode.opencsv.CSVParser;

public class LatAndLonRangeMapper extends Mapper<LongWritable,Text,Text, Text>{
    protected void map(LongWritable key, Text value, Context context){
        if (key.get() > 0) {
            CSVParser parser = new CSVParser();
            try{
                String[] data = parser.parseLine(value.toString());
                //1:bus_line,2:start_station,3:end_station,4:stationnumber,5:stationname,
                //6:rawlongitude,7:rawlatitude,8:alter_line_code,9:alter_label,10:alter_flag,
                //11:line_code,12:rawlink
                if(data[5]!=null&&data[6]!=null){//两个字段都不为空时,执行
                    String outkey=data[0];
                    String outValue=data[5]+\t+data[6];
                    //只取出rawlongitude和rawlatitude两个
                    System.out.println("MapoutKey:"+outkey);
                    System.out.println("MapoutValue:"+outValue);
                    context.write(new Text(outkey), new Text(outValue));
                }
            }catch (IOException | InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }
}

Reduce:

package latandlonRange;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class LatAndLonRangeReduce extends Reducer<Text, Text, Text, Text>{
    Float minlat=new Float(1000.0);
    Float maxlat=new Float(0.0);
    Float minlon=new Float(1000.0);
    Float maxlon=new Float(0.0);
    public void reduce(Text key,Iterable<Text> values, Context context){
        try {
            for (Text  val : values) {
                String inputValue = val.toString();
                   String[] valueData = inputValue.split("\t");
                   float lon= Float.parseFloat(valueData[0]);
                   float lat= Float.parseFloat(valueData[1]);
                   minlat=lon<minlat?lat:minlat;
                   maxlat=lon>maxlat?lat:maxlat;
                   minlon=lon<minlon?lon:minlon;
                   maxlon=lon>maxlon?lon:maxlon;
                   System.out.print("minlat:"+minlat+\t);
                   System.out.print("maxlat:"+maxlat+\t);
                   System.out.print("minlon:"+minlon+\t);
                   System.out.println("maxlon:"+maxlon);
                   String outkey_reduce=key.toString();
                   String outputStr=new String(minlat.toString()+\t+maxlat.toString()+\t+minlon.toString()+\t+maxlon.toString());
                   context.write(new Text(outkey_reduce), new Text(outputStr));
            }
        }catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

Job:

package latandlonRange;

import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class LatAndLonRangeJob {
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        try{
            Job job=new Job();
            job.setJarByClass(LatAndLonRangeJob.class);
            
            String inpath = new String("hdfs://172.18.32.177:9000/Amelie-ting/total_line_station_info_201508.csv");
            String outpath = new String("hdfs://172.18.32.177:9000/Amelie-ting/LatandlonRange/");
            FileInputFormat.addInputPath(job, new Path(inpath));
            FileOutputFormat.setOutputPath(job, new Path(outpath));  
             
            job.setMapperClass(LatAndLonRangeMapper.class);  
            job.setReducerClass(LatAndLonRangeReduce.class);  
            
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
            
            job.setOutputKeyClass(Text.class);  
            job.setOutputValueClass(Text.class);  
                  
            System.exit(job.waitForCompletion(true)?0:1);  
        }catch (IOException | ClassNotFoundException | InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

}

此代码运行不成功:不是我想要的:接下来作修改:

 

LatAndLonRange项目记录