Java 8: Beispiele zur Stream-API

von Hubert Schmid vom 2013-12-08

Die Einführung der sogenannten Stream-API ist die größte Änderung der Standardbibliothek in Java 8. Mit ihr sollen Zugriffe auf Daten vereinfacht werden. Doch lohnt sich dafür tatsächlich eine so komplexe API?

C# besitzt eine ähnliche Funktionalität, die 2007 erfolgreich mit der Language Integrated Query (LINQ) vermarktet wurde. Letztere verbindet die Bibliothekserweiterung mit Abfragesprachen für Datenbanken. Die Stream-API von Java 8 bietet diese Möglichkeit bisher nicht. Der Ansatz ist dennoch für die Motivation geeignet.

Mit Datenbankabfragesprachen wie SQL lassen sich Daten sehr einfach und effizient aufbereiten. Immer häufiger wird eine vergleichbare Funktionalität auch außerhalb von Datenbanksystemen benötigt. Doch ohne geeignete Unterstützung für Entwickler entstehen hohe Aufwände in der Realisierung und Wartung bei gleichzeitig ineffizienter Auslastung der Hardware-Ressourcen.

Die Stream-API tritt an, das Problem für Java-Entwickler zu lösen. Einen Eindruck davon werde ich im Folgenden anhand von fünf Beispielen vermitteln, wobei ich jeweils die SQL-Abfrage der Java-Methode gegenüberstelle. Ziel ist das Bewusstsein für dieses Thema zu schärfen.

Datenmodell

Alle Beispiele orientieren sich am Datenmodell einer Q&A-Plattform wie Stack Overflow. Im Mittelpunkt stehen Fragen, die durch den Typ Question modelliert werden. Das folgende Listing enthält ihre Definition, wobei jedoch nur die Eigenschaften enthalten sind, die für die Beispiele benötigt werden. Insbesondere fehlt der Inhalt der Frage sowie eine Referenz auf den Fragesteller.

interface Question { long getQuestionId(); Site getSite(); Set<String> getTags(); int getScore(); int getViewCount(); boolean isAnswered(); }

Das Datenbankschema sieht für die Abbildung zwei Tabellen vor: Die Tabelle question enthält die Spalten question_id, site, score, views und is_answered. Die Tabelle tag bildet in einer 1:n-Relation die Liste der zugeordneten TAGs mit den beiden Spalten question_id und name ab.

Beantwortete Fragen pro Site

Im ersten Beispiel geht es darum, wie viele Fragen pro Site beantwortet wurden. Die zugehörige SQL-Abfrage ist selbsterklärend und im folgenden Listing zu sehen.

SELECT q.site, COUNT(*) FROM question q WHERE q.isAnswered GROUP BY q.site

Die Implementierung in Java 8 mit der Stream-API ist erstaunlich ähnlich. Das folgende Listing nutzt abgesehen von den Typen Question und Site ausschließlich die Java-Standardbibliothek, und im Vergleich zu Java 7 lässt sich zumindest ein gewisser Vorteil erahnen.

Map<Site, Long> getAnsweredCountBySite(List<Question> questions) { return questions.stream() .filter(Question::isAnswered) .collect(groupingBy(Question::getSite, counting())); }

Kumulierte Bewertung beantworteter Fragen pro Site

Das zweite Beispiel unterscheidet sich nur minimal. Anstatt der Anzahl der Fragen werden dieses Mal die Gesamtbewertungen bestimmt. Zunächst wieder die SQL-Abfrage:

SELECT q.site, SUM(q.score) FROM question q WHERE q.isAnswered GROUP BY q.site

Auch diese Aufgabe lässt sich mit der Stream-API sehr gut lösen. Im Vergleich zum vorherigen Beispiel muss lediglich counting() durch summingLong(Question::getScore()) ersetzt werden.

Map<Site, Long> getAnsweredScoreBySite(List<Question> questions) { return questions.stream() .filter(Question::isAnswered) .collect(groupingBy(Question::getSite, summingLong(Question::getScore))); }

Kennzahlen beantworteter Fragen pro Site

Das dritte Beispiel ist eine Kombination aus den beiden vorherigen Beispielen. Zusätzlich zur Anzahl der Fragen und ihrer kumulierten Bewertung wird auch noch die Anzahl der Seitenaufrufe bestimmt. Statt einem Feld werden also Aggregate über gleich drei Felder bestimmt. Für SQL macht das jedoch keinen Unterschied.

SELECT q.site, COUNT(*), SUM(q.score), SUM(q.viewCount) FROM question q WHERE q.isAnswered GROUP BY q.site

Anders sieht es bei der Stream-API aus. Die Standardbibliothek unterstützt von sich aus Aggregationen nämlich nur über einzelne Felder. Doch dieses Beispiel ist einfach genug, um sich mit einer eigenen Collector-Implementierung zu behelfen. Die Methode summingLongs habe ich analog zu den bereits existierenden Methoden benannt, und die Implementierung zeige ich unten.

Map<Site, long[]> getAnsweredMetricsBySite(List<Question> questions) { return questions.stream() .filter(Question::isAnswered) .collect( groupingBy( Question::getSite, summingLongs( q -> 1L, Question::getScore, Question::getViewCount))); }

Beantwortete Fragen pro TAG

Die bisherigen Beispiele beschränkten sich stets auf eine Tabelle beziehungsweise ein Objekt. Das ändert sich nun mit der Abfrage um herauszufinden, welche Tags wie oft in beantworteten Fragen verwendet wurden. Die SQL-Abfrage verwendet dafür einen sogenannten Inner-Join.

SELECT t.name, COUNT(*) FROM question q JOIN tag t ON q.question_id=t.question_id WHERE q.is_answered GROUP BY t.name

Die Entsprechung der Stream-API zum Inner-Join ist die Methode flatMap. Dabei wird jedes Ausgangselement durch einen Stream neuer Elemente ersetzt. In diesem Fall sind die Ausgangselemente die beantworteten Fragen, die durch ihre jeweiligen Tags ersetzt werden. Diese müssen anschließend nur noch gruppiert und gezählt werden.

Map<String, Long> getAnsweredCountByTag(List<Question> questions) { return questions.stream() .filter(Question::isAnswered) .flatMap(q -> q.getTags().stream()) .collect(groupingBy(t -> t, counting())); }

Kennzahlen beantworteter Fragen pro TAG

Eine Besonderheit beim Inner-Join des vorherigen Beispiels war, dass im Anschluss nur noch Felder der zweiten Tabelle verwendet wurden. Solche Fälle sind eigentlich eher die Ausnahme. Viel häufiger werden Informationen aus mehreren Tabellen aufbereitet. Im letzten Beispiel passiert das, indem nicht nur die TAGs gezählt werden, sondern auch der Score und die Anzahl der Seitenaufrufe kumuliert werden.

SELECT t.name, COUNT(*), SUM(q.score), SUM(q.viewCount) FROM question q JOIN tag t ON q.question_id=t.question_id WHERE q.is_answered GROUP BY t.name

Die zugehörigen Anpassungen der SQL-Abfrage sind sehr einfach. Weit weniger einfach ist es mit der Stream-API, denn nun müssen beim Inner-Join zwei Objekte in einem Typ zusammengefasst werden. Eine mögliche Lösung dafür ist in folgendem Listing zu sehen. Zum Einsatz kommt dabei eine anonyme Klasse. Besonders daran ist, dass der konkrete Typ der anonymen Klasse aufgrund der Typinferenz erhalten bleibt, so dass später direkt auf die Felder zugegriffen werden kann.

Map<String, long[]> getAnsweredMetricsByTag(List<Question> questions) { return questions.stream() .filter(Question::isAnswered) .flatMap(q -> q.getTags().stream() .map(t -> new Object() { Question question = q; String tag = t; })) .collect( groupingBy( o -> o.tag, summingLongs( o -> 1L, o -> o.question.getScore(), o -> o.question.getViewCount()))); }

summingLongs

Abschließend möchte ich noch kurz auf die Methode summingLongs eingehen, die an zwei Stellen zum Einsatz kam. Insbesondere kann man daran erkennen, dass sich die Stream-API an manchen Stellen sehr einfach erweitern lässt.

@SafeVarargs public static <T> Collector<T, long[], long[]> summingLongs(ToLongFunction<T>... functions) { return Collector.of( () -> new long[functions.length], (result, value) -> { for (int i = 0; i < functions.length; ++i) { result[i] += functions[i].applyAsLong(value); } }, (lhs, rhs) -> { long[] result = new long[functions.length]; for (int i = 0; i < functions.length; ++i) { result[i] = lhs[i] + rhs[i]; } return result; }); }

Die Methode besteht praktisch nur aus einer return-Anweisung, die eine Instanz der Schnittstelle java.util.stream.Collector erzeugt. Auffällig ist jedoch, wie die Schnittstelle realisiert wird. Obwohl das Verhalten spezifisch ist, wird keine neue Klasse angelegt. Stattdessen werden die Funktionen einer generischen Klasse übergeben, die lediglich den Rahmen bereitstellt. Ein weiterer Anwendungsfall für Lambda-Ausdrücke.