首页 > 代码库 > hadoop的dfs工具类一个

hadoop的dfs工具类一个

  开始没搞定插件问题,就弄了个dsf操作类,后面搞定了插件问题,这玩意也就聊胜于无了,还是丢这里算了。

  首先是一个配置,ztool.hadoop.properties

hadoop.home.dir=G:/hadoop/hadoop-2.4.1hadoop.user.name=hadoophadoop.server.ip=192.168.117.128hadoop.server.hdfs.port=9000

  前面两个属性后面代码会有说明的。

  属性文件的读取,方法多了,一般用commons-configuration包,我是自己把这个再整了一次,加了些自动处理,这个代码中可以无视,直接把代码中的那部分改成普通引用就好了。

  logger部分,用了logback,也是处理了一下,处理了其在linux下会莫名其妙找不到配置文件的问题。这里就不放出代码了,直接把代码中的那部分改成普通引用就好了,我就不改了。

  工具类代码如下

package com.cnblogs.zxub.hadoop.dfs;import java.io.IOException;import java.net.URI;import org.apache.commons.configuration.PropertiesConfiguration;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataInputStream;import org.apache.hadoop.fs.FSDataOutputStream;import org.apache.hadoop.fs.FileStatus;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IOUtils;import org.apache.hadoop.mapred.JobConf;import org.slf4j.Logger;import com.cnblogs.zxub.util.logger.ZLoggerFactory;import com.cnblogs.zxub.util.properties.PropertiesLoader;public class DfsUtil {    private static final Logger logger = ZLoggerFactory            .getLogger(DfsUtil.class);    private final PropertiesConfiguration props = PropertiesLoader            .getConfiguration("ztool.hadoop");    private Configuration config = null;    private String hdfsPath = null;    private String baseDir = null;    public DfsUtil(String hdfs, Configuration config) {        // windows下设置HADOOP_HOME后,还可能找不到winutils.exe,直接自己写进程序算了        System.setProperty("hadoop.home.dir",                this.props.getString("hadoop.home.dir"));        // 设置与dfs服务通信的用户名,省得换当前用户名,也不改配置关闭权限控制了        System.setProperty("HADOOP_USER_NAME",                this.props.getString("hadoop.user.name"));        this.hdfsPath = (hdfs == null) ? "hdfs://"                + this.props.getString("hadoop.server.ip") + ":"                + this.props.getString("hadoop.server.hdfs.port") + "/" : hdfs;        if (config == null) {            JobConf conf = new JobConf(DfsUtil.class);            conf.setJobName("HdfsDAO");            config = conf;        }        this.config = config;    }    public DfsUtil(Configuration conf) {        this(null, conf);    }    public DfsUtil() {        this(null, null);    }    public String getBaseDir() {        return this.baseDir;    }    public void setBaseDir(String baseDir) {        this.baseDir = baseDir;    }    public String getHdfsPath() {        return this.hdfsPath;    }    public Configuration getConfig() {        return this.config;    }    private String standardPath(String path) {        if (this.baseDir == null) {            this.baseDir = "/";        }        if (this.baseDir.indexOf("/") != 0) {            this.baseDir = "/" + this.baseDir;        }        if (this.baseDir.lastIndexOf("/") == this.baseDir.length() - 1) {            this.baseDir = this.baseDir.replaceFirst("/$", "");        }        if (path.indexOf("/") != 0) {            path = "/" + path;        }        path = this.baseDir + path;        if (path.lastIndexOf("/") == path.length() - 1) {            path = path.replaceFirst("/$", "");        }        return path;    }    public void ll(String folder) throws IOException {        folder = this.standardPath(folder);        Path path = new Path(folder);        FileSystem fs = FileSystem.get(URI.create(this.getHdfsPath()),                this.getConfig());        FileStatus[] list = fs.listStatus(path);        System.out.println("ll: " + folder);        for (FileStatus f : list) {            System.out.printf("name: %s, folder: %s, size: %d\n", f.getPath(),                    f.isDirectory(), f.getLen());        }        fs.close();    }    public void mkdirs(String folder) throws IOException {        folder = this.standardPath(folder);        Path path = new Path(folder);        FileSystem fs = FileSystem.get(URI.create(this.getHdfsPath()),                this.getConfig());        if (!fs.exists(path)) {            fs.mkdirs(path);            logger.info("create: {}.", folder);        } else {            logger.warn("folder [{}] already exists, mkdirs failed.", folder);        }        fs.close();    }    public void rm(String file) throws IOException {        file = this.standardPath(file);        Path path = new Path(file);        FileSystem fs = FileSystem.get(URI.create(this.getHdfsPath()),                this.getConfig());        fs.deleteOnExit(path);        logger.info("delete: {}.", file);        fs.close();    }    public void newFile(String file, String content) throws IOException {        file = this.standardPath(file);        FileSystem fs = FileSystem.get(URI.create(this.getHdfsPath()),                this.getConfig());        byte[] buff = content.getBytes();        FSDataOutputStream os = null;        try {            os = fs.create(new Path(file));            os.write(buff, 0, buff.length);            logger.info("create: {}.", file);        } finally {            if (os != null) {                os.close();            }        }        fs.close();    }    public void scp(String local, String remote) throws IOException {        remote = this.standardPath(remote);        FileSystem fs = FileSystem.get(URI.create(this.getHdfsPath()),                this.getConfig());        fs.copyFromLocalFile(new Path(local), new Path(remote));        logger.info("copy: from [{}] to [{}]", local, remote);        fs.close();    }    public void download(String remote, String local) throws IOException {        remote = this.standardPath(remote);        Path path = new Path(remote);        FileSystem fs = FileSystem.get(URI.create(this.getHdfsPath()),                this.getConfig());        fs.copyToLocalFile(path, new Path(local));        logger.info("download: from [{}] to [{}]", remote, local);        fs.close();    }    public void cat(String remote) throws IOException {        remote = this.standardPath(remote);        Path path = new Path(remote);        FileSystem fs = FileSystem.get(URI.create(this.getHdfsPath()),                this.getConfig());        FSDataInputStream fsdis = null;        System.out.println("cat: " + remote);        try {            fsdis = fs.open(path);            IOUtils.copyBytes(fsdis, System.out, 4096, false);        } finally {            IOUtils.closeStream(fsdis);            fs.close();        }    }    public static void main(String[] args) throws IOException {        DfsUtil hdfs = new DfsUtil();        // hdfs.setBaseDir("/test");        // hdfs.mkdirs("/debug_in");        // hdfs.newFile("/test.txt", "测试");        // hdfs.rm("/test.txt");        // hdfs.rm("/test");        // hdfs.scp("c:/q.txt", "/");        // hdfs.ll("/");        // hdfs.download("/test.txt", "c:/t.txt");        // hdfs.cat("q.txt");        hdfs.scp("c:/din/f1.txt", "debug_in");        hdfs.scp("c:/din/f2.txt", "debug_in");    }}