RxJS: ¿Cómo actualizaría "manualmente" un Observable?


Creo que debo estar malinterpretando algo fundamental, porque en mi mente este debería ser el caso más básico para un observable, pero para la vida de mi no puedo averiguar cómo hacerlo desde los documentos.

, Básicamente, quiero ser capaz de hacer esto:

// create a dummy observable, which I would update manually
var eventObservable = rx.Observable.create(function(observer){});
var observer = eventObservable.subscribe(
   function(x){
     console.log('next: ' + x);
   }
...
var my_function = function(){
  eventObservable.push('foo'); 
  //'push' adds an event to the datastream, the observer gets it and prints 
  // next: foo
}

Pero no he podido encontrar un método como push. Estoy usando esto para un controlador de clics, y sé que tienen Observable.fromEvent para eso, pero estoy tratando de usarlo con React y preferiría ser capaz de simplemente actualizar el datastream en una devolución de llamada, en lugar de usar un sistema de manejo de eventos completamente diferente. Así que básicamente quiero esto:

$( "#target" ).click(function(e) {
  eventObservable.push(e.target.text()); 
});

Lo más cercano que tuve fue usar observer.onNext('foo'), pero eso no parecía funcionar realmente y eso se llama al observador, lo cual no parece correcto. El observador debe ser la cosa que reacciona al flujo de datos, no cambiarlo, ¿verdad?

¿Simplemente no entiendo la relación observador/observable?

Author: Michael, 2015-10-25

2 answers

En RX, Observer y Observable son entidades distintas. Un observador se suscribe a un Observable. Un Observable emite elementos a sus observadores llamando a los métodos de los observadores. Si necesita llamar a los métodos observer fuera del ámbito de Observable.create() puede usar un Sujeto, que es un proxy que actúa como observador y Observable al mismo tiempo.

Puedes hacer así:

var eventStream = new Rx.Subject();

var subscription = eventStream.subscribe(
   function (x) {
        console.log('Next: ' + x);
    },
    function (err) {
        console.log('Error: ' + err);
    },
    function () {
        console.log('Completed');
    });

var my_function = function() {
  eventStream.next('foo'); 
}

Puede encontrar más información sobre los temas aquí:

 115
Author: luisgabriel,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2018-03-05 14:12:16

Creo Observable.create () no toma a un observador como parámetro de devolución de llamada, sino como emisor. Así que si quieres añadir un nuevo valor a tu Observable prueba esto:

var emitter;
var observable = Rx.Observable.create(e => emitter = e);
var observer = {
  next: function(next) {
    console.log(next);
  },
  error: function(error) {
    console.log(error);
  },
  complete: function() {
    console.log("done");
  }
}
observable.subscribe(observer);
emitter.next('foo');
emitter.next('bar');
emitter.next('baz');
emitter.complete();

//console output
//"foo"
//"bar"
//"baz"
//"done"

Sí Sujeto lo hace más fácil, proporcionando Observable y Observador en el mismo objeto, pero no es exactamente el mismo, ya que Sujeto le permite suscribir múltiples observadores al mismo observable cuando un observable solo envía datos al último observador suscrito, así que úselo conscientemente. Aquí hay un JsBin si quiero jugar con él.

 23
Author: theFreedomBanana,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-01-09 21:40:33