首页 > 代码库 > Motan在服务provider端用于处理request的线程池
Motan在服务provider端用于处理request的线程池
最近开始重新看motan的源码,打算花一年的时间来分析每个模块每个功能的代码实现,坚持写一个motan分析系列。
因为没有思路,只能一个片段一个片段的看,等到有了一定的积累,再将看过的代码串起来一起分析,形成完整的思路。
第一篇是要回答自己的一个疑问,motan在服务provider端的线程模型是什么?request到达服务provider端之后,被哪个线程处理?
motan的tcp通信框架用的是netty,netty的线程模型是reactor模型。由一个Acceptor线程负责channel的接入,然后交给reactor线程来负责这个channel的读写事件。
所以一个request到达provider端,首先是由reactor线程来处理,进行解码,解码成java的request对象。这个时候就有一个问题,一般我们的服务都是要访问数据库等资源的,会存在IO的阻塞,如果我们直接在reactor线程处理request请求,就会阻塞住reactor线程,使得reactor无法处理其他channel的读写事件,也就无法达到高的并发。因此我们使用一个线程池来处理request。
motan的代码:
public class NettyChannelHandler extends SimpleChannelHandler { private ThreadPoolExecutor threadPoolExecutor; ...... private void processRequest(final ChannelHandlerContext ctx, MessageEvent e) { final Request request = (Request) e.getMessage(); request.setAttachment(URLParamType.host.getName(), NetUtils.getHostName(ctx.getChannel().getRemoteAddress())); final long processStartTime = System.currentTimeMillis(); // 使用线程池方式处理 try { threadPoolExecutor.execute(new Runnable() { @Override public void run() { processRequest(ctx, request, processStartTime); } }); } catch (RejectedExecutionException rejectException) { DefaultResponse response = new DefaultResponse(); response.setRequestId(request.getRequestId()); response.setException(new MotanServiceException("process thread pool is full, reject", MotanErrorMsgConstant.SERVICE_REJECT)); response.setProcessTime(System.currentTimeMillis() - processStartTime); e.getChannel().write(response); LoggerUtil .debug("process thread pool is full, reject, active={} poolSize={} corePoolSize={} maxPoolSize={} taskCount={} requestId={}", threadPoolExecutor.getActiveCount(), threadPoolExecutor.getPoolSize(), threadPoolExecutor.getCorePoolSize(), threadPoolExecutor.getMaximumPoolSize(), threadPoolExecutor.getTaskCount(), request.getRequestId()); } }
它的流程是这样:
Motan在处理request的时候并没有直接使用JDK的线程,而是继承ThreadPoolExecutor进行了自定义实现。
StandardThreadExecutor 的实现代码:
public class StandardThreadExecutor extends ThreadPoolExecutor { public static final int DEFAULT_MIN_THREADS = 20; public static final int DEFAULT_MAX_THREADS = 200; public static final int DEFAULT_MAX_IDLE_TIME = 60 * 1000; // 1 minutes protected AtomicInteger submittedTasksCount; // 正在处理的任务数 private int maxSubmittedTaskCount; // 最大允许同时处理的任务数 ....... public StandardThreadExecutor(int coreThreads, int maxThreads, long keepAliveTime, TimeUnit unit, int queueCapacity, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(coreThreads, maxThreads, keepAliveTime, unit, new ExecutorQueue(), threadFactory, handler); ((ExecutorQueue) getQueue()).setStandardThreadExecutor(this); submittedTasksCount = new AtomicInteger(0); // 最大并发任务限制: 队列buffer数 + 最大线程数 maxSubmittedTaskCount = queueCapacity + maxThreads; } public void execute(Runnable command) { int count = submittedTasksCount.incrementAndGet(); // 超过最大的并发任务限制,进行 reject // 依赖的LinkedTransferQueue没有长度限制,因此这里进行控制 if (count > maxSubmittedTaskCount) { submittedTasksCount.decrementAndGet(); getRejectedExecutionHandler().rejectedExecution(command, this); } try { super.execute(command); } catch (RejectedExecutionException rx) { // there could have been contention around the queue if (!((ExecutorQueue) getQueue()).force(command)) { submittedTasksCount.decrementAndGet(); getRejectedExecutionHandler().rejectedExecution(command, this); } } } public int getSubmittedTasksCount() { return this.submittedTasksCount.get(); } public int getMaxSubmittedTaskCount() { return maxSubmittedTaskCount; } protected void afterExecute(Runnable r, Throwable t) { submittedTasksCount.decrementAndGet(); } }
这里是重写了ThreadPoolExecutor的execute和afterExecute方法。一个需要非常注意的地方是使用了一个ExecutorQueue作为BlockingQueue.
我们来看ExecutorQueue的代码实现:
/** * LinkedTransferQueue 能保证更高性能,相比与LinkedBlockingQueue有明显提升 * * 1) 不过LinkedTransferQueue的缺点是没有队列长度控制,需要在外层协助控制 */ class ExecutorQueue extends LinkedTransferQueue<Runnable> { private static final long serialVersionUID = -265236426751004839L; StandardThreadExecutor threadPoolExecutor; public ExecutorQueue() { super(); } public void setStandardThreadExecutor(StandardThreadExecutor threadPoolExecutor) { this.threadPoolExecutor = threadPoolExecutor; } // 注:代码来源于 tomcat public boolean force(Runnable o) { if (threadPoolExecutor.isShutdown()) { throw new RejectedExecutionException("Executor not running, can‘t force a command into the queue"); } // forces the item onto the queue, to be used if the task is rejected return super.offer(o); } // 注:tomcat的代码进行一些小变更 public boolean offer(Runnable o) { int poolSize = threadPoolExecutor.getPoolSize(); // we are maxed out on threads, simply queue the object if (poolSize == threadPoolExecutor.getMaximumPoolSize()) { return super.offer(o); } // we have idle threads, just add it to the queue // note that we don‘t use getActiveCount(), see BZ 49730 if (threadPoolExecutor.getSubmittedTasksCount() <= poolSize) { return super.offer(o); } // if we have less threads than maximum force creation of a new // thread if (poolSize < threadPoolExecutor.getMaximumPoolSize()) { return false; } // if we reached here, we need to add it to the queue return super.offer(o); } }
ExecutorQueue实现了LinkedTransferQueue,主要是LinkedTransferQueue相比LinkedBlockingQueue等的队列有很大的性能提高。它的缺点是没有队列长度控制,容易发生内存溢出。所以motan的代码中,在execute(Runnable r)中对提交的任务数加一,在afterExecute中对提交的任务数减一,维护了一个正在运行的任务数,同时有一个最大任务数的限制。在提交任务的时候,如果任务数超过了最大任务数,对这个任务执行拒绝策略,以此实现了队列长度的控制。
另外在ExecutorQueue的实现中重写了offer方法,因为LinkedBlockingQueue的offer方法总是返回true,所以线程池中的线程数不会超过minThread。Motan的改动是在提交的任务数超过poolSize,而poolSize小于最大任务数的时候返回false,让executor创建线程。
最后用两张图来总结JDK的线程池与Motan线程池的执行流程:
Motan在服务provider端用于处理request的线程池