首页 > 代码库 > sqoop1.99.4 JAVA API操作

sqoop1.99.4 JAVA API操作


 貌似天国还没有介绍1.99.4的java操作代码的,自己吃一次螃蟹吧


如果你是MAVEN项目

1 <dependency>2   <groupId>org.apache.sqoop</groupId>3     <artifactId>sqoop-client</artifactId>4     <version>1.99.4</version>5 </dependency>

如果你是java项目

导入sqoop1.99.4中shell目录下的lib里面全部jar包就行(不用server中的)


HDFS-MYSQL

  1 package org.admln.sqoopOperate;  2   3 import org.apache.sqoop.client.SqoopClient;  4 import org.apache.sqoop.model.MFromConfig;  5 import org.apache.sqoop.model.MJob;  6 import org.apache.sqoop.model.MLink;  7 import org.apache.sqoop.model.MLinkConfig;  8 import org.apache.sqoop.model.MSubmission;  9 import org.apache.sqoop.model.MToConfig; 10 import org.apache.sqoop.submission.counter.Counter; 11 import org.apache.sqoop.submission.counter.CounterGroup; 12 import org.apache.sqoop.submission.counter.Counters; 13 import org.apache.sqoop.validation.Status; 14  15 public class HDFSToMysql { 16     public static void main(String[] args) { 17         sqoopTransfer(); 18     } 19     public static void sqoopTransfer() { 20         //初始化 21         String url = "http://hadoop:12000/sqoop/"; 22         SqoopClient client = new SqoopClient(url); 23          24         //创建一个源链接 HDFS 25         long fromConnectorId = 1; 26         MLink fromLink = client.createLink(fromConnectorId); 27         fromLink.setName("HDFS connector"); 28         fromLink.setCreationUser("admln"); 29         MLinkConfig fromLinkConfig = fromLink.getConnectorLinkConfig(); 30         fromLinkConfig.getStringInput("linkConfig.uri").setValue("hdfs://hadoop:8020/"); 31         Status fromStatus = client.saveLink(fromLink); 32         if(fromStatus.canProceed()) { 33          System.out.println("创建HDFS Link成功,ID为: " + fromLink.getPersistenceId()); 34         } else { 35          System.out.println("创建HDFS Link失败"); 36         } 37         //创建一个目的地链接 JDBC 38         long toConnectorId = 2; 39         MLink toLink = client.createLink(toConnectorId); 40         toLink.setName("JDBC connector"); 41         toLink.setCreationUser("admln"); 42         MLinkConfig toLinkConfig = toLink.getConnectorLinkConfig(); 43         toLinkConfig.getStringInput("linkConfig.connectionString").setValue("jdbc:mysql://hadoop:3306/hive"); 44         toLinkConfig.getStringInput("linkConfig.jdbcDriver").setValue("com.mysql.jdbc.Driver"); 45         toLinkConfig.getStringInput("linkConfig.username").setValue("hive"); 46         toLinkConfig.getStringInput("linkConfig.password").setValue("hive"); 47         Status toStatus = client.saveLink(toLink); 48         if(toStatus.canProceed()) { 49          System.out.println("创建JDBC Link成功,ID为: " + toLink.getPersistenceId()); 50         } else { 51          System.out.println("创建JDBC Link失败"); 52         } 53          54         //创建一个任务 55         long fromLinkId = fromLink.getPersistenceId(); 56         long toLinkId = toLink.getPersistenceId(); 57         MJob job = client.createJob(fromLinkId, toLinkId); 58         job.setName("HDFS to MySQL job"); 59         job.setCreationUser("admln"); 60         //设置源链接任务配置信息 61         MFromConfig fromJobConfig = job.getFromJobConfig(); 62         fromJobConfig.getStringInput("fromJobConfig.inputDirectory").setValue("/out/aboutyunLog/HiveExport/ipstatistical/data"); 63          64         //创建目的地链接任务配置信息 65         MToConfig toJobConfig = job.getToJobConfig(); 66         toJobConfig.getStringInput("toJobConfig.schemaName").setValue("aboutyunlog"); 67         toJobConfig.getStringInput("toJobConfig.tableName").setValue("ipstatistical"); 68         //toJobConfig.getStringInput("fromJobConfig.partitionColumn").setValue("id"); 69         // set the driver config values 70         //MDriverConfig driverConfig = job.getDriverConfig(); 71         //driverConfig.getStringInput("throttlingConfig.numExtractors").setValue("3");//这句还没弄明白 72         Status status = client.saveJob(job); 73         if(status.canProceed()) { 74          System.out.println("JOB创建成功,ID为: "+ job.getPersistenceId()); 75         } else { 76          System.out.println("JOB创建失败。"); 77         } 78          79         //启动任务 80         long jobId = job.getPersistenceId(); 81         MSubmission submission = client.startJob(jobId); 82         System.out.println("JOB提交状态为 : " + submission.getStatus()); 83         while(submission.getStatus().isRunning() && submission.getProgress() != -1) { 84           System.out.println("进度 : " + String.format("%.2f %%", submission.getProgress() * 100)); 85           //三秒报告一次进度 86           try { 87             Thread.sleep(3000); 88           } catch (InterruptedException e) { 89             e.printStackTrace(); 90           } 91         } 92         System.out.println("JOB执行结束... ..."); 93         System.out.println("Hadoop任务ID为 :" + submission.getExternalId()); 94         Counters counters = submission.getCounters(); 95         if(counters != null) { 96           System.out.println("计数器:"); 97           for(CounterGroup group : counters) { 98             System.out.print("\t"); 99             System.out.println(group.getName());100             for(Counter counter : group) {101               System.out.print("\t\t");102               System.out.print(counter.getName());103               System.out.print(": ");104               System.out.println(counter.getValue());105             }106           }107         }108         if(submission.getExceptionInfo() != null) {109           System.out.println("JOB执行异常,异常信息为 : " +submission.getExceptionInfo());110         }111         System.out.println("HDFS通过sqoop传输数据到MySQL统计执行完毕");112     }113 }

MYSQL-HDFS

  1 package org.admln.sqoopOperate;  2   3 import org.apache.sqoop.client.SqoopClient;  4 import org.apache.sqoop.model.MDriverConfig;  5 import org.apache.sqoop.model.MFromConfig;  6 import org.apache.sqoop.model.MJob;  7 import org.apache.sqoop.model.MLink;  8 import org.apache.sqoop.model.MLinkConfig;  9 import org.apache.sqoop.model.MSubmission; 10 import org.apache.sqoop.model.MToConfig; 11 import org.apache.sqoop.submission.counter.Counter; 12 import org.apache.sqoop.submission.counter.CounterGroup; 13 import org.apache.sqoop.submission.counter.Counters; 14 import org.apache.sqoop.validation.Status; 15  16 public class MysqlToHDFS { 17     public static void main(String[] args) { 18         sqoopTransfer(); 19     } 20     public static void sqoopTransfer() { 21         //初始化 22         String url = "http://hadoop:12000/sqoop/"; 23         SqoopClient client = new SqoopClient(url); 24          25         //创建一个源链接 JDBC 26         long fromConnectorId = 2; 27         MLink fromLink = client.createLink(fromConnectorId); 28         fromLink.setName("JDBC connector"); 29         fromLink.setCreationUser("admln"); 30         MLinkConfig fromLinkConfig = fromLink.getConnectorLinkConfig(); 31         fromLinkConfig.getStringInput("linkConfig.connectionString").setValue("jdbc:mysql://hadoop:3306/hive"); 32         fromLinkConfig.getStringInput("linkConfig.jdbcDriver").setValue("com.mysql.jdbc.Driver"); 33         fromLinkConfig.getStringInput("linkConfig.username").setValue("hive"); 34         fromLinkConfig.getStringInput("linkConfig.password").setValue("hive"); 35         Status fromStatus = client.saveLink(fromLink); 36         if(fromStatus.canProceed()) { 37          System.out.println("创建JDBC Link成功,ID为: " + fromLink.getPersistenceId()); 38         } else { 39          System.out.println("创建JDBC Link失败"); 40         } 41         //创建一个目的地链接HDFS 42         long toConnectorId = 1; 43         MLink toLink = client.createLink(toConnectorId); 44         toLink.setName("HDFS connector"); 45         toLink.setCreationUser("admln"); 46         MLinkConfig toLinkConfig = toLink.getConnectorLinkConfig(); 47         toLinkConfig.getStringInput("linkConfig.uri").setValue("hdfs://hadoop:8020/"); 48         Status toStatus = client.saveLink(toLink); 49         if(toStatus.canProceed()) { 50          System.out.println("创建HDFS Link成功,ID为: " + toLink.getPersistenceId()); 51         } else { 52          System.out.println("创建HDFS Link失败"); 53         } 54          55         //创建一个任务 56         long fromLinkId = fromLink.getPersistenceId(); 57         long toLinkId = toLink.getPersistenceId(); 58         MJob job = client.createJob(fromLinkId, toLinkId); 59         job.setName("MySQL to HDFS job"); 60         job.setCreationUser("admln"); 61         //设置源链接任务配置信息 62         MFromConfig fromJobConfig = job.getFromJobConfig(); 63         fromJobConfig.getStringInput("fromJobConfig.schemaName").setValue("sqoop"); 64         fromJobConfig.getStringInput("fromJobConfig.tableName").setValue("sqoop"); 65         fromJobConfig.getStringInput("fromJobConfig.partitionColumn").setValue("id"); 66         MToConfig toJobConfig = job.getToJobConfig(); 67         toJobConfig.getStringInput("toJobConfig.outputDirectory").setValue("/usr/tmp"); 68         MDriverConfig driverConfig = job.getDriverConfig(); 69         driverConfig.getStringInput("throttlingConfig.numExtractors").setValue("3"); 70  71         Status status = client.saveJob(job); 72         if(status.canProceed()) { 73          System.out.println("JOB创建成功,ID为: "+ job.getPersistenceId()); 74         } else { 75          System.out.println("JOB创建失败。"); 76         } 77          78         //启动任务 79         long jobId = job.getPersistenceId(); 80         MSubmission submission = client.startJob(jobId); 81         System.out.println("JOB提交状态为 : " + submission.getStatus()); 82         while(submission.getStatus().isRunning() && submission.getProgress() != -1) { 83           System.out.println("进度 : " + String.format("%.2f %%", submission.getProgress() * 100)); 84           //三秒报告一次进度 85           try { 86             Thread.sleep(3000); 87           } catch (InterruptedException e) { 88             e.printStackTrace(); 89           } 90         } 91         System.out.println("JOB执行结束... ..."); 92         System.out.println("Hadoop任务ID为 :" + submission.getExternalId()); 93         Counters counters = submission.getCounters(); 94         if(counters != null) { 95           System.out.println("计数器:"); 96           for(CounterGroup group : counters) { 97             System.out.print("\t"); 98             System.out.println(group.getName()); 99             for(Counter counter : group) {100               System.out.print("\t\t");101               System.out.print(counter.getName());102               System.out.print(": ");103               System.out.println(counter.getValue());104             }105           }106         }107         if(submission.getExceptionInfo() != null) {108           System.out.println("JOB执行异常,异常信息为 : " +submission.getExceptionInfo());109         }110         System.out.println("MySQL通过sqoop传输数据到HDFS统计执行完毕");111     }112 }

别问为什么没有MYSQL和HBASE、HIVE互导的代码


 

20150102

 

sqoop1.99.4 JAVA API操作