首页 > 代码库 > Java多线程编程模式实战指南一:Active Object模式(上)

Java多线程编程模式实战指南一:Active Object模式(上)

Active Object模式简介

Active Object模式是一种异步编程模式。它通过对方法的调用与方法的执行进行解耦来提高并发性。若以任务的概念来说,Active Object模式的核心则是它允许任务的提交(相当于对异步方法的调用)和任务的执行(相当于异步方法的真正执行)分离。这有点类似于System.gc()这个方法:客户端代码调用完gc()后,一个进行垃圾回收的任务被提交,但此时JVM并不一定进行了垃圾回收,而可能是在gc()方法调用返回后的某段时间才开始执行任务——回收垃圾。我们知道,System.gc()的调用方代码是运行在自己的线程上(通常是main线程派生的子线程),而JVM的垃圾回收这个动作则由专门的线程(垃圾回收线程)来执行的。换言之,System.gc()这个方法所代表的动作(其所定义的功能)的调用方和执行方是运行在不同的线程中的,从而提高了并发性。

再进一步介绍Active Object模式,我们可先简单地将其核心理解为一个名为ActiveObject的类,该类对外暴露了一些异步方法,如图1所示。

图 1. ActiveObject对象示例

技术分享

doSomething方法的调用方和执行方运行在各自的线程上。在并发的环境下,doSomething方法会被多个线程调用。这时所需的线程安全控制封装在doSomething方法背后,使得调用方代码无需关心这点,从而简化了调用方代码:从调用方代码来看,调用一个Active Object对象的方法与调用普通Java对象的方法并无太大差别。如清单1所示。

清单 1. Active Object方法调用示例

ActiveObject ao=...;
Future future = ao.doSomething("data");
//执行其它操作
String result = future.get();
System.out.println(result);

Active Object模式的架构

当Active Object模式对外暴露的异步方法被调用时,与该方法调用相关的上下文信息,包括被调用的异步方法名(或其代表的操作)、调用方代码所传递的参数等,会被封装成一个对象。该对象被称为方法请求(Method Request)。方法请求对象会被存入Active Object模式所维护的缓冲区(Activation Queue)中,并由专门的工作线程负责根据其包含的上下文信息执行相应的操作。也就是说,方法请求对象是由运行调用方代码的线程通过调用Active Object模式对外暴露的异步方法生成的,而方法请求所代表的操作则由专门的线程来执行,从而实现了方法的调用与执行的分离,产生了并发。

Active Object模式的主要参与者有以下几种。其类图如图2所示。

图 2. Active Object模式的类图

技术分享

  • Proxy:负责对外暴露异步方法接口。当调用方代码调用该参与者实例的异步方法doSomething时,该方法会生成一个相应的MethodRequest实例并将其存储到Scheduler所维护的缓冲区中。doSomething方法的返回值是一个表示其执行结果的外包装对象:Future参与者的实例。异步方法doSomething运行在调用方代码所在的线程中。
  • MethodRequest:负责将调用方代码对Proxy实例的异步方法的调用封装为一个对象。该对象保留了异步方法的名称及调用方代码传递的参数等上下文信息。它使得将Proxy的异步方法的调用和执行分离成为可能。其call方法会根据其所包含上下文信息调用Servant实例的相应方法。
  • ActivationQueue:负责临时存储由Proxy的异步方法被调用时所创建的MethodRequest实例的缓冲区。
  • Scheduler:负责将Proxy的异步方法所创建的MethodRequest实例存入其维护的缓冲区中。并根据一定的调度策略,对其维护的缓冲区中的MethodRequest实例进行执行。其调度策略可以根据实际需要来定,如FIFO、LIFO和根据MethodRequest中包含的信息所定的优先级等。
  • Servant:负责对Proxy所暴露的异步方法的具体实现。
  • Future:负责存储和返回Active Object异步方法的执行结果。

Active Object模式的序列图如图3所示。

技术分享

第1步:调用方代码调用Proxy的异步方法doSomething。

第2~7步:doSomething方法创建Future实例作为该方法的返回值。并将调用方代码对该方法的调用封装为MethodRequest对象。然后以所创建的MethodRequest对象作为参数调用Scheduler的enqueue方法,以将MethodRequest对象存入缓冲区。Scheduler的enqueue方法会调用Scheduler所维护的ActivationQueue实例的enqueue方法,将MethodRequest对象存入缓冲区。

第8步:doSomething返回其所创建的Future实例。

第9步:Scheduler实例采用专门的工作线程运行dispatch方法。

第10~12步:dispatch方法调用ActivationQueue实例的dequeue方法,获取一个MethodRequest对象。然后调用MethodRequest对象的call方法

第13~16步:MethodRequest对象的call方法调用与其关联的Servant实例的相应方法doSomething。并将Servant.doSomething方法的返回值设置到Future实例上。

第17步:MethodRequest对象的call方法返回。

上述步骤中,第1~8步是运行在Active Object的调用者线程中的,这几个步骤实现了将调用方代码对Active Object所提供的异步方法的调用封装成对象(Method Request),并将其存入缓冲区。这几个步骤实现了任务的提交。第9~17步是运行在Active Object的工作线程中,这些步骤实现从缓冲区中读取Method Request,并对其进行执行,实现了任务的执行。从而实现了Active Object对外暴露的异步方法的调用与执行的分离。

如果调用方代码关心Active Object的异步方法的返回值,则可以在其需要时,调用Future实例的get方法来获得异步方法的真正执行结果。

Active Object模式实战案例

某电信软件有一个彩信短号模块。其主要功能是实现手机用户给其它手机用户发送彩信时,接收方号码可以填写为对方的短号。例如,用户13612345678给其同事13787654321发送彩信时,可以将接收方号码填写为对方的短号,如776,而非其真实的号码。

该模块处理其接收到的下发彩信请求的一个关键操作是查询数据库以获得接收方短号对应的真实号码(长号)。该操作可能因为数据库故障而失败,从而使整个请求无法继续被处理。而数据库故障是可恢复的故障,因此在短号转换为长号的过程中如果出现数据库异常,可以先将整个下发彩信请求消息缓存到磁盘中,等到数据库恢复后,再从磁盘中读取请求消息,进行重试。为方便起见,我们可以通过Java的对象序列化API,将表示下发彩信的对象序列化到磁盘文件中从而实现请求缓存。下面我们讨论这个请求缓存操作还需要考虑的其它因素,以及Active Object模式如何帮助我们满足这些考虑。

首先,请求消息缓存到磁盘中涉及文件I/O这种慢的操作,我们不希望它在请求处理的主线程(即Web服务器的工作线程)中执行。因为这样会使该模块的响应延时增大,降低系统的响应性。并使得Web服务器的工作线程因等待文件I/O而降低了系统的吞吐量。这时,异步处理就派上用场了。Active Object模式可以帮助我们实现请求缓存这个任务的提交和执行分离:任务的提交是在Web服务器的工作线程中完成,而任务的执行(包括序列化对象到磁盘文件中等操作)则是在Active Object工作线程中执行。这样,请求处理的主线程在侦测到短号转长号失败时即可以触发对当前彩信下发请求进行缓存,接着继续其请求处理,如给客户端响应。而此时,当前请求消息可能正在被Active Object线程缓存到文件中。如图4所示。

图 4 .异步实现缓存

技术分享

其次,每个短号转长号失败的彩信下发请求消息会被缓存为一个磁盘文件。但我们不希望这些缓存文件被存在同一个子目录下。而是希望多个缓存文件会被存储到多个子目录中。每个子目录最多可以存储指定个数(如2000个)的缓存文件。若当前子目录已存满,则新建一个子目录存放新的缓存文件,直到该子目录也存满,依此类推。当这些子目录的个数到达指定数量(如100个)时,最老的子目录(连同其下的缓存文件,如果有的话)会被删除。从而保证子目录的个数也是固定的。显然,在并发环境下,实现这种控制需要一些并发访问控制(如通过锁来控制),但是我们不希望这种控制暴露给处理请求的其它代码。而Active Object模式中的Proxy参与者可以帮助我们封装并发访问控制。

下面,我们看该案例的相关代码通过应用Active Object模式在实现缓存功能时满足上述两个目标。首先看请求处理的入口类。该类就是本案例的Active Object模式的客调用方代码。如清单2所示。

清单 2. 彩信下发请求处理的入口类

技术分享
public class MMSDeliveryServlet extends HttpServlet {

    private static final long serialVersionUID = 5886933373599895099L;

    @Override
    public void doPost(HttpServletRequest req, HttpServletResponse resp)
            throws ServletException, IOException {
        //将请求中的数据解析为内部对象
        MMSDeliverRequest mmsDeliverReq = this.parseRequest(req.getInputStream());
        Recipient shortNumberRecipient = mmsDeliverReq.getRecipient();
        Recipient originalNumberRecipient = null;

        try {
            // 将接收方短号转换为长号
            originalNumberRecipient = convertShortNumber(shortNumberRecipient);
        } catch (SQLException e) {

            // 接收方短号转换为长号时发生数据库异常,触发请求消息的缓存
            AsyncRequestPersistence.getInstance().store(mmsDeliverReq);

            // 继续对当前请求的其它处理,如给客户端响应
            resp.setStatus(202);
        }

    }

    private MMSDeliverRequest parseRequest(InputStream reqInputStream) {
        MMSDeliverRequest mmsDeliverReq = new MMSDeliverRequest();
        //省略其它代码
        return mmsDeliverReq;
    }

    private Recipient convertShortNumber(Recipient shortNumberRecipient)
            throws SQLException {
        Recipient recipent = null;
        //省略其它代码
        return recipent;
    }

}
View Code

清单2中的doPost方法在侦测到短号转换过程中发生的数据库异常后,通过调用AsyncRequestPersistence类的store方法触发对彩信下发请求消息的缓存。这里,AsyncRequestPersistence类相当于Active Object模式中的Proxy参与者。尽管本案例涉及的是一个并发环境,但从清单2中的代码可见,AsyncRequestPersistence类的调用方代码无需处理多线程同步问题。这是因为多线程同步问题被封装在AsyncRequestPersistence类之后。

AsyncRequestPersistence类的代码如清单3所示。

清单 3. 彩信下发请求缓存入口类(Active Object模式的Proxy)

技术分享
// ActiveObjectPattern.Proxy
public class AsyncRequestPersistence implements RequestPersistence {
    private static final long ONE_MINUTE_IN_SECONDS = 60;
    private final Logger logger;
    private final AtomicLong taskTimeConsumedPerInterval = new AtomicLong(0);
    private final AtomicInteger requestSubmittedPerIterval = new AtomicInteger(0);
    
    // ActiveObjectPattern.Servant
    private final DiskbasedRequestPersistence 
                        delegate = new DiskbasedRequestPersistence();
    // ActiveObjectPattern.Scheduler
    private final ThreadPoolExecutor scheduler;

    private static class InstanceHolder {
        final static RequestPersistence INSTANCE = new AsyncRequestPersistence();
    }

    private AsyncRequestPersistence() {
        logger = Logger.getLogger(AsyncRequestPersistence.class);
        scheduler = new ThreadPoolExecutor(1, 3, 
                60 * ONE_MINUTE_IN_SECONDS,
                TimeUnit.SECONDS,
                // ActiveObjectPattern.ActivationQueue
                new LinkedBlockingQueue(200), 
                new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        Thread t;
                        t = new Thread(r, "AsyncRequestPersistence");
                        return t;
                    }

                });

        scheduler.setRejectedExecutionHandler(
                new ThreadPoolExecutor.DiscardOldestPolicy());

        // 启动队列监控定时任务
        Timer monitorTimer = new Timer(true);
        monitorTimer.scheduleAtFixedRate(
            new TimerTask() {

            @Override
            public void run() {
                if (logger.isInfoEnabled()) {

                    logger.info("task count:" 
                            + requestSubmittedPerIterval
                            + ",Queue size:" 
                            + scheduler.getQueue().size()
                            + ",taskTimeConsumedPerInterval:"
                            + taskTimeConsumedPerInterval.get() 
                            + " ms");
                }

                taskTimeConsumedPerInterval.set(0);
                requestSubmittedPerIterval.set(0);

            }
        }, 0, ONE_MINUTE_IN_SECONDS * 1000);
    }

    public static RequestPersistence getInstance() {
        return InstanceHolder.INSTANCE;
    }

    @Override
    public void store(final MMSDeliverRequest request) {
        /*
         * 将对store方法的调用封装成MethodRequest对象, 并存入缓冲区。
         */
        // ActiveObjectPattern.MethodRequest
        Callable methodRequest = new Callable() {
            @Override
            public Boolean call() throws Exception {
                long start = System.currentTimeMillis();
                try {
                    delegate.store(request);
                } finally {
                    taskTimeConsumedPerInterval.addAndGet(
                            System.currentTimeMillis() - start);
                }

                return Boolean.TRUE;
            }

        };
        scheduler.submit(methodRequest);

        requestSubmittedPerIterval.incrementAndGet();
    }

}
View Code

AsyncRequestPersistence类所实现的接口RequestPersistence定义了Active Object对外暴露的异步方法:store方法。由于本案例不关心请求缓存的结果,故该方法没有返回值。其代码如清单4所示。

清单 4. RequestPersistence接口源码

public interface RequestPersistence {
     void store(MMSDeliverRequest request);
}

 

AsyncRequestPersistence类的实例变量scheduler相当于Active Object模式中的Scheduler参与者实例。这里我们直接使用了JDK1.5引入的Executor Framework中的ThreadPoolExecutor。在ThreadPoolExecutor类的实例化时,其构造器的第5个参数(BlockingQueue<Runnable> workQueue)我们指定了一个有界阻塞队列:new LinkedBlockingQueue<Runnable>(200)。该队列相当于Active Object模式中的ActivationQueue参与者实例。

AsyncRequestPersistence类的实例变量delegate相当于Active Object模式中的Servant参与者实例。

AsyncRequestPersistence类的store方法利用匿名类生成一个java.util.concurrent.Callable实例methodRequest。该实例相当于Active Object模式中的MethodRequest参与者实例。利用闭包(Closure),该实例封装了对store方法调用的上下文信息(包括调用参数、所调用的方法对应的操作信息)。AsyncRequestPersistence类的store方法通过调用scheduler的submit方法,将methodRequest送入ThreadPoolExecutor所维护的缓冲区(阻塞队列)中。确切地说,ThreadPoolExecutor是Scheduler参与者的一个“近似”实现。ThreadPoolExecutor的submit方法相对于Scheduler的enqueue方法,该方法用于接纳MethodRequest对象,以将其存入缓冲区。当ThreadPoolExecutor当前使用的线程数量小于其核心线程数量时,submit方法所接收的任务会直接被新建的线程执行。当ThreadPoolExecutor当前使用的线程数量大于其核心线程数时,submit方法所接收的任务才会被存入其维护的阻塞队列中。不过,ThreadPoolExecutor的这种任务处理机制,并不妨碍我们将它用作Scheduler的实现。

methodRequest的call方法会调用delegate的store方法来真正实现请求缓存功能。delegate实例对应的类DiskbasedRequestPersistence是请求消息缓存功能的真正实现者。其代码如清单5所示。

清单 5. DiskbasedRequestPersistence类的源码

技术分享
public class DiskbasedRequestPersistence implements RequestPersistence {
    // 负责缓存文件的存储管理
    private final SectionBasedDiskStorage storage = new SectionBasedDiskStorage();
    private final Logger logger = Logger
                                     .getLogger(DiskbasedRequestPersistence.class);

    @Override
    public void store(MMSDeliverRequest request) {
        // 申请缓存文件的文件名
        String[] fileNameParts = storage.apply4Filename(request);
        File file = new File(fileNameParts[0]);
        try {
            ObjectOutputStream objOut = new ObjectOutputStream(
            new FileOutputStream(file));
            try {
                objOut.writeObject(request);
            } finally {
                objOut.close();
            }
        } catch (FileNotFoundException e) {
            storage.decrementSectionFileCount(fileNameParts[1]);
            logger.error("Failed to store request", e);
        } catch (IOException e) {
            storage.decrementSectionFileCount(fileNameParts[1]);
            logger.error("Failed to store request", e);
        }

    }

    class SectionBasedDiskStorage {
        private Deque sectionNames = new LinkedList();
        /*
         * Key->value: 存储子目录名->子目录下缓存文件计数器
         */
        private Map sectionFileCountMap 
                        = new HashMap();
        private int maxFilesPerSection = 2000;
        private int maxSectionCount = 100;
        private String storageBaseDir = System.getProperty("user.dir") + "/vpn";

        private final Object sectionLock = new Object();

        public String[] apply4Filename(MMSDeliverRequest request) {
            String sectionName;
            int iFileCount;
            boolean need2RemoveSection = false;
            String[] fileName = new String[2];
            synchronized (sectionLock) {
                //获取当前的存储子目录名
                sectionName = this.getSectionName();
                AtomicInteger fileCount;
                fileCount = sectionFileCountMap.get(sectionName);
                iFileCount = fileCount.get();
                //当前存储子目录已满
                if (iFileCount >= maxFilesPerSection) {
                    if (sectionNames.size() >= maxSectionCount) {
                        need2RemoveSection = true;
                    }
                    //创建新的存储子目录
                    sectionName = this.makeNewSectionDir();
                    fileCount = sectionFileCountMap.get(sectionName);

                }
                iFileCount = fileCount.addAndGet(1);

            }

            fileName[0] = storageBaseDir + "/" + sectionName + "/"
                + new DecimalFormat("0000").format(iFileCount) + "-"
                + request.getTimeStamp().getTime() / 1000 + "-" 
                               + request.getExpiry()
                + ".rq";
            fileName[1] = sectionName;

            if (need2RemoveSection) {
                //删除最老的存储子目录
                String oldestSectionName = sectionNames.removeFirst();
                this.removeSection(oldestSectionName);
            }

            return fileName;
        }

        public void decrementSectionFileCount(String sectionName) {
            AtomicInteger fileCount = sectionFileCountMap.get(sectionName);
            if (null != fileCount) {
                fileCount.decrementAndGet();
            }
        }

        private boolean removeSection(String sectionName) {
            boolean result = true;
            File dir = new File(storageBaseDir + "/" + sectionName);
            for (File file : dir.listFiles()) {
                result = result && file.delete();
            }
            result = result && dir.delete();
            return result;
        }

        private String getSectionName() {
            String sectionName;

            if (sectionNames.isEmpty()) {
                sectionName = this.makeNewSectionDir();

            } else {
                sectionName = sectionNames.getLast();
            }

            return sectionName;
        }

        private String makeNewSectionDir() {
            String sectionName;
            SimpleDateFormat sdf = new SimpleDateFormat("MMddHHmmss");
            sectionName = sdf.format(new Date());
            File dir = new File(storageBaseDir + "/" + sectionName);
            if (dir.mkdir()) {
                sectionNames.addLast(sectionName);
                sectionFileCountMap.put(sectionName, new AtomicInteger(0));
            } else {
                throw new RuntimeException(
                                 "Cannot create section dir " + sectionName);
            }

            return sectionName;
        }
    }
}
View Code

methodRequest的call方法的调用者代码是运行在ThreadPoolExecutor所维护的工作者线程中,这就保证了store方法的调用方和真正的执行方是分别运行在不同的线程中:服务器工作线程负责触发请求消息缓存,ThreadPoolExecutor所维护的工作线程负责将请求消息序列化到磁盘文件中。

DiskbasedRequestPersistence类的store方法中调用的SectionBasedDiskStorage类的apply4Filename方法包含了一些多线程同步控制代码(见清单5)。这部分控制由于是封装在DiskbasedRequestPersistence的内部类中,对于该类之外的代码是不可见的。因此,AsyncRequestPersistence的调用方代码无法知道该细节,这体现了Active Object模式对并发访问控制的封装。

小结

本篇介绍了Active Object模式的意图及架构,并以一个实际的案例展示了该模式的代码实现。下篇将对Active Object模式进行评价,并结合本文案例介绍实际运用Active Object模式时需要注意的一些事项。

 

Java多线程编程模式实战指南一:Active Object模式(上)