首页 > 代码库 > 定制版Sqoop-- VDataHub介绍

定制版Sqoop-- VDataHub介绍

1.1 产品概述

VDataHub基于Apache Sqoop,最初定位是用于将关系数据库中的数据导入Hadoop/Hive/HBaseSqoop基于HadoopMapReduce来完成数据导入导出工作,提供了很好的容错性。刚开始项目组也仅仅直接采用社区版本来完成数据导入导出。但在使用过程中,我们发现有很多地方是Sqoop现有版本没法支持的,如果不解决,是不能用于现有数据平台建设中的。

基于此,我们决定基于Cloudera的发行版sqoop-1.4.4-cdh5.1.2进行二次开发,相当于开始维护公司内部版本的Sqoop。为了进行区分,单独取名VDataHub,含义就是它是作为海量(V)数据(Data)的中转站(Hub),定位于大数据平台构建过程中的通用数据迁移工具。

在延续社区版Sqoop功能之外,VDataHub主要完善、解决了以下生产实践中的棘手问题:

    ⑴中文字符乱码:MySQL数据库存在着latin1utf8两种编码的数据表,而存入数据平台需要统一转换为utf8存储。

    ⑵带宽占用限制:在对某些线上库拉取数据时,需要严格控制所占用的网络带宽,防止对线上应用的影响。

    ⑶多数据源支持:需要提供更加灵活的数据源和目标存储位置的支持,比如可以是HTTP,FTP,HDFS,LOCAL, etc

    ⑷已有命令整合:Hive/HBase本身提供了很多有用的工具命令完成导入导出工作,可以将其无缝集成进来。

1.2 相关文档

暂无

 

1.3  和其他项目的依赖

项目 ID

项目名称

依赖描述

cdh5.1.2

Cloudera发行版

基于sqoop-1.4.4

 

 

 

 

 

2 编译开发

2.1 环境搭建

首先从代码库中下载源码,放到本地开发目录中,比如我放到D:\workspace

co https://svn.vpstore.com/VDataHub 

然后在自己的开发环境安装JDKAntJDK选用1.6版本以上,Ant选择1.7以上,并配置到PATH环境变量中。验证是否装好就是命令行下执行看会不会报错说,不存在的命令。VDataHub延续Sqoop的思路,采用ant编译打包、采用ivy作为依赖管理工具。当然,如果需要生成最终能安装部署的分发包,需要将源码放到Linux环境进行编译打包,在Windows下有些执行会有点问题(估计是可以的, 只是我老是遇到点小问题, 就难得去折腾了,反正是要在Linux下运行的,打包也在上面做省心点)。在Linux需要额外安装asciidoc/ make/ python2.5+/ xmlto/ tar/ gzip等工具。

采用命令行工具cmd进入D:\workspace\VDataHub,执行ant -p, 然后它会打印出目前VDataHub支持的ant任务,比如ant clean, ant test, ant tar等。在第一次执行的时候,ivy会首先下载项目的依赖, 将其放入lib目录下。由于很多是从国外的站点下载jar文件等,网络会比较卡,而且经常容易假死住,没办法,只有Ctrl+C强制暂停掉,然后重新执行刚才的命令,相信我,可能你需要执行4~5这样的操作后,才可能把所有的依赖包下载完成。

开发工具我们选择Eclipse LUNA,虽然已经把源码下载到本地目录了,但这时候你没法导入到eclipse中,需要在D:\workspace\VDataHub下执行ant eclipse命令成功后才可以,其实就是完成将源码生成eclipse工程。成功完成后,你就可以打开eclipse了, 然后选择[file]->[import...]->[Existing Projects into Workspace],选中源码所在位置,然后[next]->[finish]即可。

初次导入的时候,你可以看到在IDE中是会提示错误的,主要是Sqoop类等,报的错说SqoopVersion不存在。其实这个类是会在执行ant compile等命令的时候通过脚本现场生成的,所以源码肯定会报错,不过可以忽略不影响。另外有一个就是src/test的前两个目录报package路径不对,那个同样可以不用理睬。

完成导入工作后,就可以开启你源码bug fixnew feature的开发了。对了,如果你想改动本次最终打包的版本号和项目名称,需要对cdh.build.propertiesbuild.xml进行修改。第一个文件改动的地方是第7,如“version=vm1.0.0”,第二个文件改动的地方是第188,189行,如“<property name="name" value=http://www.mamicode.com/"vdatahub" />” “<property name="Name" value=http://www.mamicode.com/"VDataHub" />”。

 

2.2 远程调试

功能开发完成后,需要放到Linux机器上进行调试。主要是VDataHubSqoop一样都依赖于Hadoop的运行环境。这个时候需要用到远程调试功能。具体配置方法是:

a.在VDataHub安装的Linux机器上修改$HADOOP_HOME/bin/hadoop文件,将其中一行: HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"改为HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS -Xdebug -Xrunjdwp:transport=dt_socket,address=9991,server=y,suspend=y"

b.在eclipse的“debug configuration”中新建一个“remote java application”,hostlinux机器的ip,端口填刚才配置的9991.

c.在Linux机器上执行VDataHub的相关工具命令,如sqoop version,这时候其将会开启9991端口等待客户端去连接,这时,你在eclipse中选择刚才创建的新remote java application,并在想要调试的的源码位置上打上断点便是。

 

2.3 回归测试

作为一款基础性工具,任何的源码改动都需要经过严格的测试,否则是没法拿来使用的。所以最最起码需要将所有的测试进行跑一遍,如果全部通过,才打到发包的最基本要求。此外还会从应用角度,对相关重要功能进行实际测试。

具体跑源码中测试代码的方法是,在Linux机器上VDataHub根目录上执行ant test命令。需要注意的是,执行之前需要更改某些文件的执行权限,比如testdata/hive下的某些文件,如提示无执行权限,用chmod a+x 更改下。有的时候新打的包bin/目录下几乎都是不具有执行权限的,没法用,也可以采用同样的方法改下。

跑的相关结果会放到build/test目录下,如果报错,可以去关注下面的一些文件,看到底是什么原因导致的。

3 版本使用 

3.1 对现有设计分析

目前VDataHub解决第一节提到的中文乱码和限流是直接改动的已有代码逻辑完成的,而对支持多数据源之间的导入导出和集成已有工具命令则是通过开发Sqoop插件的方式完成的。

下面通过对VDataHub开发的expand工具,来大概说下开发Sqoop插件工具的方法。

定义一个插件类,扩展ToolPlugin

public class ExpandPlugin extends ToolPlugin {

  public List<ToolDesc> getTools() {

    return Collections.singletonList(new ToolDesc("expand",

    ExpandTool.class, "Expand sqoop tools for special usage."));

  }

}

定义具体工具类,继承于BaseSqoopTool,完成参数解析、校验、方法执行入口等工作:

public class ExpandTool extends com.cloudera.sqoop.tool.BaseSqoopTool {

 

  public static final Log LOG = LogFactory.getLog(ExpandTool.class.getName());

 

  public ExpandTool() {

    super("expand");

  }

 

  @Override

  public int run(SqoopOptions options) {

    int ret = 0;

    Expandable executor = new ExpandExecutor(options);

    ret = executor.execute();

    return ret;

  }

 

  @Override

  /** Configure the command-line arguments we expect to receive */

  public void configureOptions(ToolOptions toolOptions) {

    RelatedOptions expandOpts = new RelatedOptions("expand tool arguments");

    expandOpts

        .addOption(OptionBuilder

            .withArgName("src")

            .hasArg()

            .withDescription(

                "Place from where Sqoop expand tool will extract data")

            .withLongOpt(SRC_URI_ARG).create());

    expandOpts.addOption(OptionBuilder.withArgName("des").hasArg()

        .withDescription("Place to where Sqoop expand tool will store data")

        .withLongOpt(DES_URI_ARG).create());

    expandOpts.addOption(OptionBuilder.withArgName("shell").hasArg()

        .withDescription("Execute ‘shell cmd‘ in Hadoop/Hive/HBase")

        .withLongOpt(INNER_SHELL_ARG).create());

 

    toolOptions.addUniqueOptions(expandOpts);

  }

 

  @Override

  public void applyOptions(CommandLine in, SqoopOptions out)

      throws InvalidOptionsException {

    if (in.hasOption(SRC_URI_ARG)) {

      out.setSrcUri(in.getOptionValue(SRC_URI_ARG));

    }

    if (in.hasOption(DES_URI_ARG)) {

      out.setDesUri(in.getOptionValue(DES_URI_ARG));

    }

    if (in.hasOption(INNER_SHELL_ARG)) {

      out.setInnerShell(in.getOptionValue(INNER_SHELL_ARG));

    }

  }

 

  @Override

  public void validateOptions(SqoopOptions options)

      throws InvalidOptionsException {

    validateExpandOptions(options);

  }

 

  @Override

  public void printHelp(ToolOptions toolOptions) {

    System.out.println("usage: sqoop " + getToolName()

        + " [GENERIC-ARGS] [TOOL-ARGS]");

    System.out.println("");

    toolOptions.printHelp();

  }

 

  /**

   * Validate export-specific arguments.

   * 

   * @param options

   *          the configured SqoopOptions to check

   */

  protected void validateExpandOptions(SqoopOptions options)

      throws InvalidOptionsException {

    if (options.getSrcUri() == null && options.getDesUri() == null

        && options.getInnerShell() == null) {

      throw new InvalidOptionsException(

          "Expand requires a united (--src-uri, --des-uri) or a --inner-shell argument."

              + HELP_STR);

    } else if ((options.getSrcUri() != null || options.getDesUri() != null)

        && options.getInnerShell() != null) {

      throw new InvalidOptionsException(

          "Expand can‘t permit (--src-uri, --des-uri) and --inner-shell arguments exist together."

              + HELP_STR);

    } else if (options.getSrcUri() != null && options.getDesUri() == null) {

      throw new InvalidOptionsException(

          "Expand needs --src-uri and --des-uri arguments exist together."

              + HELP_STR);

    } else if (options.getDesUri() != null && options.getSrcUri() == null) {

      throw new InvalidOptionsException(

          "Expand needs --src-uri and --des-uri arguments exist together."

              + HELP_STR);

    }

  }

}

 

conf/sqoop-site.xml中,配置一个属性,将插件类配置进去:

<property>

    <name>sqoop.tool.plugins</name>

    <value></value>

    <description>A comma-delimited list of ToolPlugin implementations

      which are consulted, in order, to register SqoopTool instances which

      allow third-party tools to be used.

    </description>

  </property>

 

3.2 工具使用介绍

a、如对从MySQLHBase导数据进行限流控制,需要加入参数,如5MB的带宽控制(单位是字节):

bin/sqoop import -Dbandwidth=5242880 --connect jdbc:mysql://192.168.1.56:3306/db_name --table t_order

 

b、从ftp拉取数据文件到hdfs,(目前支持ftp/hdfs/local/http的互相转换,local指本地文件)

bin/sqoop expand --src-uri ftp://ftptest:111111@192.168.1.56:21/app-site.xml  hdfs://hadoop1:9000/data 

 

另外的都类似,如:

bin/sqoop expand --src-uri http://www.baidu.com/ --des-uri local://usr/local/src

bin/sqoop expand --src-uri hdfs://hadoop1:9000/hbase/hbase.id --des-uri local://usr/local/src

 

通过执行bin/sqoop expand help可以打印帮助说明:

[cdh@hadoop1 sqoop-1.4.4-cdh5.1.2]$ sqoop expand help

14/11/21 16:13:23 INFO sqoop.Sqoop: Running version: vm1.0.0

Expand requires a united (--src-uri, --des-uri) or a --inner-shell argument.

Try --help for usage instructions.

usage: sqoop expand [GENERIC-ARGS] [TOOL-ARGS]

 

expand tool arguments:

   --des-uri <des>          Place to where Sqoop expand tool will store

                            data

   --inner-shell <shell>    Execute ‘shell cmd‘ in Hadoop/Hive/HBase

   --src-uri <src>          Place from where Sqoop expand tool will

                            extract data

 

c、通过VDataHub执行shell命令,需要用双引号将执行的shell命令字符串括起来才能被正确解析,如果内部还需要包含双引号用转义符号\”处理下。

bin/sqoop expand --inner-shell "hive -e \"show databases;\""

4 后期维护 

4.1 新功能开发

目前暂时放一下,前期做的额外开发已经可以满足项目需要了。但不排除会遇到新的需求,可以继续基于VDataHub1.0.0进行开发。

 

4.2 与社区同步

一旦自己维护私有版本,就是会遇到个问题,社区发展太快,自己团队人手有限,如何跟进。我采取的思路是对cloudera发布的新版本将其所打的patch都大致过一遍,如果遇到我们项目也急需的功能或者重大bug修复,就集中力量研究下相关patch,将其整合到自己的版本中,其他无关痛痒的就暂且忽略掉。

但也会碰到问题哈, 就是可能有些直接是关联的。那该怎么弄呢。如果靠眼睛一个个比对做了那些改动,太容易出问题了。可以用beyond compare工具比较自己版本和新版本的差异来做。(主要自己现在不是用的git做版本控制,比较麻烦)

 

资源:

http://archive-primary.cloudera.com/cdh5/cdh/5/

http://sqoop.apache.org/

https://github.com/apache/sqoop

 

 

定制版Sqoop-- VDataHub介绍