首页 > 代码库 > HDFS读写数据块--${dfs.data.dir}选择策略

HDFS读写数据块--${dfs.data.dir}选择策略

  最近工作需要,看了HDFS读写数据块这部分。不过可能跟网上大部分帖子不一样,本文主要写了${dfs.data.dir}的选择策略,也就是block在DataNode上的放置策略。我主要是从我们工作需要的角度来读这部分代码的。

  

1 hdfs-site.xml2 <property>3   <name>dfs.data.dir</name>4   <value>/mnt/datadir1/data,/mnt/datadir2/data,/mnt/datadir3/data</value>5 </property>

 

  所谓${dfs.data.dir}的选择策略,就是当DataNode配置有多个${dfs.data.dir}目录时(如上面的配置),该选择哪个目录来存放block。一般多个硬盘分别挂载到不同的${dfs.data.dir}下,所以存储block是要决定block该放到哪个磁盘上。

  创建文件总共有两步:

  1、在写block之前,需要与NameNode通信来生成文件(INodeFile、INodeFileUnderConstruction)。首先在DFSClient端的create()方法中发起写请求,然后通过RPC由NameNode最终调用FSNameSystem的startFileInternal()方法来创建文件。

  1   private void startFileInternal(String src,  2                                               PermissionStatus permissions,  3                                               String holder,   4                                               String clientMachine,   5                                               boolean overwrite,  6                                               boolean append,  7                                               boolean createParent,  8                                               short replication,  9                                               long blockSize 10                                               ) throws IOException { 11     if (NameNode.stateChangeLog.isDebugEnabled()) { 12       NameNode.stateChangeLog.debug("DIR* startFile: src="http://www.mamicode.com/+ src 13           + ", holder=" + holder 14           + ", clientMachine=" + clientMachine 15           + ", createParent=" + createParent 16           + ", replication=" + replication 17           + ", overwrite=" + overwrite 18           + ", append=" + append); 19     } 20  21     FSPermissionChecker pc = getPermissionChecker(); 22     synchronized (this) { 23       if (isInSafeMode()) 24         throw new SafeModeException("Cannot create " + src, safeMode); 25       if (!DFSUtil.isValidName(src)) { 26         throw new IOException("Invalid name: " + src); 27       } 28  29       // Verify that the destination does not exist as a directory already. 30       boolean pathExists = dir.exists(src); 31       if (pathExists && dir.isDir(src)) { 32         throw new IOException("Cannot create "+ src + "; already exists as a directory"); 33       } 34  35       if (isPermissionEnabled) { 36         if (append || (overwrite && pathExists)) { 37           checkPathAccess(pc, src, FsAction.WRITE); 38         } else { 39           checkAncestorAccess(pc, src, FsAction.WRITE); 40         } 41       } 42  43       if (!createParent) { 44         verifyParentDir(src); 45       } 46  47       try { 48         INode myFile = dir.getFileINode(src); //根据路径寻找该文件 49         recoverLeaseInternal(myFile, src, holder, clientMachine, false); 50  51         try { 52           verifyReplication(src, replication, clientMachine); 53         } catch (IOException e) { 54           throw new IOException("failed to create " + e.getMessage()); 55         } 56         if (append) {//若是追加操作 57           if (myFile == null) { 58             throw new FileNotFoundException("failed to append to non-existent " 59                 + src + " on client " + clientMachine); 60           } else if (myFile.isDirectory()) { 61             throw new IOException("failed to append to directory " + src 62                 + " on client " + clientMachine); 63           } 64         } else if (!dir.isValidToCreate(src)) { 65           if (overwrite) {//允许覆盖原来的文件 66             delete(src, true); 67           } else { 68             throw new IOException("failed to create file " + src 69                 + " on client " + clientMachine 70                 + " either because the filename is invalid or the file exists"); 71           } 72         } 73  74         DatanodeDescriptor clientNode = host2DataNodeMap 75             .getDatanodeByHost(clientMachine); 76  77         if (append) { 78           // 79           // Replace current node with a INodeUnderConstruction. 80           // Recreate in-memory lease record. 81           // 82           INodeFile node = (INodeFile) myFile; 83           INodeFileUnderConstruction cons = new INodeFileUnderConstruction( 84               node.getLocalNameBytes(), node.getReplication(), 85               node.getModificationTime(), node.getPreferredBlockSize(), 86               node.getBlocks(), node.getPermissionStatus(), holder, 87               clientMachine, clientNode); 88           dir.replaceNode(src, node, cons); 89           leaseManager.addLease(cons.clientName, src); 90  91         } else { 92           // Now we can add the name to the filesystem. This file has no 93           // blocks associated with it. 94           // 95           checkFsObjectLimit(); 96  97           // increment global generation stamp 98           long genstamp = nextGenerationStamp(); 99           INodeFileUnderConstruction newNode = dir.addFile(src, permissions,100               replication, blockSize, holder, clientMachine, clientNode,101               genstamp);102           if (newNode == null) {103             throw new IOException("DIR* startFile: Unable to add to namespace");104           }105           leaseManager.addLease(newNode.clientName, src);106           if (NameNode.stateChangeLog.isDebugEnabled()) {107             NameNode.stateChangeLog.debug("DIR* startFile: "108                                        +"add "+src+" to namespace for "+holder);109           }110         }111       } catch (IOException ie) {112         NameNode.stateChangeLog.warn("DIR* startFile: "113                                      +ie.getMessage());114         throw ie;115       }116     }117   }
startFileInternal()

  该方法的主要内容如下:

  1)首先做一些检查(安全模式、权限、该路径是否已经以文件夹形式存在等)

  2)若不是追加操作:

    生成generation stamp(针对每个文件生成一个);并构造INodeFileUnderConstruction对象(preferredBlockSize);将这个文件添加到filesystem;添加租约(即有时间限制的写锁);

        若是追加操作:

    将src下的INodeFile替换成INodeFileUnderConstruction;添加租约;

 

  2、在NameNode端生成文件之后,client向NameNode申请block,并将其写入到DataNode。在上面的工作完成后,就启动DataStreamer线程来向DataNode中写入block。整个流程如下:

  1)一些前期检查

  2)向NameNode申请block(与NameNode有一次通信)

    a. 根据副本放置策略,选择N个DataNode作为block的放置位置;

    b. 随机生成一个不重复的blockID;

    c. 把该block添加到对应的文件;

  3)将目标DN组织成pipeline,并向第一个DN发送Packet

   选择其中几个比较重要的方法分析下:

 1  /** 2    * The client would like to obtain an additional block for the indicated 3    * filename (which is being written-to).  Return an array that consists 4    * of the block, plus a set of machines.  The first on this list should 5    * be where the client writes data.  Subsequent items in the list must 6    * be provided in the connection to the first datanode. 7    * 8    * Make sure the previous blocks have been reported by datanodes and 9    * are replicated.  Will return an empty 2-elt array if we want the10    * client to "try again later".11    */12   //向NameNode申请block13   public LocatedBlock getAdditionalBlock(String src, 14                                          String clientName,15                                          HashMap<Node, Node> excludedNodes16                                          ) throws IOException {17     long fileLength, blockSize;18     int replication;19     DatanodeDescriptor clientNode = null;20     Block newBlock = null;21 22     NameNode.stateChangeLog.debug("BLOCK* getAdditionalBlock: "23                                   +src+" for "+clientName);24 25     synchronized (this) {26       if (isInSafeMode()) {//check safemode first for failing-fast27         throw new SafeModeException("Cannot add block to " + src, safeMode);28       }29       // have we exceeded the configured limit of fs objects.30       checkFsObjectLimit();31 32       INodeFileUnderConstruction pendingFile  = checkLease(src, clientName);33 34       //35       // If we fail this, bad things happen!36       //37       if (!checkFileProgress(pendingFile, false)) {38         throw new NotReplicatedYetException("Not replicated yet:" + src);39       }40       fileLength = pendingFile.computeContentSummary().getLength();41       blockSize = pendingFile.getPreferredBlockSize();42       clientNode = pendingFile.getClientNode();43       replication = (int)pendingFile.getReplication();44     }45 46     // choose targets for the new block to be allocated.47     //选择副本存放的位置48     DatanodeDescriptor targets[] = replicator.chooseTarget(src, 49                                                            replication,50                                                            clientNode,51                                                            excludedNodes,52                                                            blockSize);53     if (targets.length < this.minReplication) {54       throw new IOException("File " + src + " could only be replicated to " +55                             targets.length + " nodes, instead of " +56                             minReplication);57     }58 59     // Allocate a new block and record it in the INode. 60     synchronized (this) {61       if (isInSafeMode()) { //make sure it is not in safemode again.62         throw new SafeModeException("Cannot add block to " + src, safeMode);63       }64       INode[] pathINodes = dir.getExistingPathINodes(src);65       int inodesLen = pathINodes.length;66       checkLease(src, clientName, pathINodes[inodesLen-1]);67       INodeFileUnderConstruction pendingFile  = (INodeFileUnderConstruction) 68                                                 pathINodes[inodesLen - 1];69                                                            70       if (!checkFileProgress(pendingFile, false)) {71         throw new NotReplicatedYetException("Not replicated yet:" + src);72       }73 74       // allocate new block record block locations in INode.75       //分配block,并随机生成一个不重复的blockID,然后在INode中记录该block76       newBlock = allocateBlock(src, pathINodes);77       pendingFile.setTargets(targets);78       79       for (DatanodeDescriptor dn : targets) {80         dn.incBlocksScheduled();81       }82       dir.persistBlocks(src, pendingFile);83     }84     if (persistBlocks) {85       getEditLog().logSync();86     }87         88     // Create next block89     LocatedBlock b = new LocatedBlock(newBlock, targets, fileLength);90     if (isAccessTokenEnabled) {91       b.setBlockToken(accessTokenHandler.generateToken(b.getBlock(), 92           EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE)));93     }94     return b;95   }
getAdditionalBlock

  上面的方法还涉及到了块的选择策略,这个留在下一篇再说。下面这个图来总结下上面方法的调用层次:

 

  最后重点说一下block在DataNode上的存储策略。其调度层次如下:

  首先说一下其中涉及到的数据结构:

1  class FSVolume {    //卷信息,代表${dfs.data.dir}2     private File currentDir;      //存放block,即${dfs.data.dir}/current3     private FSDir dataDir;        //表示currentDir有哪些块文件4     private File tmpDir;          //存放一些临时文件,即${dfs.data.dir}/tmp5     private File blocksBeingWritten;    //放置正在写的block,即${dfs.data.dir}/ blocksBeingWritten6     private File detachDir;       //是否写分离,即${dfs.data.dir}/detach7     private DF usage;8     private DU dfsUsage;9     private long reserved;

 

1   static class FSVolumeSet {  //卷信息集合,代表多个${dfs.data.dir}2     FSVolume[] volumes = null;    //代表多个FSVolume,并将其组织成一个数组3     int curVolume = 0;            //指示当前正在使用哪一个FSVolume  

  FSVolumeSet 代表多个${dfs.data.dir}目录的集合,它将这些目录组织成一个数组volumes,然后用curVolume来指示当前正在使用的是哪个${dfs.data.dir}目录。${dfs.data.dir}的选择策略如下:

  当有多个${dfs.data.dir}时,DataNode顺序地从volumes选择一个FSVolume用来存放block(先放在blocksBeingWritten目录下,写完后再转移到current目录下);每次写完一个block, curVolume增1。以此实现多个${dfs.data.dir}目录的轮流写。该策略在FSDataSet.FSVolumeSet的getNextVolume()方法中实现

 1    synchronized FSVolume getNextVolume(long blockSize) throws IOException { 2        3       if(volumes.length < 1) { 4         throw new DiskOutOfSpaceException("No more available volumes"); 5       } 6        7       // since volumes could‘ve been removed because of the failure 8       // make sure we are not out of bounds 9       if(curVolume >= volumes.length) {10         curVolume = 0;11       }12       13       int startVolume = curVolume;14       15       while (true) {16         FSVolume volume = volumes[curVolume];17         curVolume = (curVolume + 1) % volumes.length;    //增118         if (volume.getAvailable() > blockSize) { return volume; }19         if (curVolume == startVolume) {20           throw new DiskOutOfSpaceException("Insufficient space for an additional block");21         }22       }23     }

 

  本文基于hadoop1.2.1

  如有错误,还请指正

  参考文章:《Hadoop技术内幕 深入理解MapReduce架构设计与实现原理》 董西成

  转载请注明出处:

HDFS读写数据块--${dfs.data.dir}选择策略