Cómo parar y reanudar Observable.intervalo de emisión de garrapatas
Esto emitirá un tick cada 5 segundos.
Observable.interval(5, TimeUnit.SECONDS, Schedulers.io())
.subscribe(tick -> Log.d(TAG, "tick = "+tick));
Para detenerlo puedes usar
Schedulers.shutdown();
Pero entonces todos los programadores se detienen y no es posible reanudar el tictac más tarde. ¿Cómo puedo parar y reanudar la emisión "con gracia"`?
5 answers
Aquí hay una posible solución:
class TickHandler {
private AtomicLong lastTick = new AtomicLong(0L);
private Subscription subscription;
void resume() {
System.out.println("resumed");
subscription = Observable.interval(5, TimeUnit.SECONDS, Schedulers.io())
.map(tick -> lastTick.getAndIncrement())
.subscribe(tick -> System.out.println("tick = " + tick));
}
void stop() {
if (subscription != null && !subscription.isUnsubscribed()) {
System.out.println("stopped");
subscription.unsubscribe();
}
}
}
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2016-02-15 21:23:06
val switch = new java.util.concurrent.atomic.AtomicBoolean(true)
val tick = new java.util.concurrent.atomic.AtomicLong(0L)
val suspendableObservable =
Observable.
interval(5 seconds).
takeWhile(_ => switch.get()).
repeat.
map(_ => tick.incrementAndGet())
Puede establecer switch
a false
a suspender el tictac y true
para reanudarlo.
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2016-05-18 02:25:47
Hace algún tiempo, también estaba buscando soluciones de "temporizador" RX, pero ninguna de ellas cumplió con mis expectativas. Así que ahí puedes encontrar mi propia solución:
AtomicLong elapsedTime = new AtomicLong();
AtomicBoolean resumed = new AtomicBoolean();
AtomicBoolean stopped = new AtomicBoolean();
public Flowable<Long> startTimer() { //Create and starts timper
resumed.set(true);
stopped.set(false);
return Flowable.interval(1, TimeUnit.SECONDS)
.takeWhile(tick -> !stopped.get())
.filter(tick -> resumed.get())
.map(tick -> elapsedTime.addAndGet(1000));
}
public void pauseTimer() {
resumed.set(false);
}
public void resumeTimer() {
resumed.set(true);
}
public void stopTimer() {
stopped.set(true);
}
public void addToTimer(int seconds) {
elapsedTime.addAndGet(seconds * 1000);
}
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-07-24 06:06:48
Aquí hay otra manera de hacer esto, creo.
Cuando compruebe el código fuente, encontrará interval () usando class Onsubscribetimerperiodically . El código clave a continuación.
@Override
public void call(final Subscriber<? super Long> child) {
final Worker worker = scheduler.createWorker();
child.add(worker);
worker.schedulePeriodically(new Action0() {
long counter;
@Override
public void call() {
try {
child.onNext(counter++);
} catch (Throwable e) {
try {
worker.unsubscribe();
} finally {
Exceptions.throwOrReport(e, child);
}
}
}
}, initialDelay, period, unit);
}
Así que, verás, si quieres canalizar el bucle, ¿qué hay de lanzar una nueva excepción en onNext(). Código de ejemplo a continuación.
Observable.interval(1000, TimeUnit.MILLISECONDS)
.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
Log.i("abc", "onNext");
if (aLong == 5) throw new NullPointerException();
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
Log.i("abc", "onError");
}
}, new Action0() {
@Override
public void call() {
Log.i("abc", "onCompleted");
}
});
Entonces verás esto:
08-08 11:10:46.008 28146-28181/net.bingyan.test I/abc: onNext
08-08 11:10:47.008 28146-28181/net.bingyan.test I/abc: onNext
08-08 11:10:48.008 28146-28181/net.bingyan.test I/abc: onNext
08-08 11:10:49.008 28146-28181/net.bingyan.test I/abc: onNext
08-08 11:10:50.008 28146-28181/net.bingyan.test I/abc: onNext
08-08 11:10:51.008 28146-28181/net.bingyan.test I/abc: onNext
08-08 11:10:51.018 28146-28181/net.bingyan.test I/abc: onError
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2016-08-08 03:18:53
Lo sentimos, esto está en RxJS en lugar de RxJava, pero el concepto será el mismo. Adapté esto de learn-rxjs.io y aquí está en codepen.
La idea es que comiences con dos flujos de eventos de clic, startClick$
y stopClick$
. Cada clic que ocurre en la secuencia stopClick$
se asigna a un observable vacío, y los clics en startClick$
se asignan a la secuencia interval$
. Las dos corrientes resultantes obtienen merge
-d juntas en un observable-de-observables. En otras palabras, un nuevo observable de uno de los dos tipos será emitido desde merge
cada vez que haya un clic. El observable resultante pasará por switchMap
, que comienza a escuchar este nuevo observable y deja de escuchar lo que estaba escuchando antes. Switchmap también comenzará a combinar los valores de este nuevo observable en su flujo existente.
Después del cambio, scan
solo ve el valor de "incremento" emitido por interval$
, y no ve ningún valor cuando se hace clic en "detener".
Y hasta que se produzca el primer clic, startWith
comenzará a emitir valores desde $interval
, solo para poner las cosas en marcha:
const start = 0;
const increment = 1;
const delay = 1000;
const stopButton = document.getElementById('stop');
const startButton = document.getElementById('start');
const startClick$ = Rx.Observable.fromEvent(startButton, 'click');
const stopClick$ = Rx.Observable.fromEvent(stopButton, 'click');
const interval$ = Rx.Observable.interval(delay).mapTo(increment);
const setCounter = newValue => document.getElementById("counter").innerHTML = newValue;
setCounter(start);
const timer$ = Rx.Observable
// a "stop" click will emit an empty observable,
// and a "start" click will emit the interval$ observable.
// These two streams are merged into one observable.
.merge(stopClick$.mapTo(Rx.Observable.empty()),
startClick$.mapTo(interval$))
// until the first click occurs, merge will emit nothing, so
// use the interval$ to start the counter in the meantime
.startWith(interval$)
// whenever a new observable starts, stop listening to the previous
// one and start emitting values from the new one
.switchMap(val => val)
// add the increment emitted by the interval$ stream to the accumulator
.scan((acc, curr) => curr + acc, start)
// start the observable and send results to the DIV
.subscribe((x) => setCounter(x));
Y aquí está el HTML
<html>
<body>
<div id="counter"></div>
<button id="start">
Start
</button>
<button id="stop">
Stop
</button>
</body>
</html>
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-06-25 04:26:19