首页 > 代码库 > RxJava操作符(二) __变换操作

RxJava操作符(二) __变换操作

RxJava变换操作符

这周真心比较累,一直都在加班,今天才有点自己的时间来学习新的内容,外包工作苦啊! 上周学习了各种创建操作符,像create,from,Just,Defer….等等,这周中也工作中也用了不少,有时间也需要总结一下自己在工作中使用的操作符。好了,现在来开始学习一个变换操作符吧,不知道什么意思没关系,一个一个去试错吧。

map

官方的翻译是对于Observable发射的每一项数据,都会应用一个函数,执行变换操作,然后返回一个发射这些结果的Observable。 还是举个例子吧,Observable会发射0-5之间的数字,我需要判断这些数字中,哪些是奇数,哪些是偶数。

照例子写一个全局函数,来打印和输出Observer:

    private static void print(Object obj) {
        System.out.println(obj);
    }

    private <T> Observer<T> getDefaultObserver() {
        return new Observer<T>() {

            @Override
            public void onCompleted() {
                print("onCompleted method");
            }

            @Override
            public void onError(Throwable e) {
                print("onError method : " + e);
            }

            @Override
            public void onNext(T t) {
                print("onNext:" + t) ;
            }
        };
    }

那么我们计算0-5之间是否为偶数的代码为:

        Observable.range(0, 5)
        .map(new Func1<Integer, Boolean>() {

            @Override
            public Boolean call(Integer t) {
                //在这里判断每一个发射值 是否为偶数
                return (t & 1) == 0;
            }
        }).subscribe(getDefaultObserver()) ;

结果为:
技术分享
可以看出,map函数非常简洁,利用map操作符可以完成我们平时用一般方法较难完成的作业。举个例子,在我自己的项目中,服务器返回的数据中,我需要计算每个item中money是否大于某个值,从而决定是否进行某项操作(比如标红,标绿,标蓝色),在涉及到网络请求线程转换中,map这个函数非常好的解决了这个问题。

当然还存在一个特殊的操作符cast,是map的特殊版本,意思是将发射的数据都转换成特定的类型,个人觉得用的比较少,举个例子:Student类继承自Person类,那么下面代码为:

@Test
    public void castFunction() {
        Student s1 = new Student("tom" , 11 , "90") ;
        Student s2 = new Student("merry" , 12 , "92") ;

        Observable.just(s1, s2).
        cast(Person.class).
        subscribe(getDefaultObserver());

    }

flatMap

flatMap是将一个发射数据的Observable变换为多个Observables,然后将他们发射的数据合并后放进一个单独的Observable,举个例子:

    @Test
    public void flatMapFunction() {
        Observable.
        range(1, 5).
        flatMap(new Func1<Integer, Observable<String>>() {

            @Override
            public Observable<String> call(Integer t) {
                print("create func1 method : " + t);        
                return Observable.just(String.valueOf(t));
            }
        }).subscribe(getDefaultObserver());
    }

Observable发射5个从1开始的连续数据,每一个发射的数据都将被转化为一个Observable,然后分发给订阅者,那么结果为:
技术分享

可以看到我们的每一个数据的分发中,都会被转化为一个新的Observable了。
那么,这个map和flatMap到底有什么区别呢? 也不先急着说明它们之间的不同,用个项目中的例子,来说说它们之间的不同吧。
技术分享

这是项目中一个很平常的例子,我们需要从服务器和本地分别获取歌曲信息,如上图,此时我们需要遍历这两种数据来满足某项要求,那么就意味着我们需要合并这两种数据,就像下图:
技术分享

代码表现如下:

@Test
    public void flatMapFunction2() {
        List<String> networkList = new ArrayList<String>();
        networkList.add("network:演员");
        networkList.add("network:暧昧");

        List<String> localList = new ArrayList<String>() ;
        localList.add("local:等一分钟");
        localList.add("local:最浪漫的事");

        Observable.
        just(networkList,localList).
        flatMap(new Func1<List<String>, Observable<String>>() {
            @Override
            public Observable<String> call(List<String> t) {
                return Observable.from(t);
            }
        }).subscribe(getDefaultObserver()) ;

    }

结果为:
技术分享

很显然,使用map操作符不能满足我们的要求,因为map操作符此时只能操作List,并不能操作List的中的所有Item,所以flatmap操作符比map操作符控制粒度更精细,更加牛逼,而且flatmap相当于数据结构的降维,讲复杂的数据结构变成统一的有效的处理结构。嗯,这个函数我用得真心比较多,这主要是我司的PM经常改需求有关,经常Observable.zip请求多个接口,然后使用这个flatmap统一处理某些业务,所以啊上班难混啊。

当然它还有一个升级版,使用flatmap操作符对发射操作的每一个操作[onNext,onError,onCompleted]进行处理,如下:

@Test
    public void flatMapFunction3() {
        Observable.
        just(1, 2, 3).
        flatMap(new Func1<Integer,Observable<String>>(){  //onNext

            @Override
            public Observable<String> call(Integer t) {
                return Observable.just("call onNext : " + String.valueOf(t));
            }

        },new Func1<Throwable, Observable<String>>() {  //onError

            @Override
            public Observable<String> call(Throwable t) {
                return Observable.just("call one rror " + t);
            }
        },new Func0<Observable<String>>(){  //onCompleted

            @Override
            public Observable<String> call() {
                return Observable.just("call completed ...");
            }

        }).subscribe(getDefaultObserver());
    }

concatMap

ReactiveX文档中介绍,它类似最简单版本的flatmap,但是它是按照次序连接而不是合并那些生成的Observables,然后产生自己的数据序列。所以根据此说明,写了一个例子,来试图说明一下flatMap与concatMap的区别:
先使用线程创建一个Obervable,使用多线程的例子就是为了试图解决flatMap是否为合并数据,concatMap是否为连接数据:

private Observable<String> getRandomObservable(int index) {
        return  Observable.create(new OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> t) {
                new Thread(){
                    Random random = new Random() ;

                    public void run() {
                        for(int i = 0 ; i < 3 ; i++) {
                            try {
                                //线程休息随机秒数
                                sleep(random.nextInt(1000)) ;           
                                t.onNext(index + "--" + i);

                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }                           
                        } 
                        //onCompleted()方法必须调用,因为conactMap主要知道Observable是否发射完成 
                        //才能去连接下一个Observable的数据
                        t.onCompleted();
                    };
                }.start();
            }
        }) ;    
    }

那么在主程序中,先看flatMap的表现:

@Test
    public void concatMapFunction2() throws IOException {
        Observable.
        just(getRandomObservable(1), getRandomObservable(2)).
        flatMap(new Func1<Observable<String>, Observable<String>>() {
            @Override
            public Observable<String> call(Observable<String> t) {
                return t;
            }
        }).subscribe(getDefaultObserver()) ;

        System.in.read() ;
    }

再使用concatMap:

    @Test
    public void concatMapFunction2() throws IOException {
        Observable.
        just(getRandomObservable(1), getRandomObservable(2)).
        concatMap(new Func1<Observable<String>, Observable<String>>() {
            @Override
            public Observable<String> call(Observable<String> t) {
                return t;
            }
        }).subscribe(getDefaultObserver()) ;

        System.in.read() ;
    }

那么我们看一下结果,上图的是flatmap,下图的是concatMap:
技术分享 技术分享

很显然在多线程的情况下,flatmap的数据是混合的,那么就意味着其处理数据的方式是随机合并的[因为采用的是random方式,所以得出的结果也可能是随机的,这里我只是说明flatmap与concatmap之间的区别,但是它们也有相同结果的表现形式,比如它们是同一线程从上到下的执行顺序],而我们的conactMap是按照顺序的,先执行getRandomObservable(1),然再去执行getRandomObservable(2)。那么从这里我们就能非常清晰的知道了flatmap鱼concatmap的区别了。

switchMap

使用方式和flatmap一致,除了一点:当原始的Observable发射一个新的数据(Observable)时,它将取消订阅并停止监视之前的那个Observable,只监视这一个,好吧,还是上面的代码,重新贴一下:

        Observable.
        just(getRandomObservable(1), getRandomObservable(2)).
        switchMap(new Func1<Observable<String>, Observable<String>>() {
            @Override
            public Observable<String> call(Observable<String> t) {
                return t;
            }
        }).subscribe(getDefaultObserver()) ;

        System.in.read() ;

结果为:
技术分享

可以看出,它只执行了onNext2,并没有执行onNext1方法,就执行了onCompleted方法,那么就说明switchMap放弃监视getRandomObservable(1),转而监视getRandomObservable(2)方法。

scan

连续地对数据列的每一项应用一个函数,然后连续发射结果
不扯了,还是来个例子吧,小时候我们数学课上经常计算这个结果为5050的表达式:

1 + 2 + 3 + 4 + 5 + ... + 100 = 5050;

那么使用scan操作符计算,就是这样的:

    @Test
    public void scanFunction() {
        Observable.
        range(1, 100).
        scan(new Func2<Integer, Integer, Integer>() {

            @Override
            public Integer call(Integer t1, Integer t2) {
                return t1 + t2;
            }
        }).subscribe(getDefaultObserver()) ;
    }

来看看结果吧,比较好玩:
技术分享

好了我们看到了最终的结果是 5050了,那么看看上面的除了5050之外的onNext方法,再看一下下面的解释过程,就可以知道了scan的计算方式了:
技术分享

scan操作符对原始的Observable发射的第一项数据应用一个函数,然后将那个函数的结果作为第一项数据发射。它将函数的结果同第二项数据一起填充给这个函数来产生它自己的第二项数据,持续这个过程,直到发射数据的结束。

GroupBy

将一个Observable分拆为Observables的集合,当然这个分拆的规则是我们自己通过函数去定义的。通过groupby操作符,返回的是一个GroupedObservable。GroupObservable继承了Observable,并且拥有一个额外的方法getKey,这个key值指定了数据分组指定的Observable。
好了,咋们来试一下这个操作符吧。现在我们想从tom,tommy,merry各位小学生里面取出名字第一个字母为m的,那么我们先需要分组,然后再输出,来,给个代码证明一下啊:

@Test
    public void groupByFunction() {
        Observable.
        just("tom", "tommy" , "merry").
        groupBy(new Func1<String, Integer>() {
            @Override
            public Integer call(String t) { 
                //我这里是计算字符 ‘m‘的数值
                return t.charAt(0) - ‘a‘;
            }
        }).flatMap(new Func1<GroupedObservable<Integer, String>, Observable<String>>() {

            @Override
            public Observable<String> call(GroupedObservable<Integer, String> t) {
                //分组完成之后,在这里判断根据key去判断我们的目标值
                //当key==19 时,输出此时分组的Observable

                if(t.getKey() == 19) {
                    return t ;
                }

                //当key为其它值时,我们可以根据自己的需要 这里直接输入never【就是什么都不操作的意思】
                return Observable.never();
            }

        }).subscribe(getDefaultObserver()) ;
    }

好了,我们来看一下结果吧:
技术分享
好了,我们的目标完成了,这个grouby操作符,基本也就差不多扯到这里了,具体项目中自己用吧,据说这玩意还有个内存泄漏的什么bug,估计是我们操作不好造成的吧。

Buffer

定期收集Observable的数据放进一个数据包裹,然后发射这些数据包裹,而不是一次发射一个值。
1. 发射1-15这15个数字,每4个数字发送一次:

@Test
    public void bufferFunction1() {
        Observable.
        range(1, 15).
        buffer(4).
        subscribe(getDefaultObserver());
    }

得到的结果为:
技术分享

可以看到结果是以List的形式,每次为4位的形式展现出来的。

  1. 发射1-15这15个数字,每次取值时,剔除8个发射值,然后取4个数字发送一次:
@Test
    public void bufferFunction2() {
        Observable.
        range(1, 15).
        buffer(4,8).
        subscribe(getDefaultObserver());
    }

那么结果为:
技术分享
可以看出第二项onNext中1-8的数据被剔除了。

  1. buffer除了定量取值之外,还可以定时取值,下面的例子,buffer定时3秒获取发射值:
@Test
    public void bufferFunction3() {
        Observable.create(new Observable.OnSubscribe<String>() {  
               @Override  
               public void call(Subscriber<? super String> subscriber) {  
                   if (subscriber.isUnsubscribed()) return;  
                   while (true) {  
                       subscriber.onNext("消息" + System.currentTimeMillis());  
                       try {
                         //每隔2s发送消息
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }  
                   }  
               }  
           }).  
        //每隔3秒 取出消息
        buffer(3, TimeUnit.SECONDS).  
        subscribe(getDefaultObserver());  
    }

那么得出的结果为:
技术分享
每3秒获取一次数据,然后将获取的所有数据统一发射出来。

好了,基本上能说能写的,就写成这样了,有新的东西需要及时补上啊,写得真心时间长啊。。。。

<script type="text/javascript"> $(function () { $(‘pre.prettyprint code‘).each(function () { var lines = $(this).text().split(‘\n‘).length; var $numbering = $(‘
    ‘).addClass(‘pre-numbering‘).hide(); $(this).addClass(‘has-numbering‘).parent().append($numbering); for (i = 1; i <= lines; i++) { $numbering.append($(‘
  • ‘).text(i)); }; $numbering.fadeIn(1700); }); }); </script>

    RxJava操作符(二) __变换操作