首页 > 代码库 > hadoop2.5.2学习13-MR之新浪微博-DF的实现

hadoop2.5.2学习13-MR之新浪微博-DF的实现

本文接上篇hadoop2.5.2学习13-MR之新浪微博TF-IDF算法简介

上篇微博实现了第一个mappreduce, 统计的词频TF和微博总数N
本文将统计DF,即每个词条在多少个文章中出现。我们只需要对一个mapreduce的输出结果的词频数进行统计,就可以得到DF

主要是读取一个的mapreduce的四个文件, 从中区分TF数据的三个文件

通过获取Filesplit碎片段,

FileSplit fileSplit = (FileSplit) context.getInputSplit();

因为在mapreduce中原始数据会拆分成split作为map的输入数据,
反过来, 每个map都有一个split与之对应, 而且每个split都属于一个文件,

那么通过对split进行过滤,不考虑split为part-r-00003, 及微博总数统计的那个输出文件,就可以的到其他三个文件。

package com.chb.weibo2;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
/**
 *  统计idf
 *  输入数据:w+"_"+id
 */
public class SecondMapper extends Mapper<Text, Text, Text, IntWritable>{
    @Override
    protected void map(Text key, Text value, Context context)
            throws IOException, InterruptedException {

        FileSplit fileSplit = (FileSplit) context.getInputSplit();
        /**
         * 第一个的MR的输出分四个reduce,生成四个文件,在自定义分区中,最后一个分区是计算微博总数
         * 
         */
        if (!fileSplit.getPath().getName().equals("part-0003")) {
            if (key.toString().split("_").length == 2 ) {
                String w = key.toString().split("_")[0];
                String id = key.toString().split("_")[1];
                context.write(new Text(w), new IntWritable(1));
            }
        }
    }
}

在reduce中就是进行统计求和

package com.chb.weibo2;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;


public class SecondReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable iw : values) {
            sum += iw.get();
        }
        context.write(key, new IntWritable(sum));
    }
}

执行代码:

package com.chb.weibo2;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class SecondRunJob {
    public static void main(String[] args) throws Exception {
        System.setProperty("HADOOP_USER_NAME", "chb");
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(conf);

        Job job = Job.getInstance();
        job.setJobName("SecondRunJob");
        job.setJar("C:\\Users\\12285\\Desktop\\weibo.jar");
        job.setJarByClass(SecondRunJob.class);

        job.setMapperClass(SecondMapper.class);
        job.setReducerClass(SecondReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        job.setInputFormatClass(KeyValueTextInputFormat.class);
        Path in = new Path("/user/chb/output/outweibo1");
        FileInputFormat.addInputPath(job, in);
        Path out = new Path("/user/chb/output/outweibo2");
        if (fs.exists(out)) {
            fs.delete(out, true);
        }
        FileOutputFormat.setOutputPath(job, out);

        boolean f = job.waitForCompletion(true);
        if (f) {
            System.out.println("第二次job执行完成。。。");
        }
    }
}

DF统计结果:

统计出每个词条的DF, 即在多少篇微博中出现过。

些   1
亢奋  2
交易  1
交易日 2
交给  1
亦   1
产品  4
享   14
享受  6
享用  7
京东  1
亮   1
亮相  7
亲   10
亲子  2
亲密  1

3第三个mapreduce, 计算词条权重

经过第一个mapreduce计算出TF, N
第二个mapreduce计算出DF
技术分享
在这三个数据中,TF数据最多, 如果TF数据达到T级别, 每个分片1G, 也需要上千个mapTask,
要加载上千次N和DF数据, 这是非常耗资源的,
首先N和DF的数据很小, N只有一条数据, 词库的常用词不多, DF那就不大,

我们将小表加载到缓存中

        //将小表加载到内存中,微博总数
        job.addCacheFile(new Path("/user/chb/output/weibo1/part-r-00003").toUri());
        //df
        job.addCacheFile(new Path("/user/chb/output/weibo1/part-r-00001").toUri());

在map的setup方法中读取,setup 对每个mapTask只执行一次, 进行初始化。

//通过使用DistributedCache ,将微博总数,和DF统计的文件读入内存中,以map的形式存储
    HashMap<String, Integer> countMap = null;
    HashMap<String, Integer> dfMap = null;
    @Override
    protected void setup(Context context)
            throws IOException, InterruptedException {
        //通过contex获取存在缓存中的文件的uri
        URI[] uris =    context.getCacheFiles();
        if (uris != null) {
            for (URI uri : uris) {
                //如何读取文件内容?
                BufferedReader br = new BufferedReader(new FileReader(uri.getPath())); 
                if (uri.getPath().endsWith("part-r-00003")) {//获取微博总数的文件
                    countMap = new HashMap<String, Integer>();
                    //统计微博总数的文件中只有一行, count  数量
                    String line = br.readLine();
                    if (line.startsWith("count")) {
                        countMap.put("count", Integer.parseInt(line.split("\t")[1]));
                    }
                }else if(uri.getPath().endsWith("part-r-00000")){//获取df文件
                    String line = br.readLine();
                    //line: word 在多少个文章中出现
                    String word = line.split("\t")[0];
                    String count = line.split("\t")[1];
                    dfMap.put(word, Integer.parseInt(count));
                }
                br.close();
            }
        }
    }

上面的代码有问题,一直报错 /user/chb/output/outweibo1/part-r-00003不存在,
我将获取path的那行单独出来, 就可以了,但是没有明白为什么。
技术分享

                //如何读取文件内容?
                if (uri.getPath().endsWith("part-r-00003")) {//获取微博总数的文件
                    Path path = new Path(uri.getPath());
                    BufferedReader br = new BufferedReader(new FileReader(path.getName())); 
                    countMap = new HashMap<String, Integer>();
                    //统计微博总数的文件中只有一行, count  数量
                    String line = br.readLine();
                    if (line.startsWith("count")) {
                        countMap.put("count", Integer.parseInt(line.split("\t")[1]));
                    }
                    br.close();
                }

reduce

reduce就简单了, 只是将每篇微博的各个词条的权重以此输出:

package com.chb.weibo3;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class LastReducer extends Reducer<Text, Text, Text, Text>{
    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException,
            InterruptedException {
        StringBuilder sb = new StringBuilder();
        for (Text text : values) {
            sb.append(text.toString()+"\t");
        }
        context.write(key, new Text(sb.toString()));
    }
}

执行程序

package com.chb.weibo3;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class LastRunJob {
    public static void main(String[] args) throws Exception {
        System.setProperty("HADOOP_USER_NAME", "chb");
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(conf);

        Job job = Job.getInstance();
        job.setJar("C:\\Users\\12285\\Desktop\\weibo3.jar");
        job.setJarByClass(LastRunJob.class);
        job.setJobName("LastRunJob");

        job.setMapperClass(LastMapper.class);
        job.setReducerClass(LastReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        job.setInputFormatClass(KeyValueTextInputFormat.class);


        //将小表加载到内存中,微博总数
        job.addCacheFile(new Path("hdfs://TEST:9000/user/chb/output/outweibo1/part-r-00003").toUri());
        //df
        job.addCacheFile(new Path("hdfs://TEST:9000/user/chb/output/outweibo2/part-r-00000").toUri());

        FileInputFormat.addInputPath(job, new Path("hdfs://TEST:9000/user/chb/output/outweibo1/"));
        Path out = new Path("hdfs://TEST:9000/user/chb/output/outweibo3/");
        if (fs.exists(out)) {
            fs.delete(out, true);
        }
        FileOutputFormat.setOutputPath(job, out);

        boolean f = job.waitForCompletion(true);
        if (f) {
            System.out.println("最后一个mapreduce执行完成");

        }
    }
}

结果如下:

3823890201582094    我:7.783640596221253 香喷喷:12.553286978683289:7.65728279297819:8.713417653379183 豆浆:13.183347464017316   早晨:8.525359754082631    想约:10.722584331418851   喝上:10.722584331418851   就可以:10.525380377809771:10.525380377809771:7.16703787691222  电饭煲:11.434055402812444:2.1972245773362196:14.550344638905543:13.328818040700815:8.37930948405285  自动:12.108878692538742   今天:5.780743515792329    油条:9.54136924893133:11.166992617563398    豆浆机:4.1588830833596715  一小时:10.927663610051221  几小时:13.130529940070723  自然:13.130529940070723:8.580918882296782:7.824046010856292:13.183347464017316    起床:9.436997742590188    
3823890210294392    约:3.58351893845611:3.8918202981106265    豆浆:4.394449154672439    今天:5.780743515792329:4.394449154672439 油条:9.54136924893133 
3823890235477306    一会儿:15.327754517406941:5.991464547107982 儿子:9.911654115202522:3.58351893845611  动物园:12.553286978683289:9.54136924893133:9.043577154098081 
3823890239358658    继续:11.166992617563398   支持:7.522400231387125    
3823890256464940    次:13.130529940070723    起来:7.052721049232323:3.58351893845611:11.166992617563398:5.991464547107982 
3823890264861035    约:3.58351893845611:3.8918202981106265:4.394449154672439 吃饭:9.326878188224134:7.221835825288449 
3823890281649563    和家人:11.166992617563398  一起:6.089044875446846    吃个:12.108878692538742   相约:8.788898309344878:11.166992617563398    
3823890285529671    了:4.394449154672439 今天:5.780743515792329    广场:12.553286978683289   滑旱冰:15.327754517406941  一起:6.089044875446846:3.58351893845611  
3823890294242412    九阳:2.1972245773362196:3.8918202981106265    全球:12.553286978683289   早餐:6.516193076042964:5.780743515792329:6.591673732008658 一起:6.089044875446846    首发:6.516193076042964    预约:5.545177444479562:6.8679744089702925    即将:11.434055402812444:6.8679744089702925    要约:10.187500401613525   豆浆机:4.1588830833596715  
3823890314914825    一起:6.089044875446846:3.58351893845611:5.991464547107982 逛街:10.047761041692553   姐妹:11.744235578950832   今天:5.780743515792329    天气晴好:13.94146015628705:9.043577154098081:7.700295203420117 
3823890323625419    邮:11.166992617563398    全国:12.108878692538742   jyl-:15.327754517406941:9.54136924893133  joyoung:9.656627474604603   九阳:2.1972245773362196   
3823890335901756    的:2.1972245773362196    今年:12.553286978683289   暖和:12.553286978683289   果断:15.327754517406941   出来:11.434055402812444   逛街:10.047761041692553   一天:9.780698256443507:8.37930948405285  今天是:10.722584331418851  
3823890364788305    出:11.166992617563398    来了:8.15507488781144 去去:13.94146015628705:5.991464547107982 好友:12.108878692538742   赏花:11.744235578950832   踏青:9.780698256443507:3.58351893845611  一起:6.089044875446846    春天:7.16703787691222 
3823890369489295    让:7.824046010856292:26.261059880141445    下载:13.130529940070723   九阳:2.1972245773362196:6.664409020350408:3.8918202981106265    九阴真经:13.94146015628705  免费:12.553286978683289:15.327754517406941    了吧:15.327754517406941   平湖:15.327754517406941   走火入魔:11.434055402812444 真经:10.927663610051221   小子:15.327754517406941:10.525380377809771:6.591673732008658 三叉神经:15.327754517406941:6.802394763324311 毁了:15.327754517406941:15.327754517406941    
3823890373686361    一起:6.089044875446846:3.58351893845611  理发:12.553286978683289:4.394449154672439 小伙伴:9.436997742590188:5.991464547107982 
3823890378201539    吃:6.8679744089702925    得很:13.130529940070723:8.1886891244442   今天:5.780743515792329    姐妹:11.744235578950832   开心:8.788898309344878:5.991464547107982:8.439015410352214 周末:8.317766166719343    逛街:10.047761041692553:4.394449154672439:3.58351893845611  美食:8.954673628956414    
<script type="text/javascript"> $(function () { $(‘pre.prettyprint code‘).each(function () { var lines = $(this).text().split(‘\n‘).length; var $numbering = $(‘
    ‘).addClass(‘pre-numbering‘).hide(); $(this).addClass(‘has-numbering‘).parent().append($numbering); for (i = 1; i <= lines; i++) { $numbering.append($(‘
  • ‘).text(i)); }; $numbering.fadeIn(1700); }); }); </script>

    hadoop2.5.2学习13-MR之新浪微博-DF的实现