首页 > 代码库 > Hadoop学习笔记之二 文件操作
Hadoop学习笔记之二 文件操作
HDFS分布式文件系统:
优点:支持超大文件存储、流式访问、一次写入多次读取。
缺点:不适应大量小文件、不适应低时延的数据访问、不适应多用户访问任意修改文件。
1.hadoop用于大数据处理,在数据量较小时,并不适用于实时性强的任务,并不是所有的job放到hadoop上,性能都会提升。
2.大量小文件的情况下会极大的降低系统的性能,所以处理前需要先将少文件聚合成大文件,map的输出也应该首先combine在传输给reduce。
3.数据传输时的IO开销,存储在内存中还是硬盘中,节点之间共享数据的分发,都是实践中需要考虑的问题。
涉及的节点:Namenode、 secondaryNameNode、 DataNode
HDFS将大文件切分成小文件块(64M)分布式存储在DataNode节点上,Namenode上仅仅记录元数据的信息。
文件的常用操作:
创建、删除、读、写、追加、重命名、查看属性、更改属性、创建文件夹、移动文件、遍历所有文件、删除文件夹、重命名、判断是否是目录、判断是否存在。
1.shell下提供的文件操作:
./hadoop fs -help
1.1 列出文件
./hadoop fs -list 文件路径
1.2 上传文件
./hadoop fs -put <srcpath> <destpath>
1.3 下载文件
./hadoo fs -get <srcpath> <destpath>
1.4 删除文件或文件夹
./hadoop fs -rmr 文件路径
1.5 查看文本文件
./hadoop fs -cat 文件路径
1.6 文件存储负载均衡
./start-balanser.sh
1.7 查看文件系统的统计信息
./hadoop dfsadmin -report
2.代码中的文件操作(API):
一般是使用java创建本地的文件流, 使用fs创建hdfs上的文件流, 然后使用相关的Utils在两者之间进行传递。与外界有交互的时候一定要首先创建流。
hdfs 文件相关的操作都是通过FileSystem进行的, 如文件的(create、open、delete、copyFromLocalFile、exists、listStatus、rename、getFileBlockLocations、
getDatanodeStatus
fs是HDFS文件系统对外的接口,同样的还有LocalFileSystem、DistributedFileSystem
创建fs的时候URI.create()这句话要加上,不然会说Wrong FS: expected file:\\\
2.1 创建(写):
import java.io.BufferedInputStream;import java.io.FileInputStream;import java.io.InputStream;import java.io.OutputStream;import java.net.URI;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IOUtils;public class FileCopy { public static void main(String[] args)throws Exception{ if (args.length != 2){ System.err.println("Usage : filecopy <source> <dest>"); System.exit(2); } Configuration conf = new Configuration(); InputStream in = new BufferedInputStream(new FileInputStream(args[0])); FileSystem fs = FileSystem.get(URI.create(args[1]), conf); OutputStream out = fs.create(new Path(args[1])); IOUtils.copyBytes(in, out, 512,true); }}
2.2 打开(读):
Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(args[0]), conf); InputStream in = null; try{ in = fs.open(new Path(args[0])); IOUtils.copyBytes(in, System.out, 4096,false); }finally{ IOUtils.closeStream(in); }
2.3 将数组里的内容写入文件
Configruation conf = new Configuration();FileSystem fs = FileSystem.get(URI.create(args[0]),conf);Path path = new Path("文件路径");byte[] buf = "hello world!".getBytes();FSDataOutputStream out = fs.create(path);out.write(buf, 0, buf,length);
2.4 直接将本地文件上传
Configuration conf = new Configuration();FileSystem fs = FileSystem.get(URI.create("hdfs文件路径"),conf);Path src = new Path("本地文件路径");Path dest = new Path("hdfs文件路径");fs.copyFromLocalFiles(src, dest);
2.5 删除:
Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(args[0]),conf); fs.delete(new Path(args[0]), false);
2.6 判断文件是否存在
Configuration conf = new Configuration();FileSystem fs = FileSystem.get(conf);Path path = new Path("你的文件路径");boolean isExists = fs.exists(path);
2.7 重命名
Configuration conf = new Configuration();FileSystem fs = FileSystem.get(conf);Path src = new Path("路径/源文件名");Path dst = new Path("路径/目的文件名");boolean isRename = fs.rename(src, dst);
2.8 显示属性:
Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(args[0]), conf); FileStatus stat = fs.getFileStatus(new Path(args[0])); System.out.println("路径:\t"+stat.getPath()); System.out.println("长度:\t"+stat.getLen()); System.out.println("用户:\t"+stat.getOwner()); System.out.println("组:\t"+stat.getGroup()); System.out.println("块大小:\t"+stat.getBlockSize()); System.out.println("修改时间:\t"+stat.getModificationTime()); System.out.println("权限:\t"+stat.getPermission().toString()); System.out.println("备份:\t"+stat.getReplication()); System.out.println("文件夹?:\t"+stat.isDir());
2.9 遍历文件:
Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(args[0]), conf); FileStatus[] status = fs.listStatus(new Path(args[0])); Path[] listedPaths = FileUtil.stat2Paths(status); for (Path p : listedPaths){ System.out.println(p); }
2.10查找文件在hdfs集群中的位置
package cc.test.fileLocation;import java.net.URI;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.BlockLocation;import org.apache.hadoop.fs.FileStatus;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;public class FileLocation { public static void main(String[] args)throws Exception{ Configuration conf = new Configuration(); String s = "hdfs://MASTERPC:9000/home/Fea.txt"; FileSystem fs = FileSystem.get(URI.create(s),conf); FileStatus stat = fs.getFileStatus(new Path(s)); BlockLocation[] blkLocations = fs.getFileBlockLocations(stat, 0, stat.getLen()); for (int i=0; i<blkLocations.length; i++){ System.out.println("block:"+String.valueOf(i)+blkLocations[i].getHosts(); } }}
2.11 查找所有的datanode节点:
package cc.test.datenodeName;import java.net.URI;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.hdfs.DistributedFileSystem;import org.apache.hadoop.hdfs.protocol.DatanodeInfo;public class DatanodeName { public static void main(String[] args)throws Exception{ Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(/*URI.create("hdfs://MASTERPC:9000/"),*/conf); DistributedFileSystem dfs = (DistributedFileSystem)fs; DatanodeInfo[] dnstats = dfs.getDataNodeStats(); for (int i=0; i<dnstats.length; i++){ System.out.println(dnstats[i].getHostName()); } }}
如果在建立fs的时候没有加URI.create(), 会报一下错误:
Exception in thread "main" java.lang.ClassCastException: org.apache.hadoop.fs.LocalFileSystem cannot be cast to org.apache.hadoop.hdfs.DistributedFileSystem
这时由于在eclipse中运行的hadoop程序,默认并不是放到云端执行的,而是eclipse的虚拟云中运行,没有job的调度,也没还有配置文件的读取
路径中如果不加前缀hdfs://MASTERPC:9000则默认是PC的本地文件。(我的猜测)
采用writer将小文件数据聚合成sequence 字节流从而在hdfs中形成大文件。
package cc.test.filesequence;import java.net.URI;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.BytesWritable;import org.apache.hadoop.io.IOUtils;import org.apache.hadoop.io.SequenceFile;import org.apache.hadoop.io.SequenceFile.CompressionType;import org.apache.hadoop.io.Text;public class FileSequence { private static String[] temValue =http://www.mamicode.com/ { "hello world", "hello china", "hello home", "hello hadoop" }; public static void main(String[] args)throws Exception{ if (args.length != 2){ System.err.println("Usage: fileSeq: <src> <dest>"); System.exit(2); } Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(args[1]), conf); Path path = new Path(args[1]); Text key = new Text(); BytesWritable value = new BytesWritable(); SequenceFile.Writer writer = null; try{ writer = SequenceFile.createWriter(fs, conf, path, key.getClass(), value.getClass(), CompressionType.BLOCK); for (int i = 0; i< 5000; i++){ key.set(String.valueOf(i)); value.set(new BytesWritable(temValue[i%(temValue.length)].getBytes())); writer.append(key, value); } }finally{ IOUtils.closeStream(writer); } System.out.println("done"); }}
package cc.test.serializeTest;import java.io.ByteArrayInputStream;import java.io.DataInputStream;import java.io.DataOutputStream;import java.io.IOException;import org.apache.commons.io.output.ByteArrayOutputStream;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Writable;import org.apache.hadoop.util.StringUtils;public class SerializeTest { public static byte[] serialize(Writable w)throws IOException{ ByteArrayOutputStream out = new ByteArrayOutputStream(); DataOutputStream dataout = new DataOutputStream(out); w.write(dataout); dataout.close(); return out.toByteArray(); } public static byte[] deserialize(Writable w, byte[] bytes)throws IOException{ ByteArrayInputStream in = new ByteArrayInputStream(bytes); DataInputStream datain = new DataInputStream(in); w.readFields(datain); datain.close(); return bytes; } public static void main(String[] args)throws Exception{ IntWritable intw = new IntWritable(9); byte[] bytes = serialize(intw); String bytes_str = StringUtils.byteToHexString(bytes); System.out.println(bytes_str); IntWritable intw2 = new IntWritable(); deserialize(intw2, bytes); System.out.println(intw2); }}000000099
Hadoop学习笔记之二 文件操作