首页 > 代码库 > 【Android - 框架】之RxJava的使用
【Android - 框架】之RxJava的使用
RxJava算是最新最常用的,也是程序员们最喜欢的框架之一了。
RxJava的核心由Observable(被观察者,事件源)和Subscriber(观察者)构成,Observable负责发出一系列事件,Subscriber处理这些事件。
一个Observble可以发出零个或多个事件,直到结束或出错。每发出一个事件,就会调用与之关联的所有观察者Subscriber的onNext()方法;如果中途出错,则会回调这个观察者的onError()方法;事件发布给所有观察者之后,会回调最后一个观察者的onCompleted()方法。
RxJava很像设计模式中的观察者模式,但有一点不同,就是当一个被观察者没有任何与之关联的观察者时,这个被观察者不会发出任何事件。
在Android中使用RxJava,需要先导入RxJava和RxAndroid的依赖:
compile ‘io.reactivex:rxjava:1.2.2‘
compile ‘io.reactivex:rxandroid:1.2.1‘
DEMO 1:
Observable<String> observabele = Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("Hello RxJava"); subscriber.onCompleted(); } }); Observer<String> observer = new Observer<String>() { @Override public void onCompleted() { System.out.println("-------------------->>>>Completed"); } @Override public void one rror(Throwable e) { } @Override public void onNext(String s) { System.out.println("------------onNext---------->>>>>>>" + s); } }; observabele.subscribe(observer);
通过Observable的create()方法创建一个自定义的Observable被观察者,在参数传入的OnSubscribe内部类中通过调用子方法的Subscriber对象参数的onNext()、onError()、onCompleted()三个方法,操作与这个被观察者绑定的所有观察者。
通过new Observer()方法来创建一个观察者,需要实现onNext()、onError()、onCompleted()三个方法。
通过Observeable对象的subscribe()方法绑定一个观察者,此时这个观察者就可以接收到被观察者发送的消息了。
DEMO2:
Observable<String> observable = Observable.just("Hello RxJava", "My", "Name"); Action1<String> next = new Action1<String>() { @Override public void call(String s) { // onNext()中执行的代码 } }; Action1<Throwable> error = new Action1<Throwable>() { @Override public void call(Throwable throwable) { // one rror()中执行的代码 } }; Action0 completed = new Action0() { @Override public void call() { // onCompleted()中执行的代码 } }; observable.subscribe(next, error, completed);
Subscriber中的三个子方法可以拆分成两个Action1和一个Action0。onNext()和onError()两个方法对应的Action1,onCompleted()方法对应的是Action0。
Observable对象的subscribe()方法可以传入三个Action对象,即表示被观察者已经绑定了一个观察者,这个观察者是由这三个Action组成的。subscribe()方法有多个重载,可以只有一个onNext()方法的Action1,可以有一个onNext()方法的Action1和一个onError()方法的Action1,也可以像上面代码一样有三个Action。
Observable对象的just()方法可以有任意个参数,表示将这些对象逐个通过onNext()方法发送出去。
DEMO3:
String[] array = {"Hello RxJava", "My", "Name"}; Observable<String> observable = Observable.from(array); Action1<String> next = new Action1<String>() { @Override public void call(String s) { // onNext()中执行的代码 System.out.println("----------------------->>>>>>>>>>" + s); } }; Action1<Throwable> error = new Action1<Throwable>() { @Override public void call(Throwable throwable) { // one rror()中执行的代码 } }; Action0 completed = new Action0() { @Override public void call() { // onCompleted()中执行的代码 } }; observable.subscribe(next, error, completed);
Observable.from()方法和Observable.just()方法的效果是一样的,都是将一组数据逐个发送出去,区别是from()方法是将一个数组中的数据逐个发送,而just()方法是将各个参数中的数据进行逐个发送。
DEMO4:
Observable<String> observable = Observable .just("Hello RxJava", "aaaa", "bbbbb", "aaaaaacccc") .map(new Func1<String, String>() { @Override public String call(String s) { return "string=========" + s; } }); Action1<String> next = new Action1<String>() { @Override public void call(String s) { // onNext()中执行的代码 System.out.println("----------------------->>>>>>>>>>" + s); } }; Action1<Throwable> error = new Action1<Throwable>() { @Override public void call(Throwable throwable) { // one rror()中执行的代码 } }; Action0 completed = new Action0() { @Override public void call() { // onCompleted()中执行的代码 } }; observable.subscribe(next, error, completed);
map方法是将just中的数据进行进一步的处理,例如,上面的代码中就是在每个字符串前面加了另一端字符串。
DEMO5:
Observable.just("https://api.github.com/users/basil2style") .map(new Func1<String, String>() { @Override public String call(String s) { StringBuffer result = null; try { URL url = new URL(s); HttpURLConnection connection = (HttpURLConnection) url.openConnection(); connection.setRequestMethod("GET"); connection.setDoInput(true); connection.connect(); int responseCode = connection.getResponseCode(); if (responseCode == 200) { result = new StringBuffer(); BufferedInputStream bis = new BufferedInputStream(connection.getInputStream()); byte[] b = new byte[1024]; int len = -1; while ((len = bis.read(b)) != -1) { result.append(new String(b, 0, len)); } bis.close(); } } catch (MalformedURLException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } System.out.println("------------------->>>>>" + result.toString()); return result.toString(); } }) .map(new Func1<String, InfoData>() { @Override public InfoData call(String s) { InfoData infoData = new InfoData(); try { JSONObject object = new JSONObject(s); infoData.setId(object.getInt("id")); infoData.setUrl(object.getString("url")); infoData.setType(object.getString("type")); infoData.setName(object.getString("name")); } catch (JSONException e) { e.printStackTrace(); } return infoData; } }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer<InfoData>() { @Override public void onCompleted() { System.out.println("---------------------->>>>>>Completed"); } @Override public void one rror(Throwable e) { Toast.makeText(MainActivity.this, "获取网络数据失败", Toast.LENGTH_SHORT).show(); } @Override public void onNext(InfoData infoData) { Toast.makeText(MainActivity.this, infoData.getName(), Toast.LENGTH_SHORT).show(); } });
这是用RxJava结合原生的JAVA API完成网络访问的代码,通过map()方法在不同的参数之间进行转换,最终得到InfoData对象并输出数据。
现在,开发人员更喜欢将RxJava和Retrofit结合使用,原因是RxJava可以设置一段代码执行的线程,这样就可以轻松的、解耦的替换Handler和AsyncTask进行异步数据的访问。
有关RxJava和Retrofit结合使用的案例我会写在我的下一个帖子中,敬请期待~~
【Android - 框架】之RxJava的使用