首页 > 代码库 > 一篇博客让你了解RxJava
一篇博客让你了解RxJava
RxJava可以说是2016年最流行的项目之一了,最近也接触了一下RxJava,于是想写一篇博客,希望能通过这篇博客让大家能对其进行了解,本篇博客是基于RxJava2.0,跟RxJava1.0还是有很多不同的
基础知识
RxJava的核心就是“异步”两个字,其最关键的东西就是两个:
Observable(被观察者)
Observer/Subscriber(观察者)
Observable可以发出一系列的 事件,这里的事件可以是任何东西,例如网络请求、复杂计算处理、数据库操作、文件操作等等,事件执行结束后交给 Observer回调处理。
Observable可以理解为事件的发送者,就好像快递的寄出者,而这些事件就好比快递
Observer可以理解为事件的接收者,就好像快递的接收者
那他们之间是如何进行联系的呢?答案就是通过subscribe()方法,下面的代码就是RXJAVA中Observable与Observer进行关联的典型方式:
//创建一个被观察者 Observable
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(5);
e.onNext(6);
e.onNext(7);
e.onNext(8);
e.onComplete();
}
});
//创建观察者observer
Observer<Integer> observer = new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "subscribe");
}
@Override
public void onNext(Integer value) {
Log.d(TAG, value.toString());
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "error");
}
@Override
public void onComplete() {
Log.d(TAG, "complete");
}
};
//建立关联
observable.subscribe(observer);
运行项目,我们可以看到,数字已经打印出来
这里需要强调的是: 只有当观察者和被观察者建立连接之后, 被观察者才会开始发送事件. 也就是调用了subscribe()方法之后才开始发送事件.
上面我们看到观察者和被观察者的逻辑是分开写的,那能不能合在一起写呢?答案是肯定的,这也是RxJava比较突出的优点,那就是链式操作,代码如下:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(5);
e.onNext(6);
e.onNext(7);
e.onNext(8);
e.onComplete();
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "subscribe");
}
@Override
public void onNext(Integer value) {
Log.d(TAG, value.toString());
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "error");
}
@Override
public void onComplete() {
Log.d(TAG, "complete");
}
});
有时候,你可能觉得,我就打印几个数,还要把Observable写的那么麻烦,能不能简便一点呢?答案是肯定的,RxJava内置了很多简化创建Observable对象的函数,比如Observable.just就是用来创建只发出一个事件就结束的Observable对象,上面创建Observable对象的代码可以简化为一行
Observable<String> observable = Observable.just("hello");
同样对于Observer,这个例子中,我们其实并不关心OnComplete和OnError,我们只需要在onNext的时候做一些处理,这时候就可以使用Consumer类。
Observable<String> observable = Observable.just("hello");
Consumer<String> consumer = new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
};
observable.subscribe(consumer);
其实在RxJava中,我们可以为 Observer中的三种状态根据自身需要分别创建一个回调动作,通过Action 来替代onComplete():,通过Consumer来替代 one rror(Throwable t)和onNext(T t)
Observable<String> observable = Observable.just("hello");
Action onCompleteAction = new Action() {
@Override
public void run() throws Exception {
Log.i(TAG, "complete");
}
};
Consumer<String> onNextConsumer = new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.i(TAG, s);
}
};
Consumer<Throwable> one rrorConsumer = new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.i(TAG, "error");
}
};
observable.subscribe(onNextConsumer, one rrorConsumer, onCompleteAction);
}
Observable.just同样可以发送多个参数
Observable observable = Observable.just("you", "are", "beautiful");
Consumer<String> onNextConsumer = new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.i(TAG, s);
}
};
observable.subscribe(onNextConsumer);
例子:来一个简单的例子来了解事件的产生到消费、订阅的过程:从res/mipmap中取出一张图片,显示在ImageView上。
final ImageView ivLogo = (ImageView) findViewById(R.id.logo);
Observable.create(new ObservableOnSubscribe<Drawable>() {
@Override
public void subscribe(ObservableEmitter<Drawable> e) throws Exception {
// 从mipmap取出一张图片作为Drawable对象
Drawable drawable = ContextCompat.getDrawable(MainActivity.this, R.mipmap.ic_launcher);
// 把Drawable对象发送出去
e.onNext(drawable);
e.onComplete();
}
}).subscribe(new Observer<Drawable>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Drawable value) {
ivLogo.setImageDrawable(value);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
这样就完成了一个简单的图片的设置
ObservableEmitter和Disposable
ObservableEmitter: ObservableEmitter可以理解为发射器,这个就是用来发出事件的,它可以发出三种类型的事件,通过调用emitter的onNext(T value)、onComplete()和onError(Throwable error)就可以分别发出next事件、complete事件和error事件。
注意:但是事件的发送是有一定的规定的,就好比寄快递也要有一定要求,不是什么都能寄的:
1.被观察者可以发送无限个onNext, 观察者也可以接收无限个onNext.
2.当Observable发送了一个onComplete后, Observable的onComplete之后的事件将会继续发送, 而Observer收到onComplete事件之后将不再继续接收事件.
3.当Observable发送了一个onError后, Observable中onError之后的事件将继续发送, 而Observer收到onError事件之后将不再继续接收事件.
4.Observable可以不发送onComplete或onError.
5.最为关键的是onComplete和onError必须唯一并且互斥, 即不能发多个onComplete, 也不能发多个onError, 也不能先发一个onComplete, 然后再发一个onError, 反之亦然
注: 关于onComplete和onError唯一并且互斥这一点, 是需要自行在代码中进行控制, 如果你的代码逻辑中违背了这个规则, 并不一定会导致程序崩溃. 比如发送多个onComplete是可以正常运行的, 依然是收到第一个onComplete就不再接收了, 但若是发送多个onError, 则收到第二个onError事件会导致程序会崩溃.当我们写多个onComplete时,不会报错
当我们又有onComplete又有onError时,发现在调用onComplete后会爆出异常
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(5);
e.onNext(6);
e.onNext(7);
e.onNext(8);
e.onError(new NullPointerException());
e.onComplete();
}
})
这是onComplete在onError前调用的情况
当我们写两个onError时,会先接受前面的所有事件,最后才报错
介绍了ObservableEmitter, 接下来介绍Disposable, 当调用dispose()方法时, 它就会将观察者和被观察者的联系切断, 从而导致观察者收不到事件.
注意: 调用dispose()并不会导致Observable不再继续发送事件, Observable会继续发送剩余的事件.
看一下下面这个例子:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "emitter 1");
emitter.onNext(1);
Log.d(TAG, "emitter 2");
emitter.onNext(2);
Log.d(TAG, "emitter 3");
emitter.onNext(3);
Log.d(TAG, "emitter complete");
emitter.onComplete();
Log.d(TAG, "emitter 4");
emitter.onNext(4);
}
}).subscribe(new Observer<Integer>() {
private Disposable mDisposable;
private int i;
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "subscribe");
mDisposable = d;
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "onNext: " + value);
i++;
if (i == 2) {
Log.d(TAG, "dispose");
mDisposable.dispose();
Log.d(TAG, "isDisposed : " + mDisposable.isDisposed());
}
}
@Override
public void one rror(Throwable e) {
Log.d(TAG, "error");
}
@Override
public void onComplete() {
Log.d(TAG, "complete");
}
});
}
打印如下:
在收到onNext 2这个事件后, 我们中断了联系, 但是Observable
仍然发送了3, complete, 4这几个事件, 而且Observable
并没有因为发送了onComplete而停止. 同时可以看到Observer的onSubscribe()方法是最先调用的.
subscribe()有多个重载的方法:
public final Disposable subscribe() {}
public final Disposable subscribe(Consumer<? super T> onNext) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> one rror) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> one rror, Action onComplete) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> one rror, Action onComplete, Consumer<? super Disposable> onSubscribe) {}
public final void subscribe(Observer<? super T> observer) {}
?不带任何参数的subscribe() 表示Observer不关心任何事件,Observable发送什么数据都随你
?带有一个Consumer参数的方法表示Observer只关心onNext事件, 其他的事件我假装没看见, 因此我们如果只需要onNext事件可以这么写:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "emitter 1");
emitter.onNext(1);
Log.d(TAG, "emitter 2");
emitter.onNext(2);
Log.d(TAG, "emitter 3");
emitter.onNext(3);
Log.d(TAG, "emitter complete");
emitter.onComplete();
Log.d(TAG, "emitter 4");
emitter.onNext(4);
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "onNext: " + integer);
}
});
其他方式也是类似的方式
线程调度
正常情况下, Observer和Observable是工作在同一个线程中的, 也就是说Observable在哪个线程发事件, Observer就在哪个线程接收事件.
RxJava中, 当我们在主线程中去创建一个Observable来发送事件, 则这个Observable默认就在主线程发送事件.
当我们在主线程去创建一个Observer来接收事件, 则这个Observer默认就在主线程中接收事件,但其实在现实工作中我们更多的是需要进行线程切换的,最常见的例子就是在子线程中请求网络数据,在主线程中进行展示
要达到这个目的, 我们需要先改变Observable发送事件的线程, 让它去子线程中发送事件, 然后再改变Observer的线程, 让它去主线程接收事件. 通过RxJava内置的线程调度器可以很轻松的做到这一点. 接下来看一段代码:
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "Observable thread is : " + Thread.currentThread().getName());
Log.d(TAG, "emitter 1");
emitter.onNext(1);
}
});
Consumer<Integer> consumer = new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "Observer thread is :" + Thread.currentThread().getName());
Log.d(TAG, "onNext: " + integer);
}
};
observable.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(consumer);
}
可以看到, observable发送事件的线程的确改变了, 是在一个叫 RxNewThreadScheduler-1的线程中发送的事件, 而consumer 仍然在主线程中接收事件, 这说明我们的目的达成了, 接下来看看是如何做到的.
这段代码只不过是增加了两行代码:
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
简单的来说, subscribeOn() 指定的是Observable发送事件的线程, observeOn() 指定的是Observer接收事件的线程.
多次指定Observable的线程只有第一次指定的有效, 也就是说多次调用subscribeOn() 只有第一次的有效, 其余的会被忽略.
多次指定Observer的线程是可以的, 也就是说每调用一次observeOn() , Observer的线程就会切换一次.例如:
observable.subscribeOn(Schedulers.newThread())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.observeOn(Schedulers.io())
.subscribe(consumer);
这段代码中指定了两次上游发送事件的线程, 分别是newThread和IO线程, 下游也指定了两次线程,分别是main和IO线程. 运行结果为:
可以看到, Observable虽然指定了两次线程, 但只有第一次指定的有效, 依然是在RxNewThreadScheduler线程中, 而Observer则跑到了RxCachedThreadScheduler 中, 这个CacheThread其实就是IO线程池中的一个.
在 RxJava 中,提供了一个名为 Scheduler 的线程调度器,RxJava 内部提供了4个调度器,分别是:
Schedulers.io(): I/O 操作(读写文件、数据库、网络请求等),与newThread()差不多,区别在于io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 效率比 newThread() 更高。值得注意的是,在 io() 下,不要进行大量的计算,以免产生不必要的线程;
Schedulers.newThread(): 开启新线程操作;
Schedulers.immediate(): 默认指定的线程,也就是当前线程;
Schedulers.computation():计算所使用的调度器。这个计算指的是 CPU 密集型计算,即不会被 I/O等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。值得注意的是,不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU;
AndroidSchedulers.mainThread(): RxJava 扩展的 Android 主线程;
这些内置的Scheduler已经足够满足我们开发的需求, 因此我们应该使用内置的这些选项, 在RxJava内部使用的是线程池来维护这些线程, 所有效率也比较高.
例子:还是用之前设置图片的例子,这次我们在子线程中进行网络请求获取图片,在主线程中对图片进行设置
final ImageView ivLogo = (ImageView) findViewById(R.id.logo);
Observable.create(new ObservableOnSubscribe<Drawable>() {
@Override
public void subscribe(ObservableEmitter<Drawable> e) throws Exception {
try {
Drawable drawable = Drawable.createFromStream(new URL("https://ss2.baidu.com/6ONYsjip0QIZ8tyhnq/it/u=2502144641,437990411&fm=80&w=179&h=119&img.JPEG").openStream(), "src");
e.onNext(drawable);
} catch (IOException error) {
e.onError(error);
}
}
})// 指定 subscribe() 所在的线程,也就是上面subscribe()方法调用的线程
.subscribeOn(Schedulers.io())
// 指定 Observer 回调方法所在的线程,也就是onCompleted, one rror, onNext回调的线程
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Drawable>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Drawable value) {
ivLogo.setImageDrawable(value);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
这段代码就做一件事,在 io 线程加载一张网络图片,加载完毕之后在主线程中显示到ImageView上。
操作符的使用
在了解基本知识和线程调度后,我们来学习一下RxJava各种神奇的操作符
Map
Map是RxJava中最简单的一个变换操作符了, 它的作用就是对Observable发送的每一个事件应用一个函数, 使得每一个事件都按照指定的函数去变化.
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
}
}).map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return "This is result " + integer;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, s);
}
});
在Observable我们发送的是数字类型, 而在Observer我们接收的是String类型, 中间起转换作用的就是Map操作符, 运行结果为:
通过Map, 可以将Observable发来的事件转换为任意的类型, 可以是一个Object, 也可以是一个集合,功能非常强大
例子:还是以图片加载的例子,我们传进来一个图片的路径,然后通过Map进行转换成drawble再发送给观察者
final ImageView ivLogo = (ImageView) findViewById(R.id.logo);
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("https://ss2.baidu.com/-vo3dSag_xI4khGko9WTAnF6hhy/image/h%3D200/sign=4db5130a073b5bb5a1d727fe06d2d523/cf1b9d16fdfaaf51965f931e885494eef11f7ad6.jpg");
}
}).map(new Function<String, Drawable>() {
@Override
public Drawable apply(String url) throws Exception {
try {
Drawable drawable = Drawable.createFromStream(new URL(url).openStream(), "src");
return drawable;
} catch (IOException e) {
}
return null;
}
}) .subscribeOn(Schedulers.io())
// 指定 Observer 回调方法所在的线程,也就是onCompleted, one rror, onNext回调的线程
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Drawable>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Drawable value) {
if (value != null) {
ivLogo.setImageDrawable(value);
}
}
@Override
public void onError(Throwable e) {
Log.e(TAG, e.toString());
}
@Override
public void onComplete() {
}
});
效果如下:
经过改写代码后,有什么变化呢? Observable 创建了一个 String 事件,也就是产生一个url,通过 map 操作符进行变换,返回Drawable对象,这个变换指的就是通过url进行网络图片请求,返回一个Drawable。所以简单的来说就是把String事件,转换为Drawable事件。逻辑表示就是
Observable –> map变换 –> Observable
FlatMap
FlatMap将一个发送事件的Observable变换为多个发送事件的Observables,然后将它们发射的事件合并后放进一个单独的Observable里.
Observable每发送一个事件, flatMap都将对其进行转换, 然后发送转换之后的新的事件, Observer接收到的就是转换后发送的数据. 这里需要注意的是, flatMap并不保证事件的顺序, 如果需要保证顺序则需要使用concatMap.
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
}
}).flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
final List<String> list = new ArrayList<>();
for (int i = 0; i < 3; i++) {
list.add("I am value " + integer);
}
return Observable.fromIterable(list).delay(10, TimeUnit.MILLISECONDS);
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, s);
}
});
效果如下:
Map 与 flatMap 这两个操作符的共同点在于,他们都是把一个对象转换为另一个对象,但须注意以下这些特点:
1.flatMap 返回的是一个Observable对象,而 map 返回的是一个普通转换后的对象;
2.flatMap 返回的Observable对象并不是直接发送到Subscriber的回调中,而是重新创建一个Observable对象,并激活这个Observable对象,使之开始发送事件;而 map 变换后返回的对象直接发到Subscriber回调中;
3.flatMap 变换后产生的每一个Observable对象发送的事件,最后都汇入同一个Observable,进而发送给Subscriber回调;
4.map返回类型 与 flatMap 返回的Observable事件类型,可以与原来的事件类型一样;
5.可以对一个Observable多次使用 map 和 flatMap;
鉴于 flatMap 自身强大的功能,这常常被用于 嵌套的异步操作,例如嵌套网络请求。传统的嵌套请求,一般都是在前一个请求的 onSuccess() 回调里面发起新的请求,这样一旦嵌套多个的话,缩进就是大问题了,而且严重的影响代码的可读性。而RxJava嵌套网络请求仍然通过链式结构,保持代码逻辑的清晰!举个栗子:
public interface Api {
@GET
Observable<LoginResponse> login(@Body LoginRequest request);
@GET
Observable<RegisterResponse> register(@Body RegisterRequest request);
}
接着创建一个Retrofit客户端:
private static Retrofit create() {
OkHttpClient.Builder builder = new OkHttpClient().newBuilder();
builder.readTimeout(10, TimeUnit.SECONDS);
builder.connectTimeout(9, TimeUnit.SECONDS);
if (BuildConfig.DEBUG) {
HttpLoggingInterceptor interceptor = new HttpLoggingInterceptor();
interceptor.setLevel(HttpLoggingInterceptor.Level.BODY);
builder.addInterceptor(interceptor);
}
return new Retrofit.Builder().baseUrl(ENDPOINT)
.client(builder.build())
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.build();
}
发起请求就很简单了:
Api api = retrofit.create(Api.class);
api.login(request)
.subscribeOn(Schedulers.io()) //在IO线程进行网络请求
.observeOn(AndroidSchedulers.mainThread()) //回到主线程去处理请求结果
.subscribe(new Observer<LoginResponse>() {
@Override
public void onSubscribe(Disposable d) {}
@Override
public void onNext(LoginResponse value) {}
@Override
public void onError(Throwable e) {
Toast.makeText(mContext, "登录失败", Toast.LENGTH_SHORT).show();
}
@Override
public void onComplete() {
Toast.makeText(mContext, "登录成功", Toast.LENGTH_SHORT).show();
}
});
concatMap
这里也简单说一下concatMap吧, 它和flatMap的作用几乎一模一样, 只是它的结果是严格按照上游发送的顺序来发送的, 来看个代码吧:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
}
}).concatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
final List<String> list = new ArrayList<>();
for (int i = 0; i < 3; i++) {
list.add("I am value " + integer);
}
return Observable.fromIterable(list).delay(10,TimeUnit.MILLISECONDS);
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, s);
}
});
只是将之前的flatMap改为了concatMap, 其余原封不动, 运行结果如下:
可以看到, 结果仍然是有序的.
ZIP
Zip通过一个函数将多个Observable发送的事件结合到一起,然后发送这些组合到一起的事件. 它按照严格的顺序应用这个函数。它只发射与发射数据项最少的那个Observable一样多的数据。
Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "emitter 1");
emitter.onNext(1);
Log.d(TAG, "emitter 2");
emitter.onNext(2);
Log.d(TAG, "emitter 3");
emitter.onNext(3);
Log.d(TAG, "emitter 4");
emitter.onNext(4);
Log.d(TAG, "emit complete1");
emitter.onComplete();
}
});
Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.d(TAG, "emitter A");
emitter.onNext("A");
Log.d(TAG, "emitter B");
emitter.onNext("B");
Log.d(TAG, "emitter C");
emitter.onNext("C");
Log.d(TAG, "emitter complete2");
emitter.onComplete();
}
});
Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
@Override
public String apply(Integer integer, String s) throws Exception {
return integer + s;
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe");
}
@Override
public void onNext(String value) {
Log.d(TAG, "onNext: " + value);
}
@Override
public void one rror(Throwable e) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
我们分别创建了observable, 一个发送1,2,3,4,Complete, 另一个发送A,B,C,Complete, 接着用Zip把发出的事件组合, 来看看运行结果吧:
观察发现observable1发送事件后,observable2才发送
这是因为我们两个observable都是运行在同一个线程里, 同一个线程里执行代码肯定有先后顺序呀.
Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "emit 1");
emitter.onNext(1);
Thread.sleep(1000);
Log.d(TAG, "emit 2");
emitter.onNext(2);
Thread.sleep(1000);
Log.d(TAG, "emit 3");
emitter.onNext(3);
Thread.sleep(1000);
Log.d(TAG, "emit 4");
emitter.onNext(4);
Thread.sleep(1000);
Log.d(TAG, "emit complete1");
emitter.onComplete();
}
}).subscribeOn(Schedulers.io());
Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.d(TAG, "emit A");
emitter.onNext("A");
Thread.sleep(1000);
Log.d(TAG, "emit B");
emitter.onNext("B");
Thread.sleep(1000);
Log.d(TAG, "emit C");
emitter.onNext("C");
Thread.sleep(1000);
Log.d(TAG, "emit complete2");
emitter.onComplete();
}
}).subscribeOn(Schedulers.io());
Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
@Override
public String apply(Integer integer, String s) throws Exception {
return integer + s;
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe");
}
@Override
public void onNext(String value) {
Log.d(TAG, "onNext: " + value);
}
@Override
public void one rror(Throwable e) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
好了, 这次我们让事件都在IO线程里发送事件, 再来看看运行结果:
第一个observable明明发送了四个数据+一个Complete, 之前明明还有的, 为啥到这里没了呢?
这是因为我们之前说了, zip发送的事件数量跟observable中发送事件最少的那一个的事件数量是有关的, 在这个例子里我们observable2只发送了三个事件然后就发送了Complete, 这个时候尽管observable1还有事件4 和事件Complete 没有发送, 但是它们发不发送还有什么意义呢?
from
在RxJava的from操作符到2.0已经被拆分成了3个,fromArray, fromIterable, fromFuture接收一个集合作为输入,然后每次输出一个元素给subscriber。
Observable.fromArray(new Integer[]{1, 2, 3, 4, 5}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.i(TAG, "number:" + integer);
}
});
注意:如果from()里面执行了耗时操作,即使使用了subscribeOn(Schedulers.io()),仍然是在主线程执行,可能会造成界面卡顿甚至崩溃,所以耗时操作还是使用Observable.create(…);
filter
条件过滤,去除不符合某些条件的事件。举个栗子:
Observable.fromArray(new Integer[]{1, 2, 3, 4, 5})
.filter(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
// 偶数返回true,则表示剔除奇数,留下偶数
return integer % 2 == 0;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.i(TAG, "number:" + integer);
}
});
take
最多保留的事件数。
Observable.just("1", "2", "6", "3", "4", "5").take(2).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String value) {
Log.d(TAG,value);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
可以发现我们发送了6个String,最后只打印了前两个,这就是take过滤掉的结果
doOnNext
如果你想在处理下一个事件之前做某些事,就可以调用该方法
Observable.fromArray(new Integer[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}).filter(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
// 偶数返回true,则表示剔除奇数
return integer % 2 == 0;
}
})// 最多保留三个,也就是最后剩三个偶数
.take(3).doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
// 在输出偶数之前输出它的hashCode
Log.i(TAG, "hahcode = " + integer.hashCode() + "");
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
Log.i(TAG, "number = " + value);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
debounce
debounce也是用于事件的过滤,可以指定过滤事件的时间间隔
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
int i = 0;
int[] times = new int[]{100, 1000};
while (true) {
i++;
if (i >= 100)
break;
e.onNext(i);
try {
// 注意!!!!
// 当i为奇数时,休眠1000ms,然后才发送i+1,这时i不会被过滤掉
// 当i为偶数时,只休眠100ms,便发送i+1,这时i会被过滤掉
Thread.sleep(times[i % 2]);
} catch (InterruptedException error) {
error.printStackTrace();
}
}
e.onComplete();
}
})// 间隔400ms以内的事件将被丢弃
.debounce(400, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Integer>() {
@Override
public void onError(Throwable e) {
Log.e(TAG, e.toString());
}
@Override
public void onComplete() {
Log.i(TAG, "complete");
}
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.i(TAG, "integer = " + integer);
}
});
compose
与 flatMap 类似,都是进行变换,返回Observable对象,激活并发送事件。
1.compose 是唯一一个能够从数据流中得到原始Observable的操作符,所以,那些需要对整个数据流产生作用的操作(比如,subscribeOn()和observeOn())需要使用 compose 来实现。相较而言,如果在flatMap()中使用subscribeOn()或者observeOn(),那么它仅仅对在flatMap 中创建的Observable起作用,而不会对剩下的流产生影响。这样就可以简化subscribeOn()以及observeOn()的调用次数了。
2.compose 是对 Observable 整体的变换,换句话说, flatMap 转换Observable里的每一个事件,而 compose 转换的是整个Observable数据流。
3.flatMap 每发送一个事件都创建一个 Observable,所以效率较低。而 compose 操作符只在主干数据流上执行操作。
4.建议使用 compose 代替 flatMap。
First
只发送符合条件的第一个事件。可以与contact操作符,做网络缓存。
例子:依次检查Disk与Network,如果Disk存在缓存,则不做网络请求,否则进行网络请求。
// 从缓存获取
Observable<BookList> fromDisk = Observable.create(new Observable.OnSubscribe<BookList>() {
@Override
public void call(Subscriber<? super BookList> subscriber) {
BookList list = getFromDisk();
if (list != null) {
subscriber.onNext(list);
} else {
subscriber.onCompleted();
}
}
});
// 从网络获取
Observable<BookList> fromNetWork = bookApi.getBookDetailDisscussionList();
Observable.concat(fromDisk, fromNetWork)
// 如果缓存不为null,则不再进行网络请求。反之
.first()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<BookList>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(BookList discussionList) {
}
});
Single
Single与Observable类似,相当于是他的精简版。订阅者回调的不是OnNext/OnError/onCompleted,而是回调OnSuccess/OnError。
Single.create(new SingleOnSubscribe<Object>() {
@Override
public void subscribe(SingleEmitter<Object> e) throws Exception {
e.onSuccess("hello world");
}
}).subscribe(new SingleObserver<Object>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onSuccess(Object value) {
Log.i(TAG, value.toString());
}
@Override
public void onError(Throwable e) {
}
});
可以配合debounce,避免SearchEditText频繁请求。
final Subject subject = PublishSubject.create();
subject.debounce(400, TimeUnit.MILLISECONDS)
.subscribe(new Observer() {
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Object o) {
// request
}
});
edittext.addTextChangedListener(new TextWatcher() {
@Override
public void beforeTextChanged(CharSequence s, int start, int count, int after) { }
@Override
public void onTextChanged(CharSequence s, int start, int before, int count) {
subject.onNext(s.toString());
}
@Override
public void afterTextChanged(Editable s) { }
});
RxJava的一些使用场景
场景1:
取数据,首先检查内存是否有缓存
然后检查文件缓存中是否有
最后才从网络中取
前面任何一个条件满足,就不会执行后面的
final Observable<String> memory = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
if (memoryCache != null) {
emitter.onNext(memoryCache);
} else {
emitter.onComplete();
}
}
});
final Observable<String> disk = Observable.create(new ObservableOnSubscribe<String>() {
String cachePref = getSharedPreferences("rxdeni",MODE_PRIVATE).getString("cache",null);
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
if (cachePref != null) {
emitter.onNext(cachePref);
} else {
emitter.onComplete();
}
}
});
Observable<String> network = Observable.just("network");
//主要就是靠concat operator来实现
Observable.concat(memory, disk, network).firstElement()
.subscribeOn(Schedulers.newThread())
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println("--------------subscribe: " + s);
}
});
场景2:界面需要等到多个接口并发取完数据,再更新
Observable<String> observable1 = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("haha");
}
}).subscribeOn(Schedulers.newThread());
Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("hehe");
}
}).subscribeOn(Schedulers.newThread());
Observable.merge(observable1, observable2)
.subscribeOn(Schedulers.newThread())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String value) {
Log.d(TAG,value);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
场景3:界面按钮需要防止连续点击的情况
RxView.clicks(button)
.throttleFirst(1, TimeUnit.SECONDS)
.subscribe(new Observer<Object>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Object o) {
Log.i(TAG, "do clicked!");
}
});
场景4:响应式的界面
比如勾选了某个checkbox,自动更新对应的preference
SharedPreferences preferences = PreferenceManager.getDefaultSharedPreferences(this);
RxSharedPreferences rxPreferences = RxSharedPreferences.create(preferences);
Preference<Boolean> checked = rxPreferences.getBoolean("checked", true);
CheckBox checkBox = (CheckBox) findViewById(R.id.cb_test);
RxCompoundButton.checkedChanges(checkBox)
.subscribe(checked.asAction());
场景5:复杂的数据变换
Observable.just("1", "2", "6", "3", "4", "5")
.map(new Function<String, Integer>() {
@Override
public Integer apply(String s) throws Exception {
return Integer.parseInt(s);
}
}).filter(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer.intValue()%2 == 0;
}
}).distinct().take(2).reduce(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer integer, Integer integer2) throws Exception {
return integer.intValue() + integer2.intValue();
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG,integer.toString());
}
});
一篇博客让你了解RxJava