首页 > 代码库 > Stream数据流

Stream数据流

1、Collection接口的改进

在Iterable接口里面定义有一个简单的输出:default void forEach(Consumer<? super T> action)。 也就是说如果要想进行迭代处理,没有必要去强制使用Iterator完成了。

使用Lamda操作forEach()方法和直接采用方法引用

范例:

package cn.demo;import java.util.ArrayList;import java.util.List;import java.util.stream.Stream;public class Test {    public static void main(String[] args) throws Exception {        List<String> all = new ArrayList<String>();        all.add("hello");        all.add("word");        all.forEach((e) -> {System.out.println(e);});        all.forEach(System.out :: println);    }}

结果:

hello
word
hello
word

***************************************************************************************************************************

在整个Collection接口里面有一个取得Stream接口对象的方法:default Stream<E> stream();

范例:

 1 package cn.demo; 2  3 import java.util.ArrayList; 4 import java.util.List; 5 import java.util.stream.Stream; 6  7 public class Test { 8     public static void main(String[] args) throws Exception { 9 10         List<String> all = new ArrayList<String>();11         all.add("hello");12         all.add("word");13         Stream<String> stream = all.stream();14         System.out.println(stream.count());15     }16 }

结果:2

总结:此时取得了一个长度。功能和size()相同,但是它是直接取得了内存中的要分析的数据量。

2、Stream操作

Stream相当于是所有数据的流式数据分析工具,既然是数据的分析,那么就需要对数据执行一些简单的操作,例如:可能需要一些过滤,即:满足一些条件之后的数据可以进行分析。

   · 过滤方法:public Stream<T> filter(Predicate<? super T> predicate);

范例:执行过滤

 1 package cn.demo; 2  3 import java.util.ArrayList; 4 import java.util.List; 5 import java.util.stream.Stream; 6  7 public class Test { 8     public static void main(String[] args) throws Exception { 9 10         List<String> all = new ArrayList<String>();11         all.add("java");12         all.add("jsp");13         all.add("servlet");14         all.add("ajax");15         all.add("jquery");16         Stream<String> stream = all.stream().filter((e) -> e.contains("j"));17         System.out.println(stream.count());18     }19 }

结果:4

那么只是过滤后显示个数,一点意义都没有,那么最好的做法是将这些过滤后的数据收集起来变为一个新的集合出现,那么就可以使用收集器完成:

     public <R,A> R collect(Collector<? super T,A,R> collector);

这个方法需要使用一个Collector类型,这个类型可以通过Collectors类取得:

     · 收集的结果为List集合:public static<T> Collector<T,?,List<T>> toList()。

 1 package cn.demo; 2  3 import java.util.ArrayList; 4 import java.util.List; 5 import java.util.stream.Collectors; 6 import java.util.stream.Stream; 7  8 public class Test { 9     public static void main(String[] args) throws Exception {10 11         List<String> all = new ArrayList<String>();12         all.add("java");13         all.add("jsp");14         all.add("servlet");15         all.add("ajax");16         all.add("jquery");17         Stream<String> stream = all.stream().filter((e) -> e.contains("j"));18         List<String> result = stream.collect(Collectors.toList());19         System.out.println(result);20     }21 }

结果:

[java, jsp, ajax, jquery]

既然流式处理之中主要是为大数据而生的概念,那么就需要考虑一个数据的分页问题,提供有两个方法:

· 跳过数据量:public Stream<T> skip(long n);

· 取出的数据量:public Stream<T> limit(long maxSize)。

范例:执行分页控制

 1 package cn.demo; 2  3 import java.util.ArrayList; 4 import java.util.List; 5 import java.util.stream.Collectors; 6 import java.util.stream.Stream; 7  8 public class Test { 9     public static void main(String[] args) throws Exception {10 11         List<String> all = new ArrayList<String>();12         all.add("java");13         all.add("jsp");14         all.add("servlet");15         all.add("ajax");16         all.add("jquery");17         all.add("javaScript");18         all.add("json");19         all.add("jdbc");20         Stream<String> stream = all.stream()21         .skip(3).limit(3)22         .map((e) -> e.toLowerCase())23         .filter((e) -> e.contains("j"));24         List<String> result = stream.collect(Collectors.toList());25         System.out.println(result);26     }27 }

结果:

[ajax, jquery, javascript]

Stream的操作数据流可以结合Lamda更简单处理,但是难度也高。

3、MapReduce(概念核心

大数据最本质的做法就是MapReduce,属于数据的两个处理阶段:

· Map阶段:对要参与运算的数据进行提前处理;

  |- 方法:public <R> Stream<R> map(Function<? super T,? extends R> mapper);

· Reduce阶段:对处理后的数据进行统计。

  |- 方法:public <U> U reduce(U identity, BiFunction<U,? super T,U> accumulator, BinaryOperator<U> combiner)。

下面做一个订单操作的类,这个订单的操作类之中保存有商品的名称、商品单价、商品的购买数量。

范例:实现统计处理

 1 package cn.demo; 2  3 import java.util.ArrayList; 4 import java.util.DoubleSummaryStatistics; 5 import java.util.List; 6  7 class Orders{ 8     private String ptitle; 9     private double price;10     private int amount;11     public Orders(String ptitle,double price,int amount){12         super();13         this.ptitle = ptitle;14         this.price =price;15         this.amount = amount;16     }17     public String getPtitle() {18         return ptitle;19     }20     public void setPtitle(String ptitle) {21         this.ptitle = ptitle;22     }23     public double getPrice() {24         return price;25     }26     public void setPrice(double price) {27         this.price = price;28     }29     public int getAmount() {30         return amount;31     }32     public void setAmount(int amount) {33         this.amount = amount;34     }35 }36 public class Test {37     public static void main(String[] args) throws Exception {38        List<Orders> all = new ArrayList<Orders>();39        all.add(new Orders("平板电脑",4999,20));40        all.add(new Orders("手机",1099,200));41        all.add(new Orders("笔记本",9,80));42        all.add(new Orders("U盘",99,180));43        all.add(new Orders("鼠标",49,15));44        double allPrice = all.stream().map((obj) -> obj.getPrice() * obj.getAmount()).reduce((sum,x) -> sum + x).get();45        System.out.println("商品总销量" + allPrice);46        DoubleSummaryStatistics dss = all.stream().mapToDouble((obj) -> obj.getPrice() * obj.getAmount()).summaryStatistics();47     System.out.println("销售总销量" + dss.getCount());48     System.out.println("最高销量" + dss.getMax());49     System.out.println("最低销量" + dss.getMin());50     System.out.println("平均销量" + dss.getAverage());51     System.out.println("总销量" + dss.getSum());52     }53 }

结果:

商品总销量339055.0
销售总销量5
最高销量219800.0
最低销量720.0
平均销量67811.0
总销量339055.0

总结:大数据的开发也会采用类似的Lamda函数式处理模式完成。

Stream数据流