Espere una operación asincrónica en onNext de RxJS Observable


Tengo una secuencia RxJS consumiéndose de la manera normal...

Sin embargo, en el controlador observable 'onNext', algunas de las operaciones se completarán de forma sincrónica, pero otras requieren devoluciones de llamada asíncronas, que deben esperarse antes de procesar el siguiente elemento en la secuencia de entrada.

...un poco confundido cómo hacer esto. Alguna idea? ¡Gracias!

someObservable.subscribe(
    function onNext(item)
    {
        if (item == 'do-something-async-and-wait-for-completion')
        {
            setTimeout(
                function()
                {
                    console.log('okay, we can continue');
                }
                , 5000
            );
        }
        else
        {
            // do something synchronously and keep on going immediately
            console.log('ready to go!!!');
        }
    },
    function onError(error)
    {
        console.log('error');
    },
    function onComplete()
    {
        console.log('complete');
    }
);
Author: Brandon, 2014-02-19

3 answers

Cada operación que desee realizar se puede modelar como observable. Incluso la operación síncrona se puede modelar de esta manera. Luego puede usar map para convertir su secuencia en una secuencia de secuencias, luego use concatAll para aplanar la secuencia.

someObservable
    .map(function (item) {
        if (item === "do-something-async") {
            // create an Observable that will do the async action when it is subscribed
            // return Rx.Observable.timer(5000);

            // or maybe an ajax call?  Use `defer` so that the call does not
            // start until concatAll() actually subscribes.
            return Rx.Observable.defer(function () { return Rx.Observable.ajaxAsObservable(...); });
        }
        else {
            // do something synchronous but model it as an async operation (using Observable.return)
            // Use defer so that the sync operation is not carried out until
            // concatAll() reaches this item.
            return Rx.Observable.defer(function () {
                return Rx.Observable.return(someSyncAction(item));
            });
        }
    })
    .concatAll() // consume each inner observable in sequence
    .subscribe(function (result) {
    }, function (error) {
        console.log("error", error);
    }, function () {
        console.log("complete");
    });

Para responder a algunos de sus comments...at en algún punto es necesario forzar algunas expectativas en el flujo de funciones. En la mayoría de los lenguajes, cuando se trata de funciones que son posiblemente asincrónicas, las firmas de función son asincrónicas y la naturaleza async vs sync real de la función está oculta como un detalle de implementación de la función. Esto es cierto si está utilizando promesas de JavaScript, observables de Rx, Tareas de c#, Futuros de c++, etc. Las funciones terminan devolviendo una promesa / observable/task/future / etc y si la función es realmente sincrónica, entonces el objeto que devuelve ya está completado.

Dicho esto, dado que esto es JavaScript, usted puede hacer trampa:

var makeObservable = function (func) {
    return Rx.Observable.defer(function () {
        // execute the function and then examine the returned value.
        // if the returned value is *not* an Rx.Observable, then
        // wrap it using Observable.return
        var result = func();
        return result instanceof Rx.Observable ? result: Rx.Observable.return(result);
    });
}

someObservable
    .map(makeObservable)
    .concatAll()
    .subscribe(function (result) {
    }, function (error) {
        console.log("error", error);
    }, function () {
        console.log("complete");
    });
 23
Author: Brandon,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2014-02-19 13:49:35

En primer lugar, mueva sus operaciones asincrónicas fuera de subscribe, no está hecho para operaciones asincrónicas.

Lo que usted puede utilizar es mergeMap (alias flatMap) o concatMap. Estoy mencionando ambos, porque concatMap es en realidad mergeMap con el parámetro concurrent establecido en 1. Esto es útil, ya que a veces desea limitar el número de consultas simultáneas, pero aún así ejecutar un par concurrente.

source.concatMap(item => {
  if (item == 'do-something-async-and-wait-for-completion') {
    return Rx.Observable.timer(5000)
      .mapTo(item)
      .do(e => console.log('okay, we can continue'));
    } else {
      // do something synchronously and keep on going immediately
      return Rx.Observable.of(item)
        .do(e => console.log('ready to go!!!'));
    }
}).subscribe();

También te mostraré cómo puedes limitar la tarifa de tus llamadas. Palabra de asesoramiento: Solo el límite de velocidad en el punto donde realmente lo necesita, como cuando se llama a una API externa que solo permite un cierto número de solicitudes por segundo o minutos. De lo contrario, es mejor limitar el número de operaciones simultáneas y dejar que el sistema se mueva a la velocidad máxima.

Comenzamos con el siguiente fragmento:

const concurrent;
const delay;
source.mergeMap(item =>
  selector(item, delay)
, concurrent)

A continuación, necesitamos elegir valores para concurrent, delay y aplicar selector. concurrent y delay están estrechamente relacionados. Por ejemplo, si queremos ejecutar 10 elementos por segundo, podemos usar concurrent = 10 y delay = 1000 (milisegundo), pero también concurrent = 5 y delay = 500 o concurrent = 4 y delay = 400. El número de elementos por segundo siempre será concurrent / (delay / 1000).

Ahora vamos a implementar selector. Tenemos un par de opciones. Podemos establecer un tiempo de ejecución mínimo para selector, podemos agregar un retardo constante, podemos emitir los resultados tan pronto como estén disponibles, podemos emitir el resultado solo después de que el retardo mínimo haya pasado, etc. Incluso es posible añadir un tiempo de espera mediante el uso de la timeout operadores. Comodidad.

Establecer tiempo mínimo, enviar resultado temprano:

function selector(item, delay) {
   return Rx.Observable.of(item)
     .delay(1000) // replace this with your actual call.
     .merge(Rx.Observable.timer(delay).ignoreElements())
}

Establecer tiempo mínimo, enviar resultado tarde:

function selector(item, delay) {
   return Rx.Observable.of(item)
     .delay(1000) // replace this with your actual call.
     .zip(Rx.Observable.timer(delay), (item, _))
}

Agregar tiempo, enviar resultado temprano:

function selector(item, delay) {
   return Rx.Observable.of(item)
     .delay(1000) // replace this with your actual call.
     .concat(Rx.Observable.timer(delay).ignoreElements())
}

Añadir tiempo, enviar resultado tarde:

function selector(item, delay) {
   return Rx.Observable.of(item)
     .delay(1000) // replace this with your actual call.
     .delay(delay)
}
 4
Author: Dorus,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2016-07-28 17:01:55

Otro ejemplo sencillo para realizar operaciones asincrónicas manuales.

Tenga en cuenta que no es una buena práctica reactiva ! Si solo desea esperar 1000 ms, utilice Rx.Observable.temporizador o operador de retardo.

someObservable.flatMap(response => {
  return Rx.Observable.create(observer => {
    setTimeout(() => {
      observer.next('the returned value')
      observer.complete()
    }, 1000)
  })
}).subscribe()

Ahora, reemplace setTimeout por su función async, como Image.onload o FileReader.onload ...

 0
Author: TeChn4K,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-06-12 10:35:37