首页 > 代码库 > Hadoop学习笔记之二 文件操作

Hadoop学习笔记之二 文件操作

 

 

HDFS分布式文件系统:
优点:支持超大文件存储、流式访问、一次写入多次读取。
缺点:不适应大量小文件、不适应低时延的数据访问、不适应多用户访问任意修改文件。

 

1.hadoop用于大数据处理,在数据量较小时,并不适用于实时性强的任务,并不是所有的job放到hadoop上,性能都会提升。

2.大量小文件的情况下会极大的降低系统的性能,所以处理前需要先将少文件聚合成大文件,map的输出也应该首先combine在传输给reduce。

3.数据传输时的IO开销,存储在内存中还是硬盘中,节点之间共享数据的分发,都是实践中需要考虑的问题。

 

涉及的节点:Namenode、  secondaryNameNode、 DataNode

HDFS将大文件切分成小文件块(64M)分布式存储在DataNode节点上,Namenode上仅仅记录元数据的信息。

 

文件的常用操作:

创建、删除、读、写、追加、重命名、查看属性、更改属性、创建文件夹、移动文件、遍历所有文件、删除文件夹、重命名、判断是否是目录、判断是否存在。

1.shell下提供的文件操作:

 ./hadoop fs -help

1.1 列出文件

./hadoop fs -list 文件路径

1.2 上传文件

./hadoop fs -put <srcpath> <destpath>

1.3 下载文件

./hadoo fs -get <srcpath> <destpath>

1.4 删除文件或文件夹

./hadoop fs -rmr 文件路径

1.5 查看文本文件

./hadoop fs -cat 文件路径

1.6 文件存储负载均衡

./start-balanser.sh

1.7 查看文件系统的统计信息

./hadoop dfsadmin -report

 

2.代码中的文件操作(API):

一般是使用java创建本地的文件流, 使用fs创建hdfs上的文件流, 然后使用相关的Utils在两者之间进行传递。与外界有交互的时候一定要首先创建流。

hdfs 文件相关的操作都是通过FileSystem进行的, 如文件的(create、open、delete、copyFromLocalFile、exists、listStatus、rename、getFileBlockLocations、

getDatanodeStatus

fs是HDFS文件系统对外的接口,同样的还有LocalFileSystem、DistributedFileSystem

创建fs的时候URI.create()这句话要加上,不然会说Wrong FS:  expected file:\\\

2.1 创建(写):

import java.io.BufferedInputStream;import java.io.FileInputStream;import java.io.InputStream;import java.io.OutputStream;import java.net.URI;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IOUtils;public class FileCopy {    public static void main(String[] args)throws Exception{        if (args.length != 2){            System.err.println("Usage : filecopy <source> <dest>");            System.exit(2);        }        Configuration conf = new Configuration();        InputStream in = new BufferedInputStream(new FileInputStream(args[0]));        FileSystem fs = FileSystem.get(URI.create(args[1]), conf);        OutputStream out = fs.create(new Path(args[1]));        IOUtils.copyBytes(in, out, 512,true);            }}

2.2 打开(读):

        Configuration conf = new Configuration();        FileSystem fs = FileSystem.get(URI.create(args[0]), conf);        InputStream in = null;        try{            in = fs.open(new Path(args[0]));            IOUtils.copyBytes(in, System.out, 4096,false);        }finally{            IOUtils.closeStream(in);        }

2.3 将数组里的内容写入文件

Configruation conf = new Configuration();FileSystem fs  = FileSystem.get(URI.create(args[0]),conf);Path path = new Path("文件路径");byte[] buf = "hello world!".getBytes();FSDataOutputStream out = fs.create(path);out.write(buf, 0, buf,length);

2.4  直接将本地文件上传

Configuration conf = new Configuration();FileSystem fs  = FileSystem.get(URI.create("hdfs文件路径"),conf);Path src = new Path("本地文件路径");Path dest = new Path("hdfs文件路径");fs.copyFromLocalFiles(src, dest);

2.5 删除:

        Configuration conf = new Configuration();        FileSystem fs = FileSystem.get(URI.create(args[0]),conf);        fs.delete(new Path(args[0]), false);

2.6 判断文件是否存在

Configuration conf = new Configuration();FileSystem fs = FileSystem.get(conf);Path path = new Path("你的文件路径");boolean isExists = fs.exists(path);

2.7 重命名

Configuration conf = new Configuration();FileSystem fs  = FileSystem.get(conf);Path src = new Path("路径/源文件名");Path dst = new Path("路径/目的文件名");boolean isRename = fs.rename(src, dst);

2.8 显示属性:

        Configuration conf  = new Configuration();        FileSystem fs = FileSystem.get(URI.create(args[0]), conf);        FileStatus stat = fs.getFileStatus(new Path(args[0]));                System.out.println("路径:\t"+stat.getPath());        System.out.println("长度:\t"+stat.getLen());        System.out.println("用户:\t"+stat.getOwner());        System.out.println("组:\t"+stat.getGroup());        System.out.println("块大小:\t"+stat.getBlockSize());        System.out.println("修改时间:\t"+stat.getModificationTime());        System.out.println("权限:\t"+stat.getPermission().toString());        System.out.println("备份:\t"+stat.getReplication());        System.out.println("文件夹?:\t"+stat.isDir());        

2.9 遍历文件:

     Configuration conf = new Configuration();        FileSystem fs  = FileSystem.get(URI.create(args[0]), conf);        FileStatus[] status = fs.listStatus(new Path(args[0]));        Path[] listedPaths = FileUtil.stat2Paths(status);        for (Path p : listedPaths){            System.out.println(p);        }

2.10查找文件在hdfs集群中的位置

 

package cc.test.fileLocation;import java.net.URI;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.BlockLocation;import org.apache.hadoop.fs.FileStatus;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;public class FileLocation {    public static void main(String[] args)throws Exception{        Configuration conf = new Configuration();        String s = "hdfs://MASTERPC:9000/home/Fea.txt";        FileSystem fs  = FileSystem.get(URI.create(s),conf);        FileStatus stat = fs.getFileStatus(new Path(s));                BlockLocation[] blkLocations = fs.getFileBlockLocations(stat, 0, stat.getLen());                for (int i=0; i<blkLocations.length; i++){            System.out.println("block:"+String.valueOf(i)+blkLocations[i].getHosts();        }    }}

2.11 查找所有的datanode节点:

package cc.test.datenodeName;import java.net.URI;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.hdfs.DistributedFileSystem;import org.apache.hadoop.hdfs.protocol.DatanodeInfo;public class DatanodeName {        public static void main(String[] args)throws Exception{        Configuration conf  = new  Configuration();        FileSystem fs = FileSystem.get(/*URI.create("hdfs://MASTERPC:9000/"),*/conf);        DistributedFileSystem dfs = (DistributedFileSystem)fs;        DatanodeInfo[] dnstats = dfs.getDataNodeStats();        for (int i=0; i<dnstats.length; i++){            System.out.println(dnstats[i].getHostName());        }    }}

如果在建立fs的时候没有加URI.create(), 会报一下错误:

Exception in thread "main" java.lang.ClassCastException: org.apache.hadoop.fs.LocalFileSystem cannot be cast to org.apache.hadoop.hdfs.DistributedFileSystem
这时由于在eclipse中运行的hadoop程序,默认并不是放到云端执行的,而是eclipse的虚拟云中运行,没有job的调度,也没还有配置文件的读取

路径中如果不加前缀hdfs://MASTERPC:9000则默认是PC的本地文件。(我的猜测)

 

 

 采用writer将小文件数据聚合成sequence 字节流从而在hdfs中形成大文件。

package cc.test.filesequence;import java.net.URI;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.BytesWritable;import org.apache.hadoop.io.IOUtils;import org.apache.hadoop.io.SequenceFile;import org.apache.hadoop.io.SequenceFile.CompressionType;import org.apache.hadoop.io.Text;public class FileSequence {    private static String[] temValue =http://www.mamicode.com/ {        "hello world",         "hello china",        "hello home",        "hello hadoop"    };    public static void main(String[] args)throws Exception{        if (args.length != 2){            System.err.println("Usage: fileSeq: <src> <dest>");            System.exit(2);        }        Configuration conf  = new Configuration();        FileSystem fs = FileSystem.get(URI.create(args[1]), conf);        Path path = new Path(args[1]);        Text key = new Text();        BytesWritable value = new BytesWritable();            SequenceFile.Writer writer = null;        try{            writer = SequenceFile.createWriter(fs, conf, path, key.getClass(), value.getClass(), CompressionType.BLOCK);                        for (int i = 0; i< 5000; i++){                key.set(String.valueOf(i));                value.set(new BytesWritable(temValue[i%(temValue.length)].getBytes()));                writer.append(key, value);            }        }finally{                IOUtils.closeStream(writer);        }        System.out.println("done");    }}

 

 

package cc.test.serializeTest;import java.io.ByteArrayInputStream;import java.io.DataInputStream;import java.io.DataOutputStream;import java.io.IOException;import org.apache.commons.io.output.ByteArrayOutputStream;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Writable;import org.apache.hadoop.util.StringUtils;public class SerializeTest {    public static byte[] serialize(Writable w)throws IOException{        ByteArrayOutputStream out = new ByteArrayOutputStream();        DataOutputStream dataout = new DataOutputStream(out);                w.write(dataout);        dataout.close();        return out.toByteArray();    }    public static byte[] deserialize(Writable w, byte[] bytes)throws IOException{        ByteArrayInputStream in = new ByteArrayInputStream(bytes);        DataInputStream datain = new DataInputStream(in);        w.readFields(datain);        datain.close();        return bytes;    }    public static void main(String[] args)throws Exception{        IntWritable intw = new IntWritable(9);        byte[] bytes = serialize(intw);        String bytes_str = StringUtils.byteToHexString(bytes);        System.out.println(bytes_str);                IntWritable intw2 = new IntWritable();        deserialize(intw2, bytes);        System.out.println(intw2);            }}000000099

 

Hadoop学习笔记之二 文件操作