Java Stream: approfondimento avanzato

J

In questo articolo di approfondimento andremo a capire più in dettaglio l’uso degli Stream in Java. Al fine di poter comprendere quanto verrà analizzato in seguito è importante conoscere alcuni concetti base come l’uso di Optional, lambda e method reference.

Per chi avesse ancora dei dubbi o intenda fare un piccolo ripasso ecco alcuni articoli trattati in questo blog:

 

Stream reference

Una delle cose più importanti da ricordare è che gli Stream non sono riusabili perchè sono pensati per poter eseguire una serie finita di operazioni sui loro elementi ma non per eseguirne lo store.

E’ possibile quindi istanziare uno Stream e usare la sua referenza con con tutte le intermediate operation, anche concatenandole, ma l’esecuzione di una terminal operation lo rende inacessibile. Cerchiamo di chiarire bene il concetto con un esempio molto semplice.

Stream<String> stream = Stream.of("bianco", "rosso", "giallo", "blu", "verde");
boolean anyMatchE = stream.sorted().anyMatch(col -> col.contains("e"));
stream.filter(col -> col.contains("e"));

In questo caso abbiamo utilizzato uno Stream contenente una sequenza di colori ( riga 1 ) e abbiamo eseguito una serie di operazioni sui suoi elementi. In particolare possiamo notare che la riga 2 esegue, portando a termine, un ordinamento e un match sugli elementi che contengono la lettera “e”. Eseguendo l’ultima istruzione ( riga 3 ) ci accorgeremo che verrà rilanciata una IllegalStateException con il seguente messaggio:

java.lang.IllegalStateException: stream has already been operated upon or closed

Il motivo è molto semplice: il metodo anyMatch() è una terminal operation e quindi lo Stream viene chiuso precludendo ogni successivo riutilizzo. L’esempio corretto per eseguire le istruzioni dovrebbe quindi essere:

List<String> colori = Arrays.asList("bianco", "rosso", "giallo", "blu", "verde");
boolean anyMatchE = colori.stream().sorted().anyMatch(col -> col.contains("e"));
colori.stream().filter(col -> col.contains("e"));

 

Multithreading

Fino a questo momento abbiamo sempre utilizzato delle operazioni eseguite in serie. Le Stream API ci semplificano la vita anche per quanto riguarda il multithreading con il metodo parallelStream() eseguendo le operazioni sugli elementi in parallelo. Nel seguente esempio il metodo executeOperation() viene eseguito il parallelo per ogni elemento dello Stream.

colori.parallelStream().forEach(colore -> executeOperation(colore));

Nel caso in cui la sorgente non sia una Collection o un array è possibile eseguire le operazioni in parallelo con il metodo parallel().

IntStream parallelStream = IntStream.range(1, 100).parallel();

Le operazioni in parallelo vengono eseguite semplicemente usando il framework ForkJoin come veniva implementato prima di Java 8 ma rendendo ora tutto più semplice e in stile funzionale. Una limitazione di tutto ciò è che ( almeno per ora ) non è possibile assegnare uno specifico thread poll ma viene utilizzato quello di ForkJoinPool.commonPool() condiviso dall’intera applicazione. Un aiuto ci viene dato da varie librerie sviluppate dagli utenti che permettono di usare dei custom parallel collectors. Una di queste che ho potuto provare e che mi sento di consigliare la potete trovare a questo indirizzo.

Ricordiamo infine che un parallel stream può essere riconvertito in sequenziale in ogni momento con il metodo sequential().

serialStream = parallelStream.sequential();

Il metodo isParallel() ci indicherà sempre la modalità attuale dello Stream.

 

Ordine di esecuzione

Per chi non lo avesse già intuito le operazioni possono avere un peso diverso a seconda dell’ordine in cui vengono eseguite. Esiste una regola che generalmente va rispettata per ottimizzare le prestazioni ponendo nel giusto ordine le operazioni ed è la seguente:

Le intermediate operation che riducono la grandezza di uno Stream ( es: filter(), distinct()) vanno messe sempre prima delle operazioni che prendono in considerazione tutti gli elementi dello Stream ( es: map() ).

 

Lazy invocation

Un concetto molto importante da tenere sempre in considerazione è che tutte le intermediate operation sono lazy, vengono eseguite solo se sono necessarie per l’escuzione della terminal operation. Per capire bene il concetto utilizziamo il seguente esempio:

List<String> coloriList = Arrays.asList("bianco", "rosso", "giallo", "blu", "verde");

Stream<String> coloriStream = coloriList.stream().filter(colore -> {
    System.out.println("Colore: " + colore);
    return colore.startsWith("g");
});

In base a quanto detto prima in questo caso non esiste una terminal operation ma solo una intermediate ( filter() ) che non verrà mai eseguita.

Aggiungiamo ora un’altra intermediate operation e una terminal operation e vediamo cosa succede.

coloriList.stream().filter(colore -> {
    System.out.println("Colore: " + colore);
    return colore.startsWith("g");
}).map(colore -> {
    System.out.println(String.format("Numero lettere del colore %s: %d", colore, colore.length()));
    return colore.length();
}).findFirst();

Tenendo presente che le operazioni vengono eseguite in sequenza, in questo caso abbiamo la terminal operation findFirst() che chiude la chiamata quindi tutte le intermediate operation filter() e map() vengono eseguite. Poniamo particolare attenzione al fatto che anche se lo Stream è composto da 5 elementi il metodo filter() verrà chiamato solo 3 volte cioè fino a soddisfare la terminal operation ( trova il primo ). Ecco l’output della console all’esecuzione del codice:

Colore: bianco
Colore: rosso
Colore: giallo
Numero lettere del colore giallo: 6

 

Reduction

Con reduction si intende eseguire delle terminal operation che aggregano gli elementi dello Stream restituendo un type o un primive. Le Stream API ci mettono a disposizione metodi come min(), max(), count(), ecc… per eseguire delle reduction semplici ma abbiamo anche la possibilità di avere altri metodi per creare delle custom reduction come reduce() che si basa su un accumulator e collect() che si basa invece su un Collector.

Reduce

Per utilizzare questo metodo è fondamentale conoscere il ruolo di tutti i parametri che possono essere utilizzati:

  • identity: rappresenta il valore iniziale dell’accumulatore.
  • accumulator: specifica la funzione che viene eseguita per aggregare gli elementi dello Stream. In questo caso ricordiamo che devono essere prese in seria considerazione le performance perchè la funzione viene eseguita per ogni elemento dello Stream creando ogni volta un nuovo valore di risultato dove solo l’ultimo è quello che viene considerato.
  • combiner: concettualmente simile a quanto detto prima si tratta però di una aggregatore di accumulatori, utilizzato nelle operazioni in parallelo per aggregare i risultati dei vari thread.

Accumulator:

OptionalInt result = IntStream.range(1, 6).reduce((a, b) -> a * b);

result = 1 * 2 * 3 * 4 * 5 = 120

Identity + Accumulator:

int num = IntStream.range(1, 6).reduce(10, (a, b) -> a * b);

num = 10 * 1 * 2 * 3 * 4 * 5 = 1200

Identity + Accumulator + Combiner

int num = Arrays.asList(1, 2, 3, 4, 5).parallelStream().reduce(10, (a, b) -> a * b, (a, b) -> a + b);

num = 10 * 1 + 10 * 2 + 10 * 3 + 10 * 4 + 10 * 5 = 150

Attenzione: l’esempio riportato qui sopra è quello corretto perchè abbiamo utilizzato parallelStream(). Se utilizziamo invece il metodo stream()il risultato è 1200 perchè il combiner non viene utilizzato e il tutto si riduce a quanto descritto nell’esempio Identity + Accumulator cioè:

num = 10 * 1 * 2 * 3 * 4 * 5 = 1200

Collect

L’altra terminal operation con la quale possiamo eseguire delle reduction è reppresentata dal metodo collect() che utilizza un Collector per definirne il meccanismo. Nel precedente articolo abbiamo già visto che esistono dei collectors predefiniti che coprono le operazioni più comuni ( toList(), toSet(), joining(), grouping(), ecc.. ). Oltre a questi che sono i metodi di maggior utilizzo esistono però altri usi che  possono essere davvero interessanti.

Utilizziamo l’esempio delle marche e modelli dell’articolo precedente, Ecco quali usi potremo farne attraverso il collect dello Stream:

List<Marca> marche = new ArrayList<>();
marche.add(new Marca("Fiat", Arrays.asList("Panda", "Punto", "500")));
marche.add(new Marca("Renault", Arrays.asList("Clio", "Twingo")));
marche.add(new Marca("Ford", Arrays.asList("Focus", "C-Max")));

Somma dei modelli presenti

int summ = marche.stream().collect(Collectors.summingInt(marca -> marca.getModelli().size()))
Risultato: 7

Media dei modelli per marca

double average = marche.stream().collect(Collectors.averagingInt(marca -> marca.getModelli().size()));
Risultato: 2.3333333333333335

Informazioni statistiche

IntSummaryStatistics stat = marche.stream().collect(Collectors.summarizingInt(marca -> marca.getModelli().size()));
Risultato: IntSummaryStatistics{count=3, sum=7, min=2, average=2,333333, max=3}

In questo caso il metodo summarizingInt() restituisce una classe IntSummaryStatistics che contiene i metodi di accesso di tutte le proprietà di cui abbiamo visualizzato il toString().

Abbiamo visto nell’articolo precedente come è possibile ottenere una Map con il metodo groupingBy(), un altro modo può essere quello di ottenerla attraverso un Predicate con il metodo partitioningBy(). Possiamo così ottenere una Map con una chiave booleana e come valore una lista di marche divise tra quelle che hanno più o meno di 2 modelli

Map<Boolean, List<Marca>> mapPartioned = marche.stream().collect(Collectors.partitioningBy(marca -> marca.getModelli().size() > 2))

Un altro utilizzo può essere quello che ci viene proposto dal javadoc del metodo collectingAndThen() e cioè quella di eseguire un collect, ad esempio, su una List e restituire una UnmodifiableList

List<String> unmodifiableList = marche.stream().flatMap(marca -> marca.getModelli().stream()).collect(Collectors.collectingAndThen(Collectors.toList(), Collections::unmodifiableList));

 

L’ultimo esempio che possiamo presentare è quello del caso in cui tutti questi collector non riescano a soddisfare particolari esigenze e quindi c’è il bisogno di crearne uno custom. Cercando in rete esistono molti howto su come crearne quindi non mi cimenterò a spiegare tutti i passaggi anche perchè il modo è piuttosto semplice perchè si tratta di crearlo utilizzando il metodo of() di Collector.

Collector.of(supplier, accumulator, combiner, finisher, characteristics)

Se comunque qualcuno dovesse incontrare qualche difficoltà è sempre possibile lasciare un commento all’articolo per avere un aiuto.

Oltre Java 8

La politica dei rilasci di Java ormai è cambiata e le nuove versioni hanno una cadenza di 6 mesi. Come è ovvio aspettarsi, soprattutto con nuove features, molti saranno i cambiamenti e gli Stream non fanno eccezione. In questo momento non ho la possibilità di avere tutte le varie versioni installate e siccome si tratta solo di piccoli miglioramenti vi lascio i link ad alcuni articoli in cui vengono analizzati.

Java 9: Stream improvements.

Java 10: New Collectors method to accumulate unmodifiable collections

Java 11: Immutable/Unmodifiable Stream API Collectors

 

Conclusioni

Eccoci giunti alla fine di questi 2 articoli che riassumono più o meno  tutto quello che a partire da Java 8 è stata l’indroduzione degli Stream. Non entro nel merito delle questioni che animano le discussioni di chi usa questo linguaggio e chi no e cioè se l’indroduzione degli Stream sia stata fatta bene, male, in modo completo, superficiale, ecc… Quello che penso è che nonostante tutto la necessità di introdurre novità come Stream, Optional, lambda era molto sentita e auspicata.

Mi auguro comunque che si continui a cavalcare quest’onda di cambiamenti e che vengano migliorati anche altri aspetti di Java.

A proposito di me

Gianluca Fattarsi

Perito informatico, esperienza come programmatore nell'ambito lavorativo dal lontano 2002. La sua formazione in Java spazia dal Front-End al Back-End con esperienze in gestione database e realizzazione di App Android in Kotlin. Hobby preferiti: Linux, lettura e World of Warcraft.

I nostri Partner

Gli articoli più letti

Articoli recenti

Commenti recenti