首页 > 代码库 > 用mapreduce实现将mysql数据导出到HDFS上

用mapreduce实现将mysql数据导出到HDFS上

因为业务需要,需要将一批mysql数据导入到HBASE,现在先将数据从Mysql导出到HDFS。

版本:hadoop CDH4.5,Hbase-0.946

  1、实体类

    YqBean 是我的实体类,请根据自己需要修改,实体类需要 implements Writable, DBWritable。

  2、MR实现

import java.io.IOException;import java.util.Iterator;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;/** * @author * @version 创建时间:Jul 24, 2014 2:09:22 AM * 类说明 */public class AccessData {    public static class DataAccessMap extends Mapper<LongWritable,YqBean,Text,Text>{        @Override        protected void map(LongWritable key, YqBean value,Context context)                throws IOException, InterruptedException {                System.out.println(value.toString());                context.write(new Text(), new Text(value.toString()));        }    }        public static class DataAccessReducer extends Reducer<Text,Text,Text,Text>{        protected void reduce(Text key, Iterable<Text> values,                Context context)                throws IOException, InterruptedException {             for(Iterator<Text> itr = values.iterator();itr.hasNext();)                {                         context.write(key, itr.next());                 }           }    }    public static void main(String[] args) throws Exception {            Configuration conf = new Configuration();           //mysql的jdbc驱动        DBConfiguration.configureDB(conf,"com.mysql.jdbc.Driver", "jdbc:mysql://ip:3306/tablename?useUnicode=true&characterEncoding=utf8", "username", "passwd");            Job job = new Job(conf,"test mysql connection");            job.setJarByClass(AccessData.class);                        job.setMapperClass(DataAccessMap.class);            job.setReducerClass(DataAccessReducer.class);                        job.setOutputKeyClass(Text.class);            job.setOutputValueClass(Text.class);                        job.setInputFormatClass(DBInputFormat.class);         FileOutputFormat.setOutputPath(job, new Path("hdfs://ip:9000/hdfsFile"));                        //对应数据库中的列名(实体类字段)           String[] fields = {"id","title","price","author","quantity","description","category_id","imgUrl"};          DBInputFormat.setInput(job, YqBean.class,"tablename", "sql语句 ", "title", fields);              System.exit(job.waitForCompletion(true)? 0 : 1);        }}