首页 > 代码库 > RxJava响应式编程之初级了解

RxJava响应式编程之初级了解

据说现在流行的开发模式是 Retrofit+RxJava+MVP+ButterKnife

如果想要简单学习ButterKnife、MVP模式,可以参考我以前的例子
使用butterknife注解框架
Android—MVP设计模式高级(三)

今天我就简单来学习下RxJava的相关知识
以前我也只是听说过RxJava,RxJava这个到底是什么东西呢?
呵呵,它其实是一个库,所以我们使用里面的方法,得需要下载库,所以我们需要在AS中进行配置

1.RxJava 地址以及添加

github地址:
https://github.com/ReactiveX/RxJava
或者
https://github.com/ReactiveX/RxAndroid

依赖库添加:
compile ‘io.reactivex:rxjava:1.1.6’
或者
compile ‘io.reactivex:rxandroid:1.2.1’

2.RxJava是什么类型的库?它的原理是什么?

RxJava 在 GitHub 主页上的自我介绍是 “a library for composing asynchronous and event-based programs using observable sequences for the Java VM”(一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库)。这就是 RxJava ,概括得非常精准。

Rx:函数响应式编程,也许这个词对你我都很只可意会,不可言传,先抛开Rx不说,我们接触到的类似的这样的思路,大概有接口回调、Handler通讯、广播通讯、还有一个开源的EventBus、以及ContentPorivider里面的观察者模式、AsyncTask 我们朦胧中也许就大概了解RxJava是怎么个东西了。

RxJava 的异步实现,是通过一种扩展的观察者模式来实现的。
至于观察者我就拿ContentProvider来说吧,比如我们在一个ContentProvider中有一个insert方法,插入完毕后,去通知某个监听该URI变化的界面

getContext().getContentResolver().notifyChange(URI, null);

比如在MainActivity中去registerContentObserver注册一个内容观察者

private static final Uri URI = Uri  
            .parse("content://com.example.contentprovider.MyContentProvider/student");  
 contentResolver.registerContentObserver(uri, true, observer);
 private ContentObserver observer = new ContentObserver(null) {  
        public void onChange(boolean selfChange) {  
            // 说明数据有改变,重新查询一直所有记录  
            Uri uri = Uri.parse("content://com.example.contentprovider.MyContentProvider/student");  
            Cursor cursor = contentResolver.query(uri, null, null, null, null);  
            Log.e("TAG", "onChange()  count=" + cursor.getCount());  
        };  
    };  

那么对于RxJava 的观察者模式呢?
RxJava 有四个基本概念:Observable (被观察者)、 Observer (观察者)、 subscribe (订阅)、事件。
对比ContentProvider,RxJava里面的被观察者就是某个Uri(也就是某个数据库),观察者就是某个界面(比如MainActivity),订阅就是registerContentObserver,事件就是insert方法

我们来看看RxJava的察者模式流程交互图:
技术分享

与传统观察者模式不同, RxJava 的事件回调方法除了普通事件 onNext() (相当于 insert)之外,还定义了两个特殊的事件:onCompleted() 和 one rror()。

我们来看下Observable是如何定义的?

     Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                Log.d(TAG, "call: threadId:" + Thread.currentThread().getId());
                subscriber.onStart();
                subscriber.onNext("Hello World!");
                subscriber.onCompleted();
            }
        })

我们看看create源码里面做了什么?

   public static <T> Observable<T> create(OnSubscribe<T> f) {
        return new Observable<T>(hook.onCreate(f));
    }

可以看到,这里传入了一个 OnSubscribe 对象作为参数。OnSubscribe 会被存储在返回的 Observable 对象中,当 Observable 被订阅的时候,OnSubscribe 的 call() 方法会自动被调用,事件序列就会依照设定依次触发(对于上面的代码,就是观察者Subscriber 将会被调用一次 onNext() 和一次 onCompleted())。这样,由被观察者调用了观察者的回调方法,就实现了由被观察者向观察者的事件传递,即观察者模式。

然后在看Observer中做了什么操作?

subscribe(new Observer<String>() {
                    @Override
                    public void onCompleted() {
                        Log.d(TAG, "onCompleted: threadId:" + Thread.currentThread().getId());
                    }

                    @Override
                    public void one rror(Throwable e) {
                        Log.e(TAG, "onError: threadId:" + Thread.currentThread().getId());
                    }

                    @Override
                    public void onNext(String s) {
                        Log.d(TAG, "onNext: threadId:" + Thread.currentThread().getId());
                        Log.i(TAG, "onNext: s = " + s);
                    }
                });

onCompleted(): 事件队列完结。RxJava 不仅把每个事件单独处理,还会把它们看做一个队列。RxJava 规定,当不会再有新的 onNext() 发出时,需要触发 onCompleted() 方法作为标志。
onError(): 事件队列异常。在事件处理过程中出异常时,onError() 会被触发,同时队列自动终止,不允许再有事件发出。
在一个正确运行的事件序列中, onCompleted() 和 one rror() 有且只有一个,并且是事件序列中的最后一个。需要注意的是,onCompleted() 和 one rror() 二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。

接下来我们将全部的代码贴出来,看看Log日志打印:

同步方式

package com.example.administrator.myapplication;
import android.app.Activity;
import android.os.Bundle;
import android.util.Log;

import rx.Observable;
import rx.Observer;
import rx.Subscriber;

public class MainActivity extends Activity {
    private String TAG = "MainActivity";
    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);

        Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                Log.d(TAG, "call: threadId:" + Thread.currentThread().getId());
                subscriber.onStart();
                subscriber.onNext("Hello World!");
                subscriber.onCompleted();
            }
        })
                .subscribe(new Observer<String>() {
                    @Override
                    public void onCompleted() {
                        Log.d(TAG, "onCompleted: threadId:" + Thread.currentThread().getId());
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.e(TAG, "onError: threadId:" + Thread.currentThread().getId());
                    }

                    @Override
                    public void onNext(String s) {
                        Log.d(TAG, "onNext: threadId:" + Thread.currentThread().getId());
                        Log.i(TAG, "onNext: s = " + s);
                    }
                });
    }
}
call: threadId:1
onNext: threadId:1
onNext: s = Hello World!
onCompleted: threadId:1

从上可以看出,事件的处理和结果的接收都是在同一个线程里面处理的。但是,Rxjava的意义何在,异步呢?别急,看以下代码的处理,你就会发现了,异步原来是这么的简单。
异步方式
我们将上面的代码稍微改下,增加2行代码

        Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                Log.d(TAG, "call: threadId:" + Thread.currentThread().getId());
                subscriber.onStart();
                subscriber.onNext("Hello World!");
                subscriber.onCompleted();
            }
        })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<String>() {
                    @Override
                    public void onCompleted() {
                        Log.d(TAG, "onCompleted: threadId:" + Thread.currentThread().getId());
                    }

                    @Override
                    public void one rror(Throwable e) {
                        Log.e(TAG, "onError: threadId:" + Thread.currentThread().getId());
                    }

                    @Override
                    public void onNext(String s) {
                        Log.d(TAG, "onNext: threadId:" + Thread.currentThread().getId());
                        Log.i(TAG, "onNext: s = " + s);
                    }
                });

我们添上如下2行代码

.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())

我们看下Log输出

03-08 07:20:18.101 22734-22734/? I/MainActivity: testFunction: threadId:1
03-08 07:20:18.123 22734-22755/? D/MainActivity: call: threadId:180
03-08 07:20:18.142 22734-22734/? D/MainActivity: onNext: threadId:1
03-08 07:20:18.142 22734-22734/? I/MainActivity: onNext: s = Hello World!
03-08 07:20:18.143 22734-22734/? D/MainActivity: onCompleted: threadId:1

看见了没,第二行log日志threadId与其它的threadId很明显的不一样啊,说明我们在处理事件的时候,发生在了一个新的线程里面,而结果的接收,还是在主线程里面操作的。怎么样,只要添加两句话,异步立马就实现了,异步处理耗时操作,就是这么easy。

我们简单看下源码

 public final Observable<T> subscribeOn(Scheduler scheduler) {
        if (this instanceof ScalarSynchronousObservable) {
            return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
        }
        return create(new OperatorSubscribeOn<T>(this, scheduler));
    }

什么意思呢?
Scheduler scheduler参数就是执行订阅操作,返回一个源观察到的修改,使其订阅发生在指定的线程
observeOn(AndroidSchedulers.mainThread())
句话说,observeOn() 指定的是它之后的操作所在的线程。

打印字符串数组 from和just方式

以上是RxJava的很基础很简单的一个用法,那么我们接着往下看,比如我们有一组需求把一个String数组的字符串,单个打印出来,我们用Rxjava怎么实现呢?看代码:

Log.i(TAG, "testFunction: threadId:" + Thread.currentThread().getId());
        Observable.from(new String[]{"one","two","three","four"})
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<String>() {
                    @Override
                    public void onCompleted() {
                        Log.d(TAG, "onCompleted: threadId:" + Thread.currentThread().getId());
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.e(TAG, "onError: threadId:" + Thread.currentThread().getId());
                    }

                    @Override
                    public void onNext(String s) {
                        Log.i(TAG, "onNext: s = " + s);
                    }
                });

看Log输出日志如下:

testFunction: threadId:1
onNext: s = one
onNext: s = two
onNext: s = three
onNext: s = four
onCompleted: threadId:1

From操作符用来将某个对象转化为Observable对象,并且依次将其内容发射出去。这个类似于just,但是just会将这个对象整个发射出去。比如说一个含有3个字符串的数组,使用from就会发射4次,每次发射一个数字,而使用just会发射一次来将整个的数组发射出去。

 Log.i(TAG, "testFunction: threadId:"+Thread.currentThread().getId());
        Observable.just("one", "two", "three", "four")
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<String>() {
                    @Override
                    public void onCompleted() {
                        Log.d(TAG, "onCompleted: threadId:" + Thread.currentThread().getId());
                    }

                    @Override
                    public void one rror(Throwable e) {
                        Log.e(TAG, "onError: threadId:" + Thread.currentThread().getId());
                    }

                    @Override
                    public void onNext(String s) {
                        Log.i(TAG, "onNext: s = " + s);
                    }
                });
03-08 08:09:25.743 32155-32155/? I/MainActivity: testFunction: threadId:1
03-08 08:09:25.784 32155-32155/? I/MainActivity: onNext: s = one
03-08 08:09:25.785 32155-32155/? I/MainActivity: onNext: s = two
03-08 08:09:25.785 32155-32155/? I/MainActivity: onNext: s = three
03-08 08:09:25.785 32155-32155/? I/MainActivity: onNext: s = four
03-08 08:09:25.785 32155-32155/? D/MainActivity: onCompleted: threadId:1

一对一转换

 Log.i(TAG, "testFunction: threadId:"+Thread.currentThread().getId());
        Observable.just("1", "2", "3", "4")
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .map(new Func1<String, Integer>() {

                    @Override
                    public Integer call(String s) {
                        Log.i(TAG, "call: s = "+s);
                        return Integer.parseInt(s);
                    }
                })
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.i(TAG, "call: integer = "+integer);
                    }
                });
testFunction: threadId:1
call: s = 1
call: integer = 1
call: s = 2
call: integer = 2
call: s = 3
call: integer = 3
call: s = 4
call: integer = 4

简单说一下Func1,其中的T表示传入的参数类型,R表示方法返回的参数类型。源码如下:

public interface Func1<T, R> extends Function {
    R call(T t);
}

上例中还有一个叫做 Action1的类。也是 RxJava 的一个接口,用于包装含有无参数的方法。 Func1 和 Action 的区别在于, Func1 包装的是有返回值的方法。另外,和 ActionX 一样, FuncX 也有多个,用于不同参数个数的方法。FuncX 和 ActionX 的区别在 FuncX 包装的是有返回值的方法。

可以看到,map() 方法将参数中的 String 对象转换成一个 Integer对象后返回,而在经过 map() 方法后,事件的参数类型也由 String 转为了 Integer。这种直接变换对象并返回的,是最常见的也最容易理解的变换。不过 RxJava 的变换远不止这样,它不仅可以针对事件对象,还可以针对整个事件队列,这使得 RxJava 变得非常灵活。

封装Observable一对多转换
map转换,是一对一的转换,像示例当中,我们把string转成int,但是当我们需要一对多的转换,该怎么做呢?比如说,定义一个学生类:

package com.example.administrator.myapplication;

import java.util.List;

public class Student {

    private String name;

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    //////////////////////////
    private List<String> courses;


    public List<String> getCourses() {
        return courses;
    }

    public void setCourses(List<String> courses) {
        this.courses = courses;
    }
}

        Student student1 = new Student();
        student1.setName("safly");
        List<String> courses = new ArrayList<>();
        courses.add("语文");
        courses.add("数学");
        courses.add("英语");
        student1.setCourses(courses);

        Student student2 = new Student();
        student2.setName("wyf");
        List<String> courses2 = new ArrayList<>();
        courses2.add("化学");
        courses2.add("地理");
        courses2.add("政治");
        student2.setCourses(courses2);



        Observable.just(student1,student2)
                .subscribe(new Action1<Student>() {
                    @Override
                    public void call(Student student) {
                        Log.i(TAG, "call: name = "+student.getName());
                        List<String> course = student.getCourses();
                        for(String str:course){
                            Log.i(TAG, "call: str = "+str);
                        }
                    }
                });

这里我们没有进行转换,直接just发送过来,没有用到转换,然后在call中进行直接输出了

call: name = safly
call: str = 语文
call: str = 数学
call: str = 英语
call: name = wyf
call: str = 化学
call: str = 地理
call: str = 政治

我们用转换来试试看?
这里我们用到了flatmap这一函数,按通俗的一点理解:我们首先把Student转成了Observable,然后呢,又把student.getCourses()转成string挨个打印出来,结果如下:

 Observable.just(student1,student2)
                  .flatMap(new Func1<Student, Observable<String>>() {
                      @Override
                      public Observable<String> call(Student student) {
                       Log.i(TAG, "Observable " );
                          return Observable.from(student.getCourses());
                      }
                  })
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        Log.i(TAG, "call: s = "+s);     
                    }
                });

输出

 Observable
call: s = 语文
call: s = 数学
call: s = 英语
 Observable
call: s = 化学
call: s = 地理
call: s = 政治

我们还记得Observable.from嘛?
From操作符用来将某个对象转化为Observable对象

  public static <T> Observable<T> from(Iterable<? extends T> iterable) {
        return create(new OnSubscribeFromIterable<T>(iterable));
    }
<script type="text/javascript"> $(function () { $(‘pre.prettyprint code‘).each(function () { var lines = $(this).text().split(‘\n‘).length; var $numbering = $(‘
    ‘).addClass(‘pre-numbering‘).hide(); $(this).addClass(‘has-numbering‘).parent().append($numbering); for (i = 1; i <= lines; i++) { $numbering.append($(‘
  • ‘).text(i)); }; $numbering.fadeIn(1700); }); }); </script>

    RxJava响应式编程之初级了解