首页 > 代码库 > Spring AOP中提供的种种Aspects - 并发控制
Spring AOP中提供的种种Aspects - 并发控制
本文继续讨论ConcurrencyThrottleInterceptor(基于Spring 4.3.7)。以及上一篇文章中遗留的一个关于SimpleAsyncTaskExecutor类中属性concurrencyLimit的问题。
这些都和并发控制相关。但是这里需要事先说明的一点是,这些类和实现的年代都比较久远了,比如ConcurrencyThrottleInterceptor是在2004年的Spring 1.x中就存在了,那个年代还没有JDK中的java.util.concurrent并发包。因此这里更多地是学习和讨论一个解决特定问题的思想,而不是鼓励大家去使用它。对于并发控制的问题,利用并发包中的相关类可以更好地解决。
首先还是按照惯例画出关键类型之间的示意图:
并发控制是如何实现的
ConcurrencyThrottleInterceptor
/**
* Interceptor that throttles concurrent access, blocking invocations if a specified concurrency
* limit is reached.
*
* <p>
* Can be applied to methods of local services that involve heavy use of system resources, in a
* scenario where it is more efficient to throttle concurrency for a specific service rather than
* restricting the entire thread pool (e.g. the web container‘s thread pool).
*
* <p>
* The default concurrency limit of this interceptor is 1. Specify the "concurrencyLimit" bean
* property to change this value.
*
* @author Juergen Hoeller
* @since 11.02.2004
* @see #setConcurrencyLimit
*/
@SuppressWarnings("serial")
public class ConcurrencyThrottleInterceptor extends ConcurrencyThrottleSupport
implements MethodInterceptor, Serializable {
public ConcurrencyThrottleInterceptor() {
setConcurrencyLimit(1);
}
@Override
public Object invoke(MethodInvocation methodInvocation) throws Throwable {
beforeAccess();
try {
return methodInvocation.proceed();
} finally {
afterAccess();
}
}
}
这个类型的实现也挺简洁的,它继承了ConcurrencyThrottleSupport,实现了MethodInterceptor和Serializable接口。而其中的invoke方法主要就是为了实现MethodInterceptor接口。
它想要完成的功能注释中也说的比较明白了:对于目标方法的调用实现并发控制,通过concurrencyLimit来定义并发度。
从上述invoke方法的实现来看,主要的控制逻辑应该都在beforeAccess这个方法的实现中,它定义在父类ConcurrencyThrottleSupport:
protected void beforeAccess() {
if (this.concurrencyLimit == NO_CONCURRENCY) {
throw new IllegalStateException(
"Currently no invocations allowed - concurrency limit set to NO_CONCURRENCY");
}
if (this.concurrencyLimit > 0) {
boolean debug = logger.isDebugEnabled();
synchronized (this.monitor) {
boolean interrupted = false;
while (this.concurrencyCount >= this.concurrencyLimit) {
if (interrupted) {
throw new IllegalStateException(
"Thread was interrupted while waiting for invocation access, "
+ "but concurrency limit still does not allow for entering");
}
if (debug) {
logger.debug("Concurrency count " + this.concurrencyCount + " has reached limit "
+ this.concurrencyLimit + " - blocking");
}
try {
this.monitor.wait();
} catch (InterruptedException ex) {
// Re-interrupt current thread, to allow other threads to react.
Thread.currentThread().interrupt();
interrupted = true;
}
}
if (debug) {
logger.debug("Entering throttle at concurrency count " + this.concurrencyCount);
}
this.concurrencyCount++;
}
}
}
首先会判断并发度是否被设置成了NO_CONCURRENCY(0),即不允许任何执行。如果是这样的话就会直接抛出异常进行提示。
当concurrencyLimit(也就是并发度)大于0的时候,会在monitor这个对象上设置一个同步代码块。这个同步代码块应用的是最底层的wait/nofity机制来实现并发控制,也算是一个wait/notify工作机制的参考实例吧,它非常具体的说明了在使用这个机制的时候所需要注意的几个点:
- 需要在一个while循环中进行wait操作,当然这个不是必要的,不过是一种最佳实践。while的循环条件是因具体的业务而异的,这个条件的作用是保证当前被阻塞的线程真的能够再次开始执行,放到上面这个例子中,就是只有在当前的并发量小于阈值(也就是concurrencyLimit)的时候,才能够唤醒被阻塞的线程。因此在多线程环境中,一切皆有可能,某次唤醒后,在其他运行线程的影响下,本来被满足的条件再次变为不满足的状态。
- 在同步代码块中进行wait操作。从上面的实现来看,while循环确实也处于一个同步代码块中。这样做的目的同样是为了保证唤醒消息的准确性。
- 同步代码块的监视器对象应该和wait方法的调用对象一致,如上述代码中的monitor对象。
在方法的最后,增加了计数器concurrencyCount的值,用来表示当前的并发量。
完成了beforeAccess方法的调用后,会执行目标方法:return methodInvocation.proceed();
。执行完毕后在finally代码块中调用afterAccess方法:
protected void afterAccess() {
if (this.concurrencyLimit >= 0) {
synchronized (this.monitor) {
this.concurrencyCount--;
if (logger.isDebugEnabled()) {
logger.debug("Returning from throttle at concurrency count " + this.concurrencyCount);
}
this.monitor.notify();
}
}
}
这个方法的作用很简单,就是在目标方法执行完毕之后减少当前的并发计数器,并唤醒被阻塞的线程。这里需要注意的是,唤醒被阻塞线程的notify操作也是在同一个监视器对象的同步代码块中实现的。
了解了重要方法的实现之后,这个Interceptor的作用也就非常清晰了。比如设置了并发度为3,那么目标方法就最多只有三个线程能够同时访问,当第四个线程尝试进行访问的时候会在wait处被阻塞,直到前面的三个线程中有一个执行完毕才会唤醒一个被阻塞的线程。
应用实例
Advisor的定义
下面写一个简单的例子来应用ConcurrencyThrottleInterceptor,首先是定义Pointcut以及Interceptor本身:
@Bean
public Advisor throttleAdvisor() {
AspectJExpressionPointcut pointcut = new AspectJExpressionPointcut();
pointcut.setExpression(
"execution(* (@org.springframework.stereotype.Service *).doThrottleThings(..))");
return new DefaultPointcutAdvisor(pointcut, concurrencyThrottleInterceptor());
}
@Bean
public ConcurrencyThrottleInterceptor concurrencyThrottleInterceptor() {
ConcurrencyThrottleInterceptor interceptor = new ConcurrencyThrottleInterceptor();
interceptor.setConcurrencyLimit(3);
return interceptor;
}
这里完全使用了JavaConfig的方式进行配置。第一个方法throttleAdvisor声明的实际上是一个完整的Aspect,包含了两部分:
- Pointcut
- Interceptor
目标方法的定义
public void doThrottleThings() throws InterruptedException {
System.out.println(Thread.currentThread().getName() + ": is doing something, needing 5s");
Thread.sleep(5000);
}
为了验证并发控制是否生效,首先会打印出当前线程的名称,然后睡眠5秒。
启动方法的定义
直接使用JUnit帮助方法的启动:
@Test
public void testConcurrencyThrottle() throws InterruptedException {
IntStream.range(0, 5).forEach(i -> {
new Thread(() -> {
try {
service.doThrottleThings();
} catch (Exception e) {
e.printStackTrace();
}
}).start();
});
Thread.sleep(10000);
}
在最后睡眠10s是为了让该测试方法不会退出。因为它毕竟不是main方法,JUnit会在测试方法完成之后结束掉所有线程,然后是关闭JVM。
最后的打印结果是这样的:
[Thread-4] DEBUG o.s.a.i.ConcurrencyThrottleInterceptor - Entering throttle at concurrency count 0
[Thread-5] DEBUG o.s.a.i.ConcurrencyThrottleInterceptor - Entering throttle at concurrency count 1
[Thread-6] DEBUG o.s.a.i.ConcurrencyThrottleInterceptor - Entering throttle at concurrency count 2
[Thread-3] DEBUG o.s.a.i.ConcurrencyThrottleInterceptor - Concurrency count 3 has reached limit 3 - blocking
[Thread-2] DEBUG o.s.a.i.ConcurrencyThrottleInterceptor - Concurrency count 3 has reached limit 3 - blocking
Thread-4: is doing something, needing 5s
Thread-5: is doing something, needing 5s
Thread-6: is doing something, needing 5s
[Thread-4] DEBUG o.s.a.i.ConcurrencyThrottleInterceptor - Returning from throttle at concurrency count 2
[Thread-3] DEBUG o.s.a.i.ConcurrencyThrottleInterceptor - Entering throttle at concurrency count 2
Thread-3: is doing something, needing 5s
[Thread-6] DEBUG o.s.a.i.ConcurrencyThrottleInterceptor - Returning from throttle at concurrency count 2
[Thread-5] DEBUG o.s.a.i.ConcurrencyThrottleInterceptor - Returning from throttle at concurrency count 1
[Thread-2] DEBUG o.s.a.i.ConcurrencyThrottleInterceptor - Entering throttle at concurrency count 1
Thread-2: is doing something, needing 5s
可以清晰地发现当并发量达到3之后,剩下的两个线程会被阻塞。等待5s之后这两个线程被唤醒。
SimpleAsyncTaskExecutor中的并发控制
清楚了ConcurrencyThrottleInterceptor是如何处理并发控制之后,让我们转过头来看看SimpleAsyncTaskExecutor中的并发控制,在这个类中有这么一个成员对象和两个常量:
/**
* Permit any number of concurrent invocations: that is, don‘t throttle concurrency.
*/
public static final int UNBOUNDED_CONCURRENCY = ConcurrencyThrottleSupport.UNBOUNDED_CONCURRENCY;
/**
* Switch concurrency ‘off‘: that is, don‘t allow any concurrent invocations.
*/
public static final int NO_CONCURRENCY = ConcurrencyThrottleSupport.NO_CONCURRENCY;
/** Internal concurrency throttle used by this executor */
private final ConcurrencyThrottleAdapter concurrencyThrottle = new ConcurrencyThrottleAdapter();
也就是说在内部它使用了一个名为ConcurrencyThrottleAdapter的并发控制对象,并且复用了ConcurrencyThrottleSupport中定义的两个常量,用来表示不限制并发度和完全不允许任何执行(哪怕串行也不允许了):
/**
* Subclass of the general ConcurrencyThrottleSupport class, making {@code beforeAccess()} and
* {@code afterAccess()} visible to the surrounding class.
*/
private static class ConcurrencyThrottleAdapter extends ConcurrencyThrottleSupport {
@Override
protected void beforeAccess() {
super.beforeAccess();
}
@Override
protected void afterAccess() {
super.afterAccess();
}
}
这个类的目的就是为了让SimpleAsyncTaskExecutor能够访问到定义在ConcurrencyThrottleSupport中的beforeAccess以及afterAccess两个方法。
那么在什么时候会用到这个adapter呢?
@Override
public void execute(Runnable task, long startTimeout) {
Assert.notNull(task, "Runnable must not be null");
Runnable taskToUse = (this.taskDecorator != null ? this.taskDecorator.decorate(task) : task);
if (isThrottleActive() && startTimeout > TIMEOUT_IMMEDIATE) {
this.concurrencyThrottle.beforeAccess();
doExecute(new ConcurrencyThrottlingRunnable(taskToUse));
} else {
doExecute(taskToUse);
}
}
上述代码是定义在SimpleAsyncTaskExecutor中的execute方法。这里面有三个问题值得探究:
- taskDecorator是个啥?
- 何时执行并发控制以及什么任务会被归并发控制?
- ConcurrencyThrottlingRunnable是干嘛用的?
TaskDecorator
首先第一个问题:
TaskDecorator是个接口,抽象了装饰器模式的概念:
public interface TaskDecorator {
/**
* Decorate the given {@code Runnable}, returning a potentially wrapped
* {@code Runnable} for actual execution.
* @param runnable the original {@code Runnable}
* @return the decorated {@code Runnable}
*/
Runnable decorate(Runnable runnable);
}
Spring中并没有这个接口的任何实现,因此这个想象空间就留个各位开发人员了,不过从Javadoc中的说明可以大概明白它的意图,比如:
- 设置任务执行的上下文环境
- 进行某些监控以及统计方面的工作
并发控制的执行时机
首先需要满足的条件就是isThrottleActive()返回true:
public boolean isThrottleActive() {
return (this.concurrencyLimit > 0);
}
即设定的并发度大于0时就被认为是开启了Throttling功能。
同时还需要满足的条件是startTimeout > TIMEOUT_IMMEDIATE
。后面的常量的值是0。也就是说如何任务的timeout值被设置成TIMEOUT_IMMEDIATE的话,这种任务是不属于并发控制的范畴的。(另外这个timeout除了和TIMEOUT_IMMEDIATE进行了比较外,没有其它的用途了,这一点我觉得有点莫名其妙。)
ConcurrencyThrottlingRunnable
最终,会将任务封装在一个ConcurrencyThrottlingRunnable对象中,然后执行该wrapper对象。
/**
* This Runnable calls {@code afterAccess()} after the target Runnable has finished its execution.
*/
private class ConcurrencyThrottlingRunnable implements Runnable {
private final Runnable target;
public ConcurrencyThrottlingRunnable(Runnable target) {
this.target = target;
}
@Override
public void run() {
try {
this.target.run();
} finally {
concurrencyThrottle.afterAccess();
}
}
}
这样做也仅仅是为了在finally中执行afterAccess配合并发控制的资源释放这一过程。
总结
本文讨论两个方面的内容:
- ConcurrencyThrottleInterceptor的实现原理
- SimpleAsyncTaskExecutor是如何实现并发控制的
至此,Spring AOP中的几个比较实用的Aspects(Interceptors)就都介绍完毕了。
Spring AOP中提供的种种Aspects - 并发控制