首页 > 代码库 > java编写spark程序

java编写spark程序

转载:http://blog.csdn.net/qiaojialin/article/details/53942028

importjava.net.URI; 
import java.util.Arrays; 
import java.io.*; 
import org.apache.hadoop.io.*; 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.*; 
import org.apache.hadoop.fs.FileSystem; 
 
public class mysparktest { 
    public static void main(String[] args) throws IOException{ 
        String uri = "hdfs://192.168.217.132:9000/unit/xferlog"; 
        String uro = "hdfs://192.168.217.132:9000/unit/xferlogoutput" ; 
        Configuration conf = new Configuration(); 
        try { 
             //打开文件系统
            FileSystem fs = FileSystem. get(URI.create (uri), conf); 
            //打开文件输入流
            FSDataInputStream in = fs.open( new Path(uri)); 
            //文件读取 
            byte[] ioBuffer = new byte[1024];   
            int readLen = in.read(ioBuffer);       
            while(readLen!=-1)   
            {   
                readLen = in.read(ioBuffer);   
            }   
            String str = new String(ioBuffer); 
            int cnt=0; 
            while(( int)(str.charAt(cnt)) != 0)cnt++; 
            str=str.substring(0, cnt); 
            System. out.println(str); 
            in.close();   
            //文件的删除 
            fs.delete( new Path(uri), true); 
            //写入到新文件中
            FSDataOutputStream out = fs.create( new Path(uro));           
            out.write(( "new1"+str).getBytes( "UTF-8")); 
            out.flush(); 
            out. sync(); 
            out.close(); 
            fs.close(); 
        } catch (Exception e) { 
            e.printStackTrace(); 
        } 
         
    } 
}
 

 

import org.apache.spark.SparkConf; 
import org.apache.spark.api.java.JavaPairRDD; 
import org.apache.spark.api.java.JavaRDD; 
import org.apache.spark.api.java.JavaSparkContext; 
import org.apache.spark.api.java.function.FlatMapFunction; 
import org.apache.spark.api.java.function.Function2; 
import org.apache.spark.api.java.function.PairFunction; 
import scala.Tuple2; 
 
 
import java.util.Arrays; 
import java.util.List; 
import java.util.regex.Pattern; 
 
 
public final class mysparktest { 
  private static final Pattern SPACE = Pattern. compile(" "); 
 
 
  public static void main(String[] args) throws Exception { 
    String uri = "hdfs://Master:9000/unit/xferlog";
    String uro = "hdfs://Master:9000/unit/xferlog1";
      
//    if (args.length < 1) { 
//      System.err.println("Usage: JavaWordCount <file>"); 
//      System.exit(1); 
//    } 
 
 
    //创建SparkConf,包含application的相关信息 
    SparkConf sparkConf = new SparkConf().setAppName( "JavaWordCount"); 
    //创建一个JavaSparkContext对象 
    JavaSparkContext sc = new JavaSparkContext(sparkConf); 
    //textFile()方法可将本地文件或HDFS文件转换成RDD,读取本地文件需要各节点上都存在,或者通过网络共享该文件 
    //读取一行 
    JavaRDD<String> lines = sc.textFile(uri, 1); 


    //flatMap与map的区别是,对每个输入,flatMap会生成一个或多个的输出,而map只是生成单一的输出 
    //用空格分割各个单词,输入一行,输出多个对象,所以用flatMap 
    JavaRDD<String> words = lines.flatMap( new FlatMapFunction<String, String>() { 
      @Override 
      public Iterable<String> call(String s) { 
        return Arrays.asList(SPACE.split(s)); 
      } 
    }); 
    //对每个单词生成key-value对,PairFunction<T,K,V> 
    //表示输入类型为T,生成的key-value对中的key类型为k,value类型为v,对本例,T=String, K=String, V=Integer(计数) 
    //重写scala的Tupple2方法 
    JavaPairRDD<String, Integer> ones = words.mapToPair( new PairFunction<String, String, Integer>() { 
      @Override 
      //scala.Tuple2<K,V> call(T t) 
      //Tuple2为scala中的一个对象,call方法的输入参数为T,即输入一个单词s,新的Tuple2对象的key为这个单词,计数为1 
      public Tuple2<String, Integer> call(String s) { 
        return new Tuple2<String, Integer>(s, 1); 
      } 
    }); 
    //调用reduceByKey方法,按key值进行reduce 
    //调用Function2对象,Function2<T1,T2,R> 
    //输入两个参数,T1,T2,返回R 
    //若ones有<"one", 1>, <"one", 1>,会根据"one"将相同的pair单词个数进行统计,输入为Integer,输出也为Integer 
    //输出<"one", 2> 
    JavaPairRDD<String, Integer> counts = ones. reduceByKey(new Function2<Integer, Integer, Integer>() { 
      @Override 
      public Integer call(Integer i1, Integer i2) { 
        return i1 + i2; 
      } 
    }); 
    //将结果保存到HDFS中 
    counts.saveAsTextFile(uro); 
    //collect返回一个包含RDD内所有元素的Array 
    List<Tuple2<String, Integer>> output = counts.collect(); 
    for (Tuple2<?, ?> tuple : output) { 
      System.out.println(tuple._1() + ": " + tuple._2()); 
    } 
    sc.stop(); 
  } 
}
 

 

importorg.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
 
import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;
 
 
public final class mysparktest {
  private static final Pattern SPACE = Pattern. compile(" ");
 
 
  public static void main(String[] args) throws Exception {
        String uri = "hdfs://Master:9000/unit/xferlog";
      String uro = "hdfs://Master:9000/unit/xferlog1" ;

      SparkConf sparkConf = new SparkConf().setAppName( "JavaWordCount");
      JavaSparkContext ctx = new JavaSparkContext(sparkConf);


      //也可以使用ctx获取环境变量,例如下面的语句 
      System.out.println("spark home:"+ctx.getSparkHome()); 

       //一次一行,String类型    ,还有 hadoopfile,sequenceFile什么的  ,可以直接用sc.textFile("path") 
      JavaRDD<String> lines = ctx.textFile(uri, 1);  //java.lang.String path, int minSplits 
      lines.cache();   //cache,暂时放在缓存中,一般用于哪些可能需要多次使用的RDD,据说这样会减少运行时间 


      //collect方法,用于将RDD类型转化为java基本类型,如下 
      List<String> line = lines.collect(); 
      for(String val:line) 
              System. out.println(val); 


     //下面这些也是RDD的常用函数 
     // lines.collect();  List<String> 
     // lines.union();     javaRDD<String> 
     // lines.top(1);     List<String> 
     // lines.count();      long 
     // lines.countByValue(); 


      /**
       *   filter test
       *   定义一个返回 bool类型的函数,spark运行filter的时候会过滤掉那些返回只为false的数据
       *   String s,中的变量s可以认为就是变量lines(lines可以理解为一系列的String类型数据)的每一条数据
       */  
      JavaRDD<String> contaninsE = lines.filter( new Function<String, Boolean>() { 
      
          public Boolean call(String s) throws Exception { 
             return (s.contains( "passwd")); 
          } 
      }); 
     
      System.out.println("--------------next filter‘s  result------------------"); 
      line = contaninsE.collect(); 
      for(String val:line) 
          System. out.println(val); 


      /**
       * sample test
       * sample函数使用很简单,用于对数据进行抽样
       * 参数为:withReplacement: Boolean, fraction: Double, seed: Int
       *
       */  


      JavaRDD<String> sampletest = lines.sample( false,0.1,5); 
      System.out.println("-------------next sample-------------------"); 
      line = sampletest.collect(); 
      for(String val:line) 
          System. out.println(val); 

      /**
       *
       * new FlatMapFunction<String, String>两个string分别代表输入和输出类型
       * Override的call方法需要自己实现一个转换的方法,并返回一个 Iterable的结构
       *
       * flatmap属于一类非常常用的spark函数,简单的说作用就是将一条 rdd数据使用你定义的函数给分解成多条 rdd数据
       * 例如,当前状态下,lines这个 rdd类型的变量中,每一条数据都是一行String,我们现在想把他拆分成1个个的词的话,
       * 可以这样写 :
       */  


      JavaRDD<String> words = lines.flatMap( new FlatMapFunction<String, String>() { 
          @Override 
          public Iterable<String> call(String s) { 
               String[] words=s.split( " "); 
                return Arrays. asList(words); 
          } 
      }); 

      /**
       * map 键值对 ,类似于MR的map方法
       * pairFunction<T,K,V>: T:输入类型;K,V:输出键值对
       * 需要重写call方法实现转换
       */  
      JavaPairRDD<String, Integer> ones = words.mapToPair( new PairFunction<String, String, Integer>() { 
          @Override 
          public Tuple2<String, Integer> call(String s) { 
              return new Tuple2<String, Integer>(s, 1); 
          } 
      }); 


      //A two-argument function that takes arguments 
      // of type T1 and T2 and returns an R. 
      /**
       *  reduceByKey方法,类似于MR的reduce
       *  要求被操作的数据(即下面实例中的ones)是KV键值对形式,该方法会按照key相同的进行聚合,在两两运算
       */  
      JavaPairRDD<String, Integer> counts = ones.reduceByKey( new Function2<Integer, Integer, Integer>() { 
          @Override 
          public Integer call(Integer i1, Integer i2) {  //reduce阶段,key相同的value怎么处理的问题 
              return i1 + i2; 
          } 
      }); 


      //备注:spark也有reduce方法,输入数据是RDD类型就可以,不需要键值对, 
      // reduce方法会对输入进来的所有数据进行两两运算 

      /**
       * sort,顾名思义,排序
       */  
      JavaPairRDD<String,Integer> sort = counts.sortByKey(); 
      System.out.println("----------next sort----------------------"); 



      /**
       * collect方法其实之前已经出现了多次,该方法用于将spark的RDD类型转化为我们熟知的java常见类型
       */  
      List<Tuple2<String, Integer>> output = sort.collect(); 
      for (Tuple2<?,?> tuple : output) { 
          System. out.println(tuple. _1 + ": " + tuple._2()); 
       
      } 


      /**
       * 保存函数,数据输出,spark为结果输出提供了很多接口
       */  
//      sort.saveAsTextFile( uro); 

     // sort.saveAsNewAPIHadoopFile(); 
    //  sort.saveAsHadoopFile();
     
      System.exit(0); 
  }
}
 

 

 

java编写spark程序