首页 > 代码库 > Hadoop HDFS的Java操作

Hadoop HDFS的Java操作

本文发表于本人博客

    这次来看看我们的客户端用url方式来连接JobTracker。我们已经搭建了伪分布环境,就知道了地址。现在我们查看HDFS上的文件,比如地址:hdfs://hadoop-master:9000/data/test.txt。看下面代码:

    static final String PATH = "hdfs://hadoop-master:9000/data/test.txt";    public static void  Test() throws Exception {        URL url = new URL(PATH);        InputStream in = url.openStream();        IOUtils.copyBytes(in, System.out, 1024, true);    }

竟然hadoop支持的hdfs协议,我们先以url方式来访问,运行上面代码结果报错误:

unknown protocol: hdfs

看了这句话才知道原本的url是不支持这种hdfs://的,那现在可以更改下,问了朋友可以设置url的方式来解决,代码如下:

    public static void  Test() throws Exception {               URL url = new URL(PATH);        URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());               InputStream in = url.openStream();        IOUtils.copyBytes(in, System.out, 1024, true);    }

编译运行还是报错,分析了下这个url对象是不是没设置成功,因为是在new对象之后设置的,我在修改下:

    public static void  Test() throws Exception {          URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());              URL url = new URL(PATH);        InputStream in = url.openStream();        IOUtils.copyBytes(in, System.out, 1024, true);    }

运行结果正确读取HDFS文件,现在单纯是读取而且是使用hadoop自带的IOUtils工具来实现,那既然用到hadoop的工具了有没有简单方便的API来操作呢,当然hadoop提供这个API,hadoop提供FileSystem来管理操作HDFS,看FileSystem的源码其实现了操作HDFS的方法,比如mkdirs(Path f)、copyFromLocalFile(boolean delSrc, Path src, Path dst)等等了,看下例子新建一个类HdfsTest:

public class HdfsTest {    static final String ROOT = "hdfs://hadoop-master:9000/";    static final Configuration config = new Configuration();        /**     * 创建文件夹     * @param path    相对路径。如"/test/"     * @return 是否创建成功     * @throws Exception     */    public static boolean Mkdir(String path) throws Exception {        FileSystem fileSystem = getFileSystem();        boolean isCreate = fileSystem.mkdirs(new Path(path));        return isCreate;    }        /**     * 删除文件夹     * @param path    文件夹路径     * @return    是否成功删除     */    public static boolean Removedir(String path) {        boolean result = false;        Path strPath = new Path(path);        try {            FileSystem fileSystem = getFileSystem();            if(fileSystem.isDirectory(strPath)){                result = fileSystem.delete(strPath);            }            else{                throw new NullPointerException("逻辑异常:删除文件夹属性出错,未能删除文件夹,如果删除文件应使用deleteFile(String path)");            }        } catch (Exception e) {            System.out.println("异常:" + e.getMessage());        }        return result;    }        /**     * 删除文件     * @param path    文件路径     * @return    是否成功删除     */    public static boolean deleteFile(String path) {        boolean result = false;        Path strPath = new Path(path);        try {            FileSystem fileSystem = getFileSystem();            if(fileSystem.isFile(strPath)){                result = fileSystem.delete(strPath);            }            else{                throw new NullPointerException("逻辑异常:删除文件属性出错,未能删除文件,如果删除文件夹应使用Removedir(String path)");            }        } catch (Exception e) {            System.out.println("异常:" + e.getMessage());        }        return result;    }        /**     * 对文件或文件夹列表     * @param path    文件或文件夹路径     * @return    返回FileStatus[]     */    public static FileStatus[] List(String path){        return List(path,false);            }    /**     * 对文件或文件夹列表     * @param path     * @param isFull    true:递归调用文件夹路径获取;false:单获取第一层不进行递归调用     * @return     */    public static FileStatus[] List(String path,boolean isFull){        List<FileStatus> arrayList = new ArrayList<FileStatus>();        Path strPath = new Path(path);        try {            FileSystem fileSystem = getFileSystem();            FileStatus[] result = fileSystem.listStatus(strPath);            for (int i = 0; i < result.length; i++) {                FileStatus item = result[i];                arrayList.add(item);                                    if(item.isDir()){                    FileStatus[] results = List(item.getPath().toString(),true);                    for (int j = 0; j < results.length; j++) {                        FileStatus fileStatus = results[j];                        arrayList.add(fileStatus);                    }                }                            }        } catch (Exception e) {            System.out.println("异常:" + e.getMessage());        }        FileStatus[] results = new FileStatus[arrayList.size()];        arrayList.toArray(results);        return results;    }        /**     * 上传文件     * @param src    本地磁盘文件     * @param tar    上传文件至HDFS路径     * @return        是否成功上传     */    public static boolean PutFile(String src,String tar) {        boolean result = false;        Path srcPath = new Path(src);        Path tarPath = new Path(tar);        try {            FileSystem fileSystem = getFileSystem();            fileSystem.copyFromLocalFile(srcPath, tarPath);            result = true;        } catch (Exception e) {            System.out.println("异常:" + e.getMessage());        }        return result;    }        /**     * 下载文件至本地磁盘     * @param path    HDFS路径     * @return        返回本地磁盘路径     */    public static String DownFile(String path){            int index = path.lastIndexOf(‘.‘) > 0 ? path.lastIndexOf(‘.‘) : 0;        String name = path.substring(index);        SimpleDateFormat format=new SimpleDateFormat("yyyyMMddHHmmss");        String target =String.format("{0}\\{1}{2}", System.getProperty("user.dir"),format.format(new Date()), name);        Path strPath = new Path(path);        Path tarPath = new Path(target);        try {            FileSystem fileSystem = getFileSystem();            fileSystem.copyToLocalFile(strPath, tarPath);        } catch (Exception e) {            System.out.println("异常:" + e.getMessage());        }        return target;    }        /**     * 获取FileSystem实例     * @return     * @throws Exception     */        public  static FileSystem getFileSystem() throws Exception{        return FileSystem.get(new URI(ROOT), config);    }}

在main函数中:

 public static void main(String[] args) throws Exception {                String strDir = "/data/";        String strDirback = "/data/back";        String strFile = "F:\\05笔记.txt";        String strPutFilePath = "/data/05笔记.txt";        if(HdfsTest.Mkdir(strDir)){            if(HdfsTest.PutFile(strFile, strPutFilePath)){                if(HdfsTest.Mkdir(strDirback)){                    FileStatus[] list = HdfsTest.List(strDir);                    for (int i = 0; i < list.length; i++) {                        FileStatus fileStatus = list[i];                        final String name = fileStatus.getPath().getName();                        final String path = fileStatus.getPath().toString();                        final long length = fileStatus.getLen();                        final String dir = fileStatus.isDir() ? "d" : "-";                        final short replication = fileStatus.getReplication();                        final String permission = fileStatus.getPermission().toString();                        final String group = fileStatus.getGroup();                        final String owner = fileStatus.getOwner();                        System.out.println(dir + permission + "\t" + replication  + "\t" + group + "\t" + owner + "\t" + path);                    }                                    }            }        }            }

这次先到这里。坚持记录点点滴滴!

Hadoop HDFS的Java操作