首页 > 代码库 > Spring AOP中提供的种种Aspects - 并发控制

Spring AOP中提供的种种Aspects - 并发控制

本文继续讨论ConcurrencyThrottleInterceptor(基于Spring 4.3.7)。以及上一篇文章中遗留的一个关于SimpleAsyncTaskExecutor类中属性concurrencyLimit的问题。

这些都和并发控制相关。但是这里需要事先说明的一点是,这些类和实现的年代都比较久远了,比如ConcurrencyThrottleInterceptor是在2004年的Spring 1.x中就存在了,那个年代还没有JDK中的java.util.concurrent并发包。因此这里更多地是学习和讨论一个解决特定问题的思想,而不是鼓励大家去使用它。对于并发控制的问题,利用并发包中的相关类可以更好地解决。

首先还是按照惯例画出关键类型之间的示意图:

技术分享

并发控制是如何实现的

ConcurrencyThrottleInterceptor

/**
 * Interceptor that throttles concurrent access, blocking invocations if a specified concurrency
 * limit is reached.
 *
 * <p>
 * Can be applied to methods of local services that involve heavy use of system resources, in a
 * scenario where it is more efficient to throttle concurrency for a specific service rather than
 * restricting the entire thread pool (e.g. the web container‘s thread pool).
 *
 * <p>
 * The default concurrency limit of this interceptor is 1. Specify the "concurrencyLimit" bean
 * property to change this value.
 *
 * @author Juergen Hoeller
 * @since 11.02.2004
 * @see #setConcurrencyLimit
 */
@SuppressWarnings("serial")
public class ConcurrencyThrottleInterceptor extends ConcurrencyThrottleSupport
    implements MethodInterceptor, Serializable {

  public ConcurrencyThrottleInterceptor() {
    setConcurrencyLimit(1);
  }

  @Override
  public Object invoke(MethodInvocation methodInvocation) throws Throwable {
    beforeAccess();
    try {
      return methodInvocation.proceed();
    } finally {
      afterAccess();
    }
  }

}

这个类型的实现也挺简洁的,它继承了ConcurrencyThrottleSupport,实现了MethodInterceptor和Serializable接口。而其中的invoke方法主要就是为了实现MethodInterceptor接口。

它想要完成的功能注释中也说的比较明白了:对于目标方法的调用实现并发控制,通过concurrencyLimit来定义并发度。

从上述invoke方法的实现来看,主要的控制逻辑应该都在beforeAccess这个方法的实现中,它定义在父类ConcurrencyThrottleSupport:

protected void beforeAccess() {
  if (this.concurrencyLimit == NO_CONCURRENCY) {
    throw new IllegalStateException(
        "Currently no invocations allowed - concurrency limit set to NO_CONCURRENCY");
  }
  if (this.concurrencyLimit > 0) {
    boolean debug = logger.isDebugEnabled();
    synchronized (this.monitor) {
      boolean interrupted = false;
      while (this.concurrencyCount >= this.concurrencyLimit) {
        if (interrupted) {
          throw new IllegalStateException(
              "Thread was interrupted while waiting for invocation access, "
                  + "but concurrency limit still does not allow for entering");
        }
        if (debug) {
          logger.debug("Concurrency count " + this.concurrencyCount + " has reached limit "
              + this.concurrencyLimit + " - blocking");
        }
        try {
          this.monitor.wait();
        } catch (InterruptedException ex) {
          // Re-interrupt current thread, to allow other threads to react.
          Thread.currentThread().interrupt();
          interrupted = true;
        }
      }
      if (debug) {
        logger.debug("Entering throttle at concurrency count " + this.concurrencyCount);
      }
      this.concurrencyCount++;
    }
  }
}

首先会判断并发度是否被设置成了NO_CONCURRENCY(0),即不允许任何执行。如果是这样的话就会直接抛出异常进行提示。

当concurrencyLimit(也就是并发度)大于0的时候,会在monitor这个对象上设置一个同步代码块。这个同步代码块应用的是最底层的wait/nofity机制来实现并发控制,也算是一个wait/notify工作机制的参考实例吧,它非常具体的说明了在使用这个机制的时候所需要注意的几个点:

  1. 需要在一个while循环中进行wait操作,当然这个不是必要的,不过是一种最佳实践。while的循环条件是因具体的业务而异的,这个条件的作用是保证当前被阻塞的线程真的能够再次开始执行,放到上面这个例子中,就是只有在当前的并发量小于阈值(也就是concurrencyLimit)的时候,才能够唤醒被阻塞的线程。因此在多线程环境中,一切皆有可能,某次唤醒后,在其他运行线程的影响下,本来被满足的条件再次变为不满足的状态。
  2. 在同步代码块中进行wait操作。从上面的实现来看,while循环确实也处于一个同步代码块中。这样做的目的同样是为了保证唤醒消息的准确性。
  3. 同步代码块的监视器对象应该和wait方法的调用对象一致,如上述代码中的monitor对象。

在方法的最后,增加了计数器concurrencyCount的值,用来表示当前的并发量。

完成了beforeAccess方法的调用后,会执行目标方法:return methodInvocation.proceed();。执行完毕后在finally代码块中调用afterAccess方法:

protected void afterAccess() {
  if (this.concurrencyLimit >= 0) {
    synchronized (this.monitor) {
      this.concurrencyCount--;
      if (logger.isDebugEnabled()) {
        logger.debug("Returning from throttle at concurrency count " + this.concurrencyCount);
      }
      this.monitor.notify();
    }
  }
}

这个方法的作用很简单,就是在目标方法执行完毕之后减少当前的并发计数器,并唤醒被阻塞的线程。这里需要注意的是,唤醒被阻塞线程的notify操作也是在同一个监视器对象的同步代码块中实现的。

了解了重要方法的实现之后,这个Interceptor的作用也就非常清晰了。比如设置了并发度为3,那么目标方法就最多只有三个线程能够同时访问,当第四个线程尝试进行访问的时候会在wait处被阻塞,直到前面的三个线程中有一个执行完毕才会唤醒一个被阻塞的线程。

应用实例

Advisor的定义

下面写一个简单的例子来应用ConcurrencyThrottleInterceptor,首先是定义Pointcut以及Interceptor本身:

@Bean
public Advisor throttleAdvisor() {
  AspectJExpressionPointcut pointcut = new AspectJExpressionPointcut();
  pointcut.setExpression(
      "execution(* (@org.springframework.stereotype.Service *).doThrottleThings(..))");

  return new DefaultPointcutAdvisor(pointcut, concurrencyThrottleInterceptor());
}

@Bean
public ConcurrencyThrottleInterceptor concurrencyThrottleInterceptor() {
  ConcurrencyThrottleInterceptor interceptor = new ConcurrencyThrottleInterceptor();
  interceptor.setConcurrencyLimit(3);
  return interceptor;
}

这里完全使用了JavaConfig的方式进行配置。第一个方法throttleAdvisor声明的实际上是一个完整的Aspect,包含了两部分:

  • Pointcut
  • Interceptor

目标方法的定义

public void doThrottleThings() throws InterruptedException {
    System.out.println(Thread.currentThread().getName() + ": is doing something, needing 5s");
    Thread.sleep(5000);
  }

为了验证并发控制是否生效,首先会打印出当前线程的名称,然后睡眠5秒。

启动方法的定义

直接使用JUnit帮助方法的启动:

@Test
public void testConcurrencyThrottle() throws InterruptedException {
  IntStream.range(0, 5).forEach(i -> {
    new Thread(() -> {
      try {
        service.doThrottleThings();
      } catch (Exception e) {
        e.printStackTrace();
      }
    }).start();
  });

  Thread.sleep(10000);
}

在最后睡眠10s是为了让该测试方法不会退出。因为它毕竟不是main方法,JUnit会在测试方法完成之后结束掉所有线程,然后是关闭JVM。

最后的打印结果是这样的:

[Thread-4] DEBUG o.s.a.i.ConcurrencyThrottleInterceptor - Entering throttle at concurrency count 0
[Thread-5] DEBUG o.s.a.i.ConcurrencyThrottleInterceptor - Entering throttle at concurrency count 1
[Thread-6] DEBUG o.s.a.i.ConcurrencyThrottleInterceptor - Entering throttle at concurrency count 2
[Thread-3] DEBUG o.s.a.i.ConcurrencyThrottleInterceptor - Concurrency count 3 has reached limit 3 - blocking
[Thread-2] DEBUG o.s.a.i.ConcurrencyThrottleInterceptor - Concurrency count 3 has reached limit 3 - blocking
Thread-4: is doing something, needing 5s
Thread-5: is doing something, needing 5s
Thread-6: is doing something, needing 5s
[Thread-4] DEBUG o.s.a.i.ConcurrencyThrottleInterceptor - Returning from throttle at concurrency count 2
[Thread-3] DEBUG o.s.a.i.ConcurrencyThrottleInterceptor - Entering throttle at concurrency count 2
Thread-3: is doing something, needing 5s
[Thread-6] DEBUG o.s.a.i.ConcurrencyThrottleInterceptor - Returning from throttle at concurrency count 2
[Thread-5] DEBUG o.s.a.i.ConcurrencyThrottleInterceptor - Returning from throttle at concurrency count 1
[Thread-2] DEBUG o.s.a.i.ConcurrencyThrottleInterceptor - Entering throttle at concurrency count 1
Thread-2: is doing something, needing 5s

可以清晰地发现当并发量达到3之后,剩下的两个线程会被阻塞。等待5s之后这两个线程被唤醒。

SimpleAsyncTaskExecutor中的并发控制

清楚了ConcurrencyThrottleInterceptor是如何处理并发控制之后,让我们转过头来看看SimpleAsyncTaskExecutor中的并发控制,在这个类中有这么一个成员对象和两个常量:

/**
 * Permit any number of concurrent invocations: that is, don‘t throttle concurrency.
 */
public static final int UNBOUNDED_CONCURRENCY = ConcurrencyThrottleSupport.UNBOUNDED_CONCURRENCY;

/**
 * Switch concurrency ‘off‘: that is, don‘t allow any concurrent invocations.
 */
public static final int NO_CONCURRENCY = ConcurrencyThrottleSupport.NO_CONCURRENCY;

/** Internal concurrency throttle used by this executor */
private final ConcurrencyThrottleAdapter concurrencyThrottle = new ConcurrencyThrottleAdapter();

也就是说在内部它使用了一个名为ConcurrencyThrottleAdapter的并发控制对象,并且复用了ConcurrencyThrottleSupport中定义的两个常量,用来表示不限制并发度和完全不允许任何执行(哪怕串行也不允许了):

/**
 * Subclass of the general ConcurrencyThrottleSupport class, making {@code beforeAccess()} and
 * {@code afterAccess()} visible to the surrounding class.
 */
private static class ConcurrencyThrottleAdapter extends ConcurrencyThrottleSupport {

  @Override
  protected void beforeAccess() {
    super.beforeAccess();
  }

  @Override
  protected void afterAccess() {
    super.afterAccess();
  }
}

这个类的目的就是为了让SimpleAsyncTaskExecutor能够访问到定义在ConcurrencyThrottleSupport中的beforeAccess以及afterAccess两个方法。

那么在什么时候会用到这个adapter呢?

@Override
public void execute(Runnable task, long startTimeout) {
  Assert.notNull(task, "Runnable must not be null");
  Runnable taskToUse = (this.taskDecorator != null ? this.taskDecorator.decorate(task) : task);
  if (isThrottleActive() && startTimeout > TIMEOUT_IMMEDIATE) {
    this.concurrencyThrottle.beforeAccess();
    doExecute(new ConcurrencyThrottlingRunnable(taskToUse));
  } else {
    doExecute(taskToUse);
  }
}

上述代码是定义在SimpleAsyncTaskExecutor中的execute方法。这里面有三个问题值得探究:

  1. taskDecorator是个啥?
  2. 何时执行并发控制以及什么任务会被归并发控制?
  3. ConcurrencyThrottlingRunnable是干嘛用的?

TaskDecorator

首先第一个问题:

TaskDecorator是个接口,抽象了装饰器模式的概念:

public interface TaskDecorator {

    /**
     * Decorate the given {@code Runnable}, returning a potentially wrapped
     * {@code Runnable} for actual execution.
     * @param runnable the original {@code Runnable}
     * @return the decorated {@code Runnable}
     */
    Runnable decorate(Runnable runnable);

}

Spring中并没有这个接口的任何实现,因此这个想象空间就留个各位开发人员了,不过从Javadoc中的说明可以大概明白它的意图,比如:

  • 设置任务执行的上下文环境
  • 进行某些监控以及统计方面的工作

并发控制的执行时机

首先需要满足的条件就是isThrottleActive()返回true:

public boolean isThrottleActive() {
  return (this.concurrencyLimit > 0);
}

即设定的并发度大于0时就被认为是开启了Throttling功能。

同时还需要满足的条件是startTimeout > TIMEOUT_IMMEDIATE。后面的常量的值是0。也就是说如何任务的timeout值被设置成TIMEOUT_IMMEDIATE的话,这种任务是不属于并发控制的范畴的。(另外这个timeout除了和TIMEOUT_IMMEDIATE进行了比较外,没有其它的用途了,这一点我觉得有点莫名其妙。)

ConcurrencyThrottlingRunnable

最终,会将任务封装在一个ConcurrencyThrottlingRunnable对象中,然后执行该wrapper对象。

/**
 * This Runnable calls {@code afterAccess()} after the target Runnable has finished its execution.
 */
private class ConcurrencyThrottlingRunnable implements Runnable {

  private final Runnable target;

  public ConcurrencyThrottlingRunnable(Runnable target) {
    this.target = target;
  }

  @Override
  public void run() {
    try {
      this.target.run();
    } finally {
      concurrencyThrottle.afterAccess();
    }
  }
}

这样做也仅仅是为了在finally中执行afterAccess配合并发控制的资源释放这一过程。

总结

本文讨论两个方面的内容:

  1. ConcurrencyThrottleInterceptor的实现原理
  2. SimpleAsyncTaskExecutor是如何实现并发控制的

至此,Spring AOP中的几个比较实用的Aspects(Interceptors)就都介绍完毕了。

<script type="text/javascript"> $(function () { $(‘pre.prettyprint code‘).each(function () { var lines = $(this).text().split(‘\n‘).length; var $numbering = $(‘
    ‘).addClass(‘pre-numbering‘).hide(); $(this).addClass(‘has-numbering‘).parent().append($numbering); for (i = 1; i <= lines; i++) { $numbering.append($(‘
  • ‘).text(i)); }; $numbering.fadeIn(1700); }); }); </script>

    Spring AOP中提供的种种Aspects - 并发控制