首页 > 代码库 > Android开发之《RXJava的简单实现》

Android开发之《RXJava的简单实现》

import android.util.Log;
import rx.Observable;
import rx.Subscriber;
import rx.functions.Action1;

public class RXJavaDemo {
    private static final String TAG = RXJavaDemo.class.getSimpleName();

    private int count = 0;

    public RXJavaDemo() {
    }

    public void call() {
        new Thread(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    mObservable.subscribe(mSubscriber);
                    mObservable.subscribe(action1);
                    Observable.just("just Object").subscribe(action1);
                }
            }
        }).start();
    }

    private Observable<String> mObservable = Observable.create(
            new Observable.OnSubscribe<String>() {
                @Override
                public void call(Subscriber<? super String> subscriber) {
                    subscriber.onNext("1");
                    subscriber.onNext("2");
                    subscriber.onNext("3");
                    subscriber.onCompleted();
                }
            });

    private Subscriber<String> mSubscriber = new Subscriber<String>() {
        @Override
        public void onNext(String s) {
            Log.v(TAG, "onNext, string : " + s);
            Log.v(TAG, "onNext, count : " + count);
            count++;

            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        @Override
        public void onCompleted() {
            Log.v(TAG, "onCompleted");
        }

        @Override
        public void one rror(Throwable e) {
            Log.v(TAG, "onError, e : " + e.toString());
        }
    };

    private Action1<String> action1 = new Action1() {
        @Override
        public void call(Object o) {
            if (o == null) {
                Log.v(TAG, "Action1, object is null");
                return;
            }

            Log.v(TAG, "Acition1, o : " + ((String) o));
        }
    };
}

  

    compile ‘io.reactivex:rxjava:1.0.9‘
    compile ‘io.reactivex:rxandroid:0.24.0‘
    compile ‘com.squareup.retrofit:retrofit:1.9.0‘

 

RxJava提供四种不同的Subject:PublishSubject、BehaviorSubject、、ReplaySubject.、AsyncSubject

BehaviorSubject, 会首先向他的订阅者发送截至订阅前最新的一个数据对象(或初始值),然后正常发送订阅后的数据流。

ReplaySubject, 会缓存它所订阅的所有数据,向任意一个订阅它的观察者重发。

AsyncSubject, 当Observable完成时只会发布最后一个数据给已经订阅的每一个观察者。

PublishSubject, 没有发送数据,观察者只能等待,没有线程阻塞,没有资源消耗。在调用publishSubject.onNext时,才发送消息。 发送消息结束以后,publishSubject并没有结束,观察者等待消息再一次的发送。如果想关闭publishSubject,publishSubject需调用publishSubject.onCompleted方法关闭。此时,publishSubject再发送消息,观察者不能收到发送的消息。

Android开发之《RXJava的简单实现》