Divida Rx Observable en múltiples flujos y procese individualmente
Aquí hay una imagen de lo que estoy tratando de lograr.
{a-b-c-a bb bbb {a
Dividido en
A a-----a-- - - - - - a stream> una secuencia
----b-- - - - - bbb - - - > > b stream
------c---------- --> c stream
Entonces, ser capaz de
a.subscribe()
b.subscribe()
c.subscribe()
Hasta ahora, todo lo que he encontrado ha dividido la secuencia usando un groupBy(), pero luego colapsó todo en una sola secuencia y los procesó todos en la misma función. Lo que quiero hacer es procesar cada flujo derivado de una manera diferente.
La forma en que lo estoy haciendo ahora es haciendo un montón de filtros. ¿Hay una mejor manera de hacer esto?
3 answers
No tienes que colapsar Observables
desde groupBy
. En su lugar, puede suscribirse a ellos.
Algo como esto:
String[] inputs= {"a", "b", "c", "a", "b", "b", "b", "a"};
Action1<String> a = s -> System.out.print("-a-");
Action1<String> b = s -> System.out.print("-b-");
Action1<String> c = s -> System.out.print("-c-");
Observable
.from(inputs)
.groupBy(s -> s)
.subscribe((g) -> {
if ("a".equals(g.getKey())) {
g.subscribe(a);
}
if ("b".equals(g.getKey())) {
g.subscribe(b);
}
if ("c".equals(g.getKey())) {
g.subscribe(c);
}
});
Si las sentencias se ven un poco feas, pero al menos puedes manejar cada secuencia por separado. Tal vez haya una manera de evitarlos.
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2015-03-04 16:58:14
Fácil como un pastel, solo use filter
Un ejemplo en scala
import rx.lang.scala.Observable
val o: Observable[String] = Observable.just("a", "b", "c", "a", "b", "b", "b", "a")
val hotO: Observable[String] = o.share
val aSource: Observable[String] = hotO.filter(x ⇒ x == "a")
val bSource: Observable[String] = hotO.filter(x ⇒ x == "b")
val cSource: Observable[String] = hotO.filter(x ⇒ x == "c")
aSource.subscribe(o ⇒ println("A: " + o), println, () ⇒ println("A Completed"))
bSource.subscribe(o ⇒ println("B: " + o), println, () ⇒ println("B Completed"))
cSource.subscribe(o ⇒ println("C: " + o), println, () ⇒ println("C Completed"))
Solo necesita asegurarse de que la fuente observable esté caliente. La forma más fácil es share
.
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2015-03-05 09:08:15
Su pregunta está etiquetada con rx-java y rxjs, así que pensé que publicaría una respuesta para rxjs.
Tuve el mismo problema y usé la partición para resolver el problema. Esto es básicamente lo que hice:
import { from } from 'rxjs';
import { partition } from 'rxjs/operators';
const source = from([{category: 1, value: 10}, {category: 2, value: 20}]);
const categories = source.pipe(partition(v => v.category));
for (c of categories) {
c.pipe(
// etc
).subscribe(val => {
// etc
});
}
NOTA: No probé este ejemplo. Si encuentra algún problema, edite la publicación. ¡Gracias!
Parafraseé este ejemplo de la documentación oficial: https://www.learnrxjs.io/operators/transformation/partition.html
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2018-10-02 15:09:08