首页 > 代码库 > 一个缓存容灾写的样例

一个缓存容灾写的样例

背景     

     有时我们能够使用缓存进行容灾的处理。场景例如以下:我们当前有一个专门提供各种数据的应用DataCore,该应用开放多个RFC方法供其它应用使用。

技术分享
技术分享     我们平时在读写数据时,会在Cache备份一份(为平时DataCore提高响应速度、减少DB、CPU压力所用),当DB挂掉的时候。Cache还能够用来容灾。使用缓存容灾的优点是:性能足够好,坏处是缓存可比数据库成本高多了。
     让我们想象得更猛烈些,当DataCore整个挂掉的时候,A、B、C、D方怎么才干安然的执行下去?
     我们能够在A、B、C、D端上提供DataCore的缓存容灾服务。这样。即使在DataCore整个挂掉的情况下,其它应用也不会受影响。
技术分享
技术分享

要考虑的几个问题

  1. 容灾读的部分不必说。对象原本在存入缓存时就依据类型分了区域,读的时候直接在对应的区域取出就可以。

    容灾写的话,针对同一类型对象的写操作,怎样将这些对象集合在一块,待DataCore恢复后,再将容灾写过的对象覆盖到DB。

  2. 缓存的写操作必须是线程安全的

具体设计

     缓存容灾写的一种可能策略是:针对每种类型的对象在缓存中开辟一大段储存空间(数组方式或者数组链接结合方式),然后把每一个容灾写的对象塞进这段空间内,在要覆写回DB时,直接从头到尾在缓存里把对象取出来就可以。示意图例如以下:
技术分享
技术分享
      上面这样的设计缺点是须要一大片的连续的储存空间,对于缓存来说,这是要命的。

缓存的底层储存机制就是基于分散的hash。

     上面设计的一种改进方案是。我们仅仅在连续空间中储存UserDO的唯一标示符。比方id或者key什么的。

这样我们的就不须要那么大的连续空间了。示意图例如以下:

技术分享
技术分享
     更进一步,我们能够把UserDO的id也分散储存。能够利用一个DisasterIndexDO储存每个类型的容灾写的信息,利用beginIndex以及currentIndex字段为全部容灾写对象打上一个序号。在缓存中储存该序号与对象id的相应关系,然后我们就能够通过序号检索出id,再通过id检索出对象。

示意图例如以下:

技术分享
技术分享
     在多个调用方在对某一类型的对象进行容灾写操作时,仅仅须要对DisasterIndexDO进行安全的并发訪问就可以,抢占currentIndex,然后再进行缓存的写操作。这样。我们的容灾写就实现了。

     

范例实现

//index对象
public class DisasterIndexDO implements Serializable {
     
private static final long serialVersionUID = -8688243351154917184L;
     public int namespace;
     public int beginIndex;
     public int currentIndex;
     public long expireLockTime;
     public static final long DEFAULT_EXPIRE_TIME = 50; // 当序列被锁时间超时,防止死锁
     
     public DisasterIndexDO(int namespace, int bIndex, int cIndex, long expireLockTime) {
        this.namespace = namespace;
        this.beginIndex = bIndex;
        this.currentIndex = cIndex;
        this.expireLockTime = expireLockTime;
     }
}

//容灾实现类
public class DisasterCacheHandler {
     RemoteCache remoteCache;

     private  final int DS_CACHE_NAMESPACE = 67; 
     private  final int DS_WRITE_REPETECOUNT = 3;
     private  final int DISASTER_INDEX = 250;
     private  final String DISASTER_KEYS = "disaster_keys";
//容灾读
    public Object dsGetRemoteData(int namespace, String key){
         return remoteCache.get(DS_CACHE_NAMESPACE, namespace+key);
    }
    
    //本地同步的namespace
     private Map<Integer, Object> synNamespace = new 
           HashMap<Integer, Object>();
     //容灾写
     protected boolean dsWriteRemoteData(int namespace, 
          String key, Serializable value) {

          //先把数据写入缓存
          remoteCache.put(this.DS_CACHE_NAMESPACE, namespace+key, value);
          //本地同步的NameSpace
          if (!this.synNamespace.containsKey(Integer.valueOf(namespace))) {
                this.synNamespace.put(Integer.valueOf(namespace), namespace);
          }
          // update the Namespace Index and namespace disaster key queue
          synchronized (this.synNamespace.get(namespace)){
               int count = this.DS_WRITE_REPETECOUNT;
               DisasterIndexDO index = null;
               do {
                    count--;
                     // try to lock Namespace Index
                     int rc = remoteCache.lock(this.DISASTER_INDEX, 
                         String.valueOf(namespace));
                    // Namespace Index not exist
                    if ( rc == 2 ) {
                         // Initialize Namespace Index
                         index = new DisasterIndexDO(namespace, 1, 1, 
                              System.currentTimeMillis());
                         remoteCache.put(this.DISASTER_INDEX, 
                              String.valueOf(namespace), index);
                 // for each namespace should handle disaster, keep the only index object
                         remoteCache.put(namespace, 
                              this.DISASTER_KEYS + index.currentIndex, key);
                         return true;
                    } else if (rc == 0) { // lock failure
                         index = (DisasterIndexDO)remoteCache.
                                   get(this.DISASTER_INDEX, String.valueOf(namespace));
                    // 假设鎖已經超時。則解开,避免在訪问缓存时死住的情况
                         if (System.currentTimeMillis() - index.expireLockTime >           
                                   DisasterIndexDO.DEFAULT_EXPIRE_TIME) {
                                        remoteCache.unLock(this.DISASTER_INDEX, 
                                             String.valueOf(namespace));
                                   continue;
                          }
                         continue;
                   } else if (rc == 1) { // lock success
                         try {
                              index = (DisasterIndexDO)remoteCache.get(this.DISASTER_INDEX,  
                                        String.valueOf(namespace));
                              // update locked Namespace Index
                              int curIdx = index.currentIndex + 1;
                              remoteCache.delete(this.DISASTER_INDEX, 
                                        String.valueOf(namespace));
                              remoteCache.put(this.DISASTER_INDEX, String.valueOf(namespace), 
                                   new DisasterIndexDO(namespace, index.beginIndex, curIdx, 
                                        System.currentTimeMillis()));
                              // keep key of this Namespace with current index
                              remoteCache.put(namespace, this.DISASTER_KEYS + curIdx, key);
                              return true;
                         }  catch (Throwable e ) {
                         } finally {
                              // unlock and handle unlock failure 
                              remoteCache.unLock(this.DISASTER_INDEX, 
                                   String.valueOf(namespace));
                         }
                   }
             } while (count >= 0);
            if (count <= 0) {
                 return false;
            }
        }
    }
}

一个缓存容灾写的样例