首页 > 代码库 > java读写HDFS

java读写HDFS

HDFS是一个分布式文件系统,既然是文件系统,就可以对其文件进行操作,比如说新建文件、删除文件、读取文件内容等操作。下面记录一下使用JAVA API对HDFS中的文件进行操作的过程。
  对分HDFS中的文件操作主要涉及一下几个类:
  Configuration类:该类的对象封转了客户端或者服务器的配置。
  FileSystem类:该类的对象是一个文件系统对象,可以用该对象的一些方法来对文件进行操作。FileSystem fs = FileSystem.get(conf);通过FileSystem的静态方法get获得该对象。

  FSDataInputStreamFSDataOutputStream:这两个类是HDFS中的输入输出流。分别通过FileSystem的open方法和create方法获得。

在eclipse中通过java api操作HDFS时,需要引用一些jar包,日下:

技术分享

package hadoop;

import java.io.IOException;
import java.io.InputStream;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;

public class OperaHDFS {
    /***
     * 加载配置文件
     * **/
    private static Configuration conf = new Configuration();

    private static FileSystem fs = null;

    static {
        try {
            conf.set("fs.defaultFS", "hdfs://host:port");
            conf.setInt("dfs.replication", 2);
            // 加载配置项
            fs = FileSystem.get(conf);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /***
     * 上传本地文件到 HDFS上
     * 
     * **/
    public static void uploadFile(String src, String dst) throws Exception {
        // 本地文件路径
        Path srcPath = new Path(src);
        // HDFS文件路径
        Path dstPath = new Path(dst);
        try {
            // 调用文件系统的文件复制函数,前面参数是指是否删除原文件,true为删除,默认为false
            fs.copyFromLocalFile(false, srcPath, dstPath);
        } catch (IOException e) {
            e.printStackTrace();
        }
        System.out.println("上传成功!");
        fs.close();// 释放资源
    }

    /**
     * 在HDFS上创建一个文件夹
     * 
     * **/
    public static void createDirOnHDFS(String path) throws Exception {
        Path p = new Path(path);
        boolean done = fs.mkdirs(p);
        if (done) {
            System.out.println("创建文件夹成功!");
        } else {
            System.out.println("创建文件夹成功!");
        }
        fs.close();// 释放资源
    }

    /**
     * 在HDFS上创建一个文件
     * 
     * **/
    public static void createFileOnHDFS(String dst, String content) throws Exception {
        Path dstPath = new Path(dst);
        // 打开一个输出流
        FSDataOutputStream outputStream = fs.create(dstPath);
        outputStream.write(content.getBytes());
        outputStream.close();
        fs.close();// 释放资源
        System.out.println("创建文件成功!");

    }

    /**
     * 在HDFS上删除一个文件或文件夹
     * 
     * **/
    public static void deleteFileOrDirOnHDFS(String path) throws Exception {
        Path p = new Path(path);
        boolean done = fs.deleteOnExit(p);
        if (done) {
            System.out.println("删除成功!");
        } else {
            System.out.println("删除失败!");
        }
        fs.close();// 释放资源

    }

    /**
     * 重名名一个文件夹或者文件
     * 
     * **/
    public static void renameFileOrDirOnHDFS(String oldName, String newName) throws Exception {
        Path oldPath = new Path(oldName);
        Path newPath = new Path(newName);
        boolean done = fs.rename(oldPath, newPath);
        if (done) {
            System.out.println("重命名文件夹或文件成功!");
        } else {
            System.out.println("重命名文件夹或文件失败!");
        }
        fs.close();// 释放资源
    }

    /***
     * 
     * 读取HDFS中某个文件
     * 
     * **/
    public static void readFile(String filePath) throws IOException {
        Path srcPath = new Path(filePath);
        InputStream in = null;
        try {
            in = fs.open(srcPath);
            IOUtils.copyBytes(in, System.out, 4096, false); // 复制到标准输出流
        } finally {
            IOUtils.closeStream(in);
        }
    }

    /***
     * 
     * 读取HDFS某个文件夹的所有 文件,并打印
     * 
     * **/
    public static void readAllFile(String path) {
        // 打印文件路径下的所有文件名
        Path dstPath = new Path(path);
        FileStatus[] fileStatus = null;
        try {
            fileStatus = fs.listStatus(dstPath);
        } catch (IOException e) {
            e.printStackTrace();
        }
        for (FileStatus status : fileStatus) {
            if (status.isDirectory()) {
                readAllFile(status.getPath().toString());
            } else {
                System.out.println("文件: " + status.getPath());
                try {
                    readFile(status.getPath().toString());
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    /**
     * 从HDFS上下载文件或文件夹到本地
     * 
     * **/
    public static void downloadFileorDirectoryOnHDFS(String src, String dst) throws Exception {
        Path srcPath = new Path(src);
        Path dstPath = new Path(dst);
        fs.copyToLocalFile(false, srcPath, dstPath);
        fs.close();// 释放资源
        System.out.println("下载文件夹或文件成功!");
    }

    public static void main(String[] args) throws Exception {
        uploadFile("D:\\text.txt", "/path/file");
        deleteFileOrDirOnHDFS("/path/file");
        createDirOnHDFS("/path");
        createFileOnHDFS("/path/file", "context");
        renameFileOrDirOnHDFS("/path", "/path");
        readFile("/path/file");
        readAllFile("/path/file");
        downloadFileorDirectoryOnHDFS("/path/file", "D:\\text2.txt");
    }
}


java读写HDFS