首页 > 代码库 > Hadoop HDFS (3) JAVA访问HDFS

Hadoop HDFS (3) JAVA访问HDFS

现在我们来深入了解一下Hadoop的FileSystem类。这个类是用来跟Hadoop的文件系统进行交互的。虽然我们这里主要是针对HDFS,但是我们还是应该让我们的代码只使用抽象类FileSystem,这样我们的代码就可以跟任何一个Hadoop的文件系统交互了。在写测试代码时,我们可以用本地文件系统测试,部署时使用HDFS,只需配置一下,不需要修改代码了。
在Hadoop 1.x以后的版本中引入了一个新的文件系统接口叫FileContext,一个FileContext实例可以处理多种文件系统,而且接口更加清晰和统一。

用Hadoop URL来读取HDFS里的文件

在讨论Java API之前,先来看一个用Hadoop URL来读文件数据的方式。这种方式从严格意义上讲不能算是Hadoop给Java的接口,应该说是用Java Net的方式来向HDFS发送一个网络请求然后读取返回流。
与通过URL去读取一个HTTP服务拿到一个流相似
InputStream in = null;
try {
    in = new URL("hdfs://host/path").openStream();
    //操作输入流in,可以读取到文件的内容
} finally {
    IOUtils.closeStream(in);
}
这个方式有个小问题,Java虚拟机默认是不认识hdfs这个协议的,要想让它知道,需要通过
URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
来设置一个UrlStreamHandlerFactory,以便使其认识hdfs协议。
但是这个方法的调用是对整个虚拟机生效的,所以,如果程序中有其它部分,尤其是第三方框架,设置了这个工厂,那就会出现问题。因此这就成为了使用这种方式来访问HDFS的限制。
完整代码如下:
import java.io.InputStream;
import java.net.URL;
import org.apache.hadoop.fs.FsUrlStreamHandlerFactory;
import org.apache.hadoop.io.IOUtils;

public class URLCat {
    static {
        URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
    }
    public static void main(String[] args) throws Exception {
        InputStream in = null;
        try { 
            in = new URL(args[0]).openStream(); 
            IOUtils.copyBytes(in, System.out, 4096, false); 
        } finally { 
            IOUtils.closeStream(in); 
        }
    }
}
在这个程序里,先设置JVM的URLStreamHandlerFactory,然后通过url打开一个流,读取流,就得到了文件的内容,通过IOUtils.copyBytes()把读到的内容写出到标准输出流里,也就是控制台上,就实现了类似于Linux里的cat命令的功能。
最后把输入流关闭。
IOUtils是hadoop提供的一个工具类。copyBytes的后两个参数分别表示buffer大小和结束后是否关闭流,我们在后面手动关闭流,所以传false。
将程序打成jar包放到hadoop上去,执行:
$hadoop jar urlcat.jar hdfs://localhost/user/norris/weatherdata.txt
可将前一节放到hdfs上去的weatherdata.txt里的内容显示到控制台上。
执行hadoop jar的方法好像之前忘记写博客了,简单说一句,就是把程序打成jar包,可以指定一个可执行类,然后执行:
$hadoop jar xxx.jar
或者如果不打jar包,把class文件直接放上去,然后执行hadoop XxxClass也行,其实说白了hadoop命令就是启动一个虚拟机,跟执行java XxxClass或者java -jar xxx.jar一样,只不过用hadoop命令启动虚拟机,在启动前,hadoop命令自动把需要的类库加入到了CLASSPATH中,因此省去了自己去设置环境变量的麻烦。
值得注意的是,如果你把class放在了/home/norris/data/hadoop/bin/目录下,需要在$HADOOP_INSTALL/etc/hadoop/hadoop-env.sh最后一行添加
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/home/norris/data/hadoop/bin/
把自己的class存放位置变成hadoop搜索class位置的一个选项。

用FileSystem(org.apache.hadoop.fs.FileSystem)类来读取HDFS里的文件

有个类叫Path(org.apache.hadoop.fs.Path),这个类的实例代表HDFS里的一个文件(或目录),可以把它理解成java.io.File,跟这个File类是一样的,只不过File这个名字听起来就像是一个本地文件系统的类,所以为了区别,取名为Path,也可以把Path理解成一个HDFS上的URI,比如hdfs://localhost/user/norris/weatherdata.txt。
FileSystem类是一个抽象类,代表所有可以运行在Hadoop上的文件系统,我们这里虽然在讨论HDFS,但其实FileSystem类可以代表如本地文件系统等的任何可以运行于Hadoop上的文件系统。我们写程序也应该针对FileSystem来写,而不是org.apache.hadoop.hdfs.DistributedFileSystem,以便方便移植到其他文件系统上去。

第一步,先要拿到一个FileSystem实例,通过下面API可以获取到实例:
public static FileSystem get(Configuration conf) throws IOException;
public static FileSystem get(URI uri, Configuration conf) throws IOException;
public static FileSystem get(final URI uri, final Configuration conf,  final String user) throws IOException, InterruptedException;
其中,
Configuration(org.apache.hadoop.conf.Configuration)是指Hadoop的配置,如果用默认的构造函数构造出Configuration,就是指core-site.xml里的配置,前面《配置Hadoop》(http://blog.csdn.net/norriszhang/article/details/38659321)一节讲到了该配置,把fs配置成了hdfs,所以根据这个配置Hadoop就知道该取到一个DistributedFileSystem(org.apache.hadoop.hdfs.DistributedFileSystem)实例。
URI是指文件在HDFS里存放的路径。
String user这个参数,将在《安全》章节讨论。
即使在操作HDFS,你可能也同时希望访问本地文件系统,可以通过下面API方便地获取:
public static LocalFileSystem getLocal(Configuration conf)  throws IOException;

通过调用FileSystem实例的open方法可以得到一个输入流:
public FSDataInputStream open(Path f) throws IOException;
public abstract FSDataInputStream open(Path f, int bufferSize)  throws IOException;
其中的int bufferSize是缓冲区大小,如果不传这个参数,默认大小是4K。
下面是使用FileSystem读取HDFS中文件内容的完整程序:
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
public class FileSystemCat {
    public static void main(String[] args) throws Exception {
        String uri = args[0];
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(URI.create(uri), conf);
        //System.out.println(fs.getClass().getName()); //这里可以看到得到的实例是DistributedFileSystem,因为core-site.xml里配的是hdfs
        FSDataInputStream in = null;
        try {
            in = fs.open(new Path(uri));
            IOUtils.copyBytes(in, System.out, 4096, false);
        } finally {
            IOUtils.closeStream(in);
        }
    }
}
打成jar包放到hadoop上去运行:
$hadoop jar filesystemcat.jar hdfs://localhost/user/norris/weatherdata.txt
在控制台输出了weatherdata.txt文件中的内容。

大家看一下API,fs.open方法的返回值是FSDataInputStream,而不是java.io包里的流。
这个流继承自java.io.DataInputStream,同时实现Seekable(org.apache.hadoop.fs.Seekable),也就是说可以调用seek()方法读取文件的任意位置数据。
例如上面程序加两行:
in.seek(0);
IOUtils.copyBytes(in, System.out, 4096, false);
就可以实现把文件内容打印两遍。
另外,FSDataInputStream类同时也实现了PositionedReadable(org.apache.hadoop.fs.PositionedReadable)接口,接口中定义的三个方法允许在任意位置读取文件内容:
public int read(long position, byte[] buffer, int offset, int length) throws IOException;
public void readFully(long position, byte[] buffer, int offset, int length) throws IOException;
public void readFully(long position, byte[] buffer) throws IOException;
并且,这些方法是线程安全的,但是FSDataInputStream不是线程安全的,只是说不同的FSDataInputStream实例在不同的线程里读文件内容是安全的,但是一个FSDataInputStream实例被不同的线程调用是不安全的。所以,应该为每个线程创建各自的FSDataInputStream实例。
最后,调用seek()方法的开销是相当巨大的,应该尽量少调用,你的程序应该设计成尽量流式获取文件内容。

用FileSystem类来向HDFS里写文件

FileSystem类里提供了一大堆API用来在创建文件,其中最简单的一个是:
public FSDataOutputStream create(Path f) throws IOException;
创建一个Path类代表的文件,并返回一个输出流。
这个方法有一大堆的重载方法,可以用来设置是否覆盖已有文件,该文件复制的份数,写入时的缓冲区大小,文件块大小(block),权限等。
如果该Path代表的文件的父目录(甚至爷爷目录)不存在,这些目录会被自动创建,这样确时在很多情况下提供了方便,但是有时却与我们的需求不相符,如果你确实希望当父目录不存在时不要创建,应该自己判断其父目录是否存在。原来的API中有个createNonRecursive方法,如果父目录不存在会失败。但现在这一组方法已经被废弃,不建议使用了。
create方法的一个重要重载方法是:
public FSDataOutputStream create(Path f, Progressable progress) throws IOException;
Progressable接口的progress方法可以用来当有数据写入时回调。
public interface Progressable {
    public void progress();
}
如果不想新建一个文件,而是希望向已有文件追加内容,可以调用:
public FSDataOutputStream append(Path f) throws IOException;
该方法是可选实现方法,也就是说不是所有的Hadoop FileSystem都实现了该方法,例如,HDFS实现了,而S3文件系统就没有实现。并且,HDFS在1.x以后的版本中的实现才是稳定的实现,之前的实现是有问题的。
下面这个程序把一个本地文件上传到HDFS上,并且在每次progress方法被调用时输出一个点(.)
import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Progressable;

public class FileCopyWithProgress {
    public static void main(String[] args) throws Exception {
        String localSrc = http://www.mamicode.com/args[0];>
程序中的progress什么条件下被调用呢?这个API并没有说明,也就是说你不能假设它在某时会被调用,实际的测试结果是大概写入64K时会被调用一次,但多次测试也不一致,尤其在文件小时,更加不可预知。
如果打开上面程序的Thread.sleep()的注释,会发现,其实在progress在被调用时,主程序是等待的。如果把主线程的ID和调用progress方法的线程ID打印出来发现,每次调用progress的线程都是同一个线程,但不是主线程。但是,主线程的"end."确实是在所有点(.)都输出后才输出的。
我复制的文件是4743680字节,即大概4.5M,打出73个点。平均64K一个点。
到目前为止,只有HDFS在写入文件时回调progress,其它文件系统都没有实现回调progress,在后面做MapReduce程序时会发现,这一回调非常有用。

FileSystem里的create方法返回的FSDataOutputStream类有一个方法:
public long getPos() throws IOException;
可以查询当前在往文件的哪个位置写,一个写入的偏移量。
但是与FSDataInputStream不同,FSDataOutputStream没有seek方法,因为HDFS只允许顺序写,对于一个打开的文件,只允许向尾部追加,不允许在任意位置写,因此也就没有必要提供seek方法了。

创建目录

FileSystem中的创建目录的方法:
public boolean mkdirs(Path f) throws IOException;
与java.io.File.mkdirs方法一样,创建目录,并同时创建缺失的父目录。
我们一般不需要创建目录,因为创建文件时,默认就把所需的目录都创建好了。

查询文件元信息:FileStatus(org.apache.hadoop.fs.FileStatus)

FileSystem类中的getFileStatus()方法返回一个FileStatus实例,该FileStatus实例中包含了该Path(文件或目录)的元信息:文件大小,block大小,复制的份数,最后修改时间,所有者,权限等。
程序简单,先上代码,再解释一下
import static org.junit.Assert.*;
import static org.hamcrest.CoreMatchers.*;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class ShowFileStatusTest {
    private static final String SYSPROP_KEY = "test.build.data";
    /** MiniDFSCluster类在hadoop-hdfs-2.4.1-tests.jar中,是一个专门用于测试的in-process HDFS集群 */
    private MiniDFSCluster cluster;
    private FileSystem fs;
    @Before
    public void setUp() throws IOException {
        Configuration conf = new Configuration();
        String sysprop = System.getProperty(SYSPROP_KEY);
        if (sysprop == null) {
            System.setProperty(SYSPROP_KEY, "/tmp");
        }
        cluster = new MiniDFSCluster(conf, 1, true, null);
        fs = cluster.getFileSystem();
        OutputStream out = fs.create(new Path("/dir/file"));
        out.write("content".getBytes("UTF-8"));
        out.close();
    }
    @After
    public void tearDown() throws IOException {
        if (fs != null) {
            fs.close();
        }
        if (cluster != null) {
            cluster.shutdown();
        }
    }
    @Test(expected = FileNotFoundException.class)
    public void throwsFileNotFoundForNonExistentFile() throws IOException {
        fs.getFileStatus(new Path("no-such-file"));
    }
    @Test
    public void fileStatusForFile() throws IOException {
        Path file = new Path("/dir/file");
        FileStatus stat = fs.getFileStatus(file);
       
        assertThat(stat.getPath().toUri().getPath(), is("/dir/file"));
        assertThat(stat.isDirectory(), is(false));
        assertThat(stat.getLen(), is(7L));
        assertTrue(stat.getModificationTime() <= System.currentTimeMillis());
        assertThat(stat.getReplication(), is((short)1));
        assertThat(stat.getBlockSize(), is(64 * 1024 * 1024L));
        assertThat(stat.getOwner(), is("norris"));
        assertThat(stat.getGroup(), is("supergroup"));
        assertThat(stat.getPermission().toString(), is("rw-r--r--"));
    }
    @Test
    public void fileStatusForDirectory() throws IOException {
        Path dir = new Path("/dir");
        FileStatus stat = fs.getFileStatus(dir);
        assertThat(stat.getPath().toUri().getPath(), is("/dir"));
        assertThat(stat.isDirectory(), is(true));
        assertThat(stat.getLen(), is(0L));
        assertTrue(stat.getModificationTime() <= System.currentTimeMillis());
        assertThat(stat.getReplication(), is((short)0));
        assertThat(stat.getBlockSize(), is(0L));
        assertThat(stat.getOwner(), is("norris"));
        assertThat(stat.getGroup(), is("supergroup"));
        assertThat(stat.getPermission().toString(), is("rwxr-xr-x"));
    }
}
程序编译需要引入hadoop-hdfs-2.4.1-tests.jar和hadoop-hdfs-2.4.1.jar,在Hadoop的安装包中可以找到。
MiniDFSCluster类在hadoop-hdfs-2.4.1-tests.jar中,是一个专门用于测试的内存HDFS集群。
其它代码看名称都可以理解,只是这个JUnit的assertThat方法让我整了好半天,不知道is方法是哪个类里的静态方法,找了网上都直接这样写,好像写了就能用似的,但是我却编译不过,后来才知道,是org.hamcrest.CoreMatchers类里的静态方法,需要静态引入该类。另外有个lessThanOrEqualTo方法,死活没找到在哪个类里,只好用assertTrue方法代替了。

程序运行需要用JUnit启动,因为这是一个test case。
把junit.jar和org.hamcrest.core_1.3.0.v201303031735.jar放到服务器上,修改hadoop-env.sh,把我们刚才修改的最后一行:
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/home/norris/data/hadoop/bin/
再追加上这两个jar,即:
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/home/norris/data/hadoop/bin/:/home/norris/data/hadoop/lib/junit.jar:/home/norris/data/hadoop/lib/org.hamcrest.core_1.3.0.v201303031735.jar

然后运行时不是运行我们写的类,而是运行junit,然后让junit去test我们写的类:
$hadoop org.junit.runner.JUnitCore ShowFileStatusTest
运行结果发现,进行了三个测试,两个成功了,一个失败了,失败在
assertThat(stat.getBlockSize(), is(64 * 1024 * 1024L));这行
期望值是:67108864,也就是64M,实际值是:134217728,也就是128M。也就是说HDFS的block size的默认值是128M,这个我还没搞明白是因为新版的Hadoop修改了block size的默认值还是因为我重新在64位系统下编译了Hadoop,如果在hdfs-site.xml中设置:
<property>
    <name>dfs.block.size</name>
    <value>67108864</value>
</property>
把block size设成64M,就三个都成功了。

Hadoop HDFS (3) JAVA访问HDFS