RxJava2 toma observable lanza excepción no entregable
Según entiendo RxJava2 values.take(1)
crea otro Observable que contiene solo un elemento del Observable original. Que NO DEBE lanzar una excepción, ya que se filtra por el efecto de take(1)
como ha sucedido en segundo lugar.
Como en el siguiente fragmento de código
Observable<Integer> values = Observable.create(o -> {
o.onNext(1);
o.onError(new Exception("Oops"));
});
values.take(1)
.subscribe(
System.out::println,
e -> System.out.println("Error: " + e.getMessage()),
() -> System.out.println("Completed")
);
Salida
1
Completed
io.reactivex.exceptions.UndeliverableException: java.lang.Exception: Oops
at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:366)
at io.reactivex.internal.operators.observable.ObservableCreate$CreateEmitter.onError(ObservableCreate.java:83)
at ch02.lambda$main$0(ch02.java:28)
at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:40)
at io.reactivex.Observable.subscribe(Observable.java:10841)
at io.reactivex.internal.operators.observable.ObservableTake.subscribeActual(ObservableTake.java:30)
at io.reactivex.Observable.subscribe(Observable.java:10841)
at io.reactivex.Observable.subscribe(Observable.java:10827)
at io.reactivex.Observable.subscribe(Observable.java:10787)
at ch02.main(ch02.java:32)
Caused by: java.lang.Exception: Oops
... 8 more
Exception in thread "main" io.reactivex.exceptions.UndeliverableException: java.lang.Exception: Oops
at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:366)
at io.reactivex.internal.operators.observable.ObservableCreate$CreateEmitter.onError(ObservableCreate.java:83)
at ch02.lambda$main$0(ch02.java:28)
at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:40)
at io.reactivex.Observable.subscribe(Observable.java:10841)
at io.reactivex.internal.operators.observable.ObservableTake.subscribeActual(ObservableTake.java:30)
at io.reactivex.Observable.subscribe(Observable.java:10841)
at io.reactivex.Observable.subscribe(Observable.java:10827)
at io.reactivex.Observable.subscribe(Observable.java:10787)
at ch02.main(ch02.java:32)
Caused by: java.lang.Exception: Oops
... 8 more
Mis preguntas:
- ¿Lo estoy entendiendo correctamente ?
- Lo que realmente está sucediendo para causar la excepción.
- Cómo resolver esto ¿del consumidor ?
2 answers
- Sí, pero porque el observable 'ends' no significa que el código que se ejecuta dentro de
create(...)
se detenga. Para estar completamente seguro en este caso, debe usaro.isDisposed()
para ver si el observable ha terminado aguas abajo. - La excepción está ahí porque RxJava 2 tiene la política de NUNCA permitir que una llamada
onError
se pierda. Se entrega aguas abajo o se lanza como unUndeliverableException
global si el observable ya ha terminado. Depende del creador del Observable manejar 'correctamente' el caso donde el observable ha terminado y se produce una Excepción. - El problema es que el productor (
Observable
) y el consumidor (Subscriber
) en desacuerdo sobre cuando el flujo termina. Dado que el productor está sobreviviendo al consumidor en este caso, el problema solo puede solucionarse en el productor.
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-04-20 18:47:25
@Kiskae en el comentario anterior respondió correctamente sobre la razón por la que puede ocurrir tal excepción.
Aquí el enlace al documento oficial sobre este tema: RxJava2-wiki.
A veces no puede cambiar este comportamiento, por lo que hay una manera de manejar estos UndeliverableException
. Aquí está un fragmento de código de cómo evitar bloqueos y mal comportamiento:
RxJavaPlugins.setErrorHandler(e -> {
if (e instanceof UndeliverableException) {
e = e.getCause();
}
if ((e instanceof IOException) || (e instanceof SocketException)) {
// fine, irrelevant network problem or API that throws on cancellation
return;
}
if (e instanceof InterruptedException) {
// fine, some blocking code was interrupted by a dispose call
return;
}
if ((e instanceof NullPointerException) || (e instanceof IllegalArgumentException)) {
// that's likely a bug in the application
Thread.currentThread().getUncaughtExceptionHandler()
.handleException(Thread.currentThread(), e);
return;
}
if (e instanceof IllegalStateException) {
// that's a bug in RxJava or in a custom operator
Thread.currentThread().getUncaughtExceptionHandler()
.handleException(Thread.currentThread(), e);
return;
}
Log.warning("Undeliverable exception received, not sure what to do", e);
});
Este código tomado del enlace anterior.
Nota Importante. Este enfoque establece el controlador de errores global a RxJava, por lo que si puede deshacerse de estas excepciones - sería mejor opción.
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2018-03-21 08:23:54