首页 > 代码库 > hadoop与mysql数据库相连读出数据
hadoop与mysql数据库相连读出数据
转自:http://blog.csdn.net/qwertyu8656/article/details/6426054
用0.20.2版本 有些类已经过时 但必须要用 因为新版本对数据库连接支持不够
运行mysql创建数据库School,建立teacher表,并自行填写值
[c-sharp] view plaincopy
DROP TABLE IF EXISTS `school`.`teacher`;
CREATE TABLE `school`.`teacher` (
`id` int(11) default NULL,
`name` char(20) default NULL,
`age` int(11) default NULL,
`departmentID` int(11) default NULL
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
在eclipse中运行编译通过 但要加入必须的库 以及 hadoop0.20.2的eclipse的插件
[c-sharp] view plaincopy
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.lib.IdentityReducer;
import org.apache.hadoop.mapred.lib.db.DBConfiguration;
import org.apache.hadoop.mapred.lib.db.DBInputFormat;
public class DBAccess2 {
public static void main(String[] args) throws IOException {
JobConf conf = new JobConf(DBAccess2.class);
conf.setOutputKeyClass(LongWritable.class);
conf.setOutputValueClass(Text.class);
conf.setInputFormat(DBInputFormat.class);
FileOutputFormat.setOutputPath(conf, new Path("hdfs://localhost:9000/dbout"));
DBConfiguration.configureDB(conf,"com.mysql.jdbc.Driver",
"jdbc:mysql://localhost:3306/school","root","zxcvbnm");
String [] fields = {"id", "name", "age", "departmentID"};
DBInputFormat.setInput(conf, TeacherRecord.class, "teacher",
null, "id", fields);
conf.setMapperClass(DBAccessMapper.class);
conf.setReducerClass(IdentityReducer.class);
JobClient.runJob(conf);
}
}
注:请自行修改数据库连接语句 用户名 密码 等等。
[c-sharp] view plaincopy
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
public class DBAccessMapper extends MapReduceBase implements
Mapper<LongWritable, TeacherRecord, LongWritable, Text> {
@Override
public void map(LongWritable key, TeacherRecord value,
OutputCollector<LongWritable, Text> collector, Reporter reporter)
throws IOException {
// TODO Auto-generated method stub
collector.collect(new LongWritable(value.id),
new Text(value.toString()));
}
}
[c-sharp] view plaincopy
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.lib.db.DBWritable;
public class TeacherRecord implements Writable, DBWritable{
int id;
String name;
int age;
int departmentID;
@Override
public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub
this.id = in.readInt();
this.name = Text.readString(in);
this.age = in.readInt();
this.departmentID = in.readInt();
}
@Override
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub
out.writeInt(this.id);
Text.writeString(out, this.name);
out.writeInt(this.age);
out.writeInt(this.departmentID);
}
@Override
public void readFields(ResultSet result) throws SQLException {
// TODO Auto-generated method stub
this.id = result.getInt(1);
this.name = result.getString(2);
this.age = result.getInt(3);
this.departmentID = result.getInt(4);
}
@Override
public void write(PreparedStatement stmt) throws SQLException {
// TODO Auto-generated method stub
stmt.setInt(1, this.id);
stmt.setString(2, this.name);
stmt.setInt(3, this.age);
stmt.setInt(4, this.departmentID);
}
@Override
public String toString() {
// TODO Auto-generated method stub
return new String(this.name + " " + this.age + " " + this.departmentID);
}
}