首页 > 代码库 > HDFS读写数据块--${dfs.data.dir}选择策略
HDFS读写数据块--${dfs.data.dir}选择策略
最近工作需要,看了HDFS读写数据块这部分。不过可能跟网上大部分帖子不一样,本文主要写了${dfs.data.dir}的选择策略,也就是block在DataNode上的放置策略。我主要是从我们工作需要的角度来读这部分代码的。
1 hdfs-site.xml2 <property>3 <name>dfs.data.dir</name>4 <value>/mnt/datadir1/data,/mnt/datadir2/data,/mnt/datadir3/data</value>5 </property>
所谓${dfs.data.dir}的选择策略,就是当DataNode配置有多个${dfs.data.dir}目录时(如上面的配置),该选择哪个目录来存放block。一般多个硬盘分别挂载到不同的${dfs.data.dir}下,所以存储block是要决定block该放到哪个磁盘上。
创建文件总共有两步:
1、在写block之前,需要与NameNode通信来生成文件(INodeFile、INodeFileUnderConstruction)。首先在DFSClient端的create()方法中发起写请求,然后通过RPC由NameNode最终调用FSNameSystem的startFileInternal()方法来创建文件。
1 private void startFileInternal(String src, 2 PermissionStatus permissions, 3 String holder, 4 String clientMachine, 5 boolean overwrite, 6 boolean append, 7 boolean createParent, 8 short replication, 9 long blockSize 10 ) throws IOException { 11 if (NameNode.stateChangeLog.isDebugEnabled()) { 12 NameNode.stateChangeLog.debug("DIR* startFile: src="http://www.mamicode.com/+ src 13 + ", holder=" + holder 14 + ", clientMachine=" + clientMachine 15 + ", createParent=" + createParent 16 + ", replication=" + replication 17 + ", overwrite=" + overwrite 18 + ", append=" + append); 19 } 20 21 FSPermissionChecker pc = getPermissionChecker(); 22 synchronized (this) { 23 if (isInSafeMode()) 24 throw new SafeModeException("Cannot create " + src, safeMode); 25 if (!DFSUtil.isValidName(src)) { 26 throw new IOException("Invalid name: " + src); 27 } 28 29 // Verify that the destination does not exist as a directory already. 30 boolean pathExists = dir.exists(src); 31 if (pathExists && dir.isDir(src)) { 32 throw new IOException("Cannot create "+ src + "; already exists as a directory"); 33 } 34 35 if (isPermissionEnabled) { 36 if (append || (overwrite && pathExists)) { 37 checkPathAccess(pc, src, FsAction.WRITE); 38 } else { 39 checkAncestorAccess(pc, src, FsAction.WRITE); 40 } 41 } 42 43 if (!createParent) { 44 verifyParentDir(src); 45 } 46 47 try { 48 INode myFile = dir.getFileINode(src); //根据路径寻找该文件 49 recoverLeaseInternal(myFile, src, holder, clientMachine, false); 50 51 try { 52 verifyReplication(src, replication, clientMachine); 53 } catch (IOException e) { 54 throw new IOException("failed to create " + e.getMessage()); 55 } 56 if (append) {//若是追加操作 57 if (myFile == null) { 58 throw new FileNotFoundException("failed to append to non-existent " 59 + src + " on client " + clientMachine); 60 } else if (myFile.isDirectory()) { 61 throw new IOException("failed to append to directory " + src 62 + " on client " + clientMachine); 63 } 64 } else if (!dir.isValidToCreate(src)) { 65 if (overwrite) {//允许覆盖原来的文件 66 delete(src, true); 67 } else { 68 throw new IOException("failed to create file " + src 69 + " on client " + clientMachine 70 + " either because the filename is invalid or the file exists"); 71 } 72 } 73 74 DatanodeDescriptor clientNode = host2DataNodeMap 75 .getDatanodeByHost(clientMachine); 76 77 if (append) { 78 // 79 // Replace current node with a INodeUnderConstruction. 80 // Recreate in-memory lease record. 81 // 82 INodeFile node = (INodeFile) myFile; 83 INodeFileUnderConstruction cons = new INodeFileUnderConstruction( 84 node.getLocalNameBytes(), node.getReplication(), 85 node.getModificationTime(), node.getPreferredBlockSize(), 86 node.getBlocks(), node.getPermissionStatus(), holder, 87 clientMachine, clientNode); 88 dir.replaceNode(src, node, cons); 89 leaseManager.addLease(cons.clientName, src); 90 91 } else { 92 // Now we can add the name to the filesystem. This file has no 93 // blocks associated with it. 94 // 95 checkFsObjectLimit(); 96 97 // increment global generation stamp 98 long genstamp = nextGenerationStamp(); 99 INodeFileUnderConstruction newNode = dir.addFile(src, permissions,100 replication, blockSize, holder, clientMachine, clientNode,101 genstamp);102 if (newNode == null) {103 throw new IOException("DIR* startFile: Unable to add to namespace");104 }105 leaseManager.addLease(newNode.clientName, src);106 if (NameNode.stateChangeLog.isDebugEnabled()) {107 NameNode.stateChangeLog.debug("DIR* startFile: "108 +"add "+src+" to namespace for "+holder);109 }110 }111 } catch (IOException ie) {112 NameNode.stateChangeLog.warn("DIR* startFile: "113 +ie.getMessage());114 throw ie;115 }116 }117 }
该方法的主要内容如下:
1)首先做一些检查(安全模式、权限、该路径是否已经以文件夹形式存在等)
2)若不是追加操作:
生成generation stamp(针对每个文件生成一个);并构造INodeFileUnderConstruction对象(preferredBlockSize);将这个文件添加到filesystem;添加租约(即有时间限制的写锁);
若是追加操作:
将src下的INodeFile替换成INodeFileUnderConstruction;添加租约;
2、在NameNode端生成文件之后,client向NameNode申请block,并将其写入到DataNode。在上面的工作完成后,就启动DataStreamer线程来向DataNode中写入block。整个流程如下:
1)一些前期检查
2)向NameNode申请block(与NameNode有一次通信)
a. 根据副本放置策略,选择N个DataNode作为block的放置位置;
b. 随机生成一个不重复的blockID;
c. 把该block添加到对应的文件;
3)将目标DN组织成pipeline,并向第一个DN发送Packet
选择其中几个比较重要的方法分析下:
1 /** 2 * The client would like to obtain an additional block for the indicated 3 * filename (which is being written-to). Return an array that consists 4 * of the block, plus a set of machines. The first on this list should 5 * be where the client writes data. Subsequent items in the list must 6 * be provided in the connection to the first datanode. 7 * 8 * Make sure the previous blocks have been reported by datanodes and 9 * are replicated. Will return an empty 2-elt array if we want the10 * client to "try again later".11 */12 //向NameNode申请block13 public LocatedBlock getAdditionalBlock(String src, 14 String clientName,15 HashMap<Node, Node> excludedNodes16 ) throws IOException {17 long fileLength, blockSize;18 int replication;19 DatanodeDescriptor clientNode = null;20 Block newBlock = null;21 22 NameNode.stateChangeLog.debug("BLOCK* getAdditionalBlock: "23 +src+" for "+clientName);24 25 synchronized (this) {26 if (isInSafeMode()) {//check safemode first for failing-fast27 throw new SafeModeException("Cannot add block to " + src, safeMode);28 }29 // have we exceeded the configured limit of fs objects.30 checkFsObjectLimit();31 32 INodeFileUnderConstruction pendingFile = checkLease(src, clientName);33 34 //35 // If we fail this, bad things happen!36 //37 if (!checkFileProgress(pendingFile, false)) {38 throw new NotReplicatedYetException("Not replicated yet:" + src);39 }40 fileLength = pendingFile.computeContentSummary().getLength();41 blockSize = pendingFile.getPreferredBlockSize();42 clientNode = pendingFile.getClientNode();43 replication = (int)pendingFile.getReplication();44 }45 46 // choose targets for the new block to be allocated.47 //选择副本存放的位置48 DatanodeDescriptor targets[] = replicator.chooseTarget(src, 49 replication,50 clientNode,51 excludedNodes,52 blockSize);53 if (targets.length < this.minReplication) {54 throw new IOException("File " + src + " could only be replicated to " +55 targets.length + " nodes, instead of " +56 minReplication);57 }58 59 // Allocate a new block and record it in the INode. 60 synchronized (this) {61 if (isInSafeMode()) { //make sure it is not in safemode again.62 throw new SafeModeException("Cannot add block to " + src, safeMode);63 }64 INode[] pathINodes = dir.getExistingPathINodes(src);65 int inodesLen = pathINodes.length;66 checkLease(src, clientName, pathINodes[inodesLen-1]);67 INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction) 68 pathINodes[inodesLen - 1];69 70 if (!checkFileProgress(pendingFile, false)) {71 throw new NotReplicatedYetException("Not replicated yet:" + src);72 }73 74 // allocate new block record block locations in INode.75 //分配block,并随机生成一个不重复的blockID,然后在INode中记录该block76 newBlock = allocateBlock(src, pathINodes);77 pendingFile.setTargets(targets);78 79 for (DatanodeDescriptor dn : targets) {80 dn.incBlocksScheduled();81 }82 dir.persistBlocks(src, pendingFile);83 }84 if (persistBlocks) {85 getEditLog().logSync();86 }87 88 // Create next block89 LocatedBlock b = new LocatedBlock(newBlock, targets, fileLength);90 if (isAccessTokenEnabled) {91 b.setBlockToken(accessTokenHandler.generateToken(b.getBlock(), 92 EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE)));93 }94 return b;95 }
上面的方法还涉及到了块的选择策略,这个留在下一篇再说。下面这个图来总结下上面方法的调用层次:
最后重点说一下block在DataNode上的存储策略。其调度层次如下:
首先说一下其中涉及到的数据结构:
1 class FSVolume { //卷信息,代表${dfs.data.dir}2 private File currentDir; //存放block,即${dfs.data.dir}/current3 private FSDir dataDir; //表示currentDir有哪些块文件4 private File tmpDir; //存放一些临时文件,即${dfs.data.dir}/tmp5 private File blocksBeingWritten; //放置正在写的block,即${dfs.data.dir}/ blocksBeingWritten6 private File detachDir; //是否写分离,即${dfs.data.dir}/detach7 private DF usage;8 private DU dfsUsage;9 private long reserved;
1 static class FSVolumeSet { //卷信息集合,代表多个${dfs.data.dir}2 FSVolume[] volumes = null; //代表多个FSVolume,并将其组织成一个数组3 int curVolume = 0; //指示当前正在使用哪一个FSVolume
FSVolumeSet 代表多个${dfs.data.dir}目录的集合,它将这些目录组织成一个数组volumes,然后用curVolume来指示当前正在使用的是哪个${dfs.data.dir}目录。${dfs.data.dir}的选择策略如下:
当有多个${dfs.data.dir}时,DataNode顺序地从volumes选择一个FSVolume用来存放block(先放在blocksBeingWritten目录下,写完后再转移到current目录下);每次写完一个block, curVolume增1。以此实现多个${dfs.data.dir}目录的轮流写。该策略在FSDataSet.FSVolumeSet的getNextVolume()方法中实现
1 synchronized FSVolume getNextVolume(long blockSize) throws IOException { 2 3 if(volumes.length < 1) { 4 throw new DiskOutOfSpaceException("No more available volumes"); 5 } 6 7 // since volumes could‘ve been removed because of the failure 8 // make sure we are not out of bounds 9 if(curVolume >= volumes.length) {10 curVolume = 0;11 }12 13 int startVolume = curVolume;14 15 while (true) {16 FSVolume volume = volumes[curVolume];17 curVolume = (curVolume + 1) % volumes.length; //增118 if (volume.getAvailable() > blockSize) { return volume; }19 if (curVolume == startVolume) {20 throw new DiskOutOfSpaceException("Insufficient space for an additional block");21 }22 }23 }
本文基于hadoop1.2.1
如有错误,还请指正
参考文章:《Hadoop技术内幕 深入理解MapReduce架构设计与实现原理》 董西成
转载请注明出处:
HDFS读写数据块--${dfs.data.dir}选择策略