Nyheter i Java 8 - del 5 - parallella strömmar
2014-09-09
Detta är den femte delen i miniserien om nyheterna i Java 8. De tidigare delarna har bland annat handlat om de nya sakerna som finns kan finnas i interface samt om strömmar och lambdauttryck och vad man kan göra med dem.
Den här delen skall handla om hur man kan parallellisera exekveringen av vissa typer av program genom att använda parallella strömmar.
Parallella strömmar
Naiv metod för att bestämma om ett tal är ett primtal eller inte
Som ett exempel på hur man kan parallellisera exekveringen av vissa operationer som utförs på strömmar skall vi kolla på hur man kan hitta alla primtal mellan 1 och 10 miljoner. Vi börjar med att skriva en liten, mycket naiv, metod för att bestämma om ett tal är ett primtal eller inte. Notera som sagt att den är mycket naiv. Om någon skulle anklaga mig för att vara upphovsmannen till den skulle jag förneka detta kategoriskt ;) Men den duger bra för att exemplifiera konceptet med parallell exekvering av strömmar.
// Most inefficient algorithm.
private static boolean isPrime(int n) {
if (n == 2) {
return true;
} else if (n <= 1 || n % 2 == 0) {
return false;
}
for (int possibleDivisor = 3; possibleDivisor <= Math.ceil(Math.sqrt(n)); possibleDivisor += 2) {
if (n % possibleDivisor == 0) {
return false;
}
}
return true;
}
Beräkna primtal sekventiellt
Vi kan nu använda den statiska metoden rangeClosed()
i klassen IntStream
(som vi ju pratade om i del 3) för att generera en ström som innehåller en uppsättning med heltal. Vi är intresserade av heltalen mellan 1 och 10 miljoner, så för att generera en ström som innehåller dessa tal kan vi göra så här:
IntStream.rangeClosed(1, 10000000)
För att extrahera ut de av talen som är primtal kan vi sen applicera metoden filter()
på strömmern och använda en metodreferens till vår naiva primtalkollarmetod (som finns i klassen MainClass):
IntStream.rangeClosed(1, 10000000).filter(MainClass::isPrime)
Vill vi sedan samla upp primtalen för vidare bearbetning kan vi använda collect()
. Vi passar även på att lägga en utskrft på hur lång tid exekveringen tar, så vi har något att jämföra med senare. Med dessa ändringar ser vårt lilla program ut så här:
long startTime = System.currentTimeMillis();
List<Integer> primes = IntStream.rangeClosed(1, 10000000).filter(MainClass::isPrime).collect(ArrayList::new, ArrayList::add, ArrayList::addAll);
primes.stream().forEach(System.out::println);
long endTime = System.currentTimeMillis();
System.out.println("Execution took: " + (endTime - startTime) + "ms");
I exemplet så har "den första" varianten av collect()
använts. Detta eftersom det är den enda som finns i klassen IntStream
.
Om man kör ovanstående program på min, väldigt långsamma, "bussdator" ser det ut så här:
lars@larsLilla:~/text/blogg/140903$ java -classpath . MainClass
Execution took: 53197ms
lars@larsLilla:~/text/blogg/140903$
Beräkna primtal parallellt
Hur gör vi nu om vi vill parallellisera exekveringen av primtalssökningen? Ja, först kan vi konstatera att metoden som kollar om ett tal är ett primtal eller inte är tillståndslös. Det är ju bra, då har den ju inga konstiga sidoeffekter som vi behöver ta hänsyn till. Dock modifierar ju metoden filter()
strömmen. Det är ju hela poängen med den metoden. Så hur hanterar vi detta om vi nu vill parallellisera exekveringen?
Tittar man på API-dokumentationen av IntStream
så ser man att den har en metod parallel()
som det står "Returns an equivalent stream that is parallel.". Lysande! Vi provar:
long startTime = System.currentTimeMillis();
List<Integer> primes = IntStream.rangeClosed(1, 10000000).parallel().filter(MainClass::isPrime).collect(ArrayList::new, ArrayList::add, ArrayList::addAll);
long endTime = System.currentTimeMillis();
System.out.println("Execution took: " + (endTime - startTime) + "ms");
...
Execution took: 44177ms
Det var allt vi behövde göra för att parallellisera exekveringen av primtalssökningen. Inte en så stor ändring, eller hur?
Nu var i och för sig minskningen av exekveringstid inte gigantisk. Den blev ungefär 17% kortare, men detta kan nog till viss del bero på att den lilla, klena dator jag använder när jag sitter på bussen och kodar och skriver (som jag gör nu) bara har två kärnor. Kör man exempelprogrammet på den dator jag använder på jobbet blir tiderna istället 3,7s respektive 1,2s. I detta fall går alltså det parallella programmet tre gånger så snabbt som det sekventiella. Inte dåligt!
Man kan också konstatera att jag skulle behöva en ny bussdator.
Andra parallella strömmar
Så vad finns det då för stöd i API:t för att parallellisera exekveringen av operationer på strömmar? Vi har ju redan sett metoden parallel()
i IntStream
. Egentligen är det så att parallel()
finns i interfacet BaseStream
som förutom IntStream
också implementeras av DoubleStream
och LongStream
. Men om det inte är en numerisk ström man vill använda då? Det visar sig att klassen Collection
förutom att ha en metod stream()
också har en metod som heter parallelStream()
. Super! Istället för att använda stream()
och få en sekventiell ström kan vi använda parallelStream()
och få en parallell ström. Vi behöver inte skapa några egna trådar eller trådpoler, bara anropa parallelStream()
.
Varför inte alltid använda parallella strömmar? - sidoeffekter
Det låter ju nästan alltför bra att vi bara genom att anropa BaseStream.parallel()
eller Collection.parallelStream()
kan parallellisera alla program som använder strömmar. Finns det inga hakar då? Såklart det gör. Som med all programmering där parallellism är inblandat så får man hålla tungan rätt i mun. Har man lambdauttryck som uppdaterar något gemensamt tillstånd så är det ingen bra ide att använda parallella strömmar. Man får fundera igenom om operationerna man vi ha utförda verkligen går att exekvera parallellt innan man använder BaseStream.parallel()
eller Collection.parallelStream()
.
Som ett exempel på när det inte blir ett korrekt resultat om parallella strömmar används skall vi skriva ett litet program som givet en lista med heltal ökar varje heltal i listan med en ökande offset (1, 2, ...).
Vi skapar en metod testAddToStream(IntStream stream)
som tar en IntStream
in som parameter. Den använder sedan map
för att addera allt större tal till elementen i strömmen. Vilket tal som skall adderas hålls i en instansvariabel. Detta är ju jättekorkat egentligen eftersom det hade fungerat lika bra med en lokal variabel. Dessutom hade det blivit ett korrekt resultat även för parallella strömmar om så hade skett, men vi gör så bara för att visa ett exempel på hur det kan gå snett med parallella strömmar. Metoden ser i alla fall ut så här:
public class MutableTest
{
private int offset;
...
private Set<Integer> testAddToStream(IntStream stream) {
offset = 0;
return stream.map(i -> i + offset++).collect(HashSet::new, HashSet::add, HashSet::addAll);
}
}
Fyller vi sedan på klassen med en metod som anropar testAddToStream()
med en sekventiell ström och en metod som anropar den med en parallell ström och skriver ut hur många unika heltal som returneras ser klassen ut så här:
public class MutableTest
{
private int offset;
public static void main(String[] args) {
MutableTest instanceOne = new MutableTest();
instanceOne.testSequential();
MutableTest instanceTwo = new MutableTest();
instanceTwo.testParallel();
}
private void testSequential() {
Set<Integer> uniqueNumbers = testAddToStream(IntStream.rangeClosed(1, 100));
System.out.println("Number of unique numbers using sequential stream: " + uniqueNumbers.size());
}
private void testParallel() {
Set<Integer> uniqueNumbers = testAddToStream(IntStream.rangeClosed(1, 100).parallel());
System.out.println("Number of unique numbers using parallel stream: " + uniqueNumbers.size());
}
private Set<Integer> testAddToStream(IntStream stream) {
offset = 0;
return stream.map(i -> i + offset++).collect(HashSet::new, HashSet::add, HashSet::addAll);
}
}
Eftersom vi skapar en ström med 100 heltal förväntar vi oss att utskrifterna skall indikera att testSequential()
och testParallel()
rapporterar att 100 heltal returnerats från testAddToStream()
. Blir det så då?
Nej, inte direkt. Kör man ovanstående klass fås, på min maskin, följande resultat:
ars@larsLilla:~/text/blogg/140903$ java -classpath . MutableTest
Number of unique numbers using sequential stream: 100
Number of unique numbers using parallel stream: 74
lars@larsLilla:~/text/blogg/140903$
Resultatet från olika körningar ger lite olika resultat av testet med den parallella strömmen. 74 och 75 verkar vara de vanligaste resultaten. Inte en enda gång har det blivit 100 i alla fall. Orsaken är givetvis att flera trådar använder samma värde av offset
, vilket gör att duplikat uppträder ni resultatströmmen. När denna görs om till ett Set
elimineras resultaten och antalet unika heltal i minskar. Inga konstigheter, men ett exempel på hur man inte skall använda parallella strömmar.
Varför inte alltid använda parallella strömmar? - gemensam trådpool
Googlar man lite på parallellism i java 8 så hittar man snabbt flera texter som varnar lite för användandet av parallella strömmar. Se till exempel [3] och [4] nedan. Orsaken till varför de råder en att tänka över användandet av parallella strömmar är att de är implementerade med hjälp av JVM:ens fork-join-pool (fjp). Var är det som är farligt med det då? Jo, tydligen delas JVM:ens fjp mellan alla trådar i hela JVM:en. Till saken hör också att fjp:n är implementerad på så sätt att om en en trådarna i poolen hänger på ett blockerande anrop kommer poolen inte att fyllas på med en nystartad tråd, utan antalet tillgängliga trådar i poolen kommer att minska med en. Sammantaget innebär detta att om flera delar av en applikation använder fjp:n, t.ex. genom att använda parallella strömmar, kommer de att kunna störa varandra om pooltrådarna används till blockerande anrop. Om man använder parallella strömmar har man har alltså inte någon kontroll på hur många trådar, om ens någon, som kommer att exekvera ens lambdauttryck. Allt hänger på hur resten av applikatonen använder JVM:ens fjp.
Skriver man en singeltrådad applikation, eller en multitrådad applikation där endast en tråd använder fjp:n, är detta givetvis inte något problem. Det kan dock vara bra att veta om hur kopplingen mellan parallella strömmar och fjp:n ser ut så man kan undvika att använda dem då sådana här krockar skulle kunna inträffa.
Avslutning del 5
Den här gången har vi gått igenom hur parallella strömmar kan användas. Och när de inte bör användas. Jag tycker att de är ett alldeles utmärkt verktyg för vissa sorters problem. I vissa Project Euler-uppgifter hade de snabbat upp mina lösningar avsevärt (det vill säga om jag skrivit dem i Java och inte exekverat dem på min bussdator). Det är nästan så jag vill skriva om en del av mina lösningar till att använda parallella strömmar för att se hur mycket snabbare jag kan få dem att exekvera.
I nästa del i serien skall vi kolla lite på hur man kan parallellisera vissa operationer på arrayer med metoder som är nytillagda i Java 8.
Tidigare delar i serien
Del 1: InterfaceDel 2: Streams och lambda expressions
Del 3: Metoder i Stream, metod- och konstruktorreferenser
Del 4: Metoder klassen Collectors
Referenser
[1] Subramaniam V, Functional Programming in Java, första utgåvan, The Pragmatic Programmers, 2014.[2] The Java Tutorial - Parallelism
[3] Think twice before using Java 8 parallel streams
[4] Java Parallel Streams Are Bad for Your Health!
Leave a reply