首页 > 代码库 > 线程安全的缓存机制 - AOP设计与实现

线程安全的缓存机制 - AOP设计与实现

     最近几天由于工作原因,需要设计实现一个线程安全的缓存机制,拿出来和大家分享交流一下。

应用背景:

     缓存是在实际工作中经常用到的,主要作用呢?1. 提高响应速度 2. 降低cpu压力或者数据库压力。
     在此,我的应用背景是拦截一些RFC请求(不要求获取实时数据),以降低数据库及自身应用的访问压力。

目标:

     高可扩展性:可以方便配置需要使用缓存的方法。
     线程安全性:在并发情况下,要求线程安全,且尽可能高效。

使用技术:

  • 使用AOP的插件性质来降低缓存与原系统的耦合性,即在切面层做缓存的处理。
  • 使用Annotation来对需要做缓存处理的函数进行标记,并可以对缓存时间个性化
  • 针对缓存过期问题,对放入缓存的数据封装一层,并打上时间戳
示意图:
     

设计难点:

     针对某一时刻并发数较多且缓存失效的情况下,我们应该保证的是只有一个线程会去执行数据的读取并设置的操作,那么其他线程应该是等待该线程完成操作再一起返回还是直接返回null值?
     答:在并发数较多且数据准备时间过长的情况下,如果线程采取等待策略,那么将引起很大的资源浪费:占用RFC连接(一般数量是有限制的),占用服务器cpu时间等等问题。所以,最好是代码中提供两种策略,对于执行时间较长的读数据操作,我们应该将线程直接返回,而非一直等待。    

代码实现:   

Annotation:
/**
 * 表示一个方法是否启用本地缓存,可以指定本地缓存的时间间隔,默认为一个小时
 */
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface LocalCacheOperation {

    long localCacheInterval() default 1000 * 60 * 60;
    String localCacheKey() default "";
}
CacheObject:
//为缓存对象包上一层时间戳
public class CacheObject implements Serializable {
    private static final long serialVersionUID = 4873268779348802945L;
    private long timestamp;
    private Object object;
    .....
}          
标记要使用缓存的方法:
public interface ConfigService {
     public BloomFilter<String> getAllPassengerNames();
     public BloomFilter<Long> getAllTrades();
}

public class ConfigServiceImpl implements ConfigService {
     private PassengerManager passengerManager;
     private TradeManager tradeManager;

     @LocalCacheOperation(localCacheInterval=1000*60*60*24)
     public BloomFilter<String> getAllPassengerNames() {
          return passengerManager.getAllPassengerNames();
     }
     @LocalCacheOperation 
     public BloomFilter<Long> getAllTrades() {
          return tradeManager.getAllTrades();
     }
}

核心:切面实现

     我在这里使用了加锁和不加锁两种方式来实现对应的两种策略:缓存不存在线程直接返回或等待。
/**
 * 本地缓存切面实现
 */
@Aspect
public class LocalCacheAspect {

    private static final Log log = LogFactory.getLog(LocalCacheAspect.class);
    
    private final ConcurrentMap<String, Future<CacheObject>> 
     localCache = new ConcurrentHashMap<String, Future<CacheObject>>();
    
    //控制台
    private ConsoleBean consoleBean;
    
    private final ConcurrentMap<String, SoftReference<CacheObject>> 
     concurrentLocalCache = new ConcurrentHashMap<String, SoftReference<CacheObject>>();

    /**
     * 对于执行时间较长的读数据操作,需要在这里相应的添加锁,对于操作添加锁后的函数的线程
     * 如果本地缓存为空,且读数据的锁已被其他线程占据,将直接返回null
     */
    
    private final static Map<String, Lock> localCacheLocks = new HashMap<String, Lock>();
    static {
        localCacheLocks.put("getAllTrades", new ReentrantLock());
    }

    /**
     * Advice aound audit operations
     * 
     * @param pjpParam
     * @return
     */
    @Around("execution(@LocalCacheOperation * *(..))")
    public Object doCache(ProceedingJoinPoint pjpParam) throws Throwable {
        if(!getConsoleBean().isLocalCacheSwitchOn()) {
            log.warn("localCache not switch on, please pay attention");
            return pjpParam.proceed(pjpParam.getArgs());
        }
        final ProceedingJoinPoint pjp = pjpParam;
        Signature sig = pjp.getSignature();
        if (sig instanceof MethodSignature) {
            MethodSignature mSig = (MethodSignature) sig;
            LocalCacheOperation co = mSig.getMethod().getAnnotation(
             LocalCacheOperation.class);
            long localCacheInterval = 0;
            String localCacheKey = null;
            /**
             * AOP在拦截子类的Annotataion时,无法获取该Annotation,导致co可能为空
             * @author chenlei.cl
             */
            if( co == null ){
             localCacheInterval = consoleBean.getLocalCacheInterval();
             localCacheKey = mSig.getName();
            } else {
             localCacheInterval = co.localCacheInterval();
             localCacheKey = StringUtils.isNotBlank(co.localCacheKey()) ? 
             co.localCacheKey() : mSig.getName();
            }
            if (localCacheLocks.containsKey(mSig.getMethod().getName())) { //使用本地互斥锁
             return doConcurrentLocalCache(pjp, localCacheInterval, localCacheKey);
            }

            while (true) {// 等待某个线程将数据获取到本地缓存
                Future<CacheObject> f = localCache.get(localCacheKey);
                try {
                    long currentTime = System.currentTimeMillis();
                    if (f != null && f.get() != null && currentTime - f.get().getTimestamp() 
                    > localCacheInterval) {
                     localCache.remove(localCacheKey, f);
                        f = null;
                    }

                    if (f == null) {
                        Callable<CacheObject> eval = new Callable<CacheObject>() {
                            public CacheObject call() throws InterruptedException {
                                Object res;
                                try {
                                    res = pjp.proceed(pjp.getArgs());
                                }
                                catch (Throwable e) {
                                    log.error("Fail to process method", e);
                                    throw new  ServiceException(e.getMessage());
                                }
                                CacheObject cacheObject = new CacheObject();
                                cacheObject.setObject(res);
                                cacheObject.setTimestamp(System.currentTimeMillis());
                                return cacheObject;
                            }
                        };
                        
                        FutureTask<CacheObject> ft = 
                         new FutureTask<CacheObject>(eval);
                        f = localCache.putIfAbsent(localCacheKey, ft);
                        if (f == null) {
                            f = ft;
                            ft.run();
                        }
                    }

                    CacheObject obj = f.get();
                    if (obj != null)
                        return obj.getObject();
                }
                catch (CancellationException e) {
                    localCache.remove(localCacheKey, f);
                }
                catch (ExecutionException e) {
                    throw new ServiceException(e.getMessage());
                }
            }
        }
        return pjp.proceed(pjp.getArgs());
    }

    @SuppressWarnings("static-access")
    public Object doConcurrentLocalCache(ProceedingJoinPoint pjp, 
        long localCacheInterval, String localCacheKey) throws Throwable {
        try {
            long currentTime = System.currentTimeMillis();
            SoftReference<CacheObject> weakRefCacheObj = 
            concurrentLocalCache.get(localCacheKey);
            if (weakRefCacheObj != null && weakRefCacheObj.get() != null && 
                currentTime - weakRefCacheObj.get().getTimestamp() > localCacheInterval) {
                // 缓存过期
                weakRefCacheObj.get().setObject(null); // 清空引用
                concurrentLocalCache.remove(localCacheKey, weakRefCacheObj);
                weakRefCacheObj = null;
            } else if (weakRefCacheObj != null && weakRefCacheObj.get() != null) {
                return weakRefCacheObj.get().getObject();
           }
            
            if (this.localCacheLocks.get(localCacheKey).tryLock()) {
             weakRefCacheObj = concurrentLocalCache.get(localCacheKey);
             if (weakRefCacheObj != null && weakRefCacheObj.get() != null) { 
             // double check
                    return weakRefCacheObj.get().getObject();
             }
             try {
                 Object res = pjp.proceed(pjp.getArgs());
                 CacheObject cacheObject = new CacheObject();
                 cacheObject.setObject(res);
                 cacheObject.setTimestamp(System.currentTimeMillis());
                 weakRefCacheObj = new SoftReference<CacheObject>(
                 cacheObject);
                 concurrentLocalCache.put(localCacheKey, weakRefCacheObj);
                 return res;
             } finally {
                 this.localCacheLocks.get(localCacheKey).unlock();
             }
            } else {
             return null; // make the other part wait
            }
        } catch (Exception e) {
            throw new ServiceException(e.getMessage(), e);
        }
    }
}

线程安全的缓存机制 - AOP设计与实现