首页 > 代码库 > 异步线程池的实现(一)-------具体实现方法

异步线程池的实现(一)-------具体实现方法

本篇是这个内容的第一篇,主要是写:遇到的问题,和自己摸索实现的方法。后面还会有一篇是总结性地写线程池的相关内容(偏理论的)。

 

一、背景介绍

    朋友的项目开发到一定程度之后,又遇到了一些问题:在某些流程中的一些节点,由于是串联执行的。上一步要等下一步执行完毕;或者提交数据之后要等待后台其他系统处理完成之后,才能返回结果。这样就会导致,请求发起方不得不一直等待结果,用户体验很不好;从项目优化来说,模块与模块之间构成了强耦合,这也是不利于以后扩展的,更不用说访问量上来之后,肯定会抓瞎的问题。所以,我就着手开始,利用异步线程池来解决这个问题。

    刚开始的时候,我准备只是在节点处另外起线程去执行异步操作。但是,考虑到以后的扩展,同时利用“池化”技术,更加高效地重复利用线程,节省资源。在这里就选定了,使用线程池的方法。

二、实现步骤

     实现总共分为四步:

第一步,在启动服务的时候初始化线程池;

第二步,建立有队列的线程池;

第三步,将业务逻辑方法与线程池联系起来;

第四步,调整原有代码逻辑结构,将可以异步的操作放入第三步的业务逻辑方法,并将请求放入线程池的队列中,等待执行。

三、具体实现

首先,第一步我们在web项目的起源之处web.xml中加入这么一行

1 <listener>2 3                    <listener-class>com.jptec.kevin.thread.listener.InitThreadPoolListener</listener-class>4 5 </listener>

这里的路径实际上就是,在启动项目之后,会加载的初始化函数。这个函数主要的作用就是:将线程池启动起来。实现代码如下:

技术分享

 1 public class InitThreadPoolListener implements ServletContextListener { 2  3     @Override 4     public void contextInitialized(ServletContextEvent sce) { 5  6         new TestThreadPool().runThread(); 7     } 8  9     @Override10     public void contextDestroyed(ServletContextEvent sce) {11     }12 13 }

好了,第一步就算完工了。

 

    然后,我们开始第二步,建立有队列的线程池(这里有很多,理论上的内容,会放在第二篇中详细说)。在这里主要是,定义了一个ArrayBlockingQueue队列(先进先出,有限阻塞),使用Executor定义了一个线程池。具体代码如下:

 1 public class TestThreadPool { 2  3     protected final static Logger log = LoggerFactory.getLogger(TestThreadPool.class); 4  5     // 线程休眠时间(秒) 6     // 存放需要发送的信息 7     public static BlockingQueue<Runnable> addressBqueue = new ArrayBlockingQueue<Runnable>( 8                 10000); 9         10     public static final ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 200, 25, TimeUnit.SECONDS,11             addressBqueue);12 13     public TestThreadPool() {14     }15     16 17     public void runThread() {18 19         try {20             executor.prestartCoreThread();21             log.info("队列大小:" + executor.getQueue().size());22 23         } catch (Exception e) {24             log.error("启动子线程异常", e);25         }26 27     }28 29 }

 

    完成第二步之后,我们继续第三步。我们有了线程池,那么实际代码如何将请求放入其中,并等待执行呢。于是,这里分为两个类,一个是负责业务代码中调用的,负责向队列中插入请求,一个是单个线程的实现类。具体实现如下:

插入请求实现:

 1 /** 2  * 线程队列 3  */ 4 public class TestQueue { 5     protected final static Logger log = LoggerFactory.getLogger(TestQueue.class); 6  7     public static boolean put(String userId,String tradeId, String amount, String flag, String term) { 8             log.debug("添加入队列开始... 额度申请用户tradeId=[{}]", tradeId); 9             try {10                 TestThreadPool.executor.execute(new TestThread( tradeId, amount,  flag, term, userId));11             log.debug("添加入队列结束...");12         } catch (Exception e) {13             log.error("添加入队列异常...", e);14             return false;15         }16         return true;17     }18 19 }

 

单个线程实现(第八行的引用在下文细说):

 1 /** 2  * 发送信息线程处理类 3  */ 4 public class TestThread implements Runnable { 5  6     protected final static Logger log = LoggerFactory.getLogger(TestThread.class); 7  8     private TradeService tradeService = (TradeService) SpringHandle.getBean("tradeService"); 9 10     String tradeId;11     String amount;12     String flag;13     String term;14     String userId;15 16     17 18     /** 19      * <p>Title: </p>20      * <p>Description: </p>21      * @param tradeId22      * @param amount23      * @param flag24      * @param term25      * @param userId 26      */ 27     28     public TestThread(String tradeId, String amount, String flag, String term,29             String userId) {30         super();31         this.tradeId = tradeId;32         this.amount = amount;33         this.flag = flag;34         this.term = term;35         this.userId = userId;36     }37 38     @Override39     public void run() {40         log.info("线程开始tradeId={}", tradeId);41         log.info("线程名:={}", Thread.currentThread().getId());42         log.info("队列大小:" + TestThreadPool.executor.getPoolSize() + ","43                 + TestThreadPool.executor.getCompletedTaskCount());44         putTradeConfirm(userId,tradeId, amount, flag, term);45         try {46             Thread.sleep(1000L);47         } catch (InterruptedException e) {48             e.printStackTrace();49         }50     }51 52     private void putTradeConfirm(String userId,String tradeId, String amount, String flag, String term) {53 54         tradeService.getMatchFundInfo(userId,amount, tradeId, flag, term);55 56     }57 58 }

 

这里需要注意的是,我需要获得一个Service的实例来调用具体的方法。但是,注释的方法不起作用,于是在朋友的帮助下,使用了辅助类。具体实现如下:

 1 @Component 2 public final class SpringHandle implements BeanFactoryPostProcessor { 3  4     private static ConfigurableListableBeanFactory beanFactory; // Spring应用上下文环境 5  6     public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException { 7         SpringHandle.beanFactory = beanFactory; 8     } 9 10     /**11      * 获取对象12      * 13      * @param name14      * @return Object 一个以所给名字注册的bean的实例15      * @throws org.springframework.beans.BeansException16      * 17      */18     @SuppressWarnings("unchecked")19     public static <T> T getBean(String name) throws BeansException {20         return (T) beanFactory.getBean(name);21     }22 23     /**24      * 获取类型为requiredType的对象25      * 26      * @param clz27      * @return28      * @throws org.springframework.beans.BeansException29      * 30      */31     public static <T> T getBean(Class<T> clz) throws BeansException {32         T result = (T) beanFactory.getBean(clz);33         return result;34     }35 36     /**37      * 如果BeanFactory包含一个与所给名称匹配的bean定义,则返回true38      * 39      * @param name40      * @return boolean41      */42     public static boolean containsBean(String name) {43         return beanFactory.containsBean(name);44     }45 46     /**47      * 判断以给定名字注册的bean定义是一个singleton还是一个prototype。48      * 如果与给定名字相应的bean定义没有被找到,将会抛出一个异常(NoSuchBeanDefinitionException)49      * 50      * @param name51      * @return boolean52      * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException53      * 54      */55     public static boolean isSingleton(String name) throws NoSuchBeanDefinitionException {56         return beanFactory.isSingleton(name);57     }58 59     /**60      * @param name61      * @return Class 注册对象的类型62      * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException63      * 64      */65     public static Class<?> getType(String name) throws NoSuchBeanDefinitionException {66         return beanFactory.getType(name);67     }68 69     /**70      * 如果给定的bean名字在bean定义中有别名,则返回这些别名71      * 72      * @param name73      * @return74      * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException75      * 76      */77     public static String[] getAliases(String name) throws NoSuchBeanDefinitionException {78         return beanFactory.getAliases(name);79     }80 81 }

  最后,在具体业务逻辑中,调用插入请求的方法,即可。

TradeGetFundInfoQueue.put(userId, tradeId, quota, repaymentType, timeLimit);

四、测试函数

由于在项目中,所以我写了另外一个测试函数(这个测试函数,会在下一篇文章中再次遇到),放在这里。供大家参考:

 1 public class TestThreadPool { 2  3     public static BlockingQueue<Runnable> queue = new  ArrayBlockingQueue<Runnable>( 4             10000); 5   6     public static void main(String[] args) { 7         for (int i = 0; i < 2; i++) { 8             queue.add(new TestThread("初始化")); 9         }10  11         final ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 3, 15, TimeUnit.SECONDS, queue);12 13         executor.prestartCoreThread();14 15  16         new Thread(new Runnable() {17             @Override18             public void run() {19                 while (true) {20                     System.out.println("getActiveCount=" + executor.getActiveCount()21                                     + ";getKeepAliveTime=" + executor.getKeepAliveTime(TimeUnit.SECONDS)22                                     + ";getCompletedTaskCount=" + executor.getCompletedTaskCount()23                                     + ";getCorePoolSize=" + executor.getCorePoolSize()24                                     + ";getLargestPoolSize=" + executor.getLargestPoolSize()25                                     + ";getMaximumPoolSize=" + executor.getMaximumPoolSize()26                                     + ";getPoolSize=" + executor.getPoolSize()27                                     + ";getTaskCount=" + executor.getTaskCount()28                                     + ";getQueue().size()=" + executor.getQueue().size()29                     );30                     try {31                         Thread.currentThread().sleep(200L);32                     } catch (InterruptedException e) {33                         e.printStackTrace();34                     }35                 }36             }37         }).start();38  39         new Thread(new Runnable() {40             @Override41             public void run() {42                 int i = 0;43                 while (true) {44                     queue.add(new TestThread("生产者"));45                     try {46                         Thread.currentThread().sleep(100L);47                     } catch (InterruptedException e) {48                         e.printStackTrace();49                     }50                     i++;51                     if (i > 100) break;52                 }53             }54         }).start();55     }56 }57  58 class TestThread implements Runnable {59     public static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");60     private String name;        //创建者61     private Date addDate;       //添加到队列的日期62  63     TestThread(String name) {64         this.name = name;65         this.addDate = new Date();66     }67  68     @Override69     public void run() {70         System.out.println(Thread.currentThread().getName() +71                 ":创建者=" + name + ",创建时间=" + sdf.format(addDate) + ",执行时间=" + sdf.format(new Date()) + ",当前队列大小=" + TestThreadPool.queue.size());72  73         System.out.println(TestThreadPool.queue.peek());74         try {75             Thread.currentThread().sleep(1000L);76         } catch (InterruptedException e) {77             e.printStackTrace();78         }79     }80 }

 

测试的结果大致是这个样子的:

技术分享

最后,希望这篇文章对你有帮助,感谢朋友的帮助!

 

 

异步线程池的实现(一)-------具体实现方法