Zwracanie streamów? Współbieżne streamy.

Co zwracać - kolekcje czy streamy? Kiedy wywoływać parallel() na streamie?


Ten wpis jest częścią serii, w której tworzę wpisy na podstawie wybranego tematu z książki Effective Java (3rd edition 2018), której autorem jest Joshua Bloch. Jest to uaktualnione wydanie pod Jave 9 jednej z najlepszych książek o Javie. Nie ograniczam się jednak tylko do książki, więc czasem temat będzie rozbudowany i trafią się informacje z innych źródeł na ten sam temat.

Ten wpis nawiązuje do tematu z Item 47, 48 z rozdziału 7:

Lambdas and Streams


Zwracanie kolekcji zamiast stremów

Pisząc metodę zwracającą sekwencję obiektów, powinniśmy przede wszystkim zwrócić jakiś typ kolekcji, a nie tylko Stream. No, chyba że jesteśmy pewni, że obiekty te będą używane tylko w streamie. Pisząc jakieś publiczne API najlepiej jest udostępnić obie wersje.

Zwracając na przykład tylko Stream sprawiamy, że klient nie będzie mógł w łatwy sposób przeitreować po elementach za pomocą pętli for-each. Niestety Stream nie rozszerza Iterable, chociaż mógłby. Żeby użyć go w pętli, trzeba by zrobić takie paskudztwo:

// Hideous workaround to iterate over a stream
for  (ProcessHandle ph : (Iterable<ProcessHandle>)
    ProcessHandle.allProcesses()::iterator)

Już lepszym obejściem, gdy chcemy użyć elementów streamu w pętli, jest taki adapter:

// Adapter from  Stream<E> to Iterable<E>
public static <E> Iterable<E> iterableOf(Stream<E> stream) {
    return stream::iterator;
}

Który można użyć potem tak:

for (ProcessHandle p : iterableOf(ProcessHandle.allProcesses())) {
    // Process the process
}

Interfejs Collection jest podtypem Iterable i ma metodę stream, więc zwracając kolekcje mamy od razu dostęp do iteracji i streamów. Również zwykłe tablice udostępniają łatwy dostęp do tego dzięki Arrays.asList i Stream.of.

Równoległe wykonywanie operacji na streamach

Od Javy 5 mamy bibliotekę java.util.concurrent która zawiera współbieżne kolekcje i executor framework. W Javie 7 dodano pakiet fork-join, czyli wydajny framwork do parallel decomposition. W Javie 8 dodano streamy, które mogą działać współbieżnie po wywołaniu tylko jednej metody - parallel(). Dzięki tym mechanizmom pisanie wielowątkowych programów staje się coraz łatwiejsze, ale napisanie ich, aby działały poprawnie i szybko jest tak samo trudne, jak to było wcześniej.

Rozważmy taki program:

// Stream-based program to generate the first 20 Mersenne primes
public static void main(String[] args) {
    primes().map(p -> TWO.pow(p.intValueExact()).subtract(ONE))
        .filter(mersenne -> mersenne.isProbablePrime(50))
        .limit(20)
        .forEach(System.out::println);
}

static Stream<BigInteger> primes() {
    return Stream.iterate(TWO, BigInteger::nextProbablePrime);
}

Generuje on pierwsze 20 liczb pierwszych Mersenne’a, nie jest jednak ważne, co dokładnie to robi. Na moim laptopie zajmuje to około 14 sekund. Załóżmy teraz, że naiwnie chciałbym przyspieszyć ten proces, wywołując parallel(), przez co wszystkie operację wywołane zostaną współbieżnie. Czy teraz będzie szybciej? Czy trochę wolniej? Niestety - nie pokaże się nic, a zużycie procesora wskoczy na 100%.

Co się stało? Ano biblioteka streamów nie ma pojęcia jak wykonać tę operację wielowątkowo. Nawet w dobrych warunkach, jeśli źródło streamu to Stream.iterate lub jest obecna operacja limit, to wywołanie parallel() nie przyniesie dobrego rezultatu.

Morał jest więc prosty - nie wywołujmy metody parallel() na streamie bezmyślnie.

Z reguły, najlepszy zysk na wydajności zyskujemy wtedy, gdy robimy streamy na instancjach klas takich jak: ArrayList, HashMap, HashSet, ConcurrentHashMap oraz tablicach i IntStream.range()/LongStream.range().

To, co mają wspólnego, to to, że mogą być dokładnie i dosyć tanim kosztem podzielone na mniejsze części, co ułatwia rozdzielenie pracy pośród wiele wątków. Używany jest do tego spliterator, który jest zwracany przez wywołanie metody spliterator() na Stream lub Iterable.

Również specyfika końcowych operacji wpływa na efektywność współbieżnego przetwarzania. Jeśli więcej pracy jest wykonywane w operacji kończącej niż w tych modyfikujących, to wywołanie parallel() nie wiele zmieni.

Najlepsze operacje końcowe, które najlepiej działają współbieżnie to “redukcje”, gdzie wszystkie elementy są łączone za pomocą jednej z metod redukujących w Stream lub gotowe metody takie jak min, max, count i sum. Równie dobrze sprawdzają się operację takie jak anyMatch, allMatch, i noneMatch. Do tej grupy nie należą jednak collect, która jest nieco bardziej kosztowna.

Używając zwykłego forEach na streamie, który wykonywany jest współbieżnie, kolejność elementów nie zostanie zachowana. Aby dostać na koniec tę samą kolejność trzeba użyć forEachOrdered.

Nawet używając źródła streamu, które można łatwo podzielić na kawałki, mało kosztowne operacje kończące i niekolidujące obiekty funkcyjne, nie uzyskamy lepszej wydajności z wywoływania parallel jeśli w streamie nie będą wykonywane ciężkie operacje, które przewyższą koszt przetwarzania wielowątkowego.

Przykład streamu, gdzie równoległe wykonywanie ma sens i przynosi duży zysk wydajności:

// Prime-counting stream pipeline - benefits from parallelization
static long pi(long n) {
    return LongStream.rangeClosed(2, n)
        .mapToObj(BigInteger::valueOf)
        .filter(i -> i.isProbablePrime(50))
        .count();
}

Na moim laptopie, policzenie tym prostym sposobem ilości liczb pierwszych mniejszych od 100 000 zajmuje 32 sekundy. Dodanie parallel():

// Prime-counting stream pipeline - parallel version
static long pi(long n) {
    return LongStream.rangeClosed(2, n)
        .parallel()
        .mapToObj(BigInteger::valueOf)
        .filter(i -> i.isProbablePrime(50))
        .count();
}

redukuje czas do 9 sekund.


Jeśli uważasz, że to co robię jest przydatne, polub stronę bloga na Facebooku. Wrzucam tam m.in. informacje o nowych wpisach, o promocjach dla programistów i inne.