首页 > 代码库 > 通过SDK提交MapReduce作业

通过SDK提交MapReduce作业

技术分享大数据计算服务(MaxCompute)
快速、完全托管的TB/PB级数据仓库解决方案,向用户提供了完善的数据导入方案以及多种经典的分布式计算模型,能够更快速的解决用户海量数据计算问题,有效降低企业成本,并保障数据安全。
了解更多

通过SDK提交MR作业的步骤如下:


步骤一:
      编写MR程序,导出jar包,jar包可以不包含main方法(main方法是在本地执行)
        
步骤二:
       上传jar包及所需的资源
       (1) 通过console上传jar包到server端: add jar xxx.jar
       (2)也可以通过SDK写程序上传,参考相关方法:com.aliyun.odps.ODPS.resources().create(xxx,xxx)

 

步骤三:

   对main方法进行改进 ,主要包括两部分:
   (1)设置账户信息(accessId/accessKey/endpoint),充当console/conf/odps_conf.ini中的配置功能
   (2)设置MR中使用的资源,充当jar -resources xxx1.jar,xxx2.jar的功能
           通过方法job.setResources("test13.jar");设置
 
注:本地用户Mapper类和Reducer类方法是空的(本地并不会执行这份代码),存在的目的是保证main方法编译通过

package com.aliyun.odps.examples.mr;

import com.aliyun.odps.Odps;
import com.aliyun.odps.account.Account;
import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.data.TableInfo;
import com.aliyun.odps.mapred.JobClient;
import com.aliyun.odps.mapred.MapperBase;
import com.aliyun.odps.mapred.ReducerBase;
import com.aliyun.odps.mapred.RunningJob;
import com.aliyun.odps.mapred.conf.JobConf;
import com.aliyun.odps.mapred.conf.SessionState;
import com.aliyun.odps.mapred.utils.InputUtils;
import com.aliyun.odps.mapred.utils.OutputUtils;
import com.aliyun.odps.mapred.utils.SchemaUtils;

/* 
 * 该示例展示了MapReduce程序中的基本结构
 * 
 */
public class WordCount {

  public static class TokenizerMapper extends MapperBase {
  }

  /**
   * A combiner class that combines map output by sum them.
   */
  public static class SumCombiner extends ReducerBase {
  }

  /**
   * A reducer class that just emits the sum of the input values.
   */
  public static class SumReducer extends ReducerBase {

  }

  public static void main(String[] args) throws Exception {

    // /////////////额外添加的代码//////////
    String endpoint = "your_endpoint";
    String accessId = "your_access_id";
    String accessKey = "your_access_key";
    String project = "your_project";

    Account account = new AliyunAccount(accessId, accessKey);
    Odps odps = new Odps(account);
    odps.setDefaultProject(project);
    odps.setEndpoint(endpoint);

    SessionState.get().setOdps(odps);
    SessionState.get().setLocalRun(false);
    // ///////////////////////////////

    JobConf job = new JobConf();
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(SumCombiner.class);
    job.setReducerClass(SumReducer.class);

    // /////////////额外添加的代码//////////
    // 资源名称列表,多个资源用逗号分隔
    job.setResources("test13.jar");
    // //////////////////////////////////

    job.setMapOutputKeySchema(SchemaUtils.fromString("word:string"));
    job.setMapOutputValueSchema(SchemaUtils.fromString("count:bigint"));

    InputUtils.addTable(TableInfo.builder().tableName("wc_in").build(), job);
    OutputUtils.addTable(TableInfo.builder().tableName("wc_out").build(), job);

    RunningJob rj = JobClient.runJob(job);
    rj.waitForCompletion();
  }

}

 阅读原文请点击

通过SDK提交MapReduce作业