首页 > 代码库 > java dbcp连接池,大数据处理循环多表操作插入事例
java dbcp连接池,大数据处理循环多表操作插入事例
基础连接池类:
package com.yl.sys.dao;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Properties;
import java.util.Vector;
public class ConnectionPoolTool {
private Vector<Connection> pool;
private String url;
private String user;
private String password;
/**
* 连接池的大小,也就是连接池中有多少个数据库连接。
*/
private int poolSize = 1;
private static ConnectionPoolTool instance = null;
/**
* 私有的构造方法,禁止外部创建本类的对象,要想获得本类的对象,通过<code>getIstance</code>方法。 使用了设计模式中的单子模式。
*/
private ConnectionPoolTool() {
init();
}
/**
* 连接池初始化方法,读取属性文件的内容 建立连接池中的初始连接
* @date 上午10:40:20
* @author DuChaoWei
* @descripte
*/
private void init() {
pool = new Vector<Connection>(poolSize);
readConfig();
addConnection();
}
/**
* 返回连接到连接池中
* @date 上午10:40:29
* @author DuChaoWei
* @descripte
* @param conn
*/
public synchronized void release(Connection conn) {
pool.add(conn);
}
/**
* 关闭连接池中的所有数据库连接
*/
public synchronized void closePool() {
for (int i = 0; i < pool.size(); i++) {
try {
((Connection) pool.get(i)).close();
} catch (SQLException e) {
e.printStackTrace();
}
pool.remove(i);
}
}
/**
* 返回当前连接池的一个对象
*/
public static ConnectionPoolTool getInstance() {
if (instance == null) {
instance = new ConnectionPoolTool();
}
return instance;
}
/**
* 返回连接池中的一个数据库连接
*/
public synchronized Connection getConnection() {
if (pool.size() > 0) {
Connection conn = pool.get(0);
pool.remove(conn);
return conn;
} else {
return null;
}
}
/**
* 在连接池中创建初始设置的的数据库连接
* @date 上午9:10:14
* @author DuChaoWei
* @descripte
*/
private void addConnection() {
Connection conn = null;
for (int i = 0; i < poolSize; i++) {
try {
Class.forName("org.postgresql.Driver");
conn = java.sql.DriverManager.getConnection(url, user, password);
pool.add(conn);
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
/**
* 读取设置连接池的属性文件
* @date 上午9:10:02
* @author DuChaoWei
* @descripte
*/
private void readConfig() {
Properties prop = new Properties();
InputStream in = LocalPostgisDAO.class.getResourceAsStream("localpost.properties");
try {
prop.load(in);
url = prop.getProperty("url");
user = prop.getProperty("user");
password = prop.getProperty("password");
} catch (Exception e) {
e.printStackTrace();
}
}
}
调用类
package com.yl.zhi.stand.root;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.json.JSONArray;
import org.json.JSONObject;
import com.alibaba.fastjson.JSON;
import com.mongodb.BasicDBObject;
import com.mongodb.DBCollection;
import com.mongodb.DBCursor;
import com.mongodb.DBObject;
import com.yl.sys.dao.ConnectionPoolTool;
import com.yl.sys.dao.LocalMongoDAO;
import com.yl.zhi.stand.utils.CompressUtils;
/**
* 创建引文关系类
*
* @date 下午3:25:21
* @author DuChaoWei
* @descripte
*/
public class CreateQuoteRelation {
// 文献解析配置文件
private static final String name = "createQuoteRelation_config.js";
// 查询数据量
private static final int searchSize = 20000;
// 存放所有引文 文献的hashCode,去重
private Map<Integer, Integer> quoteRepeatMap = new HashMap<>();
// 统计错误引用文献数
private int errorCount = 0;
// 存放错误引文
private List<String> errorList = new ArrayList<>();
// 统计不能插入文献数据
private int errorWx = 0;
// 未插入文献 文件id
private List<String> errorWxList = new ArrayList<>();
private Connection con = null;
public void Start() {
// 连接mongo
LocalMongoDAO.initMongoDB();
try {
execute();
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 执行录入
*
* @date 下午3:35:35
* @author DuChaoWei
* @descripte
* @throws Exception
*/
public void execute() throws Exception {
// 读取配置文件
JSONArray main = new JSONArray(getJsonConfig());
for (int i = 0; i < main.length(); i++) {
reader(main.getJSONObject(i));
}
}
private void reader(JSONObject object) throws Exception {
// mongo源数据表名
String mongoTable = object.getString("mongoTable");
// 存储 主表
String mainTable = object.getString("mainTable");
// 关系表
String relationTable = object.getString("relationTable");
// 获取mongodb源数据总数
int totalCount = getMongoDbCount(mongoTable);
int searchCount = (totalCount + searchSize - 1) / searchSize;
//初始化数据库连接池
ConnectionPoolTool pool = ConnectionPoolTool.getInstance();
for (int i = 0; i < searchCount; i++) {
int skipNum = searchSize * i;
List<Map<String, Object>> list = getMongoData(mongoTable, skipNum);
System.out.println("处理20000条文献开始时间:" + System.currentTimeMillis());
//开启事务
for (Map<String, Object> map : list) {
//long start = System.currentTimeMillis();
con = pool.getConnection();
con.setAutoCommit(false);
try {
// 插入主表(文献表),返回为数据id
Integer mainId = insertMain(map, mainTable);
// 处理引文,返回引用文献的id
List<Integer> quoteList = dealQuote(map, object);
if (quoteList != null && quoteList.size() > 0) {
// 插入关系表
insertRelation(mainId, quoteList, relationTable);
}
// 提交
con.commit();
} catch (Exception e) {
e.printStackTrace();
errorWx++;
errorWxList.add(map.get("_id").toString());
con.rollback();
con.setAutoCommit(true);
}finally{
//释放链接
pool.release(con);
}
}
System.out.println("处理20000条文献结束时间:" + System.currentTimeMillis());
}
//关闭连接池
pool.closePool();
System.out.println("未插入文献数量为:" + errorWx);
System.out.println("未插入文献篇名为:" + JSON.toJSONString(errorWxList));
System.out.println("错误格式引文数量为:" + errorCount);
}
/**
* 获取该文档的引文
*
* @date 上午9:19:41
* @author DuChaoWei
* @descripte
* @param map
* @return
* @throws SQLException
*/
private List<Integer> dealQuote(Map<String, Object> map, JSONObject object) throws SQLException {
// 返回所有引用文献id
List<Integer> list = new ArrayList<>();
// mongo源数据表名
String mongoTable = object.getString("mongoTable");
// 引文表
String quoteTable = object.getString("quoteTable");
// 获取data
byte[] str = CompressUtils.unGZip((byte[]) map.get("data"));
String obj = new String(str);
JSONObject data = http://www.mamicode.com/new JSONObject(obj);
if (data != null) {
// 获取网址 用于 区别新旧数据
String webSite = data.getString("detail_URI");
// 文献类型
String type = map.get("source_type").toString();
// 引文
String reference = data.getString("reference");
String[] quotes = getSplitQuote(reference);
// 存放所有处理后的引文
List<Map<String, Object>> quoteList = new ArrayList<>();
for (String quote : quotes) {
if (quote != null && !"".equals(quote)) {
int quoteHashCode = quote.hashCode();
if (quoteRepeatMap.containsKey(quoteHashCode)) {
// 引文id
Integer quoteId = quoteRepeatMap.get(quoteHashCode);
list.add(quoteId);
} else {
Map<String, Object> quoteMap = null;
quote = quote.replaceAll("‘", "‘‘");
if (webSite.equals("")) {
// 老数据处理引文
quoteMap = oldFormatQuote(quote, mongoTable, type);
} else {
// 处理引文
quoteMap = newFormatQuote(quote, mongoTable);
}
if (quoteMap != null) {
quoteList.add(quoteMap);
}
}
}
}
// 插入引文到postgress 返回插入对象
list= insertQuote(quoteList, quoteTable);
}
return list;
}
/**
* 拆分引文
*
* @date 下午3:19:21
* @author DuChaoWei
* @descripte
* @param str
* @return
*/
private String[] getSplitQuote(String str) {
if (str != null) {
String reg = "[\\[]{1}[0-9][\\]]{1}";
return str.split(reg);
}
return null;
}
/**
* 插入关系表
*
* @date 下午2:46:38
* @author DuChaoWei
* @descripte
* @param list
* @param relationTable
* @throws SQLException
*/
private void insertRelation(Integer mainId, List<Integer> quoteList, String relationTable) throws SQLException {
StringBuffer sql = new StringBuffer();
sql.append(" insert into " + relationTable);
sql.append(" (wx_id,yw_id) ");
sql.append(" values ");
StringBuffer valueSql = new StringBuffer();
for (Integer quoteId : quoteList) {
valueSql.append("(" + mainId + "," + quoteId + "),");
}
String valueStr = valueSql.toString();
valueStr = valueStr.substring(0, valueStr.lastIndexOf(",")) + ";";
sql.append(valueStr);
PreparedStatement ps1 = con.prepareStatement(sql.toString());
ps1.executeUpdate();
}
/**
* 插入引文到引文表
*
* @date 上午11:03:07
* @author DuChaoWei
* @descripte
* @param map
* @return
* @throws SQLException
*/
private List<Integer> insertQuote(List<Map<String, Object>> list, String quoteTable)
throws SQLException {
List<Integer> resultList = new ArrayList<>();
if (list != null && list.size() > 0) {
StringBuffer sb = new StringBuffer();
sb.append(" insert into " + quoteTable);
sb.append(" (mongo_id,title,type,author,company,dates,flag) ");
sb.append(" values ");
StringBuffer valueSb = new StringBuffer();
for (Map<String, Object> map : list) {
valueSb.append("(" + map.get("mongo_id") + ",");
valueSb.append("‘" + map.get("title").toString() + "‘,");
valueSb.append("‘" + map.get("type").toString() + "‘,");
valueSb.append("‘" + map.get("author").toString() + "‘,");
valueSb.append("‘" + map.get("company").toString() + "‘,");
valueSb.append("‘" + map.get("dates").toString() + "‘,");
valueSb.append(" 0), ");
}
String valueStr = valueSb.toString();
sb.append(valueStr.substring(0, valueStr.lastIndexOf(",")));
PreparedStatement ps1 = con.prepareStatement(sb.toString(),Statement.RETURN_GENERATED_KEYS);
ps1.executeUpdate();
ResultSet rs = ps1.getGeneratedKeys();
while(rs.next()){
resultList.add(rs.getInt(1));
}
rs.close();
}
return resultList;
}
/**
* 插入主表
*
* @date 上午9:07:26
* @author DuChaoWei
* @descripte
* @param map
* @param tableName
* @return
* @throws SQLException
*/
private Integer insertMain(Map<String, Object> map, String tableName) throws SQLException {
Integer result = null;
// 数据
byte[] str = CompressUtils.unGZip((byte[]) map.get("data"));
String obj = new String(str);
JSONObject data = http://www.mamicode.com/new JSONObject(obj);
StringBuffer sb = new StringBuffer();
sb.append(" insert into " + tableName);
sb.append(" (mongo_id,title,type,author,company,dates,flag) ");
sb.append(" values ( ");
sb.append("‘" + map.get("_id").toString() + "‘,");
sb.append("‘" + data.getString("title").replaceAll("‘", "‘‘") + "‘,");
sb.append("‘" + data.getString("type").replaceAll("‘", "‘‘") + "‘,");
sb.append("‘" + data.getString("author").replaceAll("‘", "‘‘") + "‘,");
sb.append("‘" + data.getString("company").replaceAll("‘", "‘‘") + "‘,");
sb.append("‘" + data.getString("dates").replaceAll("‘", "‘‘") + "‘,");
sb.append(" 0) ");
PreparedStatement ps1 = con.prepareStatement(sb.toString(),Statement.RETURN_GENERATED_KEYS);
ps1.executeUpdate();
ResultSet rs = ps1.getGeneratedKeys();
while(rs.next()){
result = rs.getInt(1);
}
rs.close();
// QueryRunner runner = new QueryRunner(dataSource);
// result = runner.insert(sb.toString(), new ScalarHandler<Integer>("id"));
// ArrayHandler:把结果集中的第一行数据转换成对象数组。
// ArrayListHandler:把结果集中的每一行数据都转换成一个对象数组,再存放到List中。
// BeanHandler:将结果集中的第一行数据封装到一个对应的JavaBean实例中。
// BeanListHandler:将结果集中的每一行数据都封装到一个对应的JavaBean实例中,存放到List里。
// MapHandler:将结果集中的第一行数据封装到一个Map里,key是列名,value就是对应的值。
// MapListHandler:将结果集中的每一行数据都封装到一个Map里,然后再存放到List。
// ColumnListHandler:将结果集中某一列的数据存放到List中。
// KeyedHandler(name):将结果集中的每一行数据都封装到一个Map里(List),再把这些map再存到一个map里,其key为指定的列。
// ScalarHandler:获取结果集中第一行数据指定列的值,常用来进行单值查询
// result =
// LocalPostgisDAO.getInstance().getQuery().insert(sb.toString(),
// new ResultSetHandler<Integer>() {
// @Override
// public Integer handle(ResultSet arg0) throws SQLException {
// return arg0.getInt(0);
// }});
return result;
}
/**
* 老数据处理引文
*
* @date 下午4:37:24
* @author DuChaoWei
* @descripte
* @param quote
* @param mongoTable
* @return
*/
private Map<String, Object> oldFormatQuote(String quote, String mongoTable, String type) {
Map<String, Object> rsMap = new HashMap<>();
String[] mdArray = quote.split(",");
try {
// mongoid
rsMap.put("mongo_id", 0);
// 篇名
rsMap.put("title", mdArray[1].trim());
// 类型
rsMap.put("type", type);
// 作者
rsMap.put("author", mdArray[0].trim());
// 机构
rsMap.put("company", mdArray[2].trim().replace("‘", "‘‘"));
// 时间
rsMap.put("dates", mdArray[3].trim());
} catch (Exception e) {
errorCount++;
errorList.add(quote);
return null;
}
// 获取引文 的mongoid
// Map<String, Object> map = getMongoData(mongoTable, mdArray[1]);
// if (map != null) {
// rsMap.put("mongo_id", map.get("_id"));
// } else {
// rsMap.put("mongo_id", 0);
// }
return rsMap;
}
/**
* 处理单个引用文献
*
* @date 上午10:24:37
* @author DuChaoWei
* @descripte
* @param quote
* @return
*/
private Map<String, Object> newFormatQuote(String quote, String mongoTable) {
Map<String, Object> rsMap = new HashMap<>();
String[] mdArray = quote.replaceAll(";", "").split("\\.");
try {
// mongoid
rsMap.put("mongo_id", 0);
// 篇名
rsMap.put("title", mdArray[0].trim().substring(0, mdArray[0].indexOf("[")));
// 类型
rsMap.put("type", mdArray[0].trim().substring(mdArray[0].indexOf("["), mdArray[0].length()));
// 作者
rsMap.put("author", mdArray[1].trim());
// 机构
rsMap.put("company", mdArray[2].trim());
// 时间
rsMap.put("dates", mdArray[3].trim());
} catch (Exception e) {
errorCount++;
errorList.add(quote);
return null;
}
// 获取引文 的mongoid
// Map<String, Object> map = getMongoData(mongoTable, mdArray[0].substring(0, mdArray[0].indexOf("[")));
// if (map != null) {
// rsMap.put("mongo_id", map.get("_id"));
// } else {
// rsMap.put("mongo_id", 0);
// }
rsMap.put("mongo_id", 0);
return rsMap;
}
/**
* 获取mongo文档总数
*
* @date 下午4:01:33
* @author DuChaoWei
* @descripte
* @param collectionName
* @return
*/
private int getMongoDbCount(String collectionName) {
BasicDBObject sort = new BasicDBObject();
sort.put("_id", 1);
DBCollection collection = LocalMongoDAO.getCollection(collectionName);
DBCursor cursor = collection.find().sort(sort);
return cursor.count();
}
/**
* 分页查询
*
* @date 下午5:44:57
* @author DuChaoWei
* @descripte
* @param collectionName
* @param skipNum
* @param totalCount
* @return
*/
@SuppressWarnings("unchecked")
private List<Map<String, Object>> getMongoData(String collectionName, int skipNum) {
List<Map<String, Object>> list = new ArrayList<>();
BasicDBObject sort = new BasicDBObject();
sort.put("_id", 1);// 1标识 顺序 排序
DBCollection collection = LocalMongoDAO.getCollection(collectionName);
DBCursor cursor = collection.find().sort(sort).skip(skipNum).limit(searchSize);
while (cursor.hasNext()) {
DBObject obj = cursor.next();
if (obj != null) {
list.add(obj.toMap());
}
}
return list;
}
/**
* 获取mongo数据
*
* @date 上午11:08:51
* @author DuChaoWei
* @descripte
* @param collectionName
* @param tital
* @return
*/
@SuppressWarnings("unchecked")
private Map<String, Object> getMongoData(String collectionName, String title) {
BasicDBObject queryObj = new BasicDBObject();
BasicDBObject queryObj_1 = new BasicDBObject();
queryObj.put("data", queryObj_1);
queryObj_1.put("title", title);
DBCollection collection = LocalMongoDAO.getCollection(collectionName);
DBObject obj = collection.findOne(queryObj);
if (obj != null) {
return obj.toMap();
}
return null;
}
/**
* 获取配置文件
*
* @date 下午3:37:57
* @author DuChaoWei
* @descripte
* @return
* @throws IOException
*/
private String getJsonConfig() throws IOException {
StringBuffer buffer = new StringBuffer();
// String realPath = RootStart.getRootPath();
// String subPath = "/WEB-INF/res/config/" + name;
// String filePath = realPath + subPath;
// System.out.println("科技文献标准化配置文件:" + filePath);
String filePath = "D:/ylkfSoft/workspace/IndexWeb/zhishi_config/" + name;
File file = new File(filePath);
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(new FileInputStream(file), "utf-8"));
String line = null;
while ((line = bufferedReader.readLine()) != null) {
line = line.trim();
buffer.append(line);
}
bufferedReader.close();
return removeNotes(buffer.toString());
}
/**
* 去掉注释
*
* @date 下午3:38:10
* @author DuChaoWei
* @descripte
* @param str
* @return
*/
private String removeNotes(String str) {
int start = str.indexOf("//#");
int end = str.indexOf("#//");
if (start > -1 && end > -1) {
String oldStr = str.substring(start, end + 3);
String removeStr = str.replace(oldStr, "");
return removeNotes(removeStr);
} else {
return str;
}
}
public static void main(String[] args) {
CreateQuoteRelation cqr = new CreateQuoteRelation();
cqr.Start();
}
}
java dbcp连接池,大数据处理循环多表操作插入事例