首页 > 代码库 > java编写spark程序
java编写spark程序
转载:http://blog.csdn.net/qiaojialin/article/details/53942028
importjava.net.URI; import java.util.Arrays; import java.io.*; import org.apache.hadoop.io.*; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; import org.apache.hadoop.fs.FileSystem; public class mysparktest { public static void main(String[] args) throws IOException{ String uri = "hdfs://192.168.217.132:9000/unit/xferlog"; String uro = "hdfs://192.168.217.132:9000/unit/xferlogoutput" ; Configuration conf = new Configuration(); try { //打开文件系统 FileSystem fs = FileSystem. get(URI.create (uri), conf); //打开文件输入流 FSDataInputStream in = fs.open( new Path(uri)); //文件读取 byte[] ioBuffer = new byte[1024]; int readLen = in.read(ioBuffer); while(readLen!=-1) { readLen = in.read(ioBuffer); } String str = new String(ioBuffer); int cnt=0; while(( int)(str.charAt(cnt)) != 0)cnt++; str=str.substring(0, cnt); System. out.println(str); in.close(); //文件的删除 fs.delete( new Path(uri), true); //写入到新文件中 FSDataOutputStream out = fs.create( new Path(uro)); out.write(( "new1"+str).getBytes( "UTF-8")); out.flush(); out. sync(); out.close(); fs.close(); } catch (Exception e) { e.printStackTrace(); } } }
import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2; import java.util.Arrays; import java.util.List; import java.util.regex.Pattern; public final class mysparktest { private static final Pattern SPACE = Pattern. compile(" "); public static void main(String[] args) throws Exception { String uri = "hdfs://Master:9000/unit/xferlog"; String uro = "hdfs://Master:9000/unit/xferlog1"; // if (args.length < 1) { // System.err.println("Usage: JavaWordCount <file>"); // System.exit(1); // } //创建SparkConf,包含application的相关信息 SparkConf sparkConf = new SparkConf().setAppName( "JavaWordCount"); //创建一个JavaSparkContext对象 JavaSparkContext sc = new JavaSparkContext(sparkConf); //textFile()方法可将本地文件或HDFS文件转换成RDD,读取本地文件需要各节点上都存在,或者通过网络共享该文件 //读取一行 JavaRDD<String> lines = sc.textFile(uri, 1); //flatMap与map的区别是,对每个输入,flatMap会生成一个或多个的输出,而map只是生成单一的输出 //用空格分割各个单词,输入一行,输出多个对象,所以用flatMap JavaRDD<String> words = lines.flatMap( new FlatMapFunction<String, String>() { @Override public Iterable<String> call(String s) { return Arrays.asList(SPACE.split(s)); } }); //对每个单词生成key-value对,PairFunction<T,K,V> //表示输入类型为T,生成的key-value对中的key类型为k,value类型为v,对本例,T=String, K=String, V=Integer(计数) //重写scala的Tupple2方法 JavaPairRDD<String, Integer> ones = words.mapToPair( new PairFunction<String, String, Integer>() { @Override //scala.Tuple2<K,V> call(T t) //Tuple2为scala中的一个对象,call方法的输入参数为T,即输入一个单词s,新的Tuple2对象的key为这个单词,计数为1 public Tuple2<String, Integer> call(String s) { return new Tuple2<String, Integer>(s, 1); } }); //调用reduceByKey方法,按key值进行reduce //调用Function2对象,Function2<T1,T2,R> //输入两个参数,T1,T2,返回R //若ones有<"one", 1>, <"one", 1>,会根据"one"将相同的pair单词个数进行统计,输入为Integer,输出也为Integer //输出<"one", 2> JavaPairRDD<String, Integer> counts = ones. reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer i1, Integer i2) { return i1 + i2; } }); //将结果保存到HDFS中 counts.saveAsTextFile(uro); //collect返回一个包含RDD内所有元素的Array List<Tuple2<String, Integer>> output = counts.collect(); for (Tuple2<?, ?> tuple : output) { System.out.println(tuple._1() + ": " + tuple._2()); } sc.stop(); } }
importorg.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2; import java.util.Arrays; import java.util.List; import java.util.regex.Pattern; public final class mysparktest { private static final Pattern SPACE = Pattern. compile(" "); public static void main(String[] args) throws Exception { String uri = "hdfs://Master:9000/unit/xferlog"; String uro = "hdfs://Master:9000/unit/xferlog1" ; SparkConf sparkConf = new SparkConf().setAppName( "JavaWordCount"); JavaSparkContext ctx = new JavaSparkContext(sparkConf); //也可以使用ctx获取环境变量,例如下面的语句 System.out.println("spark home:"+ctx.getSparkHome()); //一次一行,String类型 ,还有 hadoopfile,sequenceFile什么的 ,可以直接用sc.textFile("path") JavaRDD<String> lines = ctx.textFile(uri, 1); //java.lang.String path, int minSplits lines.cache(); //cache,暂时放在缓存中,一般用于哪些可能需要多次使用的RDD,据说这样会减少运行时间 //collect方法,用于将RDD类型转化为java基本类型,如下 List<String> line = lines.collect(); for(String val:line) System. out.println(val); //下面这些也是RDD的常用函数 // lines.collect(); List<String> // lines.union(); javaRDD<String> // lines.top(1); List<String> // lines.count(); long // lines.countByValue(); /** * filter test * 定义一个返回 bool类型的函数,spark运行filter的时候会过滤掉那些返回只为false的数据 * String s,中的变量s可以认为就是变量lines(lines可以理解为一系列的String类型数据)的每一条数据 */ JavaRDD<String> contaninsE = lines.filter( new Function<String, Boolean>() { public Boolean call(String s) throws Exception { return (s.contains( "passwd")); } }); System.out.println("--------------next filter‘s result------------------"); line = contaninsE.collect(); for(String val:line) System. out.println(val); /** * sample test * sample函数使用很简单,用于对数据进行抽样 * 参数为:withReplacement: Boolean, fraction: Double, seed: Int * */ JavaRDD<String> sampletest = lines.sample( false,0.1,5); System.out.println("-------------next sample-------------------"); line = sampletest.collect(); for(String val:line) System. out.println(val); /** * * new FlatMapFunction<String, String>两个string分别代表输入和输出类型 * Override的call方法需要自己实现一个转换的方法,并返回一个 Iterable的结构 * * flatmap属于一类非常常用的spark函数,简单的说作用就是将一条 rdd数据使用你定义的函数给分解成多条 rdd数据 * 例如,当前状态下,lines这个 rdd类型的变量中,每一条数据都是一行String,我们现在想把他拆分成1个个的词的话, * 可以这样写 : */ JavaRDD<String> words = lines.flatMap( new FlatMapFunction<String, String>() { @Override public Iterable<String> call(String s) { String[] words=s.split( " "); return Arrays. asList(words); } }); /** * map 键值对 ,类似于MR的map方法 * pairFunction<T,K,V>: T:输入类型;K,V:输出键值对 * 需要重写call方法实现转换 */ JavaPairRDD<String, Integer> ones = words.mapToPair( new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) { return new Tuple2<String, Integer>(s, 1); } }); //A two-argument function that takes arguments // of type T1 and T2 and returns an R. /** * reduceByKey方法,类似于MR的reduce * 要求被操作的数据(即下面实例中的ones)是KV键值对形式,该方法会按照key相同的进行聚合,在两两运算 */ JavaPairRDD<String, Integer> counts = ones.reduceByKey( new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer i1, Integer i2) { //reduce阶段,key相同的value怎么处理的问题 return i1 + i2; } }); //备注:spark也有reduce方法,输入数据是RDD类型就可以,不需要键值对, // reduce方法会对输入进来的所有数据进行两两运算 /** * sort,顾名思义,排序 */ JavaPairRDD<String,Integer> sort = counts.sortByKey(); System.out.println("----------next sort----------------------"); /** * collect方法其实之前已经出现了多次,该方法用于将spark的RDD类型转化为我们熟知的java常见类型 */ List<Tuple2<String, Integer>> output = sort.collect(); for (Tuple2<?,?> tuple : output) { System. out.println(tuple. _1 + ": " + tuple._2()); } /** * 保存函数,数据输出,spark为结果输出提供了很多接口 */ // sort.saveAsTextFile( uro); // sort.saveAsNewAPIHadoopFile(); // sort.saveAsHadoopFile(); System.exit(0); } }
java编写spark程序
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。