首页 > 代码库 > java 连接池的简单实现

java 连接池的简单实现

  最近一个项目中需要自己写个连接池, 写了一个下午,挺辛苦的,但不知道会不会出问题, 所以,贴到博客上,欢迎各路大神指点

1. 配置信息:

/** *  */package cn.mjorcen.db.bean;import java.util.ResourceBundle;import org.apache.log4j.Logger;/** *  * 配置信息 *  * @author mjorcen * @email mjorcen@gmail.com * @dateTime Oct 5, 2014 3:02:56 PM * @version 1 */public class Configuration {    private ResourceBundle resource;    private Logger logger = Logger.getLogger(getClass());    private String driverClassName = "com.mysql.jdbc.Driver";    private String validationQuery = "SELECT 1";    private String url = "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull";    private String user = "root";    private String password = "";    private int initialPoolSize = 3;    private int minPoolSize = 3;    private int maxPoolSize = 10;    private int maxStatements = 30;    private int maxIdleTime = 25000;    private int idleConnectionTestPeriod = 18000;    private int connectionLonger = 3600000;    public Configuration() {        super();    }    public Configuration(String _properties) {        super();        init(_properties);    }    /**     *      * @param _properties     *      * @author mjorcen     * @email mjorcen@gmail.com     * @dateTime Oct 5, 2014 3:08:54 PM     * @version 1     */    private void init(String _properties) {        resource = ResourceBundle.getBundle(_properties);        try {            String tmp = "";            setDriverClassName(resource.getString("driverClassName"));            setValidationQuery(resource.getString("validationQuery"));            setUrl(resource.getString("jdbc_url"));            setUser(resource.getString("jdbc_username"));            setPassword(resource.getString("jdbc_password"));            tmp = resource.getString("initialPoolSize");            if (tmp != null) {                setInitialPoolSize(Integer.parseInt(tmp));            }            tmp = resource.getString("minPoolSize");            if (tmp != null) {                setMinPoolSize(Integer.parseInt(tmp));            }            tmp = resource.getString("maxPoolSize");            if (tmp != null) {                setMaxPoolSize(Integer.parseInt(tmp));            }            tmp = resource.getString("maxStatements");            if (tmp != null) {                setMaxStatements(Integer.parseInt(tmp));            }            tmp = resource.getString("maxIdleTime");            if (tmp != null) {                setMaxIdleTime(Integer.parseInt(tmp));            }            tmp = resource.getString("idleConnectionTestPeriod");            if (tmp != null) {                setIdleConnectionTestPeriod(Integer.parseInt(tmp));            }            tmp = resource.getString("connectionLonger");            if (tmp != null) {                setIdleConnectionTestPeriod(Integer.parseInt(tmp));            }        } catch (Exception e) {            e.printStackTrace();            logger.error(e);        }    }    public ResourceBundle getResource() {        return resource;    }    public void setResource(ResourceBundle resource) {        this.resource = resource;    }    public String getDriverClassName() {        return driverClassName;    }    public void setDriverClassName(String driverClassName) {        this.driverClassName = driverClassName;    }    public String getValidationQuery() {        return validationQuery;    }    public void setValidationQuery(String validationQuery) {        this.validationQuery = validationQuery;    }    public String getUrl() {        return url;    }    public void setUrl(String url) {        this.url = url;    }    public String getUser() {        return user;    }    public void setUser(String user) {        this.user = user;    }    public String getPassword() {        return password;    }    public void setPassword(String password) {        this.password = password;    }    public int getInitialPoolSize() {        return initialPoolSize;    }    public void setInitialPoolSize(int initialPoolSize) {        this.initialPoolSize = initialPoolSize;    }    public int getMinPoolSize() {        return minPoolSize;    }    public void setMinPoolSize(int minPoolSize) {        this.minPoolSize = minPoolSize;    }    public int getMaxPoolSize() {        return maxPoolSize;    }    public void setMaxPoolSize(int maxPoolSize) {        this.maxPoolSize = maxPoolSize;    }    public int getMaxStatements() {        return maxStatements;    }    public void setMaxStatements(int maxStatements) {        this.maxStatements = maxStatements;    }    public int getMaxIdleTime() {        return maxIdleTime;    }    public void setMaxIdleTime(int maxIdleTime) {        this.maxIdleTime = maxIdleTime;    }    public int getIdleConnectionTestPeriod() {        return idleConnectionTestPeriod;    }    public void setIdleConnectionTestPeriod(int idleConnectionTestPeriod) {        this.idleConnectionTestPeriod = idleConnectionTestPeriod;    }    public int getConnectionLonger() {        return connectionLonger;    }    public void setConnectionLonger(int connectionLonger) {        this.connectionLonger = connectionLonger;    }}

2. connection 的包装类, 因为mysql 一个连接连接8小时就会被mysql 干掉;所以出此下策;

/** *  */package cn.mjorcen.db.bean;import java.sql.Connection;import java.sql.SQLException;import org.apache.log4j.Logger;/** *  *  * @author mjorcen * @email mjorcen@gmail.com * @dateTime Oct 5, 2014 4:27:30 PM * @version 1 */public class WarpConnection {    private Logger logger = Logger.getLogger(getClass());    private long connectionTime;    private long lastWorkTime;    private Connection connection;    public long getConnectionTime() {        return connectionTime;    }    public void setConnectionTime(long connectionTime) {        this.connectionTime = connectionTime;    }    public Connection getConnection() {        return connection;    }    public void setConnection(Connection connection) {        this.connection = connection;    }    public static WarpConnection warp(Connection connection) {        WarpConnection warpConnection = new WarpConnection();        warpConnection.setConnection(connection);        warpConnection.setConnectionTime(System.currentTimeMillis());        return warpConnection;    }    public boolean isTimeOut(long time) {        return System.currentTimeMillis() - this.connectionTime >= time;    }    public long getLastWorkTime() {        return lastWorkTime;    }    public void setLastWorkTime(long lastWorkTime) {        this.lastWorkTime = lastWorkTime;    }    @Override    public int hashCode() {        final int prime = 31;        int result = 1;        result = prime * result                + ((connection == null) ? 0 : connection.hashCode());        return result;    }    @Override    public boolean equals(Object obj) {        if (this == obj)            return true;        if (obj == null)            return false;        if (getClass() != obj.getClass())            return false;        WarpConnection other = (WarpConnection) obj;        if (connection == null) {            if (other.connection != null)                return false;        } else if (!connection.equals(other.connection))            return false;        return true;    }    /**     * 查看链接是否有效     * @param connectionLonger     *            连接最大时间     * @return     *      * @author mjorcen     * @email mjorcen@gmail.com     * @dateTime Oct 5, 2014 5:21:07 PM     * @version 1     * @throws SQLException     */    public boolean veryfiConnection(int connectionLonger) {        try {            if (this.connection == null || this.connection.isClosed()                    || isTimeOut(connectionLonger)) {                return true;            }        } catch (SQLException e) {            e.printStackTrace();        }        return false;    }}

3.连接池:

/** *  */package cn.mjorcen.db.pool.impl;import java.sql.Connection;import java.sql.DriverManager;import java.sql.SQLException;import java.util.NoSuchElementException;import java.util.concurrent.ConcurrentLinkedQueue;import java.util.concurrent.atomic.AtomicInteger;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;import cn.mjorcen.db.bean.Configuration;import cn.mjorcen.db.bean.WarpConnection;import cn.mjorcen.db.pool.PooledDataSource;/** * 简单的线程池实现 *  * @author mjorcen * @email mjorcen@gmail.com * @dateTime Oct 5, 2014 3:24:32 PM * @version 1 */public class AbstractPooledDataSource implements PooledDataSource {    protected ConcurrentLinkedQueue<WarpConnection> idleQueue;    protected ConcurrentLinkedQueue<WarpConnection> busyQueue;    protected ThreadLocal<Connection> threadLocal;    protected AtomicInteger totalSize;    protected AtomicInteger currentSize;    protected boolean available;    protected Configuration configuration;    final Lock lock = new ReentrantLock();//// final Condition notFull = lock.newCondition(); // 实例化两个condition    final Condition notEmpty = lock.newCondition();    public AbstractPooledDataSource(Configuration configuration)            throws Exception {        super();        this.configuration = configuration;        idleQueue = new ConcurrentLinkedQueue<WarpConnection>();        busyQueue = new ConcurrentLinkedQueue<WarpConnection>();        threadLocal = new ThreadLocal<Connection>();        totalSize = new AtomicInteger(0);        currentSize = new AtomicInteger(0);        init();    }    /**     *      *      * @author mjorcen     * @email mjorcen@gmail.com     * @dateTime Oct 5, 2014 3:49:36 PM     * @version 1     * @throws ClassNotFoundException     */    private void init() throws Exception {        Class.forName("com.mysql.jdbc.Driver");        for (int i = 0; i < this.configuration.getInitialPoolSize(); i++) {            idleQueue.add(WarpConnection.warp(openConnection()));        }        this.totalSize.set(this.configuration.getInitialPoolSize());        available = true;    }    protected Connection openConnection() throws SQLException {        return DriverManager.getConnection(configuration.getUrl(),                configuration.getUser(), configuration.getPassword());    }    public Connection getConnection() throws SQLException {        Connection connection = threadLocal.get();        if (connection != null) {            return connection;        }        try {            lock.lock();            WarpConnection warpConnection = null;            try {                warpConnection = this.idleQueue.remove();            } catch (NoSuchElementException e) {                warpConnection = getWarpConnection();            }            veryfiConnection(warpConnection);            warpConnection.setLastWorkTime(System.currentTimeMillis());            this.busyQueue.add(warpConnection);            threadLocal.set(warpConnection.getConnection());            return warpConnection.getConnection();        } finally {            lock.unlock();        }    }    /**     * 检查链接状态     *      * @author mjorcen     * @email mjorcen@gmail.com     * @dateTime Oct 5, 2014 5:17:06 PM     * @version 1     * @param warpConnection     * @throws SQLException     */    private void veryfiConnection(WarpConnection warpConnection)            throws SQLException {        if (!warpConnection.veryfiConnection(this.configuration                .getConnectionLonger())) {            warpConnection.setConnection(openConnection());        }    }    /**     *      * @return     *      * @author mjorcen     * @email mjorcen@gmail.com     * @dateTime Oct 5, 2014 4:44:52 PM     * @version 1     * @throws SQLException     */    private WarpConnection getWarpConnection() throws SQLException {        WarpConnection warpConnection = null;        if (this.totalSize.get() < configuration.getMaxPoolSize()) {            warpConnection = WarpConnection.warp(openConnection());            this.totalSize.addAndGet(1);            return warpConnection;        }        while (true) {            try {                warpConnection = this.idleQueue.remove();                return warpConnection;            } catch (NoSuchElementException e) {                try {                    this.notEmpty.wait();                } catch (InterruptedException e1) {                    e1.printStackTrace();                }            }        }    }    public void destroy() {        this.available = false;        ConcurrentLinkedQueue<WarpConnection> _idleQueue = this.idleQueue;        ConcurrentLinkedQueue<WarpConnection> _busyQueue = this.busyQueue;        this.idleQueue = null;        this.busyQueue = null;        this.threadLocal = null;        for (WarpConnection connection : _idleQueue) {            closeQuiet(connection.getConnection());        }    }    private void closeQuiet(Connection connection) {        if (connection != null) {            try {                connection.close();            } catch (SQLException e) {                e.printStackTrace();            }        }    }    public void release(Connection connection) throws Exception {        try {            lock.lock();            if (this.available) {                WarpConnection warpConnection = null;                for (WarpConnection element : this.busyQueue) {                    if (element.getConnection().equals(connection)) {                        warpConnection = element;                        break;                    }                }                this.busyQueue.remove(warpConnection);                this.idleQueue.add(warpConnection);                System.out.println("busyQueue = " + busyQueue.size());                System.out.println("idleQueue = " + idleQueue.size());                threadLocal.set(null);                notEmpty.signal();// 一旦插入就唤醒取数据线程            } else {                closeQuiet(connection);            }        } catch (Exception e) {            e.printStackTrace();        } finally {            lock.unlock();        }    }    public boolean isAvailable() {        return available;    }}

 

调用类:

 

/** *  */package cn.mjorcen.db.test;import java.sql.Connection;import java.sql.SQLException;import java.util.concurrent.Executor;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.ScheduledExecutorService;import java.util.concurrent.TimeUnit;import cn.mjorcen.db.bean.Configuration;import cn.mjorcen.db.pool.impl.AbstractPooledDataSource;/** *  *  * @author mjorcen * @email mjorcen@gmail.com * @dateTime Oct 5, 2014 4:00:09 PM * @version 1 */public class Client {    public static void main(String[] args) throws Exception {        Configuration conf = new Configuration("product_db");        final AbstractPooledDataSource dataSource = new AbstractPooledDataSource(                conf);        ExecutorService executor = Executors.newFixedThreadPool(10);        Runnable r = new Runnable() {            public void run() {                try {                    for (int i = 0; i < 3; i++) {                        Connection connection = dataSource.getConnection();                        System.out.println(Thread.currentThread().getName()                                + " : " + connection);                        Thread.sleep(3000);                        dataSource.release(connection);                    }                } catch (Exception e) {                    e.printStackTrace();                }            }        };        for (int i = 0; i < 10; i++) {            executor.execute(r);        }        // Connection connection = dataSource.getConnection();        // connection = dataSource.getConnection();        // System.out.println(connection);        // dataSource.release(connection);    }}

 

配置文件:

driverClassName=com.mysql.jdbc.DrivervalidationQuery=SELECT 1jdbc_url=jdbc:mysql://115.29.36.149:3306/sai_zd?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNulljdbc_username=cjdbc_password=cinitialPoolSize=3minPoolSize=3maxPoolSize=10maxStatements=30maxIdleTime=25000idleConnectionTestPeriod=18000connectionLonger=3

 

java 连接池的简单实现