首页 > 代码库 > Hadoop HDFS (3) JAVA访问HDFS
Hadoop HDFS (3) JAVA访问HDFS
现在我们来深入了解一下Hadoop的FileSystem类。这个类是用来跟Hadoop的文件系统进行交互的。虽然我们这里主要是针对HDFS,但是我们还是应该让我们的代码只使用抽象类FileSystem,这样我们的代码就可以跟任何一个Hadoop的文件系统交互了。在写测试代码时,我们可以用本地文件系统测试,部署时使用HDFS,只需配置一下,不需要修改代码了。
在Hadoop 1.x以后的版本中引入了一个新的文件系统接口叫FileContext,一个FileContext实例可以处理多种文件系统,而且接口更加清晰和统一。
用Hadoop URL来读取HDFS里的文件
在讨论Java API之前,先来看一个用Hadoop URL来读文件数据的方式。这种方式从严格意义上讲不能算是Hadoop给Java的接口,应该说是用Java Net的方式来向HDFS发送一个网络请求然后读取返回流。
与通过URL去读取一个HTTP服务拿到一个流相似
InputStream in = null; try { in = new URL("hdfs://host/path").openStream(); //操作输入流in,可以读取到文件的内容 } finally { IOUtils.closeStream(in); }
这个方式有个小问题,Java虚拟机默认是不认识hdfs这个协议的,要想让它知道,需要通过
URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
来设置一个UrlStreamHandlerFactory,以便使其认识hdfs协议。
但是这个方法的调用是对整个虚拟机生效的,所以,如果程序中有其它部分,尤其是第三方框架,设置了这个工厂,那就会出现问题。因此这就成为了使用这种方式来访问HDFS的限制。
完整代码如下:
import java.io.InputStream; import java.net.URL; import org.apache.hadoop.fs.FsUrlStreamHandlerFactory; import org.apache.hadoop.io.IOUtils; public class URLCat { static { URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory()); } public static void main(String[] args) throws Exception { InputStream in = null; try { in = new URL(args[0]).openStream(); IOUtils.copyBytes(in, System.out, 4096, false); } finally { IOUtils.closeStream(in); } } }
在这个程序里,先设置JVM的URLStreamHandlerFactory,然后通过url打开一个流,读取流,就得到了文件的内容,通过IOUtils.copyBytes()把读到的内容写出到标准输出流里,也就是控制台上,就实现了类似于Linux里的cat命令的功能。
最后把输入流关闭。
IOUtils是hadoop提供的一个工具类。copyBytes的后两个参数分别表示buffer大小和结束后是否关闭流,我们在后面手动关闭流,所以传false。
将程序打成jar包放到hadoop上去,执行:
$hadoop jar urlcat.jar hdfs://localhost/user/norris/weatherdata.txt
可将前一节放到hdfs上去的weatherdata.txt里的内容显示到控制台上。
执行hadoop jar的方法好像之前忘记写博客了,简单说一句,就是把程序打成jar包,可以指定一个可执行类,然后执行:
$hadoop jar xxx.jar
或者如果不打jar包,把class文件直接放上去,然后执行hadoop XxxClass也行,其实说白了hadoop命令就是启动一个虚拟机,跟执行java XxxClass或者java -jar xxx.jar一样,只不过用hadoop命令启动虚拟机,在启动前,hadoop命令自动把需要的类库加入到了CLASSPATH中,因此省去了自己去设置环境变量的麻烦。
值得注意的是,如果你把class放在了/home/norris/data/hadoop/bin/目录下,需要在$HADOOP_INSTALL/etc/hadoop/hadoop-env.sh最后一行添加
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/home/norris/data/hadoop/bin/
把自己的class存放位置变成hadoop搜索class位置的一个选项。
用FileSystem(org.apache.hadoop.fs.FileSystem)类来读取HDFS里的文件
有个类叫Path(org.apache.hadoop.fs.Path),这个类的实例代表HDFS里的一个文件(或目录),可以把它理解成java.io.File,跟这个File类是一样的,只不过File这个名字听起来就像是一个本地文件系统的类,所以为了区别,取名为Path,也可以把Path理解成一个HDFS上的URI,比如hdfs://localhost/user/norris/weatherdata.txt。
FileSystem类是一个抽象类,代表所有可以运行在Hadoop上的文件系统,我们这里虽然在讨论HDFS,但其实FileSystem类可以代表如本地文件系统等的任何可以运行于Hadoop上的文件系统。我们写程序也应该针对FileSystem来写,而不是org.apache.hadoop.hdfs.DistributedFileSystem,以便方便移植到其他文件系统上去。
第一步,先要拿到一个FileSystem实例,通过下面API可以获取到实例:
public static FileSystem get(Configuration conf) throws IOException; public static FileSystem get(URI uri, Configuration conf) throws IOException; public static FileSystem get(final URI uri, final Configuration conf, final String user) throws IOException, InterruptedException;
其中,
Configuration(org.apache.hadoop.conf.Configuration)是指Hadoop的配置,如果用默认的构造函数构造出Configuration,就是指core-site.xml里的配置,前面《配置Hadoop》(http://blog.csdn.net/norriszhang/article/details/38659321)一节讲到了该配置,把fs配置成了hdfs,所以根据这个配置Hadoop就知道该取到一个DistributedFileSystem(org.apache.hadoop.hdfs.DistributedFileSystem)实例。
URI是指文件在HDFS里存放的路径。
String user这个参数,将在《安全》章节讨论。
即使在操作HDFS,你可能也同时希望访问本地文件系统,可以通过下面API方便地获取:
public static LocalFileSystem getLocal(Configuration conf) throws IOException;
通过调用FileSystem实例的open方法可以得到一个输入流:
public FSDataInputStream open(Path f) throws IOException; public abstract FSDataInputStream open(Path f, int bufferSize) throws IOException;
其中的int bufferSize是缓冲区大小,如果不传这个参数,默认大小是4K。
下面是使用FileSystem读取HDFS中文件内容的完整程序:
import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; public class FileSystemCat { public static void main(String[] args) throws Exception { String uri = args[0]; Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(uri), conf); //System.out.println(fs.getClass().getName()); //这里可以看到得到的实例是DistributedFileSystem,因为core-site.xml里配的是hdfs FSDataInputStream in = null; try { in = fs.open(new Path(uri)); IOUtils.copyBytes(in, System.out, 4096, false); } finally { IOUtils.closeStream(in); } } }
打成jar包放到hadoop上去运行:
$hadoop jar filesystemcat.jar hdfs://localhost/user/norris/weatherdata.txt
在控制台输出了weatherdata.txt文件中的内容。
大家看一下API,fs.open方法的返回值是FSDataInputStream,而不是java.io包里的流。
这个流继承自java.io.DataInputStream,同时实现Seekable(org.apache.hadoop.fs.Seekable),也就是说可以调用seek()方法读取文件的任意位置数据。
例如上面程序加两行:
in.seek(0); IOUtils.copyBytes(in, System.out, 4096, false);
就可以实现把文件内容打印两遍。
另外,FSDataInputStream类同时也实现了PositionedReadable(org.apache.hadoop.fs.PositionedReadable)接口,接口中定义的三个方法允许在任意位置读取文件内容:
public int read(long position, byte[] buffer, int offset, int length) throws IOException; public void readFully(long position, byte[] buffer, int offset, int length) throws IOException; public void readFully(long position, byte[] buffer) throws IOException;
并且,这些方法是线程安全的,但是FSDataInputStream不是线程安全的,只是说不同的FSDataInputStream实例在不同的线程里读文件内容是安全的,但是一个FSDataInputStream实例被不同的线程调用是不安全的。所以,应该为每个线程创建各自的FSDataInputStream实例。
最后,调用seek()方法的开销是相当巨大的,应该尽量少调用,你的程序应该设计成尽量流式获取文件内容。
用FileSystem类来向HDFS里写文件
FileSystem类里提供了一大堆API用来在创建文件,其中最简单的一个是:
public FSDataOutputStream create(Path f) throws IOException;
创建一个Path类代表的文件,并返回一个输出流。
这个方法有一大堆的重载方法,可以用来设置是否覆盖已有文件,该文件复制的份数,写入时的缓冲区大小,文件块大小(block),权限等。
如果该Path代表的文件的父目录(甚至爷爷目录)不存在,这些目录会被自动创建,这样确时在很多情况下提供了方便,但是有时却与我们的需求不相符,如果你确实希望当父目录不存在时不要创建,应该自己判断其父目录是否存在。原来的API中有个createNonRecursive方法,如果父目录不存在会失败。但现在这一组方法已经被废弃,不建议使用了。
create方法的一个重要重载方法是:
public FSDataOutputStream create(Path f, Progressable progress) throws IOException;
Progressable接口的progress方法可以用来当有数据写入时回调。
public interface Progressable { public void progress(); }
如果不想新建一个文件,而是希望向已有文件追加内容,可以调用:
public FSDataOutputStream append(Path f) throws IOException;
该方法是可选实现方法,也就是说不是所有的Hadoop FileSystem都实现了该方法,例如,HDFS实现了,而S3文件系统就没有实现。并且,HDFS在1.x以后的版本中的实现才是稳定的实现,之前的实现是有问题的。
下面这个程序把一个本地文件上传到HDFS上,并且在每次progress方法被调用时输出一个点(.)
import java.io.BufferedInputStream; import java.io.FileInputStream; import java.io.InputStream; import java.io.OutputStream; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.util.Progressable; public class FileCopyWithProgress { public static void main(String[] args) throws Exception { String localSrc = http://www.mamicode.com/args[0];>
程序中的progress什么条件下被调用呢?这个API并没有说明,也就是说你不能假设它在某时会被调用,实际的测试结果是大概写入64K时会被调用一次,但多次测试也不一致,尤其在文件小时,更加不可预知。
如果打开上面程序的Thread.sleep()的注释,会发现,其实在progress在被调用时,主程序是等待的。如果把主线程的ID和调用progress方法的线程ID打印出来发现,每次调用progress的线程都是同一个线程,但不是主线程。但是,主线程的"end."确实是在所有点(.)都输出后才输出的。
我复制的文件是4743680字节,即大概4.5M,打出73个点。平均64K一个点。
到目前为止,只有HDFS在写入文件时回调progress,其它文件系统都没有实现回调progress,在后面做MapReduce程序时会发现,这一回调非常有用。
FileSystem里的create方法返回的FSDataOutputStream类有一个方法:
public long getPos() throws IOException;
可以查询当前在往文件的哪个位置写,一个写入的偏移量。
但是与FSDataInputStream不同,FSDataOutputStream没有seek方法,因为HDFS只允许顺序写,对于一个打开的文件,只允许向尾部追加,不允许在任意位置写,因此也就没有必要提供seek方法了。
创建目录
FileSystem中的创建目录的方法:
public boolean mkdirs(Path f) throws IOException;
与java.io.File.mkdirs方法一样,创建目录,并同时创建缺失的父目录。
我们一般不需要创建目录,因为创建文件时,默认就把所需的目录都创建好了。
查询文件元信息:FileStatus(org.apache.hadoop.fs.FileStatus)
FileSystem类中的getFileStatus()方法返回一个FileStatus实例,该FileStatus实例中包含了该Path(文件或目录)的元信息:文件大小,block大小,复制的份数,最后修改时间,所有者,权限等。
程序简单,先上代码,再解释一下
import static org.junit.Assert.*; import static org.hamcrest.CoreMatchers.*; import java.io.FileNotFoundException; import java.io.IOException; import java.io.OutputStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.junit.After; import org.junit.Before; import org.junit.Test; public class ShowFileStatusTest { private static final String SYSPROP_KEY = "test.build.data"; /** MiniDFSCluster类在hadoop-hdfs-2.4.1-tests.jar中,是一个专门用于测试的in-process HDFS集群 */ private MiniDFSCluster cluster; private FileSystem fs; @Before public void setUp() throws IOException { Configuration conf = new Configuration(); String sysprop = System.getProperty(SYSPROP_KEY); if (sysprop == null) { System.setProperty(SYSPROP_KEY, "/tmp"); } cluster = new MiniDFSCluster(conf, 1, true, null); fs = cluster.getFileSystem(); OutputStream out = fs.create(new Path("/dir/file")); out.write("content".getBytes("UTF-8")); out.close(); } @After public void tearDown() throws IOException { if (fs != null) { fs.close(); } if (cluster != null) { cluster.shutdown(); } } @Test(expected = FileNotFoundException.class) public void throwsFileNotFoundForNonExistentFile() throws IOException { fs.getFileStatus(new Path("no-such-file")); } @Test public void fileStatusForFile() throws IOException { Path file = new Path("/dir/file"); FileStatus stat = fs.getFileStatus(file); assertThat(stat.getPath().toUri().getPath(), is("/dir/file")); assertThat(stat.isDirectory(), is(false)); assertThat(stat.getLen(), is(7L)); assertTrue(stat.getModificationTime() <= System.currentTimeMillis()); assertThat(stat.getReplication(), is((short)1)); assertThat(stat.getBlockSize(), is(64 * 1024 * 1024L)); assertThat(stat.getOwner(), is("norris")); assertThat(stat.getGroup(), is("supergroup")); assertThat(stat.getPermission().toString(), is("rw-r--r--")); } @Test public void fileStatusForDirectory() throws IOException { Path dir = new Path("/dir"); FileStatus stat = fs.getFileStatus(dir); assertThat(stat.getPath().toUri().getPath(), is("/dir")); assertThat(stat.isDirectory(), is(true)); assertThat(stat.getLen(), is(0L)); assertTrue(stat.getModificationTime() <= System.currentTimeMillis()); assertThat(stat.getReplication(), is((short)0)); assertThat(stat.getBlockSize(), is(0L)); assertThat(stat.getOwner(), is("norris")); assertThat(stat.getGroup(), is("supergroup")); assertThat(stat.getPermission().toString(), is("rwxr-xr-x")); } }
程序编译需要引入hadoop-hdfs-2.4.1-tests.jar和hadoop-hdfs-2.4.1.jar,在Hadoop的安装包中可以找到。
MiniDFSCluster类在hadoop-hdfs-2.4.1-tests.jar中,是一个专门用于测试的内存HDFS集群。
其它代码看名称都可以理解,只是这个JUnit的assertThat方法让我整了好半天,不知道is方法是哪个类里的静态方法,找了网上都直接这样写,好像写了就能用似的,但是我却编译不过,后来才知道,是org.hamcrest.CoreMatchers类里的静态方法,需要静态引入该类。另外有个lessThanOrEqualTo方法,死活没找到在哪个类里,只好用assertTrue方法代替了。
程序运行需要用JUnit启动,因为这是一个test case。
把junit.jar和org.hamcrest.core_1.3.0.v201303031735.jar放到服务器上,修改hadoop-env.sh,把我们刚才修改的最后一行:
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/home/norris/data/hadoop/bin/
再追加上这两个jar,即:
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/home/norris/data/hadoop/bin/:/home/norris/data/hadoop/lib/junit.jar:/home/norris/data/hadoop/lib/org.hamcrest.core_1.3.0.v201303031735.jar
然后运行时不是运行我们写的类,而是运行junit,然后让junit去test我们写的类:
$hadoop org.junit.runner.JUnitCore ShowFileStatusTest
运行结果发现,进行了三个测试,两个成功了,一个失败了,失败在
assertThat(stat.getBlockSize(), is(64 * 1024 * 1024L));这行
期望值是:67108864,也就是64M,实际值是:134217728,也就是128M。也就是说HDFS的block size的默认值是128M,这个我还没搞明白是因为新版的Hadoop修改了block size的默认值还是因为我重新在64位系统下编译了Hadoop,如果在hdfs-site.xml中设置:
<property>
<name>dfs.block.size</name>
<value>67108864</value>
</property>
把block size设成64M,就三个都成功了。
Hadoop HDFS (3) JAVA访问HDFS
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。