Java 8 Stream: diferencia entre limit () y skip()


Hablando de Streams, cuando ejecute este fragmento de código

public class Main {
    public static void main(String[] args) {
        Stream.of(1,2,3,4,5,6,7,8,9)
        .peek(x->System.out.print("\nA"+x))
        .limit(3)
        .peek(x->System.out.print("B"+x))
        .forEach(x->System.out.print("C"+x));
    }
}

Obtengo esta salida

A1B1C1
A2B2C2
A3B3C3

Porque limitar mi flujo a los tres primeros componentes obliga a acciones A, B and C to be executed only three times.

Tratando de realizar un cálculo análogo en los últimos tres elementos mediante el método skip(), muestra un comportamiento diferente: este

public class Main {
    public static void main(String[] args) {
        Stream.of(1,2,3,4,5,6,7,8,9)
        .peek(x->System.out.print("\nA"+x))
        .skip(6)
        .peek(x->System.out.print("B"+x))
        .forEach(x->System.out.print("C"+x));
    }
}

Produce esto

A1
A2
A3
A4
A5
A6
A7B7C7
A8B8C8
A9B9C9

¿Por qué, en este caso, las acciones A1 to A6 are being executed? Debe tener algo que ver con el hecho de que limit es una operación intermedia con estado de cortocircuito, mientras que skip no lo es, pero no entiendo las implicaciones prácticas de esta propiedad. ¿Es solo que "cada acción antes de skip se ejecuta mientras que no todos antes de limit lo es"?

Author: Tagir Valeev, 2015-09-05

4 answers

Lo que tienes aquí son dos tuberías de flujo.

Estas tuberías de flujo consisten cada una en una fuente, varias operaciones intermedias y una operación terminal.

Pero las operaciones intermedias son perezosas. Esto significa que nada sucede a menos que una operación posterior requiera un elemento. Cuando lo hace, entonces la operación intermedia hace todo lo que necesita para producir el elemento requerido, y luego de nuevo espera hasta que se solicite otro elemento, y así sucesivamente.

El terminal las operaciones suelen ser "ansiosas". Es decir, piden todos los elementos en la secuencia que se necesitan para que completen.

Así que realmente deberías pensar en el pipeline como el forEach pidiendo al stream detrás de él el siguiente elemento, y ese stream pide al stream detrás de él, y así sucesivamente, todo el camino hasta la fuente.

Con eso en mente, veamos lo que tenemos con tu primer pipeline: {[23]]}

Stream.of(1,2,3,4,5,6,7,8,9)
        .peek(x->System.out.print("\nA"+x))
        .limit(3)
        .peek(x->System.out.print("B"+x))
        .forEach(x->System.out.print("C"+x));

Entonces, el forEach está pidiendo el primer ítem. Eso significa que la" B " peek necesita una item, y pide al flujo de salida limit para ello, lo que significa que limit tendrá que pedir la "A" peek, que va a la fuente. Un elemento se da, y va todo el camino hasta el forEach, y se obtiene su primera línea:

A1B1C1

El forEach pide otro ítem, luego otro. Y cada vez, la solicitud se propaga por la corriente y se realiza. Pero cuando forEach pide el cuarto elemento, cuando la solicitud llega al limit, sabe que ya ha dado todos los elementos que se le permite dar.

Por lo tanto, no se trata de pedir a la mirada "A" por otro elemento. Indica inmediatamente que sus artículos se agotan, y por lo tanto, no se realizan más acciones y forEach termina.

¿Qué sucede en la segunda tubería?

    Stream.of(1,2,3,4,5,6,7,8,9)
    .peek(x->System.out.print("\nA"+x))
    .skip(6)
    .peek(x->System.out.print("B"+x))
    .forEach(x->System.out.print("C"+x));

Nuevamente, forEach está pidiendo el primer ítem. Esto se propaga de nuevo. Pero cuando llega al skip, sabe que tiene que pedir 6 elementos de su río arriba antes de que pueda pasar uno río abajo. Así que hace una petición aguas arriba de la " A " peek, lo consume sin pasarlo río abajo, hace otra petición, y así sucesivamente. Así que el " A " peek recibe 6 solicitudes para un artículo y produce 6 impresiones, pero estos artículos no se transmiten.

A1
A2
A3
A4
A5
A6

En la 7a solicitud hecha por skip, el artículo se pasa a la " B " peek y de ella a la forEach, por lo que la impresión completa se hace:

A7B7C7

Entonces es como antes. El skip ahora, cada vez que recibe una solicitud, pedirá un elemento aguas arriba y lo pasará aguas abajo, ya que "sabe" que tiene ya ha hecho su trabajo de saltarse. Así que el resto de las huellas están pasando por toda la tubería, hasta que la fuente se agota.

 78
Author: RealSkeptic,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2015-09-05 17:31:48

La notación fluida de la canalización transmitida es lo que está causando esta confusión. Piénsalo de esta manera:

limit(3)

Todas las operaciones canalizadas se evalúan perezosamente, excepto forEach(), que es una operación terminal , que desencadena "ejecución de la canalización".

Cuando se ejecuta la canalización, las definiciones de flujo intermedio no harán ninguna suposición sobre lo que sucede "antes" o "después". Todo lo que están haciendo es tome un flujo de entrada y transformarlo en un flujo de salida:

Stream<Integer> s1 = Stream.of(1,2,3,4,5,6,7,8,9);
Stream<Integer> s2 = s1.peek(x->System.out.print("\nA"+x));
Stream<Integer> s3 = s2.limit(3);
Stream<Integer> s4 = s3.peek(x->System.out.print("B"+x));

s4.forEach(x->System.out.print("C"+x));
  • s1 contiene 9 Integer valores diferentes.
  • s2 observa todos los valores que lo pasan y los imprime.
  • s3 pasa los primeros 3 valores a s4 y aborta la canalización después del tercer valor. s3 no produce más valores. Esto no significa que no haya más valores en la canalización. s2 todavía produciría (e imprimiría) más valores, pero nadie solicita esos valores y así se detiene la ejecución.
  • s4 nuevamente observa todos los valores que lo pasan y los imprime.
  • forEach consume e imprime lo que s4 le pasa.

Piénsalo de esta manera. Toda la corriente es completamente perezosa. Solo la operación de terminal activa extrae nuevos valores de la canalización. Después de que ha sacado 3 valores de s4 <- s3 <- s2 <- s1, s3 ya no producirá nuevos valores y ya no extraerá ningún valor de s2 <- s1. Mientras que s1 -> s2 todavía ser capaz de producir 4-9, esos valores simplemente nunca se extraen de la tubería, y por lo tanto nunca se imprimen por s2.

skip(6)

Con skip(), sucede lo mismo:

Stream<Integer> s1 = Stream.of(1,2,3,4,5,6,7,8,9);
Stream<Integer> s2 = s1.peek(x->System.out.print("\nA"+x));
Stream<Integer> s3 = s2.skip(6);
Stream<Integer> s4 = s3.peek(x->System.out.print("B"+x));

s4.forEach(x->System.out.print("C"+x));
  • s1 contiene 9 Integer valores diferentes.
  • s2 observa todos los valores que lo pasan y los imprime.
  • s3 consume los primeros 6 valores, "salteándolos" , lo que significa que los primeros 6 valores no se pasan a s4, solo los valores posteriores ser.
  • s4 nuevamente observa todos los valores que lo pasan y los imprime.
  • forEach consume e imprime lo que s4 le pasa.

Lo importante aquí es que s2 no es consciente de que la canalización restante omita ningún valor. s2 observa todos los valores independientemente de lo que suceda después.

Otro ejemplo:

Considere esta tubería, que se enumeran en esta entrada del blog

IntStream.iterate(0, i -> ( i + 1 ) % 2)
         .distinct()
         .limit(10)
         .forEach(System.out::println);

Cuando se ejecuta el anterior, el programa nunca se detendrá. ¿Por qué? Porque:

IntStream i1 = IntStream.iterate(0, i -> ( i + 1 ) % 2);
IntStream i2 = i1.distinct();
IntStream i3 = i2.limit(10);

i3.forEach(System.out::println);

Que significa:

  • i1 genera una cantidad infinita de alternancia de valores: 0, 1, 0, 1, 0, 1, ...
  • i2 consume todos los valores que se han encontrado antes, pasando solo valores"nuevos", es decir, hay un total de 2 valores que salen de i2.
  • i3 pasa 10 valores, luego se detiene.

Este algoritmo nunca se detendrá, porque i3 espera a que i2 produzca 8 valores más después de 0 y 1, pero esos valores nunca aparecen, mientras que i1 nunca deja de alimentar valores a i2.

No importa que en algún momento del proceso se hayan producido más de 10 valores. Todo lo que importa es que i3 nunca ha visto esos 10 valores.

Para responder a su pregunta:

Es solo que " cada acción antes de skip se ejecuta mientras que no todos antes de limit es"?

No. Todas las operaciones anteriores a skip() o limit() se ejecutan. En ambas ejecuciones, obtienes A1 - A3. Pero limit() puede cortocircuitar la tubería, abortando el consumo de valor una vez que se ha producido el evento de interés (se alcanza el límite).

 10
Author: Lukas Eder,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2015-09-06 10:00:22

Es una completa blasfemia ver las operaciones de steam individualmente porque no es así como se evalúa una transmisión.

Hablando de limit (3) , es una operación de cortocircuito, lo que tiene sentido porque pensando en ello, cualquier operación es antes de y después de el limit, tener un límite en una secuencia detendría la iteración después de obtener n elementos hasta la operación límite, pero esto no significa que solo n elementos de secuencia serían procesar. Tome esta operación de flujo diferente para un ejemplo

public class App 
{
    public static void main(String[] args) {
        Stream.of(1,2,3,4,5,6,7,8,9)
        .peek(x->System.out.print("\nA"+x))
        .filter(x -> x%2==0)
        .limit(3)
        .peek(x->System.out.print("B"+x))
        .forEach(x->System.out.print("C"+x));
    }
}

Produciría

A1
A2B2C2
A3
A4B4C4
A5
A6B6C6

Que parecen correctas, porque limit está esperando que 3 elementos de flujo pasen a través de la cadena de operación, aunque se procesen 6 elementos de flujo.

 8
Author: Amm Sokun,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2015-09-05 15:54:06

Todos los flujos se basan en spliterators, que tienen básicamente dos operaciones: advance (mover hacia adelante un elemento, similar al iterador) y split (dividirse en posición arbitraria, que es adecuado para el procesamiento en paralelo). Puede dejar de tomar elementos de entrada en cualquier momento que desee (lo que se hace mediante limit), pero no puede saltar a la posición arbitraria (no hay tal operación en la interfaz Spliterator). Por lo tanto skip operación necesita realmente leer los primeros elementos de la fuente solo ignorarlos. Tenga en cuenta que en algunos casos se puede realizar salto real:

List<Integer> list = Arrays.asList(1,2,3,4,5,6,7,8,9);

list.stream().skip(3)... // will read 1,2,3, but ignore them
list.subList(3, list.size()).stream()... // will actually jump over the first three elements
 4
Author: Tagir Valeev,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2015-09-05 15:07:12