RxJava2 toma observable lanza excepción no entregable


Según entiendo RxJava2 values.take(1) crea otro Observable que contiene solo un elemento del Observable original. Que NO DEBE lanzar una excepción, ya que se filtra por el efecto de take(1) como ha sucedido en segundo lugar.

Como en el siguiente fragmento de código

    Observable<Integer> values = Observable.create(o -> {
        o.onNext(1);
        o.onError(new Exception("Oops"));
    });

    values.take(1)
            .subscribe(
                    System.out::println,
                    e -> System.out.println("Error: " + e.getMessage()),
                    () -> System.out.println("Completed")
            );

Salida

1
Completed
io.reactivex.exceptions.UndeliverableException: java.lang.Exception: Oops
    at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:366)
    at io.reactivex.internal.operators.observable.ObservableCreate$CreateEmitter.onError(ObservableCreate.java:83)
    at ch02.lambda$main$0(ch02.java:28)
    at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:40)
    at io.reactivex.Observable.subscribe(Observable.java:10841)
    at io.reactivex.internal.operators.observable.ObservableTake.subscribeActual(ObservableTake.java:30)
    at io.reactivex.Observable.subscribe(Observable.java:10841)
    at io.reactivex.Observable.subscribe(Observable.java:10827)
    at io.reactivex.Observable.subscribe(Observable.java:10787)
    at ch02.main(ch02.java:32)
Caused by: java.lang.Exception: Oops
    ... 8 more
Exception in thread "main" io.reactivex.exceptions.UndeliverableException: java.lang.Exception: Oops
    at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:366)
    at io.reactivex.internal.operators.observable.ObservableCreate$CreateEmitter.onError(ObservableCreate.java:83)
    at ch02.lambda$main$0(ch02.java:28)
    at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:40)
    at io.reactivex.Observable.subscribe(Observable.java:10841)
    at io.reactivex.internal.operators.observable.ObservableTake.subscribeActual(ObservableTake.java:30)
    at io.reactivex.Observable.subscribe(Observable.java:10841)
    at io.reactivex.Observable.subscribe(Observable.java:10827)
    at io.reactivex.Observable.subscribe(Observable.java:10787)
    at ch02.main(ch02.java:32)
Caused by: java.lang.Exception: Oops
    ... 8 more

Mis preguntas:

  1. ¿Lo estoy entendiendo correctamente ?
  2. Lo que realmente está sucediendo para causar la excepción.
  3. Cómo resolver esto ¿del consumidor ?
Author: AbdElraouf Sabri, 2017-04-20

2 answers

  1. Sí, pero porque el observable 'ends' no significa que el código que se ejecuta dentro de create(...) se detenga. Para estar completamente seguro en este caso, debe usar o.isDisposed() para ver si el observable ha terminado aguas abajo.
  2. La excepción está ahí porque RxJava 2 tiene la política de NUNCA permitir que una llamada onError se pierda. Se entrega aguas abajo o se lanza como un UndeliverableException global si el observable ya ha terminado. Depende del creador del Observable manejar 'correctamente' el caso donde el observable ha terminado y se produce una Excepción.
  3. El problema es que el productor (Observable) y el consumidor (Subscriber) en desacuerdo sobre cuando el flujo termina. Dado que el productor está sobreviviendo al consumidor en este caso, el problema solo puede solucionarse en el productor.
 37
Author: Kiskae,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-04-20 18:47:25

@Kiskae en el comentario anterior respondió correctamente sobre la razón por la que puede ocurrir tal excepción.

Aquí el enlace al documento oficial sobre este tema: RxJava2-wiki.

A veces no puede cambiar este comportamiento, por lo que hay una manera de manejar estos UndeliverableException. Aquí está un fragmento de código de cómo evitar bloqueos y mal comportamiento:

RxJavaPlugins.setErrorHandler(e -> {
    if (e instanceof UndeliverableException) {
        e = e.getCause();
    }
    if ((e instanceof IOException) || (e instanceof SocketException)) {
        // fine, irrelevant network problem or API that throws on cancellation
        return;
    }
    if (e instanceof InterruptedException) {
        // fine, some blocking code was interrupted by a dispose call
        return;
    }
    if ((e instanceof NullPointerException) || (e instanceof IllegalArgumentException)) {
        // that's likely a bug in the application
        Thread.currentThread().getUncaughtExceptionHandler()
            .handleException(Thread.currentThread(), e);
        return;
    }
    if (e instanceof IllegalStateException) {
        // that's a bug in RxJava or in a custom operator
        Thread.currentThread().getUncaughtExceptionHandler()
            .handleException(Thread.currentThread(), e);
        return;
    }
    Log.warning("Undeliverable exception received, not sure what to do", e);
});

Este código tomado del enlace anterior.

Nota Importante. Este enfoque establece el controlador de errores global a RxJava, por lo que si puede deshacerse de estas excepciones - sería mejor opción.

 2
Author: Ilia Kurtov,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2018-03-21 08:23:54