首页 > 代码库 > DataNode节点的数据块管理 FSVolumeSet、FSVolume

DataNode节点的数据块管理 FSVolumeSet、FSVolume

在分布式文件系统HDFS中,DataNode节点被用来存储文件的数据,确切的来说就是HDFS中的每一个文件是分块来存储的,一个文件可能有多个数据块,每一个数据块有多个副本,而且数据块的不同副本存储在不同的DataNode节点上,所以如果把整个HDFS集群看做一台机器的话,那么每一个DataNode节点就可以看做是一块存储磁盘。实际上,HDFS也正是这么干的。前面说过,每一个DataNode节点我们都可以为它配置多个本地存储路径,如果把这些本地存储路径统一看做一块磁盘的话,那每个存储路径就可以看做是这个磁盘上的一个分区。HDFS设计了FSVolumeSet类来表示磁盘,FSVolume类来表示分区,这两个类都是作为org.apache.Hadoop.hdfs.server.datanode.FSDataset的内部类而只能被FSDataset类引用。

      先来仔细的看看这两个类的基本信息:

1.FSVolume

 

[java]
  1.  private FSDir dataDir;  //存储有效的数据块的最终位置(current/)   
  2.  private File tmpDir;    //存储数据块的中间位置(tmp/)   
  3.  private File detachDir; //存储数据块的copy on write(detach/)   
  4.  private DF usage;       //获取当前存储目录的空间使用信息   
  5.  private DU dfsUsage;    //获取当前存储目录所在的磁盘分区空间信息   
  6.  private long reserved;  //预留存储空间大小  

      DataNode节点配置的每一个存储路径最终被抽象成了一个FSVolume对象,它主要负责为数据块分配存储空间,并且定期的更新这个“分区”的空间使用信息,为了统计的准确性,它使用了DF、DU类。处于一致性的考虑,当有一个数据块达到DataNode节点时,就需要为这个数据块分配存储空间,但是FSVolume并不是马上就在它的最终存储位置上为它创建一个对应的存储文件,而是在“分区”的tmpDir(对应存储路径下的tmp/目录)中为它建立一个临时文件,当成功接收这个数据块并写入这个临时文件之后,再把这个临时文件移动到真正存储这个数据块的位置下面。当我们为DataNode节点的数据进行备份/升级时,DataNode节点会把每个"分区"中current/下的所有数据块移动到previous/下面,然后为了提高会在current/建立每一个文件的硬链接,它们分别指向previous/下的对应文件。那么当我们对某一个数据块进行更新的时候(因为HDFS不支持随机写,所以这里的更新主要是指追加操作),就需要对相应的数据块文件进行detach操作(至于这一操作的原因,我在前面详细提到过):先将这个数据块对应文件(物理位置在previous/下)复制到detach/下,然后用这个副本代替数据块在current/下的硬链接。上面的两个操作(分配存储空间、数据块detach)可以近似看作是采用了两段提交协议,因此每一个“分区”在初始化的时候都进行了恢复操作,它会尽量恢复可能由于DataNode所在节点宕机而造成影响。这个恢复操作是:

 

1).对于detach/下的所有数据块文件(detach/下不存在目录,只有文件),如果该文件在current/下不存在,则把它移动到current/下,最后清空detach/目录

2).如果DataNode节点被设置为支持append操作(对应的配置项为dfs.support.apend),那么对于tmp/下的所有数据块文件(tmp/下不存在目录,只有文件),如果该文件在current/下不存在,则把它移动到current/下,最后清空tmp/目录;否则清空tmp/目录。

笔者认为,这里的恢复操作,在Hadoop-0.20.0版本中有点问题或者是实现的有点牵强(大家可以详细的参考我贴出来的代码)。哦,对了,差点忘了“分区”的空间预留值reserved,它可以通过配置文件中的dfs.datanode.du.reserved项来配置。

FSVolume的启动恢复操作:

 

[java]
  1.       this.detachDir = new File(parent, "detach");  
  2.       if (detachDir.exists()) {  
  3.         recoverDetachedBlocks(currentDir, detachDir);  
  4.       }  
  5.   
  6.       this.tmpDir = new File(parent, "tmp");  
  7.       if (tmpDir.exists()) {  
  8.         if (supportAppends) {  
  9.           recoverDetachedBlocks(currentDir, tmpDir);  
  10.         } else {  
  11.             LOG.debug("clear directory: "+ tmpDir.getAbsolutePath());  
  12.           FileUtil.fullyDelete(tmpDir);  
  13.         }  
  14.       }  
  15.   
  16. ...  
  17.   
  18.     private void recoverDetachedBlocks(File dataDir, File dir) throws IOException {  
  19.       File contents[] = dir.listFiles();  
  20.       if (contents == null) {  
  21.         return;  
  22.       }  
  23.         
  24.       for (int i = 0; i < contents.length; i++) {  
  25.         if (!contents[i].isFile()) {  
  26.           throw new IOException ("Found " + contents[i] + " in " + dir + " but it is not a file.");  
  27.         }  
  28.         // If the original block file still exists, then no recovery is needed.   
  29.         File blk = new File(dataDir, contents[i].getName());  
  30.         if (!blk.exists()) {  
  31.             LOG.debug("try to move file["+contents[i].getAbsolutePath()+"] to file["+blk.getAbsolutePath()+"]");  
  32.           if (!contents[i].renameTo(blk)) {  
  33.             throw new IOException("Unable to recover detached file " + contents[i]);  
  34.           }  
  35.           continue;  
  36.         }  
  37.           
  38.         LOG.debug("try to delete file["+contents[i].getAbsolutePath()+"]");  
  39.         if (!contents[i].delete()) {  
  40.             throw new IOException("Unable to cleanup detached file " + contents[i]);  
  41.         }  
  42.           
  43.       }  
  44.     }  

FSVolume中的重要方法:

 

 

[java]
  1.  /*获取分区的存储空间容量,考虑预留值*/   
  2.  long getCapacity() throws IOException {  
  3.    if (reserved > usage.getCapacity()) {  
  4.      return 0;  
  5.    }  
  6.   
  7.    return usage.getCapacity()-reserved;  
  8.  }  
  9.    
  10.  /*获取分区的可用空间*/  
  11.  long getAvailable() throws IOException {  
  12.    long remaining = getCapacity()-getDfSUSEd();  
  13.    long available = usage.getAvailable();  
  14.    if (remaining>available) {  
  15.      remaining = available;  
  16.    }  
  17.    return (remaining > 0) ? remaining : 0;  
  18.  }  
  19.   
  20. File createTmpFile(Block b) throws IOException {  
  21.    File f = new File(tmpDir, b.getBlockName());  
  22.    return createTmpFile(b, f);  
  23.  }  
  24.   
  25. File createDetachFile(Block b, String filename) throws IOException {  
  26.    File f = new File(detachDir, filename);  
  27.    return createTmpFile(b, f);  
  28.  }  
  29.   
  30.  private File createTmpFile(Block b, File f) throws IOException {  
  31.    if (f.exists()) {  
  32.      throw new IOException("Unexpected problem in creating temporary file for "+ b + ".  File " + f + " should not be present, but is.");  
  33.    }  
  34.    // Create the zero-length temp file   
  35.    //   
  36.    boolean fileCreated = false;  
  37.    try {  
  38.      fileCreated = f.createNewFile();  
  39.    } catch (IOException ioe) {  
  40.      throw (IOException)new IOException(DISK_ERROR +f).initCause(ioe);  
  41.    }  
  42.    if (!fileCreated) {  
  43.      throw new IOException("Unexpected problem in creating temporary file for "+  
  44.                            b + ".  File " + f + " should be creatable, but is already present.");  
  45.    }  
  46.      
  47.    return f;  
  48.  }  
  49.   
  50.  /*将一个接受成功的数据块写入current/中*/    
  51.  File addBlock(Block b, File f) throws IOException {  
  52.    File blockFile = dataDir.addBlock(b, f);  
  53.    File metaFile = getMetaFile( blockFile , b);  
  54.    dfsUsage.incDfsUsed(b.getNumBytes()+metaFile.length());  
  55.    return blockFile;  
  56.  }  

 

2. FSVolumeSet

    DataNode节点配置的每一个存储路径最终被抽象成了一个FSVolume对象,因此,FSVolumeSet对所有的存储路径进行管理,实际上就是对所有的FSVolume对象进行管理。FSVolumeSet主要为上层(DataNode进程)提供存储数据块选择一个的存储路径(分区),说白了就是为该数据块创建一个对应的本地磁盘文件,同时也负载统计它的存储空间的状态信息和收集所有的数据块信息。在FSVolumeSet中唯一需要重点明确的是它如何为一个数据块选择存储??径(分区)。这个过程实际上很简单,它采用循环队列的策略来实现负载均衡(参看它的getNextVolume()方法,一看便知)。

 

[java]
  1.    /*为一个数据块选择一个存储分区*/  
  2.    synchronized FSVolume getNextVolume(long blockSize) throws IOException {  
  3.       int startVolume = curVolume;  
  4.       while (true) {  
  5.         FSVolume volume = volumes[curVolume];  
  6.         curVolume = (curVolume + 1) % volumes.length;  
  7.         //检查分区剩余可用空间是否满足数据块的大小   
  8.         if (volume.getAvailable() > blockSize) { return volume; }  
  9.         if (curVolume == startVolume) {  
  10.            throw new DiskOutOfSpaceException("Insufficient space for an additional block");  
  11.         }  
  12.       }  
  13.     }  
  14.       
  15.     /*获取磁盘已使用空间*/    
  16.     long getDfsUsed() throws IOException {  
  17.       long dfsUsed = 0L;  
  18.       for (int idx = 0; idx < volumes.length; idx++) {//统计每一个分区的已使用空间   
  19.         dfsUsed += volumes[idx].getDfsUsed();  
  20.       }  
  21.       return dfsUsed;  
  22.     }  
  23.   
  24.     /*获取磁盘的总空间容量*/  
  25.     synchronized long getCapacity() throws IOException {  
  26.       long capacity = 0L;  
  27.       for (int idx = 0; idx < volumes.length; idx++) {//统计每一个分区的空间容量   
  28.         capacity += volumes[idx].getCapacity();  
  29.       }  
  30.       return capacity;  
  31.     }  
  32.         
  33.     /*获取磁盘的剩余空间*/  
  34.     synchronized long getRemaining() throws IOException {  
  35.       long remaining = 0L;  
  36.       for (int idx = 0; idx < volumes.length; idx++) {//统计每一个分区的剩余空间   
  37.         remaining += volumes[idx].getAvailable();  
  38.       }  
  39.       return remaining;  
  40.     }  
  41.         
  42.     /*获取磁盘中所有数据块信息*/  
  43.     synchronized void getBlockInfo(TreeSet<Block> blockSet) {  
  44.       for (int idx = 0; idx < volumes.length; idx++) {//统计每一个分区下的所有数据块信息   
  45.         volumes[idx].getBlockInfo(blockSet);  
  46.       }  
  47.     }  
  48.         
  49.     /*获取磁盘中所有数据块的分区位置信息*/  
  50. <pre name="code" class="java"><pre name="code" class="java">    synchronized void getVolumeMap(HashMap<Block, DatanodeBlockInfo> volumeMap) {  
  51.       for (int idx = 0; idx < volumes.length; idx++) {//统计每一个分区下的所有数据块的分区位置信息    
  52.         volumes[idx].getVolumeMap(volumeMap);  
  53.       }  
  54.     }  

DataNode节点的数据块管理 FSVolumeSet、FSVolume