首页 > 代码库 > Hadoop集群上使用JNI,调用资源文件

Hadoop集群上使用JNI,调用资源文件

hadoop是基于java的数据计算平台,引入第三方库,例如C语言实现的开发包将会大大增强数据分析的效率和能力。 通常在是用一些工具的时候都要用到一些配置文件、资源文件等。接下来,借一个例子来说明hadoop上面如何使用JNI、以及调用资源文件。

首先介绍一下ICTClass,ICTClass是中国科学院开发的一个分词软件(ICTClass官网)。该套软件采用C/C++编写。ICTClass虽然支持java,但是必须使用到的JNI技术。因此,在使用ICTClass之前需要配置好JNI资源以及ICTClass运行所需的一些资源文件。这造成了将ICTClass直接移植到Hadoop之上有点困难。不管怎样,我们先来探讨下怎个使用ICTClass的流程吧。

1. 封装Wrapper类

首先,要使用JNI技术创建ICTClass的Wrapper类ICTCLAS50.java:

package ICTCLAS.I3S.AC;import java.io.*;import java.net.URI;import java.net.URL;public class ICTCLAS50{    public native boolean ICTCLAS_Init(byte[] sPath);    public native boolean ICTCLAS_Exit();    public native int ICTCLAS_ImportUserDictFile(byte[] sPath,int eCodeType);    public native int ICTCLAS_SaveTheUsrDic();    public native int ICTCLAS_SetPOSmap(int nPOSmap);    public native boolean ICTCLAS_FileProcess(byte[] sSrcFilename, int eCodeType, int bPOSTagged,byte[] sDestFilename);    public native byte[] ICTCLAS_ParagraphProcess(byte[] sSrc, int eCodeType, int bPOSTagged);    public native byte[] nativeProcAPara(byte[] sSrc, int eCodeType, int bPOStagged);    /* Use static intializer */    static    {

      String absolutePath = new File("").getAbsolutePath();
      System.load(absolutePath+"/ICTClassLib/libICTCLAS50.so");

    }}

这里要注意,ICTCLAS50这个类必须在pacakage ICTCLAS.I3S.AC下否则将会出错。

2. 打包ICTCLAS以及资源文件

这里要打包两个jar文件。ICTClass.jar中打包了ICTCLAS50类作为即将被调用的API。ICTClassLib.jar打包了ICTCLAS所需的所有资源。

看一下两个jar的清单:

**********************************ICTClass.jarICTClass    |_I3S         |_AC              |_ICTCLAS50.class___________________________________ICTClassLib.jarICTClassLib    |_libICTCLAS50.so    |_userdict.txt     |_user.lic      |_Configure.xml    |_Data          |_BiWord.big          |_character.idx          |_character.type          |_CoreDict.pdat          |_CoreDict.pos          |_CoreDict.unig          |_FieldDict.pdat          |_FieldDict.pos          |_GranDict.pdat          |_GranDict.pos          |_ICTCLAS30.ctx          |_ICTCLAS_First.map          |_ICTPOS.map          |_nr.ctx          |_nr.fsa          |_nr.role          |_PKU_First.map          |_PKU.map          |_temp          |_UserDict.map          |_UserDict.pdat          |_UserDict.pos

3. 在hadoop上使用JNI和资源

hadoop有很特殊的文件系统,这里笔者针对性介绍一下DistributeCache的机制。Hadoop可以将HDFS的一些文件分发到运行的某台机器的工作目录下,并按照一定的逻辑解压。通过以下API实现:

URI uri = null;try {
   //这里filepath指的是hadoop文件系统目录 uri
= new URI("hdfs://filepath/ICTClassLib.jar#ICTClassLib");} catch (URISyntaxException e) { logger.error(e.getMessage(),e);}if( uri != null){ DistributedCache.addCacheArchive(uri, jobConf);}

上面的API将ICTClassLib.jar(当然该文件必须是在hadoop集群上的hdfs文件系统上)分发到tasknode上,并解压到工作目录的link目录下。ICTClassLib.jar包含ICTClass相关的资源文件。在tasknode上,每个task工作目录下的文件是:

从上面可以看出,通过该机制hadoop能够把需要的文件下发到指定的task的工作目录下。

4. Hadoop程序源码(笔者是hadoop菜鸟,限于能力这里涉及到的资源访问使用的都是绝对路径。笔者试过相对路径但是总会抛出错误,无奈选择了这个折衷的办法)。

package com.ict.hadoop;import java.io.File;import java.io.IOException;import java.net.URI;import java.net.URISyntaxException;import java.util.ArrayList;import java.util.HashSet;import java.util.Iterator;import java.util.Set;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.filecache.DistributedCache;import org.apache.hadoop.fs.FileStatus;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapred.FileInputFormat;import org.apache.hadoop.mapred.FileOutputFormat;import org.apache.hadoop.mapred.FileSplit;import org.apache.hadoop.mapred.InputSplit;import org.apache.hadoop.mapred.JobClient;import org.apache.hadoop.mapred.JobConf;import org.apache.hadoop.mapred.MapReduceBase;import org.apache.hadoop.mapred.Mapper;import org.apache.hadoop.mapred.OutputCollector;import org.apache.hadoop.mapred.Reducer;import org.apache.hadoop.mapred.Reporter;import org.apache.hadoop.mapred.TextOutputFormat;import org.apache.hadoop.mapred.lib.MultipleOutputs;import org.apache.hadoop.util.GenericOptionsParser;import org.apache.log4j.Logger;public class WxSegment{    private static Logger logger = Logger.getLogger(WxSegment.class);        public static class Map extends MapReduceBase implements Mapper<Text, Text, Text, Text>    {            public ICTCLAS50 testICTCLAS50;        public static Set<String> listAll(String dir) throws IOException {            //获取文件夹内所有文件的操作,可以忽略        }            @Override        public void configure(JobConf jobConf){            testICTCLAS50 = new ICTCLAS50();            String absolutePath = new File("").getAbsolutePath()+"/ICTClassLib/";            if(testICTCLAS50.ICTCLAS_Init(absolutePath.getBytes("GB2312")) == false){                System.out.println("Init Fail!");                  System.exit(1);            }            super.configure(jobConf);                    }        public String getSplitFileName(InputSplit inputSplit){            return ((FileSplit)inputSplit).getPath().getName();        }                @Override        public void map(Text key, Text value,                OutputCollector<Text,Text> output, Reporter reporter)                throws IOException {            logger.info("AbsolutePath:" + new File("./ICTClassLib").getAbsolutePath());                        String lineString = value.toString();            byte nativeBytes[] = testICTCLAS50.ICTCLAS_ParagraphProcess(                          lineString.getBytes("GB2312"), 0, 0);              String nativeStr = new String(nativeBytes, 0,                          nativeBytes.length, "GB2312");                  // System.out.println("The result is :" + nativeStr);                  String[] rest = nativeStr.split("\\s+");                  for (int i = 0; i < rest.length; i++)                      sb.append(rest[i] + " ");              lineString = sb.toString();            logger.info(lineString);            output.collect(key, new Text(lineString));                    }        @Override        public void close() throws IOException {            //保存用户字典            testICTCLAS50.ICTCLAS_SaveTheUsrDic();            //释放分词组件资源            testICTCLAS50.ICTCLAS_Exit();        }    }            public static class Reduce extends MapReduceBase implements Reducer<Text, Text, Text, NullWritable>{        private MultipleOutputs mos;        private OutputCollector<Text,NullWritable> collector;        @Override        public void configure(JobConf conf) {            mos = new MultipleOutputs(conf);        }                @Override        public void reduce(Text key, Iterator<Text> values,                OutputCollector<Text,NullWritable> output, Reporter reporter)                throws IOException {            System.out.print("This is reduce");            collector = mos.getCollector(key.toString(), reporter);            output.collect( new Text(values.hasNext()?"has values":"no values"),NullWritable.get());            int count = 0;            while(values.hasNext()){                Text tmp = values.next();                collector.collect(tmp,NullWritable.get());                output.collect( tmp,NullWritable.get());                logger.info(tmp);                count ++;            }                        logger.info("The total count is "+ count);                                }                @Override        public void close() throws IOException {            mos.close();        }            }    public static Set<String> listAll(String dir) throws IOException {        Configuration conf = new Configuration();        FileSystem fs = FileSystem.get(conf);        FileStatus[] stats = fs.listStatus(new Path(dir));        Set<String> list = new HashSet<String>();        for (int i = 0; i < stats.length; ++i)        {            if ( !stats[i].isDir() )            {                String filename = stats[i].getPath().getName();                filename = filename.replaceAll("[^a-zA-Z0-9]", "");                list.add(filename);                //System.out.println("File: "+ stats[i].getPath().getName());                logger.info("File: "+ filename);            }            else                logger.warn(stats[i].getPath().getName() +" is a directory.");        }        fs.close();        return list;    }        public static void main(String [] args) throws IOException, URISyntaxException{        Configuration conf = new Configuration();        String[] remainingArgs = new GenericOptionsParser(conf,args).getRemainingArgs();        //    new GenericOptionsParser(conf, args).getRemainingArgs();                if (remainingArgs.length != 2) {            System.err.println("Error!");            System.exit(1);        }                JobConf jobConf = new JobConf(conf,WxSegment.class);                        DistributedCache.createSymlink(jobConf);        logger.info("AbsolutePath:" + new File("").getAbsolutePath());        URI uri = null;        try {            uri = new URI("hdfs://filepath/ICTClassLib.jar#ICTClassLib");        } catch (URISyntaxException e) {            logger.error(e.getMessage(),e);        }        if( uri != null)            logger.info(uri.getPath());        DistributedCache.addCacheArchive(uri, jobConf);                                Path in = new Path(remainingArgs[0]);        Path out = new Path(remainingArgs[1]);        FileInputFormat.setInputPaths(jobConf, in);        FileOutputFormat.setOutputPath(jobConf, out);                jobConf.setJobName("WxSegment");        jobConf.setMapperClass(Map.class);        jobConf.setReducerClass(Reduce.class);        jobConf.setMapOutputKeyClass(Text.class);        jobConf.setMapOutputValueClass(Text.class);        jobConf.setInputFormat(MsgSplitInputFormat.class);                jobConf.setOutputKeyClass(Text.class);        jobConf.setOutputValueClass(NullWritable.class);                Set<String> set = listAll(remainingArgs[0]);        for( String filename : set){            MultipleOutputs.addNamedOutput(jobConf,                    filename,                    TextOutputFormat.class,                    Text.class,                    NullWritable.class);        }        jobConf.setJarByClass(WxSegment.class);                JobClient.runJob(jobConf);    }}

 另外一篇值得参考的文章:《如何在Hadoop集群运行JNI程序》