Divida Rx Observable en múltiples flujos y procese individualmente


Aquí hay una imagen de lo que estoy tratando de lograr.

{a-b-c-a bb bbb {a

Dividido en

A a-----a-- - - - - - a stream> una secuencia

----b-- - - - - bbb - - - > > b stream

------c---------- --> c stream

Entonces, ser capaz de

a.subscribe()
b.subscribe()
c.subscribe()

Hasta ahora, todo lo que he encontrado ha dividido la secuencia usando un groupBy(), pero luego colapsó todo en una sola secuencia y los procesó todos en la misma función. Lo que quiero hacer es procesar cada flujo derivado de una manera diferente.

La forma en que lo estoy haciendo ahora es haciendo un montón de filtros. ¿Hay una mejor manera de hacer esto?

Author: Brandon Bil, 2015-03-04

3 answers

No tienes que colapsar Observables desde groupBy. En su lugar, puede suscribirse a ellos.

Algo como esto:

    String[] inputs= {"a", "b", "c", "a", "b", "b", "b", "a"};

    Action1<String> a = s -> System.out.print("-a-");

    Action1<String> b = s -> System.out.print("-b-");

    Action1<String> c = s -> System.out.print("-c-");

    Observable
            .from(inputs)
            .groupBy(s -> s)
            .subscribe((g) -> {
                if ("a".equals(g.getKey())) {
                    g.subscribe(a);
                }

                if ("b".equals(g.getKey())) {
                    g.subscribe(b);
                }

                if ("c".equals(g.getKey())) {
                    g.subscribe(c);
                }
            });

Si las sentencias se ven un poco feas, pero al menos puedes manejar cada secuencia por separado. Tal vez haya una manera de evitarlos.

 9
Author: ihuk,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2015-03-04 16:58:14

Fácil como un pastel, solo use filter

Un ejemplo en scala

import rx.lang.scala.Observable

val o: Observable[String] = Observable.just("a", "b", "c", "a", "b", "b", "b", "a")
val hotO: Observable[String] = o.share
val aSource: Observable[String] = hotO.filter(x ⇒ x == "a")
val bSource: Observable[String] = hotO.filter(x ⇒ x == "b")
val cSource: Observable[String] = hotO.filter(x ⇒ x == "c")

aSource.subscribe(o ⇒ println("A: " + o), println, () ⇒ println("A Completed"))

bSource.subscribe(o ⇒ println("B: " + o), println, () ⇒ println("B Completed"))

cSource.subscribe(o ⇒ println("C: " + o), println, () ⇒ println("C Completed"))

Solo necesita asegurarse de que la fuente observable esté caliente. La forma más fácil es share.

 35
Author: Tomáš Dvořák,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2015-03-05 09:08:15

Su pregunta está etiquetada con rx-java y rxjs, así que pensé que publicaría una respuesta para rxjs.

Tuve el mismo problema y usé la partición para resolver el problema. Esto es básicamente lo que hice:

import { from } from 'rxjs';
import { partition } from 'rxjs/operators';

const source = from([{category: 1, value: 10}, {category: 2, value: 20}]);

const categories = source.pipe(partition(v => v.category));
for (c of categories) {
    c.pipe(
        // etc
    ).subscribe(val => {
        // etc
    });
}

NOTA: No probé este ejemplo. Si encuentra algún problema, edite la publicación. ¡Gracias!

Parafraseé este ejemplo de la documentación oficial: https://www.learnrxjs.io/operators/transformation/partition.html

 0
Author: James Watkins,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2018-10-02 15:09:08