首页 > 代码库 > 一篇博客让你了解RxJava

一篇博客让你了解RxJava

RxJava可以说是2016年最流行的项目之一了,最近也接触了一下RxJava,于是想写一篇博客,希望能通过这篇博客让大家能对其进行了解,本篇博客是基于RxJava2.0,跟RxJava1.0还是有很多不同的

基础知识

RxJava的核心就是“异步”两个字,其最关键的东西就是两个:

  1. Observable(被观察者)

  2. Observer/Subscriber(观察者)

Observable可以发出一系列的 事件,这里的事件可以是任何东西,例如网络请求、复杂计算处理、数据库操作、文件操作等等,事件执行结束后交给 Observer回调处理。

Observable可以理解为事件的发送者,就好像快递的寄出者,而这些事件就好比快递
Observer可以理解为事件的接收者,就好像快递的接收者

那他们之间是如何进行联系的呢?答案就是通过subscribe()方法,下面的代码就是RXJAVA中Observable与Observer进行关联的典型方式:

//创建一个被观察者 Observable
    Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> e) throws Exception {
            e.onNext(5);
            e.onNext(6);
            e.onNext(7);
            e.onNext(8);
            e.onComplete();
        }
    });

    //创建观察者observer
    Observer<Integer> observer = new Observer<Integer>() {
        @Override
        public void onSubscribe(Disposable d) {
            Log.d(TAG, "subscribe");
        }

        @Override
        public void onNext(Integer value) {
            Log.d(TAG, value.toString());
        }

        @Override
        public void onError(Throwable e) {
            Log.d(TAG, "error");
        }

        @Override
        public void onComplete() {
            Log.d(TAG, "complete");
        }
    };
    //建立关联
    observable.subscribe(observer);

运行项目,我们可以看到,数字已经打印出来
技术分享

这里需要强调的是: 只有当观察者和被观察者建立连接之后, 被观察者才会开始发送事件. 也就是调用了subscribe()方法之后才开始发送事件.

上面我们看到观察者和被观察者的逻辑是分开写的,那能不能合在一起写呢?答案是肯定的,这也是RxJava比较突出的优点,那就是链式操作,代码如下:

Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
        e.onNext(5);
        e.onNext(6);
        e.onNext(7);
        e.onNext(8);
        e.onComplete();
    }
}).subscribe(new Observer<Integer>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "subscribe");
    }

    @Override
    public void onNext(Integer value) {
        Log.d(TAG, value.toString());
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "error");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "complete");
    }
});

有时候,你可能觉得,我就打印几个数,还要把Observable写的那么麻烦,能不能简便一点呢?答案是肯定的,RxJava内置了很多简化创建Observable对象的函数,比如Observable.just就是用来创建只发出一个事件就结束的Observable对象,上面创建Observable对象的代码可以简化为一行

Observable<String> observable = Observable.just("hello");

同样对于Observer,这个例子中,我们其实并不关心OnComplete和OnError,我们只需要在onNext的时候做一些处理,这时候就可以使用Consumer类。

Observable<String> observable = Observable.just("hello");
   Consumer<String> consumer = new Consumer<String>() {
       @Override
       public void accept(String s) throws Exception {
           System.out.println(s);
       }
   };
    observable.subscribe(consumer);

技术分享

其实在RxJava中,我们可以为 Observer中的三种状态根据自身需要分别创建一个回调动作,通过Action 来替代onComplete():,通过Consumer来替代 one rror(Throwable t)和onNext(T t)

Observable<String> observable = Observable.just("hello");
    Action onCompleteAction = new Action() {
        @Override
        public void run() throws Exception {
            Log.i(TAG, "complete");
        }
    };
    Consumer<String> onNextConsumer = new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            Log.i(TAG, s);
        }
    };
    Consumer<Throwable> one rrorConsumer = new Consumer<Throwable>() {
        @Override
        public void accept(Throwable throwable) throws Exception {
            Log.i(TAG, "error");
        }
    };
    observable.subscribe(onNextConsumer, one rrorConsumer, onCompleteAction);

}

技术分享

Observable.just同样可以发送多个参数

Observable observable = Observable.just("you", "are", "beautiful");
Consumer<String> onNextConsumer = new Consumer<String>() {
    @Override
    public void accept(String s) throws Exception {
        Log.i(TAG, s);
    }
};
observable.subscribe(onNextConsumer);

技术分享

例子:来一个简单的例子来了解事件的产生到消费、订阅的过程:从res/mipmap中取出一张图片,显示在ImageView上。

final ImageView ivLogo = (ImageView) findViewById(R.id.logo);
Observable.create(new ObservableOnSubscribe<Drawable>() {


    @Override
    public void subscribe(ObservableEmitter<Drawable> e) throws Exception {
        // 从mipmap取出一张图片作为Drawable对象
        Drawable drawable = ContextCompat.getDrawable(MainActivity.this, R.mipmap.ic_launcher);

        // 把Drawable对象发送出去
        e.onNext(drawable);
        e.onComplete();
    }
}).subscribe(new Observer<Drawable>() {
    @Override
    public void onSubscribe(Disposable d) {

    }

    @Override
    public void onNext(Drawable value) {
        ivLogo.setImageDrawable(value);
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {

    }
});

这样就完成了一个简单的图片的设置
技术分享

ObservableEmitter和Disposable

ObservableEmitter: ObservableEmitter可以理解为发射器,这个就是用来发出事件的,它可以发出三种类型的事件,通过调用emitter的onNext(T value)、onComplete()和onError(Throwable error)就可以分别发出next事件、complete事件和error事件。
注意:但是事件的发送是有一定的规定的,就好比寄快递也要有一定要求,不是什么都能寄的:

1.被观察者可以发送无限个onNext, 观察者也可以接收无限个onNext.
2.当Observable发送了一个onComplete后, Observable的onComplete之后的事件将会继续发送, 而Observer收到onComplete事件之后将不再继续接收事件.
3.当Observable发送了一个onError后, Observable中onError之后的事件将继续发送, 而Observer收到onError事件之后将不再继续接收事件.
4.Observable可以不发送onComplete或onError.
5.最为关键的是onComplete和onError必须唯一并且互斥, 即不能发多个onComplete, 也不能发多个onError, 也不能先发一个onComplete, 然后再发一个onError, 反之亦然

注: 关于onComplete和onError唯一并且互斥这一点, 是需要自行在代码中进行控制, 如果你的代码逻辑中违背了这个规则, 并不一定会导致程序崩溃. 比如发送多个onComplete是可以正常运行的, 依然是收到第一个onComplete就不再接收了, 但若是发送多个onError, 则收到第二个onError事件会导致程序会崩溃.当我们写多个onComplete时,不会报错

当我们又有onComplete又有onError时,发现在调用onComplete后会爆出异常

Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
        e.onNext(5);
        e.onNext(6);
        e.onNext(7);
        e.onNext(8);

        e.onError(new NullPointerException());
        e.onComplete();
    }
})

这是onComplete在onError前调用的情况
技术分享

当我们写两个onError时,会先接受前面的所有事件,最后才报错
技术分享

介绍了ObservableEmitter, 接下来介绍Disposable, 当调用dispose()方法时, 它就会将观察者和被观察者的联系切断, 从而导致观察者收不到事件.

注意: 调用dispose()并不会导致Observable不再继续发送事件, Observable会继续发送剩余的事件.
看一下下面这个例子:

Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            Log.d(TAG, "emitter 1");
            emitter.onNext(1);
            Log.d(TAG, "emitter 2");
            emitter.onNext(2);
            Log.d(TAG, "emitter 3");
            emitter.onNext(3);
            Log.d(TAG, "emitter complete");
            emitter.onComplete();
            Log.d(TAG, "emitter 4");
            emitter.onNext(4);
        }
    }).subscribe(new Observer<Integer>() {
        private Disposable mDisposable;
        private int i;

        @Override
        public void onSubscribe(Disposable d) {
            Log.d(TAG, "subscribe");
            mDisposable = d;
        }

        @Override
        public void onNext(Integer value) {
            Log.d(TAG, "onNext: " + value);
            i++;
            if (i == 2) {
                Log.d(TAG, "dispose");
                mDisposable.dispose();
                Log.d(TAG, "isDisposed : " + mDisposable.isDisposed());
            }
        }

        @Override
        public void one rror(Throwable e) {
            Log.d(TAG, "error");
        }

        @Override
        public void onComplete() {
            Log.d(TAG, "complete");
        }
    });
}

打印如下:
技术分享

在收到onNext 2这个事件后, 我们中断了联系, 但是Observable
仍然发送了3, complete, 4这几个事件, 而且Observable
并没有因为发送了onComplete而停止. 同时可以看到Observer的onSubscribe()方法是最先调用的.

subscribe()有多个重载的方法:

 public final Disposable subscribe() {}
 public final Disposable subscribe(Consumer<? super T> onNext) {}
 public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> one rror) {} 
 public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> one rror, Action onComplete) {}
 public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> one rror, Action onComplete, Consumer<? super Disposable> onSubscribe) {}
 public final void subscribe(Observer<? super T> observer) {}

?不带任何参数的subscribe() 表示Observer不关心任何事件,Observable发送什么数据都随你
?带有一个Consumer参数的方法表示Observer只关心onNext事件, 其他的事件我假装没看见, 因此我们如果只需要onNext事件可以这么写:

Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            Log.d(TAG, "emitter 1");
            emitter.onNext(1);
            Log.d(TAG, "emitter 2");
            emitter.onNext(2);
            Log.d(TAG, "emitter 3");
            emitter.onNext(3);
            Log.d(TAG, "emitter complete");
            emitter.onComplete();
            Log.d(TAG, "emitter 4");
            emitter.onNext(4);
        }
    }).subscribe(new Consumer<Integer>() {
        @Override
        public void accept(Integer integer) throws Exception {
            Log.d(TAG, "onNext: " + integer);
        }
    });

其他方式也是类似的方式

线程调度

正常情况下, Observer和Observable是工作在同一个线程中的, 也就是说Observable在哪个线程发事件, Observer就在哪个线程接收事件.
RxJava中, 当我们在主线程中去创建一个Observable来发送事件, 则这个Observable默认就在主线程发送事件.
当我们在主线程去创建一个Observer来接收事件, 则这个Observer默认就在主线程中接收事件,但其实在现实工作中我们更多的是需要进行线程切换的,最常见的例子就是在子线程中请求网络数据,在主线程中进行展示

要达到这个目的, 我们需要先改变Observable发送事件的线程, 让它去子线程中发送事件, 然后再改变Observer的线程, 让它去主线程接收事件. 通过RxJava内置的线程调度器可以很轻松的做到这一点. 接下来看一段代码:

Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            Log.d(TAG, "Observable thread is : " + Thread.currentThread().getName());
            Log.d(TAG, "emitter 1");
            emitter.onNext(1);
        }
    });

    Consumer<Integer> consumer = new Consumer<Integer>() {
        @Override
        public void accept(Integer integer) throws Exception {
            Log.d(TAG, "Observer thread is :" + Thread.currentThread().getName());
            Log.d(TAG, "onNext: " + integer);
        }
    };

    observable.subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(consumer);
}

技术分享
可以看到, observable发送事件的线程的确改变了, 是在一个叫 RxNewThreadScheduler-1的线程中发送的事件, 而consumer 仍然在主线程中接收事件, 这说明我们的目的达成了, 接下来看看是如何做到的.

这段代码只不过是增加了两行代码:

.subscribeOn(Schedulers.newThread())
        .observeOn(AndroidSchedulers.mainThread())

简单的来说, subscribeOn() 指定的是Observable发送事件的线程, observeOn() 指定的是Observer接收事件的线程.
多次指定Observable的线程只有第一次指定的有效, 也就是说多次调用subscribeOn() 只有第一次的有效, 其余的会被忽略.
多次指定Observer的线程是可以的, 也就是说每调用一次observeOn() , Observer的线程就会切换一次.例如:

observable.subscribeOn(Schedulers.newThread())
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .observeOn(Schedulers.io())
        .subscribe(consumer);

这段代码中指定了两次上游发送事件的线程, 分别是newThread和IO线程, 下游也指定了两次线程,分别是main和IO线程. 运行结果为:
技术分享

可以看到, Observable虽然指定了两次线程, 但只有第一次指定的有效, 依然是在RxNewThreadScheduler线程中, 而Observer则跑到了RxCachedThreadScheduler 中, 这个CacheThread其实就是IO线程池中的一个.

在 RxJava 中,提供了一个名为 Scheduler 的线程调度器,RxJava 内部提供了4个调度器,分别是:

Schedulers.io(): I/O 操作(读写文件、数据库、网络请求等),与newThread()差不多,区别在于io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 效率比 newThread() 更高。值得注意的是,在 io() 下,不要进行大量的计算,以免产生不必要的线程;

Schedulers.newThread(): 开启新线程操作;

Schedulers.immediate(): 默认指定的线程,也就是当前线程;

Schedulers.computation():计算所使用的调度器。这个计算指的是 CPU 密集型计算,即不会被 I/O等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。值得注意的是,不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU;

AndroidSchedulers.mainThread(): RxJava 扩展的 Android 主线程;

这些内置的Scheduler已经足够满足我们开发的需求, 因此我们应该使用内置的这些选项, 在RxJava内部使用的是线程池来维护这些线程, 所有效率也比较高.

例子:还是用之前设置图片的例子,这次我们在子线程中进行网络请求获取图片,在主线程中对图片进行设置

final ImageView ivLogo = (ImageView) findViewById(R.id.logo);
Observable.create(new ObservableOnSubscribe<Drawable>() {


    @Override
    public void subscribe(ObservableEmitter<Drawable> e) throws Exception {
        try {
            Drawable drawable = Drawable.createFromStream(new URL("https://ss2.baidu.com/6ONYsjip0QIZ8tyhnq/it/u=2502144641,437990411&fm=80&w=179&h=119&img.JPEG").openStream(), "src");
            e.onNext(drawable);
        } catch (IOException error) {
            e.onError(error);
        }
    }
})// 指定 subscribe() 所在的线程,也就是上面subscribe()方法调用的线程
        .subscribeOn(Schedulers.io())
        // 指定 Observer 回调方法所在的线程,也就是onCompleted, one rror, onNext回调的线程
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Observer<Drawable>() {
    @Override
    public void onSubscribe(Disposable d) {

    }

    @Override
    public void onNext(Drawable value) {
        ivLogo.setImageDrawable(value);
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {

    }
});

这段代码就做一件事,在 io 线程加载一张网络图片,加载完毕之后在主线程中显示到ImageView上。

操作符的使用

在了解基本知识和线程调度后,我们来学习一下RxJava各种神奇的操作符

Map
Map是RxJava中最简单的一个变换操作符了, 它的作用就是对Observable发送的每一个事件应用一个函数, 使得每一个事件都按照指定的函数去变化.

Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onNext(1);
            emitter.onNext(2);
            emitter.onNext(3);
        }
    }).map(new Function<Integer, String>() {
        @Override
        public String apply(Integer integer) throws Exception {
            return "This is result " + integer;
        }
    }).subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            Log.d(TAG, s);
        }
    });

在Observable我们发送的是数字类型, 而在Observer我们接收的是String类型, 中间起转换作用的就是Map操作符, 运行结果为:
技术分享

通过Map, 可以将Observable发来的事件转换为任意的类型, 可以是一个Object, 也可以是一个集合,功能非常强大

例子:还是以图片加载的例子,我们传进来一个图片的路径,然后通过Map进行转换成drawble再发送给观察者

final ImageView ivLogo = (ImageView) findViewById(R.id.logo);
Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> e) throws Exception {
        e.onNext("https://ss2.baidu.com/-vo3dSag_xI4khGko9WTAnF6hhy/image/h%3D200/sign=4db5130a073b5bb5a1d727fe06d2d523/cf1b9d16fdfaaf51965f931e885494eef11f7ad6.jpg");
    }
}).map(new Function<String, Drawable>() {
    @Override
    public Drawable apply(String url) throws Exception {
        try {
            Drawable drawable = Drawable.createFromStream(new URL(url).openStream(), "src");
            return drawable;
        } catch (IOException e) {

        }
        return null;
    }
})  .subscribeOn(Schedulers.io())
        // 指定 Observer 回调方法所在的线程,也就是onCompleted, one rror, onNext回调的线程
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Observer<Drawable>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Drawable value) {
                if (value != null) {
                    ivLogo.setImageDrawable(value);
                }
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, e.toString());
            }

            @Override
            public void onComplete() {

            }
        });

效果如下:
技术分享

经过改写代码后,有什么变化呢? Observable 创建了一个 String 事件,也就是产生一个url,通过 map 操作符进行变换,返回Drawable对象,这个变换指的就是通过url进行网络图片请求,返回一个Drawable。所以简单的来说就是把String事件,转换为Drawable事件。逻辑表示就是
Observable –> map变换 –> Observable

FlatMap
FlatMap将一个发送事件的Observable变换为多个发送事件的Observables,然后将它们发射的事件合并后放进一个单独的Observable里.

Observable每发送一个事件, flatMap都将对其进行转换, 然后发送转换之后的新的事件, Observer接收到的就是转换后发送的数据. 这里需要注意的是, flatMap并不保证事件的顺序, 如果需要保证顺序则需要使用concatMap.

 Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onNext(1);
            emitter.onNext(2);
            emitter.onNext(3);
        }
    }).flatMap(new Function<Integer, ObservableSource<String>>() {
        @Override
        public ObservableSource<String> apply(Integer integer) throws Exception {
            final List<String> list = new ArrayList<>();
            for (int i = 0; i < 3; i++) {
                list.add("I am value " + integer);
            }
            return Observable.fromIterable(list).delay(10, TimeUnit.MILLISECONDS);
        }
    }).subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            Log.d(TAG, s);
        }
    });

效果如下:
技术分享

Map 与 flatMap 这两个操作符的共同点在于,他们都是把一个对象转换为另一个对象,但须注意以下这些特点:

1.flatMap 返回的是一个Observable对象,而 map 返回的是一个普通转换后的对象;
2.flatMap 返回的Observable对象并不是直接发送到Subscriber的回调中,而是重新创建一个Observable对象,并激活这个Observable对象,使之开始发送事件;而 map 变换后返回的对象直接发到Subscriber回调中;
3.flatMap 变换后产生的每一个Observable对象发送的事件,最后都汇入同一个Observable,进而发送给Subscriber回调;
4.map返回类型 与 flatMap 返回的Observable事件类型,可以与原来的事件类型一样;
5.可以对一个Observable多次使用 map 和 flatMap;

鉴于 flatMap 自身强大的功能,这常常被用于 嵌套的异步操作,例如嵌套网络请求。传统的嵌套请求,一般都是在前一个请求的 onSuccess() 回调里面发起新的请求,这样一旦嵌套多个的话,缩进就是大问题了,而且严重的影响代码的可读性。而RxJava嵌套网络请求仍然通过链式结构,保持代码逻辑的清晰!举个栗子:

public interface Api {
    @GET
    Observable<LoginResponse> login(@Body LoginRequest request);

    @GET
    Observable<RegisterResponse> register(@Body RegisterRequest request);

}

接着创建一个Retrofit客户端:

private static Retrofit create() {
    OkHttpClient.Builder builder = new OkHttpClient().newBuilder();
    builder.readTimeout(10, TimeUnit.SECONDS);
    builder.connectTimeout(9, TimeUnit.SECONDS);

    if (BuildConfig.DEBUG) {
        HttpLoggingInterceptor interceptor = new HttpLoggingInterceptor();
        interceptor.setLevel(HttpLoggingInterceptor.Level.BODY);
        builder.addInterceptor(interceptor);
    }

    return new Retrofit.Builder().baseUrl(ENDPOINT)
            .client(builder.build())
            .addConverterFactory(GsonConverterFactory.create())
            .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
            .build();
}

发起请求就很简单了:

Api api = retrofit.create(Api.class);
api.login(request)
        .subscribeOn(Schedulers.io())               //在IO线程进行网络请求
        .observeOn(AndroidSchedulers.mainThread())  //回到主线程去处理请求结果
        .subscribe(new Observer<LoginResponse>() {
            @Override
            public void onSubscribe(Disposable d) {}

            @Override
            public void onNext(LoginResponse value) {}

            @Override
            public void onError(Throwable e) {
                Toast.makeText(mContext, "登录失败", Toast.LENGTH_SHORT).show();
            }

            @Override
            public void onComplete() {
                Toast.makeText(mContext, "登录成功", Toast.LENGTH_SHORT).show();
            }
        });

concatMap
这里也简单说一下concatMap吧, 它和flatMap的作用几乎一模一样, 只是它的结果是严格按照上游发送的顺序来发送的, 来看个代码吧:

Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onNext(1);
            emitter.onNext(2);
            emitter.onNext(3);
        }
    }).concatMap(new Function<Integer, ObservableSource<String>>() {
        @Override
        public ObservableSource<String> apply(Integer integer) throws Exception {
            final List<String> list = new ArrayList<>();
            for (int i = 0; i < 3; i++) {
                list.add("I am value " + integer);
            }
            return Observable.fromIterable(list).delay(10,TimeUnit.MILLISECONDS);
        }
    }).subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            Log.d(TAG, s);
        }
    });

只是将之前的flatMap改为了concatMap, 其余原封不动, 运行结果如下:
技术分享

可以看到, 结果仍然是有序的.

ZIP
Zip通过一个函数将多个Observable发送的事件结合到一起,然后发送这些组合到一起的事件. 它按照严格的顺序应用这个函数。它只发射与发射数据项最少的那个Observable一样多的数据。

Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            Log.d(TAG, "emitter 1");
            emitter.onNext(1);
            Log.d(TAG, "emitter 2");
            emitter.onNext(2);
            Log.d(TAG, "emitter 3");
            emitter.onNext(3);
            Log.d(TAG, "emitter 4");
            emitter.onNext(4);
            Log.d(TAG, "emit complete1");
            emitter.onComplete();
        }
    });

    Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> emitter) throws Exception {
            Log.d(TAG, "emitter A");
            emitter.onNext("A");
            Log.d(TAG, "emitter B");
            emitter.onNext("B");
            Log.d(TAG, "emitter C");
            emitter.onNext("C");
            Log.d(TAG, "emitter complete2");
            emitter.onComplete();
        }
    });

    Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
        @Override
        public String apply(Integer integer, String s) throws Exception {
            return integer + s;
        }
    }).subscribe(new Observer<String>() {
        @Override
        public void onSubscribe(Disposable d) {
            Log.d(TAG, "onSubscribe");
        }

        @Override
        public void onNext(String value) {
            Log.d(TAG, "onNext: " + value);
        }

        @Override
        public void one rror(Throwable e) {
            Log.d(TAG, "onError");
        }

        @Override
        public void onComplete() {
            Log.d(TAG, "onComplete");
        }
    });

我们分别创建了observable, 一个发送1,2,3,4,Complete, 另一个发送A,B,C,Complete, 接着用Zip把发出的事件组合, 来看看运行结果吧:
技术分享
观察发现observable1发送事件后,observable2才发送
这是因为我们两个observable都是运行在同一个线程里, 同一个线程里执行代码肯定有先后顺序呀.

Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            Log.d(TAG, "emit 1");
            emitter.onNext(1);
            Thread.sleep(1000);

            Log.d(TAG, "emit 2");
            emitter.onNext(2);
            Thread.sleep(1000);

            Log.d(TAG, "emit 3");
            emitter.onNext(3);
            Thread.sleep(1000);

            Log.d(TAG, "emit 4");
            emitter.onNext(4);
            Thread.sleep(1000);

            Log.d(TAG, "emit complete1");
            emitter.onComplete();
        }
    }).subscribeOn(Schedulers.io());

    Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> emitter) throws Exception {
            Log.d(TAG, "emit A");
            emitter.onNext("A");
            Thread.sleep(1000);

            Log.d(TAG, "emit B");
            emitter.onNext("B");
            Thread.sleep(1000);

            Log.d(TAG, "emit C");
            emitter.onNext("C");
            Thread.sleep(1000);

            Log.d(TAG, "emit complete2");
            emitter.onComplete();
        }
    }).subscribeOn(Schedulers.io());

    Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
        @Override
        public String apply(Integer integer, String s) throws Exception {
            return integer + s;
        }
    }).subscribe(new Observer<String>() {
        @Override
        public void onSubscribe(Disposable d) {
            Log.d(TAG, "onSubscribe");
        }

        @Override
        public void onNext(String value) {
            Log.d(TAG, "onNext: " + value);
        }

        @Override
        public void one rror(Throwable e) {
            Log.d(TAG, "onError");
        }

        @Override
        public void onComplete() {
            Log.d(TAG, "onComplete");
        }
    });

好了, 这次我们让事件都在IO线程里发送事件, 再来看看运行结果:
技术分享

第一个observable明明发送了四个数据+一个Complete, 之前明明还有的, 为啥到这里没了呢?
这是因为我们之前说了, zip发送的事件数量跟observable中发送事件最少的那一个的事件数量是有关的, 在这个例子里我们observable2只发送了三个事件然后就发送了Complete, 这个时候尽管observable1还有事件4 和事件Complete 没有发送, 但是它们发不发送还有什么意义呢?

from

在RxJava的from操作符到2.0已经被拆分成了3个,fromArray, fromIterable, fromFuture接收一个集合作为输入,然后每次输出一个元素给subscriber。

Observable.fromArray(new Integer[]{1, 2, 3, 4, 5}).subscribe(new Consumer<Integer>() {
    @Override
    public void accept(Integer integer) throws Exception {
        Log.i(TAG, "number:" + integer);
    }
});

技术分享

注意:如果from()里面执行了耗时操作,即使使用了subscribeOn(Schedulers.io()),仍然是在主线程执行,可能会造成界面卡顿甚至崩溃,所以耗时操作还是使用Observable.create(…);

filter
条件过滤,去除不符合某些条件的事件。举个栗子:

Observable.fromArray(new Integer[]{1, 2, 3, 4, 5})
       .filter(new Predicate<Integer>() {
           @Override
           public boolean test(Integer integer) throws Exception {
               // 偶数返回true,则表示剔除奇数,留下偶数
               return integer % 2 == 0;

           }
       }).subscribe(new Consumer<Integer>() {
    @Override
    public void accept(Integer integer) throws Exception {
        Log.i(TAG, "number:" + integer);
    }
});

技术分享

take
最多保留的事件数。

 Observable.just("1", "2", "6", "3", "4", "5").take(2).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(String value) {
                Log.d(TAG,value);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

技术分享
可以发现我们发送了6个String,最后只打印了前两个,这就是take过滤掉的结果

doOnNext
如果你想在处理下一个事件之前做某些事,就可以调用该方法

Observable.fromArray(new Integer[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}).filter(new Predicate<Integer>() {
    @Override
    public boolean test(Integer integer) throws Exception {
        // 偶数返回true,则表示剔除奇数
        return integer % 2 == 0;
    }
})// 最多保留三个,也就是最后剩三个偶数
        .take(3).doOnNext(new Consumer<Integer>() {
    @Override
    public void accept(Integer integer) throws Exception {
        // 在输出偶数之前输出它的hashCode
        Log.i(TAG, "hahcode = " + integer.hashCode() + "");
    }
}).subscribe(new Observer<Integer>() {
    @Override
    public void onSubscribe(Disposable d) {

    }

    @Override
    public void onNext(Integer value) {
        Log.i(TAG, "number = " + value);
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {

    }
});

技术分享

debounce
debounce也是用于事件的过滤,可以指定过滤事件的时间间隔

Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
        int i = 0;
        int[] times = new int[]{100, 1000};
        while (true) {
            i++;
            if (i >= 100)
                break;
            e.onNext(i);
            try {
                // 注意!!!!
                // 当i为奇数时,休眠1000ms,然后才发送i+1,这时i不会被过滤掉
                // 当i为偶数时,只休眠100ms,便发送i+1,这时i会被过滤掉
                Thread.sleep(times[i % 2]);
            } catch (InterruptedException error) {
                error.printStackTrace();
            }
        }
        e.onComplete();
    }
})// 间隔400ms以内的事件将被丢弃
        .debounce(400, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Observer<Integer>() {


            @Override
            public void onError(Throwable e) {
                Log.e(TAG, e.toString());
            }

            @Override
            public void onComplete() {
                Log.i(TAG, "complete");
            }

            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Integer integer) {
                Log.i(TAG, "integer = " + integer);
            }
        });

技术分享

compose
与 flatMap 类似,都是进行变换,返回Observable对象,激活并发送事件。
1.compose 是唯一一个能够从数据流中得到原始Observable的操作符,所以,那些需要对整个数据流产生作用的操作(比如,subscribeOn()和observeOn())需要使用 compose 来实现。相较而言,如果在flatMap()中使用subscribeOn()或者observeOn(),那么它仅仅对在flatMap 中创建的Observable起作用,而不会对剩下的流产生影响。这样就可以简化subscribeOn()以及observeOn()的调用次数了。
2.compose 是对 Observable 整体的变换,换句话说, flatMap 转换Observable里的每一个事件,而 compose 转换的是整个Observable数据流。
3.flatMap 每发送一个事件都创建一个 Observable,所以效率较低。而 compose 操作符只在主干数据流上执行操作。
4.建议使用 compose 代替 flatMap。

First
只发送符合条件的第一个事件。可以与contact操作符,做网络缓存。
例子:依次检查Disk与Network,如果Disk存在缓存,则不做网络请求,否则进行网络请求。

// 从缓存获取
        Observable<BookList> fromDisk = Observable.create(new Observable.OnSubscribe<BookList>() {
            @Override
            public void call(Subscriber<? super BookList> subscriber) {
                BookList list = getFromDisk();
                if (list != null) {
                    subscriber.onNext(list);
                } else {
                    subscriber.onCompleted();
                }
            }
        });

// 从网络获取
        Observable<BookList> fromNetWork = bookApi.getBookDetailDisscussionList();

        Observable.concat(fromDisk, fromNetWork)
                // 如果缓存不为null,则不再进行网络请求。反之
                .first()
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<BookList>() {
                    @Override
                    public void onCompleted() {

                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onNext(BookList discussionList) {

                    }
                });

Single
Single与Observable类似,相当于是他的精简版。订阅者回调的不是OnNext/OnError/onCompleted,而是回调OnSuccess/OnError。

Single.create(new SingleOnSubscribe<Object>() {
    @Override
    public void subscribe(SingleEmitter<Object> e) throws Exception {
        e.onSuccess("hello world");
    }
}).subscribe(new SingleObserver<Object>() {
    @Override
    public void onSubscribe(Disposable d) {

    }

    @Override
    public void onSuccess(Object value) {
        Log.i(TAG, value.toString());
    }

    @Override
    public void onError(Throwable e) {

    }
});

技术分享

可以配合debounce,避免SearchEditText频繁请求。

final Subject subject = PublishSubject.create();

subject.debounce(400, TimeUnit.MILLISECONDS)
        .subscribe(new Observer() {


            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }

            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Object o) {
                // request
            }
        });

edittext.addTextChangedListener(new TextWatcher() {

    @Override
    public void beforeTextChanged(CharSequence s, int start, int count, int after) { }

    @Override
    public void onTextChanged(CharSequence s, int start, int before, int count) {
        subject.onNext(s.toString());
    }

    @Override
    public void afterTextChanged(Editable s) { }
});

RxJava的一些使用场景

场景1:
取数据,首先检查内存是否有缓存
然后检查文件缓存中是否有
最后才从网络中取
前面任何一个条件满足,就不会执行后面的

final Observable<String> memory = Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> emitter) throws Exception {
        if (memoryCache != null) {
            emitter.onNext(memoryCache);
        } else {
            emitter.onComplete();
        }
    }
});

final Observable<String> disk  = Observable.create(new ObservableOnSubscribe<String>() {
    String cachePref = getSharedPreferences("rxdeni",MODE_PRIVATE).getString("cache",null);
    @Override
    public void subscribe(ObservableEmitter<String> emitter) throws Exception {
        if (cachePref != null) {
            emitter.onNext(cachePref);
        } else {
            emitter.onComplete();
        }
    }
});

Observable<String> network = Observable.just("network");

//主要就是靠concat operator来实现
Observable.concat(memory, disk, network).firstElement()

        .subscribeOn(Schedulers.newThread())
        .subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                System.out.println("--------------subscribe: " + s);
            }
        });

场景2:界面需要等到多个接口并发取完数据,再更新

Observable<String> observable1 = Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> e) throws Exception {
            e.onNext("haha");
        }
    }).subscribeOn(Schedulers.newThread());

    Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> e) throws Exception {
            e.onNext("hehe");
        }
    }).subscribeOn(Schedulers.newThread());


    Observable.merge(observable1, observable2)
            .subscribeOn(Schedulers.newThread())
            .subscribe(new Observer<String>() {
                @Override
                public void onSubscribe(Disposable d) {

                }

                @Override
                public void onNext(String value) {
                    Log.d(TAG,value);
                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onComplete() {

                }
            });

场景3:界面按钮需要防止连续点击的情况

RxView.clicks(button)
        .throttleFirst(1, TimeUnit.SECONDS)
        .subscribe(new Observer<Object>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(Object o) {
                Log.i(TAG, "do clicked!");
            }
        });

场景4:响应式的界面
比如勾选了某个checkbox,自动更新对应的preference

SharedPreferences preferences = PreferenceManager.getDefaultSharedPreferences(this);
RxSharedPreferences rxPreferences = RxSharedPreferences.create(preferences);

Preference<Boolean> checked = rxPreferences.getBoolean("checked", true);

CheckBox checkBox = (CheckBox) findViewById(R.id.cb_test);
RxCompoundButton.checkedChanges(checkBox)
        .subscribe(checked.asAction());

场景5:复杂的数据变换

Observable.just("1", "2", "6", "3", "4", "5")
        .map(new Function<String, Integer>() {
            @Override
            public Integer apply(String s) throws Exception {
                return Integer.parseInt(s);
            }
        }).filter(new Predicate<Integer>() {
    @Override
    public boolean test(Integer integer) throws Exception {
        return integer.intValue()%2 == 0;
    }
}).distinct().take(2).reduce(new BiFunction<Integer, Integer, Integer>() {
    @Override
    public Integer apply(Integer integer, Integer integer2) throws Exception {
        return integer.intValue() + integer2.intValue();
    }
}).subscribe(new Consumer<Integer>() {
    @Override
    public void accept(Integer integer) throws Exception {
       Log.d(TAG,integer.toString());
    }
});
<script type="text/javascript"> $(function () { $(‘pre.prettyprint code‘).each(function () { var lines = $(this).text().split(‘\n‘).length; var $numbering = $(‘
    ‘).addClass(‘pre-numbering‘).hide(); $(this).addClass(‘has-numbering‘).parent().append($numbering); for (i = 1; i <= lines; i++) { $numbering.append($(‘
  • ‘).text(i)); }; $numbering.fadeIn(1700); }); }); </script>

    一篇博客让你了解RxJava