首页 > 代码库 > HDFS小文件合并问题的优化:copyMerge的改进

HDFS小文件合并问题的优化:copyMerge的改进

1.问题分析

用fsck命令统计 查看HDFS上在某一天日志的大小,分块情况以及平均的块大小,即

[hduser@da-master jar]$ hadoop fsck /wcc/da/kafka/report/2015-01-11
DEPRECATED: Use of this script to execute hdfs command is deprecated.
Instead use the hdfs command for it.

15/01/13 18:57:23 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Connecting to namenode via http://da-master:50070
FSCK started by hduser (auth:SIMPLE) from /172.21.101.30 for path /wcc/da/kafka/report/2015-01-11 at Tue Jan 13 18:57:24 CST 2015
....................................................................................................
....................................................................................................
........................................Status: HEALTHY
 Total size:	9562516137 B
 Total dirs:	1
 Total files:	240
 Total symlinks:		0
 Total blocks (validated):	264 (avg. block size 36221652 B)
 Minimally replicated blocks:	264 (100.0 %)
 Over-replicated blocks:	0 (0.0 %)
 Under-replicated blocks:	0 (0.0 %)
 Mis-replicated blocks:		0 (0.0 %)
 Default replication factor:	2
 Average block replication:	2.0
 Corrupt blocks:		0
 Missing replicas:		0 (0.0 %)
 Number of data-nodes:		5
 Number of racks:		1
FSCK ended at Tue Jan 13 18:57:24 CST 2015 in 14 milliseconds


The filesystem under path '/wcc/da/kafka/report/2015-01-11' is HEALTHY

用表格整理出来:

 

Date Time

Total(GB)

Total blocks

AveBlockSize(MB)

2014/12/21

9.39

268

36

2014/12/20

9.5

268

36

2014/12/19

8.89

268

34

2014/11/5

8.6

266

33

2014/10/1

9.31

268

36


分析问题的存在性:从表中可以看出,每天日志量的分块情况:总共大概有268左右的块数,平均块大小为36MB左右,远远不足128MB,这潜在的说明了一个问题。日志产生了很多小文件,大多数都不足128M,严重影响集群的扩展性和性能:首先,在HDFS中,任何block,文件或者目录在内存中均以对象的形式存储,每个对象约占150byte,如果有1000 0000个小文件,每个文件占用一个block,则namenode大约需要2G空间。如果存储1亿个文件,则namenode需要20G空间,这样namenode内存容量严重制约了集群的扩展; 其次,访问大量小文件速度远远小于访问几个大文件;HDFS最初是为流式访问大文件开发的,如果访问大量小文件,需要不断的从一个datanode跳到另一个datanode,严重影响性能;最后,处理大量小文件速度远远小于处理同等大小的大文件的速度,因为每一个小文件要占用一个slot,而task启动将耗费大量时间甚至大部分时间都耗费在启动task和释放task上,累计起来的总时长必然增加。我们采取的策略是先合并小文件,比如整理日志成user_report.tsv,client_report.tsv,AppLog_UserDevice.tsv, 再运行job。


2.解决方案

可以调用API的 FileUtil工具类的方法copyMerge(FileSystem srcFS, Path srcDir, FileSystem dstFS, Path dstFile, boolean deleteSource,Configuration conf, String addString);

但是此方法并不适用,因为某一天日志存在着三种类型的日志,即:

技术分享

要分别合并成三个文件user_report.tsv,client_report.tsv和AppLog_UserDevice.tsv,故必须重新实现copyMerge函数,先分析copyMerge源码:

  /** Copy all files in a directory to one output file (merge). */
  public static boolean copyMerge(FileSystem srcFS, Path srcDir, 
                                  FileSystem dstFS, Path dstFile, 
                                  boolean deleteSource,
                                  Configuration conf, String addString) throws IOException {
	//生成合并后的目标文件路径dstFile,文件名为srcDir.getName(),即源路径的目录名,因此这里我们不能自定义生成的日志文件名,非常不方便							  
    dstFile = checkDest(srcDir.getName(), dstFS, dstFile, false);
	//判断源路径是否为文件目录
    if (!srcFS.getFileStatus(srcDir).isDirectory())
      return false;
   //创建输出流,准备写文件
    OutputStream out = dstFS.create(dstFile);
    
    try {
	 // 得到每个源路径目录下的每个文件
      FileStatus contents[] = srcFS.listStatus(srcDir);
	  //排序操作
      Arrays.sort(contents);
      for (int i = 0; i < contents.length; i++) {
        if (contents[i].isFile()) {
			//创建输入流,读文件
          InputStream in = srcFS.open(contents[i].getPath());
          try {
		  //执行复制操作,写入到目标文件中
            IOUtils.copyBytes(in, out, conf, false);
            if (addString!=null)
              out.write(addString.getBytes("UTF-8"));
                
          } finally {
            in.close();
          } 
        }
      }
    } finally {
      out.close();
    }
    
	//若deleteSource为true,删除源路径目录下的每个文件
    if (deleteSource) {
      return srcFS.delete(srcDir, true);
    } else {
      return true;
    }
  }  

改进后:(这种方式只需要打开关闭输出流out 三次)

	/** Copy corresponding files in a directory to related output file (merge). */
	@SuppressWarnings("unchecked")
	public static boolean merge(FileSystem hdfs, Path srcDir, Path dstFile,
			boolean deleteSource, Configuration conf) throws IOException {
		if (!hdfs.getFileStatus(srcDir).isDirectory())
			return false;
		// 得到每个源目录下的每个文件;
		FileStatus[] fileStatus = hdfs.listStatus(srcDir);
		// 三种不同类型的文件各自的文件路径存入不同的list;
		ArrayList<Path> userPaths = new ArrayList<Path>();
		ArrayList<Path> clientPaths = new ArrayList<Path>();
		ArrayList<Path> appPaths = new ArrayList<Path>();
		for (FileStatus fileStatu : fileStatus) {
			Path filePath = fileStatu.getPath();
			if (filePath.getName().startsWith("user_report")) {
				userPaths.add(filePath);
			} else if (filePath.getName().startsWith("client_report")) {
				clientPaths.add(filePath);
			} else if (filePath.getName().startsWith("AppLog_UserDevice")) {
				appPaths.add(filePath);
			}
		}
		// 分别写入到目标文件:user_report.tsv中
		if (userPaths.size() > 0) {
			Path userDstFile = new Path(dstFile.toString() + "/user_report.tsv");
			OutputStream out = hdfs.create(userDstFile);
			Collections.sort(userPaths);
			try {
				Iterator<Path> iterator = userPaths.iterator();
				while (iterator.hasNext()) {
					InputStream in = hdfs.open(iterator.next());
					try {
						IOUtils.copyBytes(in, out, conf, false);
					} finally {
						in.close();
					}
				}
			} finally {
				out.close();
			}
		}
		// 分别写入到目标文件:client_report.tsv中
		if (clientPaths.size() > 0) {
			Path clientDstFile = new Path(dstFile.toString()
					+ "/client_report.tsv");
			OutputStream out = hdfs.create(clientDstFile);
			Collections.sort(clientPaths);
			try {
				Iterator<Path> iterator = clientPaths.iterator();
				while (iterator.hasNext()) {
					InputStream in = hdfs.open(iterator.next());
					try {
						IOUtils.copyBytes(in, out, conf, false);
					} finally {
						in.close();
					}
				}
			} finally {
				out.close();
			}
		}
		// 分别写入到目标文件:AppLog_UserDevice.tsv中
		if (appPaths.size() > 0) {
			Path appDstFile = new Path(dstFile.toString()
					+ "/AppLog_UserDevice.tsv");
			OutputStream out = hdfs.create(appDstFile);
			Collections.sort(appPaths);
			try {
				Iterator<Path> iterator = appPaths.iterator();
				while (iterator.hasNext()) {
					InputStream in = hdfs.open(iterator.next());
					try {
						IOUtils.copyBytes(in, out, conf, false);
					} finally {
						in.close();
					}
				}
			} finally {
				out.close();
			}
		}
		if (deleteSource) {
			return hdfs.delete(srcDir, true);
		}
		return true;
	}

当然你也可以这样:

	public static boolean mergeFiles(FileSystem hdfs, Path srcDir,
			Path dstFile, boolean deleteSource, Configuration conf)
			throws IOException {
		if (!hdfs.getFileStatus(srcDir).isDirectory())
			return false;
		// 得到每个源目录下的每个文件;
		FileStatus[] fileStatus = hdfs.listStatus(srcDir);
		// 三种不同类型的文件各自合并

		for (FileStatus fileStatu : fileStatus) {
			Path filePath = fileStatu.getPath();
			Path dstPath = new Path("");
			if (filePath.getName().startsWith("user_report")) {
				dstPath = new Path(dstFile.toString() + "/user_report.tsv");
			} else if (filePath.getName().startsWith("client_report")) {
				dstPath = new Path(dstFile.toString() + "/client_report.tsv");
			} else if (filePath.getName().startsWith("AppLog_UserDevice")) {
				dstPath = new Path(dstFile.toString() + "/client_report.tsv");
			}else{
				dstPath=new Path( "/error.tsv");
			}
			
			OutputStream out = hdfs.create(dstPath);
			try {
				InputStream in = hdfs.open(filePath);
				try {
					IOUtils.copyBytes(in, out, conf, false);
				} finally {
					in.close();
				}

			} finally {
				out.close();
			}
			
			
		}
		if (deleteSource) {
			return hdfs.delete(srcDir, true);
		}
		return true;
	}

3.总结

根据不同业务逻辑的需求,你可以自定义实现API接口函数。对于解决小文件合并问题,如果你有更好的策略,欢迎交流!



HDFS小文件合并问题的优化:copyMerge的改进