首页 > 代码库 > RxJava 错误处理

RxJava 错误处理

在Observable发射数据时,有时发送onError通知,导致Observer不能接收数据。可是,我们很希望对Observable发射的onError通知做出响应或者从错误中恢复。此时我们该如何处理呢?

解决办法就是使用Error handling相关的操作符来集中统一地处理错误。RxJava中错误处理的操作符为 Catch和 Retry。

Catch

??Catch操作符能够拦截原始Observable的onError通知,不让Observable因为产生错误而终止。相当于Java中try/catch操作,不能因为抛异常而导致程序崩溃。
??
技术分享

RxJava将Catch实现为三个不同的操作符:

onErrorReturn:让Observable遇到错误时发射一个特殊的项并且正常终止。
one rrorResumeNext:让Observable在遇到错误时开始发射第二个Observable的数据序列。
onExceptionResumeNex:让Observable在遇到错误时继续发射后面的数据项。

onErrorReturn

技术分享

onErrorReturn方法 返回一个镜像原有Observable行为的新Observable
会忽略前者的onError调用,不会将错误传递给观察者,而是发射一个特殊的项并调用观察者的onCompleted方法。

API

Javadoc: one rrorReturn(Func1))

示例代码

/*
 * ①.onErrorReturn:
 * 返回一个原有Observable行为的新Observable镜像,
 * 后者会忽略前者的onError调用,不会将错误传递给观察者,
 * 作为替代,它会发发射一个特殊的项并调用观察者的onCompleted方法
 */
Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override
    public void call(Subscriber<? super Integer> subscriber) {
        for(int i = 0;i < 10; i++){
            if(i>3){
                //会忽略onError调用,不会将错误传递给观察者
                subscriber.onError(new Throwable("i太大了"));
            }
            subscriber.onNext(i);
        }
        subscriber.onCompleted();
    }
}).onErrorReturn(new Func1<Throwable, Integer>() {
    //作为替代,它会发发射一个特殊的项并调用观察者的onCompleted方法。
    @Override
    public Integer call(Throwable throwable) {
        return 10;
    }
}).subscribe(new Subscriber<Integer>() {
    @Override
    public void onCompleted() {
        Log.v(TAG, "①onErrorReturn(Func1)->onCompleted");
    }
    @Override
    public void onError(Throwable e) {
        Log.e(TAG, "①onErrorReturn(Func1)->onError:"+e.getMessage());
    }
    @Override
    public void onNext(Integer integer) {
        Log.v(TAG, "①onErrorReturn(Func1)->onNext:"+integer);
    }
});

输出结果如下:

①onErrorReturn(Func1)->onNext:0 
①onErrorReturn(Func1)->onNext:1 
①onErrorReturn(Func1)->onNext:2 
①onErrorReturn(Func1)->onNext:3 
①onErrorReturn(Func1)->onNext:10 
①onErrorReturn(Func1)->onCompleted

在手动创建Observale时,当Observable发送了第四个数据后,Observable发送了onError通知,然后又发送了1个数据。而在onErrorReturn方法处理中,其参数函数中,创建并返回了一个特殊项( 10).

从Log打印可以看出,观察者并没有执行onError方法,意味着Observale并没有接收到onError通知,而是接收到了一个特殊项后,调用了onCompleted方法,结束了此次订阅。而这个特殊项,正是在onErrorReturn中参数函数中,创建的特殊项。

onErrorResumeNext

技术分享

onErrorResumeNext方法与onErrorReturn()方法类似,都是拦截原Observable的onError通知,不同的是拦截后的处理方式,onErrorReturn创建并返回一个特殊项,而onErrorResumeNext创建并返回一个新的Observabl,观察者会订阅它,并接收其发射的数据。

API

Javadoc: one rrorResumeNext(Func1))
Javadoc: one rrorResumeNext(Observable))

示例代码

/*
 * ②.onErrorResumeNext(Observable):
 * 当原Observable发射onError消息时,会忽略onError消息,不会传递给观察者;
 * 然后它会开始另一个备用的Observable,继续发射数据
 */
Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override
    public void call(Subscriber<? super Integer> subscriber) {
        for(int i = 0;i < 10; i++){
            if(i>3){
                //会忽略onError调用,不会将错误传递给观察者
                subscriber.onError(new Throwable("i太大了"));
            }
            subscriber.onNext(i);
        }
        subscriber.onCompleted();
    }
}).onErrorResumeNext(Observable.create(new Observable.OnSubscribe<Integer>() {

    @Override
    public void call(Subscriber<? super Integer> subscriber) {
        for(int i = 10;i < 13; i++){
            subscriber.onNext(i);
        }
        subscriber.onCompleted();
    }
})).subscribe(new Subscriber<Integer>() {
    @Override
    public void onCompleted() {
        Log.v(TAG, "②onErrorResumeNext(Observable)->onCompleted");
    }
    @Override
    public void onError(Throwable e) {
        Log.e(TAG, "②onErrorResumeNext(Observable)->onError:"+e.getMessage());
    }
    @Override
    public void onNext(Integer integer) {
        Log.v(TAG, "②onErrorResumeNext(Observable)->onNext:"+integer);
    }
});

/*
 * ③.onErrorResumeNext(Func1):
 * 和onErrorResumeNext(Observable)相似,但他能截取到原Observable的onError消息
 */
Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override
    public void call(Subscriber<? super Integer> subscriber) {
        for(int i = 0;i < 10; i++){
            if(i>3){
                //会忽略onError调用,不会将错误传递给观察者
                subscriber.onError(new Throwable("i太大了"));
            }
            subscriber.onNext(i);
        }
        subscriber.onCompleted();
    }
}).onErrorResumeNext(new Func1<Throwable, Observable<? extends Integer>>() {
    @Override
    public Observable<? extends Integer> call(Throwable throwable) {
        //throwable就是原Observable发射的onError消息中的Throwable对象
        Log.e(TAG, "③onErrorResumeNext(Func1)->throwable:"+throwable.getMessage());
        //如果原Observable发射了onError消息,将会开启下面的Observable
        return Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                for(int i = 100;i < 103; i++){
                    subscriber.onNext(i);
                }
                subscriber.onCompleted();
            }
        });
    }
}).subscribe(new Subscriber<Integer>() {
    @Override
    public void onCompleted() {
        Log.v(TAG, "③onErrorResumeNext(Func1)->onCompleted");
    }
    @Override
    public void onError(Throwable e) {
        Log.e(TAG, "③onErrorResumeNext(Func1)->onError:"+e.getMessage());
    }
    @Override
    public void onNext(Integer integer) {
        Log.v(TAG, "onErrorResumeNext(Func1)->onNext:"+integer);
    }
});

输出结果如下:

②onErrorResumeNext(Observable)->onNext:0 
②onErrorResumeNext(Observable)->onNext:1 
②onErrorResumeNext(Observable)->onNext:2 
②onErrorResumeNext(Observable)->onNext:3 
②onErrorResumeNext(Observable)->onNext:10 
②onErrorResumeNext(Observable)->onNext:11 
②onErrorResumeNext(Observable)->onNext:12 
②onErrorResumeNext(Observable)->onCompleted

③onErrorResumeNext(Func1)->onNext:0 
③onErrorResumeNext(Func1)->onNext:1 
③onErrorResumeNext(Func1)->onNext:2 
③onErrorResumeNext(Func1)->onNext:3 
③onErrorResumeNext(Func1)->throwable:i太大了 
③onErrorResumeNext(Func1)->onNext:100 
③onErrorResumeNext(Func1)->onNext:101 
③onErrorResumeNext(Func1)->onNext:102 
③onErrorResumeNext(Func1)->onCompleted

在手动创建Observale时,当Observable发送了第四个数据后,Observable发送了onError通知,然后又发送了3个数据。在onErrorResumeNext方法中的参数函数中,创建了一个新的Observable。

从Log打印可以看出,观察者并没有执行onError方法,意味着Observale并没有接收到onError通知,而是接收到了新建的创建了一个新的Observable发射的出具。在新Observable发射完数据后,调用了onCompleted方法,结束了此次订阅。

onExceptionResumeNext

技术分享

onExceptionResumeNext方法与onErrorResumeNext方法类似创建并返回一个拥有类似原Observable的新Observable,,也使用这个备用的Observable。不同的是,如果onError收到的Throwable不是一个Exception,它会将错误传递给观察者的onError方法,不会使用备用的Observable。

API

Javadoc: onExceptionResumeNext(Observable))

示例代码

/*
 * ④.onExceptionResumeNext:
 *    和onErrorResumeNext类似,可以说是onErrorResumeNext的特例,
 *    区别是如果onError收到的Throwable不是一个Exception,它会将错误传递给观察者的onError方法,不会使用备用的Observable。
 */
Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override
    public void call(Subscriber<? super Integer> subscriber) {
        for(int i = 0;i < 10; i++){
            if(i>3){
                //如果不是Exception,错误会传递给观察者,不会开启备用Observable
                //subscriber.onError(new Throwable("i太大了"));
                //如果Exception,不会将错误传递给观察者,并会开启备用Observable
                subscriber.onError(new Exception("i太大了哦哦哦"));
            }
            subscriber.onNext(i);
        }
        subscriber.onCompleted();
    }
}).onExceptionResumeNext(Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override
    public void call(Subscriber<? super Integer> subscriber) {
        for(int i = 10;i < 13; i++){
            subscriber.onNext(i);
        }
        subscriber.onCompleted();
    }
})).subscribe(new Subscriber<Integer>() {
    @Override
    public void onCompleted() {
        Log.v(TAG, "④onExceptionResumeNext(Observable)->onCompleted");
    }
    @Override
    public void onError(Throwable e) {
        Log.e(TAG, "④onExceptionResumeNext(Observable)->onError:"+e.getClass().getSimpleName()+":"+e.getMessage());
    }
    @Override
    public void onNext(Integer integer) {
        Log.v(TAG, "④onExceptionResumeNext(Observable)->onNext:"+integer);
    }
});

输出结果如下:

④onExceptionResumeNext(Observable)->onNext:0 
④onExceptionResumeNext(Observable)->onNext:1 
④onExceptionResumeNext(Observable)->onNext:2 
④onExceptionResumeNext(Observable)->onNext:3 
④onExceptionResumeNext(Observable)->onNext:10 
④onExceptionResumeNext(Observable)->onNext:11 
④onExceptionResumeNext(Observable)->onNext:12 
④onExceptionResumeNext(Observable)->onCompleted

Retry

顾名思义,retry的意思就是试着重来,当原始Observable发射onError通知时,retry操作符不会让onError通知传递给观察者,它会重新订阅这个Observable一次或者多次(意味着重新从头发射数据),所以可能造成数据项重复发送的情况。

如果重新订阅了指定的次数还是发射了onError通知,将不再尝试重新订阅,它会把最新的一个onError通知传递给观察者。

技术分享

RxJava中将Retry操作符的实现为retry和retryWhen两种。

retry操作符默认在trampoline调度器上执行。

  • Javadoc: retry():无论收到多少次onError通知,都会继续订阅并重发原始Observable,直到onCompleted。

  • Javadoc: retry(long):接受count参数的retry会最多重新订阅count次,如果次数超过了就不会尝试再次订阅,它会把最新的一个onError通知传递给他的观察者。

  • Javadoc: retry(Func2): 这个版本的retry接受一个谓词函数作为参数,这个函数的两个参数是:重试次数和导致发射onError通知的Throwable。这个函数返回一个布尔值,如果返回true,retry应该再次订阅和镜像原始的Observable,如果返回false,retry会将最新的一个onError通知传递给它的观察者。

API

Javadoc: retry()
Javadoc: retry(long)
Javadoc: retry(Func2)

示例代码

/**
 * ①. retry()
 *     无限次尝试重新订阅
 */
Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override
    public void call(Subscriber<? super Integer> subscriber) {
        for(int i = 0; i<3; i++){
            if(i==1){
                Log.v(TAG, "①retry()->onError");
                subscriber.onError(new RuntimeException("always fails"));
            }else{
                subscriber.onNext(i);
            }
        }
    }
}).retry()    //无限次尝试重新订阅
.subscribe(new Subscriber<Integer>() {
    @Override
    public void onCompleted() {
        Log.v(TAG, "①retry()->onCompleted");
    }
    @Override
    public void onError(Throwable e) {
        Log.v(TAG, "①retry()->onError"+e.getMessage());
    }
    @Override
    public void onNext(Integer i) {
        Log.v(TAG, "①retry()->onNext"+i);
    }
});

/**
 * ②. retry(count)
 *     最多2次尝试重新订阅
 */
Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override
    public void call(Subscriber<? super Integer> subscriber) {
        for(int i = 0; i<3; i++){
            if(i==1){
                Log.v(TAG, "②retry(count)->onError");
                subscriber.onError(new RuntimeException("always fails"));
            }else{
                subscriber.onNext(i);
            }
        }
    }
}).retry(2)    //最多尝试2次重新订阅
        .subscribe(new Subscriber<Integer>() {
            @Override
            public void onCompleted() {
                Log.v(TAG, "②retry(count)->onCompleted");
            }
            @Override
            public void onError(Throwable e) {
                Log.v(TAG, "②retry(count)->onError"+e.getMessage());
            }
            @Override
            public void onNext(Integer i) {
                Log.v(TAG, "②retry(count)->onNext"+i);
            }
        });

/**
 * ③. retry(Func2)
 */
Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override
    public void call(Subscriber<? super Integer> subscriber) {
        for(int i = 0; i<3; i++){
            if(i==1){
                Log.v(TAG, "③retry(Func2)->onError");
                subscriber.onError(new RuntimeException("always fails"));
            }else{
                subscriber.onNext(i);
            }
        }
    }
}).retry(new Func2<Integer, Throwable, Boolean>() {
    @Override
    public Boolean call(Integer integer, Throwable throwable) {
        Log.v(TAG, "③发生错误了:"+throwable.getMessage()+",第"+integer+"次重新订阅");
        if(integer>2){
            return false;//不再重新订阅
        }
        //此处也可以通过判断throwable来控制不同的错误不同处理
        return true;
    }
}).subscribe(new Subscriber<Integer>() {
    @Override
    public void onCompleted() {
        Log.v(TAG, "③retry(Func2)->onCompleted");
    }
    @Override
    public void onError(Throwable e) {
        Log.v(TAG, "③retry(Func2)->onError"+e.getMessage());
    }
    @Override
    public void onNext(Integer i) {
        Log.v(TAG, "③retry(Func2)->onNext"+i);
    }
});

输出结果如下:

①retry()->onNext0 
①retry()->onError 
①retry()->onNext0 
①retry()->onError 
…无限次

②retry(count)->onNext0 
②retry(count)->onError 
②retry(count)->onNext0 
②retry(count)->onError 
②retry(count)->onNext0 
②retry(count)->onError 
②retry(count)->onErroralways fails

③retry(Func2)->onNext0 
③retry(Func2)->onError 
③发生错误了:always fails,第1次重新订阅 
③retry(Func2)->onNext0 
③retry(Func2)->onError 
③发生错误了:always fails,第2次重新订阅 
③retry(Func2)->onNext0 
③retry(Func2)->onError 
③发生错误了:always fails,第3次重新订阅 
③retry(Func2)->onErroralways fails

retryWhen

技术分享

retryWhen和retry类似,区别是,retryWhen将onError中的Throwable传递给一个函数,这个函数产生另一个Observable,retryWhen观察它的结果再决定是不是要重新订阅原始的Observable。如果这个Observable发射了一项数据,它就重新订阅,如果这个Observable发射的是onError通知,它就将这个通知传递给观察者然后终止。
??
retryWhen()默认在trampoline调度器上执行,可以通过参数指定其它的调度器。

API

Javadoc: retryWhen(Func1)
Javadoc: retryWhen(Func1,Scheduler)

示例1代码

Observable.create(new Observable.OnSubscribe<Student>() {
    @Override
    public void call(Subscriber<? super Student> subscriber) {
        subscriber.onNext(getListOfStudent().get(0));
        subscriber.onNext(getListOfStudent().get(1));
        subscriber.onNext(getListOfStudent().get(2));
        if (isError) {
            subscriber.onError(new Throwable("do one rror"));
            isError = false;
        }

        subscriber.onNext(getListOfStudent().get(3));
        subscriber.onNext(getListOfStudent().get(4));
        subscriber.onNext(getListOfStudent().get(5));
    }
}).subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Subscriber<Student>() {

            @Override
            public void onStart() {
                super.onStart();
                mAdaStudent.clear();
            }

            @Override
            public void onCompleted() {
                Log.i(TAG, "do onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                Log.i(TAG, "do one rror");
            }

            @Override
            public void onNext(Student student) {
                Log.i(TAG, "do onNext");
                Log.i(TAG, student.toString());
                mAdaStudent.addData(student);
            }
        });

输出结果如下:

OperateActivity: do onNext 
Student{id=’1’name=’A’, age=23} 
OperateActivity: do onNext 
Student{id=’2’name=’B’, age=33} 
OperateActivity: do onNext 
Student{id=’3’name=’C’, age=24} 
OperateActivity: do onNext 
Student{id=’1’name=’A’, age=23} 
OperateActivity: do onNext 
Student{id=’2’name=’B’, age=33} 
OperateActivity: do onNext 
Student{id=’3’name=’C’, age=24} 
OperateActivity: do onNext 
Student{id=’4’name=’D’, age=24} 
OperateActivity: do onNext 
Student{id=’5’name=’E’, age=33} 
OperateActivity: do onNext 
Student{id=’6’name=’F’, age=23} 

示例2代码

***
.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
    @Override
    public Observable<Throwable> call(Observable<? extends Throwable> observable) {
        return Observable.error(new Throwable(" do retryWhen"));
    }
})
****

输出结果如下:

do one rror

示例1中,在retryWhend(Func1)的参数函数中,创建并返回了一个可发射数据的Observable对象,而在示例2中,其参数函数,创建并返回了一个发射onError通知的Observable。通过Log打印可以出,示例1在拦截了原Observable中的onError通知,并重新订阅了原Observable,但是示例2中,观察者接收了onError通知,意味着原Observable中的onError通知未被拦截,直接发射出去。示例2中,正体现了retryWhen()和retry()的不同之处。

参考:
http://blog.csdn.net/xmxkf/article/details/51658235

<script type="text/javascript"> $(function () { $(‘pre.prettyprint code‘).each(function () { var lines = $(this).text().split(‘\n‘).length; var $numbering = $(‘
    ‘).addClass(‘pre-numbering‘).hide(); $(this).addClass(‘has-numbering‘).parent().append($numbering); for (i = 1; i <= lines; i++) { $numbering.append($(‘
  • ‘).text(i)); }; $numbering.fadeIn(1700); }); }); </script>

    RxJava 错误处理