Cómo ignorar el error y continuar flujo infinito?


Me gustaría saber cómo ignorar excepciones y continuar flujo infinito (en mi caso flujo de ubicaciones)?

Estoy recuperando la posición actual del usuario (usando Android-ReactiveLocation) y luego enviándolos a mi API (usando Retrofit).

En mi caso, cuando se produce una excepción durante la llamada a la red (por ejemplo, tiempo de espera), se invoca el método onError y la secuencia se detiene. ¿Cómo evitarlo?

Actividad:

private RestService mRestService;
private Subscription mSubscription;
private LocationRequest mLocationRequest = LocationRequest.create()
            .setPriority(LocationRequest.PRIORITY_HIGH_ACCURACY)
            .setInterval(100);
...
private void start() {
    mRestService = ...;
    ReactiveLocationProvider reactiveLocationProvider = new ReactiveLocationProvider(this);
    mSubscription = reactiveLocationProvider.getUpdatedLocation(mLocationRequest)
            .buffer(50)
            .flatMap(locations -> mRestService.postLocations(locations)) // can throw exception
            .subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe();
}

RestService:

public interface RestService {
    @POST("/.../")
    Observable<Response> postLocations(@Body List<Location> locations);
}
Author: jww, 2015-03-10

7 answers

mRestService.postLocations(locations) emita un elemento, luego complete. Si se produce un error, entonces emite el error, que completa la secuencia.

Al llamar a este método en un flatMap, el error continúa a su flujo "principal", y luego su flujo se detiene.

Lo que puede hacer es transformar su error en otro elemento (como se describe aquí : https://stackoverflow.com/a/28971140/476690 ), pero no en su corriente principal (como supongo que ya lo ha intentado) sino en el mRestService.postLocations(locations).

De esta manera, esta llamada emite un error, que se transformará en un elemento/otro observable y luego se completará. (sin llamar onError).

En una vista de consumidor, mRestService.postLocations(locations) emitirá un elemento, luego completará, como si todo tuviera éxito.

mSubscription = reactiveLocationProvider.getUpdatedLocation(mLocationRequest)
        .buffer(50)
        .flatMap(locations -> mRestService.postLocations(locations).onErrorReturn((e) -> Collections.emptyList()) // can throw exception
        .subscribeOn(Schedulers.newThread())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe();
 7
Author: dwursteisen,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-05-23 11:47:07

Es posible que desee utilizar uno de los operadores de manejo de errores .

  • onErrorResumeNext( ) - indica a un Observable que emita una secuencia de elementos si encuentra un error
  • onErrorReturn( ) - indica a un Observable que emita un elemento en particular cuando se encuentra con un error
  • onExceptionResumeNext( ) - instruye a un Observable para continuar emitiendo elementos después de que se encuentra con una excepción (pero no otra variedad de lanzable)
  • retry( ) - si una fuente Observable emite un error, vuelva a suscribirse a con la esperanza de que se complete sin error
  • retryWhen( ) - si una fuente Observable emite un error, pase ese error a otro Observable para determinar si desea volver a suscribirse a la fuente

Especialy retry y onExceptionResumeNext parecen prometedores en su caso.

 42
Author: tomrozb,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2015-03-10 18:15:38

Simplemente pegando la información del enlace de la respuesta de @MikeN en caso de que se pierda:

import rx.Observable.Operator;
import rx.functions.Action1;

public final class OperatorSuppressError<T> implements Operator<T, T> {
    final Action1<Throwable> onError;

    public OperatorSuppressError(Action1<Throwable> onError) {
        this.onError = onError;
    }

    @Override
    public Subscriber<? super T> call(final Subscriber<? super T> t1) {
        return new Subscriber<T>(t1) {

            @Override
            public void onNext(T t) {
                t1.onNext(t);
            }

            @Override
            public void onError(Throwable e) {
                onError.call(e);
            }

            @Override
            public void onCompleted() {
                t1.onCompleted();
            }

        };
    }
}

Y usarlo cerca de la fuente observable porque otros operadores pueden ansía cancelar la suscripción antes de eso.

Observerable.create(connectToUnboundedStream()).lift(new OperatorSuppressError(log()).doOnNext(someStuff()).subscribe();

Tenga en cuenta, sin embargo, que esto suprime la entrega de error de la fuente. Si algún onNext en la cadena después de lanzar una excepción, es todavía es probable que la fuente sea cancelada.

 6
Author: AllDayAmazing,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2015-11-04 21:45:29

Si solo desea ignorar el error dentro de la flatMap sin devolver un elemento haga esto:

flatMap(item -> 
    restService.getSomething(item).onErrorResumeNext(Observable.empty())
);
 3
Author: Albert Vila Calvo,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-08-23 06:39:49

Intente llamar al servicio rest en un Observable.aplaza la llamada. De esa manera, por cada llamada tendrás la oportunidad de usar su propio 'onErrorResumeNext' y los errores no harán que tu flujo principal se complete.

reactiveLocationProvider.getUpdatedLocation(mLocationRequest)
  .buffer(50)
  .flatMap(locations ->
    Observable.defer(() -> mRestService.postLocations(locations))
      .onErrorResumeNext(<SOME_DEFAULT_TO_REACT_TO>)
  )
........

Esa solución es originalmente de este hilo -> RxJava Observable y Suscriptor para saltar excepción?, pero creo que funcionará en su caso también.

 1
Author: meddle,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-05-23 12:26:00

Agregue mi solución para este problema:

privider
    .compose(ignoreErrorsTransformer)
    .subscribe()

private final Observable.Transformer<ResultType, ResultType> ignoreErrorsTransformer =
        new Observable.Transformer<ResultType, ResultType>() {
            @Override
            public Observable<ResultType> call(Observable<ResultType> resultTypeObservable) {
                return resultTypeObservable
                        .materialize()
                        .filter(new Func1<Notification<ResultType>, Boolean>() {
                            @Override
                            public Boolean call(Notification<ResultType> resultTypeNotification) {
                                return !resultTypeNotification.isOnError();
                            }
                        })
                        .dematerialize();

            }
        };
 1
Author: HotIceCream,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2016-07-05 14:19:10

Una ligera modificación de la solución (@MikeN) para permitir que las corrientes finitas se completen:

import rx.Observable.Operator;
import rx.functions.Action1;

public final class OperatorSuppressError<T> implements Operator<T, T> {
    final Action1<Throwable> onError;

    public OperatorSuppressError(Action1<Throwable> onError) {
        this.onError = onError;
    }

    @Override
    public Subscriber<? super T> call(final Subscriber<? super T> t1) {
        return new Subscriber<T>(t1) {

            @Override
            public void onNext(T t) {
                t1.onNext(t);
            }

            @Override
            public void onError(Throwable e) {
                onError.call(e);
                //this will allow finite streams to complete
                t1.onCompleted();
            }

            @Override
            public void onCompleted() {
                t1.onCompleted();
            }

        };
    }
}
 0
Author: portenez,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2015-12-15 04:10:03