首页 > 代码库 > ThreadLocal,LinkedBlockingQueue,线程池 获取数据库连接

ThreadLocal,LinkedBlockingQueue,线程池 获取数据库连接

<pre name="code" class="java">package com.ctl.util;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Random;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

import com.ctl.util.ConfigUtils;
import com.ctl.util.LogUtil;

public class ThreadLocalBlockingQueueUtil {
	public static int num=0;
	public static ThreadLocal<LinkedBlockingQueue<Connection>> queueHoder = new ThreadLocal<LinkedBlockingQueue<Connection>>();
	private static String driver;
	private static String url;
	private static String username;
	private static String password;
	private static int threadPoolMaxNum;
	private static int threadPoolMinNum;
	private static LogUtil log;
	// 是否继续创建数据库连接
	private static boolean continueCreateConnncetion;

	public static boolean isContinueCreateConnncetion() {
		return continueCreateConnncetion;
	}

	public static void setContinueCreateConnncetion(
			boolean continueCreateConnncetion) {
		ThreadLocalBlockingQueueUtil.continueCreateConnncetion = continueCreateConnncetion;
	}

	// \u6570\u636E\u5E93\u7C7B\u578Boracle mysql db2
	private static String databasetype;
	static {
		log=new LogUtil();
		continueCreateConnncetion = true;
		databasetype = ConfigUtils.getType("databasetype");
		threadPoolMaxNum = Integer.parseInt(ConfigUtils
				.getType("threadPoolMaxNum"));
		threadPoolMinNum = Integer.parseInt(ConfigUtils
				.getType("threadPoolMinNum"));
		if (databasetype.equals("mysql")) {
			driver = ConfigUtils.getType("mysql.driver");
			url = ConfigUtils.getType("mysql.url");
			username = ConfigUtils.getType("mysql.username");
			password = ConfigUtils.getType("mysql.password");
		} else if (databasetype.equals("oracle")) {
			driver = ConfigUtils.getType("oracle.driver");
			url = ConfigUtils.getType("oracle.url");
			username = ConfigUtils.getType("oracle.username");
			password = ConfigUtils.getType("oracle.password");
		}
		try {
			Class.forName(driver);
		} catch (ClassNotFoundException e) {
			System.out.println(e.getMessage());
		}
		if(getQueue().size()<threadPoolMinNum){
			log.WriteLine("static{}", "getQueue().size()<threadPoolMinNum"+getQueue().size()+"  "+threadPoolMinNum);
			CreateConnection cc= new ThreadLocalBlockingQueueUtil().new CreateConnection();
			cc.start();
		}
	}

	private static synchronized LinkedBlockingQueue<Connection> getQueue() {
		LinkedBlockingQueue<Connection> queue = queueHoder.get();
		if (queue == null) {
			queue = new LinkedBlockingQueue<Connection>(threadPoolMaxNum);
			queueHoder.set(queue);
			return queue;
		}
		return queue;
	}

	private class CreateConnection extends Thread {
		LinkedBlockingQueue<Connection> queue = getQueue();
		public  void run() {
			while (continueCreateConnncetion) {
				boolean result = false;
				Random rand = new Random();
				// 一个介于最大和最小连接数之间的连接数
				int randomNum = rand.nextInt(threadPoolMaxNum - threadPoolMinNum)
						+ threadPoolMinNum + 1;
				try {
					Connection conn = null;
					if (queue.size()<randomNum){
						try {
							conn = DriverManager.getConnection(url, username, password);
							conn.setAutoCommit(false);
							// 将指定元素插入到此队列的尾部,如有必要,则等待指定的时间以使空间变得可用。
							// 1*TimeUnit.SECONES=1*60秒
							result = queue.offer(conn, 1, TimeUnit.SECONDS);
						} catch (Exception e) {
							log.WriteLine("CreateConnection", "DriverManager.getConnection(url, username, password) fail");
							System.out.println("DriverManager.getConnection(url, username, password) fail");
						}
					}
					if (result == false) {
						if(conn!=null){
							conn.close();
						}
						Thread.sleep(50);
						//System.out.println("线程池已达到最大现指数或获取新连接失败!");
						//continueCreateConnncetion = false;
					} else {
						log.WriteLine("CreateConnection", "++【" + queue.size() + "】"
								+ "createConnection success:" + conn);
						System.out.println("【" + queue.size() + "】"
								+ "createConnection success:" + conn);
//						if (queue.size() > randomNum) {// 如果大于线程池随机连接个数退出创建连接函数
//							continueCreateConnncetion = false;
//							System.out.println("大于线程池随机连接个数退出创建连接函数");
//						}
					}
				} catch (SQLException e) {
					e.printStackTrace();
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		}
	}

	public static synchronized  Connection getConnection() throws InterruptedException {
		LinkedBlockingQueue<Connection> queue = ThreadLocalBlockingQueueUtil
				.getQueue();
		Connection conn = queue.poll(2, TimeUnit.SECONDS);
		if (conn!=null) {// 如果线程池中的连接个数小于最小连接数则开始创建新的连接
			log.WriteLine("getConnection   ", "--【" + queue.size() + "】" + "getConnecion成功:"
					+ conn);
			System.err.println("【" + queue.size() + "】" + "getConnecion成功:"
					+ conn);
		} else {
			getConnection();
		}
		return conn;
	}
}