首页 > 代码库 > Java中Semaphore(信号量) 数据库连接池

Java中Semaphore(信号量) 数据库连接池

计数信号量用来控制同时访问某个特定资源的操作数或同时执行某个指定操作的数量

A counting semaphore.Conceptually, a semaphore maintains a set of permits. Each acquire blocks if necessary until a permit is available, and then takes it. Each release adds a permit, potentially releasing a blocking acquirer. However, no actual permit objects are used; the Semaphore just keeps a count of the number available and acts accordingly.

从概念上来说,Semaphore中维护了一组许可,许可的数量在构造函数中指定。acquire方法将获取一个可用的许可,如果没有可用的许可,该方法会被阻塞,直到Semaphore中有可用的许可。release方法释放一个许可,如果此时存在阻塞中的acqure方法,将释放一个阻塞中的acquire

事实上,Semaphore中只维护可用请求数量,并不包含实际的请求对象

示例一:数据库连接池

在初始化Semaphore时可以设置其公平性,如果为公平Semaphore,则按照请求时间获得许可,即先发送的请求先获得许可,如果为非公平Semaphore,则先发送的请求未必先获得许可,这有助于提高程序的吞吐量,但是有可能导致某些请求始终获取不到许可(tryAcquire方法不使用公平性设置)

[java] view plain copy
 
  1. import java.sql.Connection;  
  2. import java.sql.DriverManager;  
  3. import java.util.HashMap;  
  4. import java.util.LinkedList;  
  5. import java.util.Map;  
  6. import java.util.concurrent.Semaphore;  
  7.   
  8. public class MyConnPool {  
  9.     private LinkedList<Connection> unusedConns =   
  10.         new LinkedList<Connection>();  
  11.     //释放连接时对查找性能要求较高,故使用哈希表  
  12.     private Map<Connection,String> usedConns =   
  13.             new HashMap<Connection,String>();  
  14.     private final Semaphore available;  
  15.       
  16.     public MyConnPool(int size) throws Exception{  
  17.         StringBuilder builder = new StringBuilder();  
  18.         builder.append("-----pool-----\n");  
  19.         available = new Semaphore(size, true);//公平性Semaphore  
  20.         String url = "jdbc:mysql://ip:port/name?user=user&password=pwd";  
  21.         for(int i = 0 ; i < size ; i++){  
  22.             Connection conn = DriverManager.getConnection(url);  
  23.             unusedConns.add(conn);  
  24.             builder.append("conn-" + i + ":" + conn.hashCode() + "\n");  
  25.         }  
  26.         builder.append("--------------\n");  
  27.         System.out.print(builder.toString());  
  28.     }  
  29.       
  30.     public Connection getConn() throws InterruptedException{  
  31.         //获取Semaphore中的许可  
  32.         available.acquire();  
  33.         Connection conn = null;  
  34.         synchronized(this){  
  35.             conn = unusedConns.removeFirst();  
  36.             usedConns.put(conn, "");  
  37.               
  38.             System.out.println(Thread.currentThread().getName()  
  39.                     + ":" + conn.hashCode() + "[got]");  
  40.             System.out.println(display());  
  41.         }  
  42.         return conn;  
  43.     }  
  44.       
  45.     public void close(Connection conn){  
  46.         synchronized(this){  
  47.             if(usedConns.containsKey(conn)){  
  48.                 usedConns.remove(conn);  
  49.                 unusedConns.addLast(conn);  
  50.                   
  51.                 System.out.println(Thread.currentThread().getName()  
  52.                         + ":" + conn.hashCode() + "[closed]");  
  53.                 System.out.println(display());  
  54.             }  
  55.         }  
  56.         //释放线程获取的许可  
  57.         available.release();  
  58.     }  
  59.       
  60.     private final synchronized String display(){  
  61.         String str = "";  
  62.         if(unusedConns.size() > 0){  
  63.             str = "";  
  64.             for(Connection conn : unusedConns){  
  65.                 str += conn.hashCode() + "|";  
  66.             }  
  67.         }  
  68.         if(!str.equals(""))  
  69.             return str;  
  70.         else  
  71.             return "empty";  
  72.     }  
  73. }  
[java] view plain copy
 
  1. import java.sql.Connection;  
  2. import java.util.concurrent.CountDownLatch;  
  3.   
  4. public class Test implements Runnable{  
  5.     private static CountDownLatch latch   
  6.             = new CountDownLatch(1);  
  7.     private MyConnPool pool;  
  8.       
  9.     public Test(MyConnPool pool){  
  10.         this.pool = pool;  
  11.     }  
  12.       
  13.     @Override  
  14.     public void run(){   
  15.         try {  
  16.             latch.await();  
  17.             Connection conn = pool.getConn();  
  18.             Thread.sleep(1*1000);  
  19.             pool.close(conn);  
  20.         } catch (InterruptedException e) {  
  21.             e.printStackTrace();  
  22.         }  
  23.     }  
  24.       
  25.     public static void main(String[] args) throws Exception{  
  26.         MyConnPool pool = new MyConnPool(2);  
  27.         for(int i = 0 ; i < 4 ; i++){  
  28.             Thread t = new Thread(new Test(pool));  
  29.             t.start();  
  30.         }  
  31.         //保证4个线程同时运行  
  32.         latch.countDown();  
  33.     }  
  34. }  

运行结果如下:

[plain] view plain copy
 
  1. -----pool-----  
  2. conn-0:11631043  
  3. conn-1:14872264  
  4. --------------  
  5. Thread-4:11631043[got]  
  6. 14872264|  
  7. Thread-1:14872264[got]  
  8. empty  
  9. Thread-4:11631043[closed]  
  10. 11631043|  
  11. Thread-2:11631043[got]  
  12. empty  
  13. Thread-1:14872264[closed]  
  14. 14872264|  
  15. Thread-3:14872264[got]  
  16. empty  
  17. Thread-2:11631043[closed]  
  18. 11631043|  
  19. Thread-3:14872264[closed]  
  20. 11631043|14872264|  

特别注意如果getConn方法和close方法都为同步方法,将产生死锁:

[java] view plain copy
 
  1. public synchronized Connection getConn() throws InterruptedException{  
  2.     ......  
  3. }  
  4.       
  5. public synchronized void close(Connection conn){  
  6.     ......  
  7. }  

同一时刻只能有一个线程调用连接池的getConn方法或close方法,当Semaphore中没有可用的许可,并且此时恰好有一个线程成功调用连接池的getConn方法,则该线程将一直阻塞在acquire方法上,其它线程将没有办法获取连接池上的锁并调用close方法释放许可,程序将会卡死

阻塞方法上不要加锁,否则将导致锁长时间不释放,如果该锁为互斥锁,将导致程序卡住

acquire方法本身使用乐观锁实现,也不需要再加互斥锁

示例二:不可重入互斥锁

[java] view plain copy
 
  1. import java.util.concurrent.CountDownLatch;  
  2. import java.util.concurrent.Semaphore;  
  3.   
  4. public class Test implements Runnable{  
  5.     private static CountDownLatch latch =  
  6.             new CountDownLatch(1);  
  7.     private static Semaphore lock =  
  8.             new Semaphore(1, true);  
  9.       
  10.     @Override  
  11.     public void run(){   
  12.         try {  
  13.             latch.await();  
  14.             this.work();  
  15.         } catch (InterruptedException e) {  
  16.             e.printStackTrace();  
  17.         }  
  18.     }  
  19.       
  20.     private void work() throws InterruptedException{  
  21.             lock.acquire();  
  22.             System.out.println("Locking by "   
  23.                     + Thread.currentThread().getName());  
  24.             Thread.sleep(1*1000);  
  25.             lock.release();  
  26.     }  
  27.       
  28.     public static void main(String[] args) throws Exception{  
  29.         for(int i = 0 ; i < 4 ; i++){  
  30.             Thread t = new Thread(new Test());  
  31.             t.start();  
  32.         }  
  33.         //保证4个线程同时运行  
  34.         latch.countDown();  
  35.     }  
  36. }  

运行结果如下:

[plain] view plain copy
 
  1. Locking by Thread-3  
  2. Locking by Thread-0  
  3. Locking by Thread-1  
  4. Locking by Thread-2  

 

 
 

Java中Semaphore(信号量) 数据库连接池