首页 > 代码库 > java框架---->RxJava的使用(一)

java框架---->RxJava的使用(一)

  RxJava是响应式程序设计的一种实现。在响应式程序设计中,当数据到达的时候,消费者做出响应。响应式编程可以将事件传递给注册了的observer。今天我们就来学习一下rxJava,并分析一下它源码感受一下它的观察者模式。

RxJava的简单使用

一、mavan的pom.xml中增加rxjava的依赖

这里我们用的是rxjava1.3.0,目前最新的已经更新到2了。

<dependency>    <groupId>io.reactivex</groupId>    <artifactId>rxjava</artifactId>    <version>1.3.0</version></dependency>

 

二、测试的用类

import rx.Observable;import rx.Subscriber;/** * @author huhx */public class RxJavaTest {    public static void main(String[] args) {        Observable<String> observable = Observable.just("One", "Two", "Three");        observable.subscribe(new Subscriber<String>() {            @Override            public void onCompleted() {                System.out.println("onCompleted");            }            @Override            public void one rror(Throwable item) {                System.out.println("onError");            }            @Override            public void onNext(String item) {                System.out.println("Item is " + item);            }        });    }}

 

三、打印的结果如下:

Item is OneItem is TwoItem is ThreeonCompleted

 

RxJava源码的简单分析

根据上述的代码,我们简单分析一下程序的流程。

一、首先Observable.just("One", "Two", "Three")代码:

just是一个工厂方法,用心构建一个Observable对象。

public static <T> Observable<T> just(T t1, T t2, T t3) {    return from((T[])new Object[] { t1, t2, t3 });}

from方法的代码如下:

public static <T> Observable<T> from(T[] array) {    int n = array.length;    if (n == 0) {        return empty();    } else    if (n == 1) {        return just(array[0]);    }    return unsafeCreate(new OnSubscribeFromArray<T>(array));}

 

二、我们重点是看unsafeCreate方法:

public static <T> Observable<T> unsafeCreate(OnSubscribe<T> f) {    return new Observable<T>(RxJavaHooks.onCreate(f));}

对于RxJavaHooks.onCreate()方法,第一次会先执行RxJavaHooks静态块的代码。

static {    init(); // 初始化了很多RxJavaHooks跟事件有着的变量}

onCreate方法中,这个没有怎么看懂后续再会。

public static <T> Observable.OnSubscribe<T> onCreate(Observable.OnSubscribe<T> onSubscribe) {    Func1<Observable.OnSubscribe, Observable.OnSubscribe> f = onObservableCreate;    if (f != null) {        return f.call(onSubscribe);    }    return onSubscribe;}

 

三、对于observable.subscribe()代码

public final Subscription subscribe(Subscriber<? super T> subscriber) {    return Observable.subscribe(subscriber, this);}

Observable的subscribe方法代码如下:

 1 static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) { 2     if (subscriber == null) { 3         throw new IllegalArgumentException("subscriber can not be null"); 4     } 5     if (observable.onSubscribe == null) { 6         throw new IllegalStateException("onSubscribe function can not be null."); 7     } 8  9     subscriber.onStart();10 11     if (!(subscriber instanceof SafeSubscriber)) {12         subscriber = new SafeSubscriber<T>(subscriber);13     }14 15     try {16         RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);17         return RxJavaHooks.onObservableReturn(subscriber);18     } catch (Throwable e) {19         Exceptions.throwIfFatal(e);20         if (subscriber.isUnsubscribed()) {21             RxJavaHooks.onError(RxJavaHooks.onObservableError(e));22         } else {23             try {24                 subscriber.onError(RxJavaHooks.onObservableError(e));25             } catch (Throwable e2) {26                 Exceptions.throwIfFatal(e2);27                 RuntimeException r = new one rrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to one rror.", e2);28                 RxJavaHooks.onObservableError(r);29                 throw r; // NOPMD30             }31         }32         return Subscriptions.unsubscribed();33     }34 }

 

四、整个的正常流程会走到16行的代码,这个是我们重点的分析地方。如果是异常情况,则只会执行onError方法

对于这个例子中,RxJavaHooks.onObservableStart(observable, observable.onSubscribe)得到的结果OnSubscribeFromArray这个类。调用它的call方法。

public void call(Subscriber<? super T> child) {    child.setProducer(new FromArrayProducer<T>(child, array));}

setProducer方法执行的代码如下:

 1 public void setProducer(Producer p) { 2     long toRequest; 3     boolean passToSubscriber = false; 4     synchronized (this) { 5         toRequest = requested; 6         producer = p; 7         if (subscriber != null) { 8             // middle operator ... we pass through unless a request has been made 9             if (toRequest == NOT_SET) {10                 // we pass through to the next producer as nothing has been requested11                 passToSubscriber = true;12             }13         }14     }15     // do after releasing lock16     if (passToSubscriber) {17         subscriber.setProducer(producer);18     } else {19         // we execute the request with whatever has been requested (or Long.MAX_VALUE)20         if (toRequest == NOT_SET) {21             producer.request(Long.MAX_VALUE);22         } else {23             producer.request(toRequest);24         }25     }26 }

这里会走到21行的代码,跟进去FromArrayProducer的request方法:

public void request(long n) {    if (n < 0) {        throw new IllegalArgumentException("n >= 0 required but it was " + n);    }    if (n == Long.MAX_VALUE) {        if (BackpressureUtils.getAndAddRequest(this, n) == 0) {            fastPath();        }    } else    if (n != 0) {        if (BackpressureUtils.getAndAddRequest(this, n) == 0) {            slowPath(n);        }    }}

对于fastPath()的代码如下:

void fastPath() {    final Subscriber<? super T> child = this.child;    for (T t : array) {        if (child.isUnsubscribed()) {            return;        }        child.onNext(t);    }    if (child.isUnsubscribed()) {        return;    }    child.onCompleted();}

执行onNext()方法,其中t就是Observable.just()方法的参数。child就是observable.subscribe的定义的参数。

 

友情链接

 

java框架---->RxJava的使用(一)