首页 > 代码库 > Hadoop 中利用 mapreduce 读写 mysql 数据

Hadoop 中利用 mapreduce 读写 mysql 数据

Hadoop 中利用 mapreduce 读写 mysql 数据

 

有时候我们在项目中会遇到输入结果集很大,但是输出结果很小,比如一些 pv、uv 数据,然后为了实时查询的需求,或者一些 OLAP 的需求,我们需要 mapreduce 与 mysql 进行数据的交互,而这些特性正是 hbase 或者 hive 目前亟待改进的地方。

好了言归正传,简单的说说背景、原理以及需要注意的地方:

1、为了方便 MapReduce 直接访问关系型数据库(Mysql,Oracle),Hadoop提供了DBInputFormat和DBOutputFormat两个类。通过DBInputFormat类把数据库表数据读入到HDFS,根据DBOutputFormat类把MapReduce产生的结果集导入到数据库表中。

2、由于0.20版本对DBInputFormat和DBOutputFormat支持不是很好,该例用了0.19版本来说明这两个类的用法。

至少在我的 0.20.203 中的org.apache.hadoop.mapreduce.lib 下是没见到 db 包,所以本文也是以老版的 API 来为例说明的。

3、运行MapReduce时候报错:java.io.IOException: com.mysql.jdbc.Driver,一般是由于程序找不到mysql驱动包。解决方法是让每个tasktracker运行MapReduce程序时都可以找到该驱动包。

添加包有两种方式:

(1)在每个节点下的${HADOOP_HOME}/lib下添加该包。重启集群,一般是比较原始的方法。

(2)a)把包传到集群上: hadoop fs -put mysql-connector-java-5.1.0- bin.jar /hdfsPath/

b)在mr程序提交job前,添加语句:DistributedCache.addFileToClassPath(new Path(“/hdfsPath/mysql- connector-java- 5.1.0-bin.jar”), conf);

(3)虽然API用的是0.19的,但是使用0.20的API一样可用,只是会提示方法已过时而已。

4、测试数据:

 

Java代码  技术分享
  1. CREATE TABLE `t` (  
  2. `id` int DEFAULT NULL,  
  3. `name` varchar(10) DEFAULT NULL  
  4. ) ENGINE=InnoDB DEFAULT CHARSET=utf8;  
  5.   
  6. CREATE TABLE `t2` (  
  7. `id` int DEFAULT NULL,  
  8. `name` varchar(10) DEFAULT NULL  
  9. ) ENGINE=InnoDB DEFAULT CHARSET=utf8;  
  10.   
  11. insert into t values (1,"june"),(2,"decli"),(3,"hello"),  
  12.     (4,"june"),(5,"decli"),(6,"hello"),(7,"june"),  
  13.     (8,"decli"),(9,"hello"),(10,"june"),  
  14.     (11,"june"),(12,"decli"),(13,"hello");  

5、代码:

 

Java代码  技术分享
  1. package mysql2mr;  
  2.   
  3. import java.io.DataInput;  
  4. import java.io.DataOutput;  
  5. import java.io.File;  
  6. import java.io.IOException;  
  7. import java.sql.PreparedStatement;  
  8. import java.sql.ResultSet;  
  9. import java.sql.SQLException;  
  10.   
  11. import mapr.EJob;  
  12.   
  13. import org.apache.hadoop.conf.Configuration;  
  14. import org.apache.hadoop.filecache.DistributedCache;  
  15. import org.apache.hadoop.fs.Path;  
  16. import org.apache.hadoop.io.LongWritable;  
  17. import org.apache.hadoop.io.Text;  
  18. import org.apache.hadoop.io.Writable;  
  19. import org.apache.hadoop.mapred.JobConf;  
  20. import org.apache.hadoop.mapreduce.Job;  
  21. import org.apache.hadoop.mapreduce.Mapper;  
  22. import org.apache.hadoop.mapreduce.Reducer;  
  23. import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;  
  24. import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;  
  25. import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;  
  26. import org.apache.hadoop.mapreduce.lib.db.DBWritable;  
  27.   
  28. /** 
  29.  * Function: 测试 mr 与 mysql 的数据交互,此测试用例将一个表中的数据复制到另一张表中 实际当中,可能只需要从 mysql 读,或者写到 
  30.  * mysql 中。 
  31.  *  
  32.  * @author administrator 
  33.  *  
  34.  */  
  35. public class Mysql2Mr {  
  36.     public static class StudentinfoRecord implements Writable, DBWritable {  
  37.         int id;  
  38.         String name;  
  39.   
  40.         public StudentinfoRecord() {  
  41.   
  42.         }  
  43.   
  44.         public String toString() {  
  45.             return new String(this.id + " " + this.name);  
  46.         }  
  47.   
  48.         @Override  
  49.         public void readFields(ResultSet result) throws SQLException {  
  50.             this.id = result.getInt(1);  
  51.             this.name = result.getString(2);  
  52.         }  
  53.   
  54.         @Override  
  55.         public void write(PreparedStatement stmt) throws SQLException {  
  56.             stmt.setInt(1, this.id);  
  57.             stmt.setString(2, this.name);  
  58.         }  
  59.   
  60.         @Override  
  61.         public void readFields(DataInput in) throws IOException {  
  62.             this.id = in.readInt();  
  63.             this.name = Text.readString(in);  
  64.         }  
  65.   
  66.         @Override  
  67.         public void write(DataOutput out) throws IOException {  
  68.             out.writeInt(this.id);  
  69.             Text.writeString(out, this.name);  
  70.         }  
  71.   
  72.     }  
  73.   
  74.     // 记住此处是静态内部类,要不然你自己实现无参构造器,或者等着抛异常:  
  75.     // Caused by: java.lang.NoSuchMethodException: DBInputMapper.<init>()  
  76.     // http://stackoverflow.com/questions/7154125/custom-mapreduce-input-format-cant-find-constructor  
  77.     // 网上脑残式的转帖,没见到一个写对的。。。  
  78.     public static class DBInputMapper extends  
  79.             Mapper<LongWritable, StudentinfoRecord, LongWritable, Text> {  
  80.         @Override  
  81.         public void map(LongWritable key, StudentinfoRecord value,  
  82.                 Context context) throws IOException, InterruptedException {  
  83.             context.write(new LongWritable(value.id), new Text(value.toString()));  
  84.         }  
  85.     }  
  86.   
  87.       
  88.     public static class MyReducer extends Reducer<LongWritable, Text, StudentinfoRecord, Text> {  
  89.         @Override  
  90.         public void reduce(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {  
  91.             String[] splits = values.iterator().next().toString().split(" ");  
  92.             StudentinfoRecord r = new StudentinfoRecord();  
  93.             r.id = Integer.parseInt(splits[0]);  
  94.             r.name = splits[1];  
  95.             context.write(r, new Text(r.name));  
  96.               
  97.         }  
  98.     }  
  99.       
  100.     @SuppressWarnings("deprecation")  
  101.     public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {  
  102.         File jarfile = EJob.createTempJar("bin");  
  103.         EJob.addClasspath("usr/hadoop/conf");  
  104.   
  105.         ClassLoader classLoader = EJob.getClassLoader();  
  106.         Thread.currentThread().setContextClassLoader(classLoader);  
  107.           
  108.         Configuration conf = new Configuration();  
  109.         // 这句话很关键  
  110.         conf.set("mapred.job.tracker", "172.30.1.245:9001");  
  111.         DistributedCache.addFileToClassPath(new Path(  
  112.                 "hdfs://172.30.1.245:9000/user/hadoop/jar/mysql-connector-java-5.1.6-bin.jar"), conf);  
  113.         DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", "jdbc:mysql://172.30.1.245:3306/sqooptest", "sqoop", "sqoop");  
  114.           
  115.         Job job = new  Job(conf, "Mysql2Mr");  
  116. //      job.setJarByClass(Mysql2Mr.class);  
  117.         ((JobConf)job.getConfiguration()).setJar(jarfile.toString());  
  118.         job.setMapOutputKeyClass(LongWritable.class);  
  119.         job.setMapOutputValueClass(Text.class);  
  120.           
  121.         job.setMapperClass(DBInputMapper.class);  
  122.         job.setReducerClass(MyReducer.class);  
  123.           
  124.         job.setOutputKeyClass(LongWritable.class);  
  125.         job.setOutputValueClass(Text.class);  
  126.           
  127.         job.setOutputFormatClass(DBOutputFormat.class);  
  128.         job.setInputFormatClass(DBInputFormat.class);  
  129.           
  130.         String[] fields = {"id","name"};  
  131.          // 从 t 表读数据  
  132.         DBInputFormat.setInput(job, StudentinfoRecord.class, "t", null, "id", fields);  
  133.         // mapreduce 将数据输出到 t2 表  
  134.         DBOutputFormat.setOutput(job, "t2", "id", "name");  
  135.           
  136.         System.exit(job.waitForCompletion(true)? 0:1);  
  137.     }  
  138. }  

6、结果:

执行两次后,你可以看到mysql结果:

Java代码  技术分享
  1. mysql> select * from t2;  
  2. +------+-------+  
  3. | id   | name  |  
  4. +------+-------+  
  5. |    1 | june  |  
  6. |    2 | decli |  
  7. |    3 | hello |  
  8. |    4 | june  |  
  9. |    5 | decli |  
  10. |    6 | hello |  
  11. |    7 | june  |  
  12. |    8 | decli |  
  13. |    9 | hello |  
  14. |   10 | june  |  
  15. |   11 | june  |  
  16. |   12 | decli |  
  17. |   13 | hello |  
  18. |    1 | june  |  
  19. |    2 | decli |  
  20. |    3 | hello |  
  21. |    4 | june  |  
  22. |    5 | decli |  
  23. |    6 | hello |  
  24. |    7 | june  |  
  25. |    8 | decli |  
  26. |    9 | hello |  
  27. |   10 | june  |  
  28. |   11 | june  |  
  29. |   12 | decli |  
  30. |   13 | hello |  
  31. +------+-------+  
  32. 26 rows in set (0.00 sec)  
  33.   
  34. mysql>  

Hadoop 中利用 mapreduce 读写 mysql 数据