首页 > 代码库 > java框架---->RxJava的使用(一)
java框架---->RxJava的使用(一)
RxJava是响应式程序设计的一种实现。在响应式程序设计中,当数据到达的时候,消费者做出响应。响应式编程可以将事件传递给注册了的observer。今天我们就来学习一下rxJava,并分析一下它源码感受一下它的观察者模式。
RxJava的简单使用
一、mavan的pom.xml中增加rxjava的依赖
这里我们用的是rxjava1.3.0,目前最新的已经更新到2了。
<dependency> <groupId>io.reactivex</groupId> <artifactId>rxjava</artifactId> <version>1.3.0</version></dependency>
二、测试的用类
import rx.Observable;import rx.Subscriber;/** * @author huhx */public class RxJavaTest { public static void main(String[] args) { Observable<String> observable = Observable.just("One", "Two", "Three"); observable.subscribe(new Subscriber<String>() { @Override public void onCompleted() { System.out.println("onCompleted"); } @Override public void one rror(Throwable item) { System.out.println("onError"); } @Override public void onNext(String item) { System.out.println("Item is " + item); } }); }}
三、打印的结果如下:
Item is OneItem is TwoItem is ThreeonCompleted
RxJava源码的简单分析
根据上述的代码,我们简单分析一下程序的流程。
一、首先Observable.just("One", "Two", "Three")代码:
just是一个工厂方法,用心构建一个Observable对象。
public static <T> Observable<T> just(T t1, T t2, T t3) { return from((T[])new Object[] { t1, t2, t3 });}
from方法的代码如下:
public static <T> Observable<T> from(T[] array) { int n = array.length; if (n == 0) { return empty(); } else if (n == 1) { return just(array[0]); } return unsafeCreate(new OnSubscribeFromArray<T>(array));}
二、我们重点是看unsafeCreate方法:
public static <T> Observable<T> unsafeCreate(OnSubscribe<T> f) { return new Observable<T>(RxJavaHooks.onCreate(f));}
对于RxJavaHooks.onCreate()方法,第一次会先执行RxJavaHooks静态块的代码。
static { init(); // 初始化了很多RxJavaHooks跟事件有着的变量}
onCreate方法中,这个没有怎么看懂后续再会。
public static <T> Observable.OnSubscribe<T> onCreate(Observable.OnSubscribe<T> onSubscribe) { Func1<Observable.OnSubscribe, Observable.OnSubscribe> f = onObservableCreate; if (f != null) { return f.call(onSubscribe); } return onSubscribe;}
三、对于observable.subscribe()代码
public final Subscription subscribe(Subscriber<? super T> subscriber) { return Observable.subscribe(subscriber, this);}
Observable的subscribe方法代码如下:
1 static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) { 2 if (subscriber == null) { 3 throw new IllegalArgumentException("subscriber can not be null"); 4 } 5 if (observable.onSubscribe == null) { 6 throw new IllegalStateException("onSubscribe function can not be null."); 7 } 8 9 subscriber.onStart();10 11 if (!(subscriber instanceof SafeSubscriber)) {12 subscriber = new SafeSubscriber<T>(subscriber);13 }14 15 try {16 RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);17 return RxJavaHooks.onObservableReturn(subscriber);18 } catch (Throwable e) {19 Exceptions.throwIfFatal(e);20 if (subscriber.isUnsubscribed()) {21 RxJavaHooks.onError(RxJavaHooks.onObservableError(e));22 } else {23 try {24 subscriber.onError(RxJavaHooks.onObservableError(e));25 } catch (Throwable e2) {26 Exceptions.throwIfFatal(e2);27 RuntimeException r = new one rrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to one rror.", e2);28 RxJavaHooks.onObservableError(r);29 throw r; // NOPMD30 }31 }32 return Subscriptions.unsubscribed();33 }34 }
四、整个的正常流程会走到16行的代码,这个是我们重点的分析地方。如果是异常情况,则只会执行onError方法
对于这个例子中,RxJavaHooks.onObservableStart(observable, observable.onSubscribe)得到的结果OnSubscribeFromArray这个类。调用它的call方法。
public void call(Subscriber<? super T> child) { child.setProducer(new FromArrayProducer<T>(child, array));}
setProducer方法执行的代码如下:
1 public void setProducer(Producer p) { 2 long toRequest; 3 boolean passToSubscriber = false; 4 synchronized (this) { 5 toRequest = requested; 6 producer = p; 7 if (subscriber != null) { 8 // middle operator ... we pass through unless a request has been made 9 if (toRequest == NOT_SET) {10 // we pass through to the next producer as nothing has been requested11 passToSubscriber = true;12 }13 }14 }15 // do after releasing lock16 if (passToSubscriber) {17 subscriber.setProducer(producer);18 } else {19 // we execute the request with whatever has been requested (or Long.MAX_VALUE)20 if (toRequest == NOT_SET) {21 producer.request(Long.MAX_VALUE);22 } else {23 producer.request(toRequest);24 }25 }26 }
这里会走到21行的代码,跟进去FromArrayProducer的request方法:
public void request(long n) { if (n < 0) { throw new IllegalArgumentException("n >= 0 required but it was " + n); } if (n == Long.MAX_VALUE) { if (BackpressureUtils.getAndAddRequest(this, n) == 0) { fastPath(); } } else if (n != 0) { if (BackpressureUtils.getAndAddRequest(this, n) == 0) { slowPath(n); } }}
对于fastPath()的代码如下:
void fastPath() { final Subscriber<? super T> child = this.child; for (T t : array) { if (child.isUnsubscribed()) { return; } child.onNext(t); } if (child.isUnsubscribed()) { return; } child.onCompleted();}
执行onNext()方法,其中t就是Observable.just()方法的参数。child就是observable.subscribe的定义的参数。
友情链接
java框架---->RxJava的使用(一)