ich habe das erste mal Java 8 in der Hand und wollte nun die Streams nutzen. mein Anwendungsfall sieht nun so aus:
Ich habe eine Liste von IDs, die prinzipiell beliebig lang sein kann. Nun möchte ich alle Datensätze aus der DB holen, die in der Liste auftauchen:
[SQL]SELECT * FROM FOO WHERE ID IN (1,2,3,4);[/SQL]
Hier könnte ich das Problem bekommen, dass die Liste zu lang werden könnte und mir die Datenbank einen Fehler wirft. Erster Gedanke wäre, für jede einzelne ID eine Abfrage. Könnte dann aber doch etwas langsam werden. Also ein Mittelding: In einer Batchgröße von z.B. 50 abfragen.
Mein erster Versuch scheitert doch glatt daran, dass ich ‚i‘ als final deklarieren müsste und damit nicht verändern dürfte
List<Long> allElements = new LinkedList<Long> ();
List<Long> temp = new LinkedList<Long>();
keys.stream().forEach(x->{
temp.add(x);
if(i++ % 20 == 0){
//SELECT * FROM FOO WHERE ID IN #{temp} => Add to allElements
temp.clear();
}
});```
Ich habe auch irgendwie das Gefühl noch nicht die volle Power der Streams zu nutzen.
Ja, das wäre ja auch mein Standardweg gewesen:```List batch = new LinkedList();
for (Long item : keys) {
batch.add(item);
if (batch.size() == 20) {
fillMap(mapName, map, sess, batch);
batch.clear();
}
}
//Wenn noch was vorhanden ist,dann noch den Rest holen
if (!batch.isEmpty()) {
fillMap(mapName, map, sess, batch);
}```
Ich dachte nur, man kann das auch mit Streams elegant lösen und wollte es nun auch mal versuchen. Aber alles was ich bisher gefunden habe, ist zu kompliziert und ich habe das Gefühl, dass meine foreach Schleife schöner zu lesen ist, als alles, was ich bei Streams gefunden habe. Aber vielleicht ist das auch nur das Unbekannte Neue :).
PS: Achso… und meine Foreach Schleife kann ich nicht so ohne weiteres parallel abarbeiten. Ein paar DB Abfragen könnten ja schon parallel laufen.
den doppelten Code zu vermeiden (//Wenn noch was vorhanden ist) sehe ich bei solchen Konstrukten als Hauptaufgabe,
ich bemühe teils extra Iterator-Klassen die an deren Ende noch ein null oder sonstiges besonders Objekt nachschieben,
so dass in der Schleife darauf reagiert werden kann
hat Java 8 hier zufällig auch was Gutes fertig im Gürtel?
Aus Java 8 sind aus dem ursprünglichen Vorschlag ein paar Sachen rausgeflogen, insb. Streams.zip und ein paar Dinge aus anderen Sprachen werden auch noch vermisst.
Ich vermisse z.B. eine partition-Funktion die ich aus Clojure kenne. Ich habe mich mal versucht eine Nachzubauen.
import java.util.Arrays;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class Partitioning {
public static void main(String[] args) {
List<Integer> numbers = Arrays.asList(3, 4, 7, 9, 10, 22, 43);
partition(numbers, 3).forEach(System.out::println);
partition(numbers, 3).map(l -> l.stream().reduce((a, b) -> a + b)).forEach(System.out::println);
partition(numbers, 4).map(Partitioning::query).forEach(System.out::println);
partition(numbers, 3).map(Partitioning::square).flatMap(x -> x.stream()).forEach(System.out::println);
}
public static String query(List<Integer> numbers) {
return "Query: " + numbers.toString();
}
public static List<Integer> square(List<Integer> numbers) {
return numbers.stream().map(x -> x * x).collect(Collectors.toList());
}
public static <T> Stream<List<T>> partition(List<T> numbers, int size) {
return numbers.stream().map(new Function<T, Pair<T>>() {
int index = 0;
@Override
public Pair<T> apply(T n) {
return new Pair<T>(index++, n);
}
}).collect(Collectors.groupingBy(p -> p.index / size)).values().stream()
.map(t -> t.stream().map(x -> x.value).collect(Collectors.toList()));
}
private static class Pair<T> {
private final int index;
private final T value;
public Pair(int index, T value) {
super();
this.index = index;
this.value = value;
}
}
}```
Die partition-Funktion versieht jedes Element mit einem Index. Danach Gruppiert es die Elemente anhand des Index auf Listen mit einer maximalen Größe, sodass man eine Map erhält (Key: index/maximale Größe, Value: Liste der Elemente). Danach noch ein wenig umformen und man erhält einen Stream<List<T>>.
Jede Liste in diesem Stream überschreitet, dann eine maximale Batchgröße nicht.
So kann dann jedes Element im Stream per map einzeln verarbeitet werden.
Ich habe mal ein Paar Beispiele wie man dann mit dieser partition-Funktion und dessen Ergebnis weiterarbeiten kann.
Beispiel 1: Gibt die partitionierten Listen aus.
[3, 4, 7]
[9, 10, 22]
[43]
Beispiel 2: Summiert die einzelnen Listen auf und gibt die Ergebnisse aus
Optional[14]
Optional[41]
Optional[43]
Beispiel 3: Ruft die QueryFunktion mit partitionierten Listen auf.
Query: [3, 4, 7, 9]
Query: [10, 22, 43]
Beispiel 4: Ruft die Funktion square auf, die ihrerseits wieder eine Liste zurückliefert.
Daraufhin wird mit Flatmap aus dem Stream von List von Integer ein einzelner Stream von Integer, der dann per foreach ausgegeben wird.
Mit unregistreds Lösung reduziert sich der Code soweit:
//Batches erstellen von 50 Keys
List<List<Long>> l2 = udb.partition(new LinkedList<Long>(keys), 50).collect(Collectors.toList());
//pro CPU Kern wird ein Thread gestartet und jede Batch aus der DB geholt
l2.parallelStream().forEach(batch-> fillMap(mapName, map, sess, batch));
Ich habe über das ganze noch mal reflektiert und bin zu der Erkenntnis gekommen, das das ganze nicht ganz so optimal ist.
Eine partitions-Funktion halte ich vom Ansatz her immer noch richtig, allerdings bin ich mit dieser Implementierung nicht zufrieden, da das alles viel zu aufwendig ist.
Also eine Version ohne Streams, Lambdas, ausser vielleicht der Rückgabetyp?
List<List<T>> result = new ArrayList<List<T>>();
List<T> current = new ArrayList<T>();
for (T t : numbers) {
current.add(t);
if (current.size() >= size) {
result.add(current);
current = new ArrayList<T>();
}
}
if (!current.isEmpty()) {
result.add(current);
}
return result.stream();
}```
Wesentlich lesbarer, vernünftiger, bis auf den Punkt das ich erstmals die vorletzten 3 Zeilen vergessen habe :mad:
Vorallem muss ich die Streams nicht mehrmals durchlaufen, nicht unzählige Instanzen von Pair erstellen etc.pp.
Aber irgendwie kann ich mich dennoch nicht damit anfreunden, dass dies mit der Stream-Api nur so umständlich sein kann.
Also nochmal recherchiert was möglich ist und dann bin ich auf diese Lösung gestossen:
```public static <T> Stream<List<T>> partition(Stream<T> numbers, int size) {
Spliterator<T> x = numbers.spliterator();
List<List<T>> acc = new ArrayList<List<T>>();
partition(x, size, acc);
return acc.stream();
}
private static <T> void partition(Spliterator<T> split, int size, List<List<T>> acc) {
if (split.estimateSize() > size) {
Spliterator<T> r = split.trySplit();
partition(r, size, acc);
partition(split, size, acc);
} else {
final List<T> list = new ArrayList<>();
split.forEachRemaining(list::add);
acc.add(list);
}
}```
Spliterator! Was ist das und wie funktioniert er?
Auf einem Stream kann man einen Spliterator erstellen.
Ruft man auf diesem nun trySplit() auf erhält man einen weiteren Spliterator.
Nun hat man zwei Spliteratoren denen man der Funktion forEachRemaining einen Consumer übergeben kann.
(Vergleichbar dem forEach auf Stream, aber mit dem Unterschied, dass nur auf dem eigenen Split iteriert wird).
Also ein Divide and Conquer gepullt. Divide => solange splitten bis die estimateSize kleiner der Obergrenze ist.
Conquer => eine Liste aus dem Teil-Stream bauen.
Und zum Schluss das ganze mittels Accumulator zusammenklauben.
Funktioniert soweit ich sehen kann ganz gut. Für den Conquer Teil, hätte ich gerne den Collector von Collectors.toList verwendet. Klappt aber nicht so ganz.
Soweit so gut fürs erste.
Ich bin ja der Meinung, dass man bei so billigen Queries durchaus jeden einzelnen Record anfordern kann, wenn man parallel in einem anderen Thread dessen Werte in ein Bean stopft und dises dann per Listener an die GUI weitergibt.
Die ganzen superduper AJax-SinglePage Webanwendungen machen das doch auch so.
Kommt immer auf das Query drauf an. Wenn du direkt auf die Entität gehst, dann mag das sein, aber wenn es ein View ist, können schon ein paar ms mehr ins Land gehen.
Allerdings gehe ich direkt auf die Entität aber ich muss davon ausgehen, dass im schlimmsten Falle ein paar 100.000 Einträge gelesen werden (Extremfall … Normalfall wäre 0). Also du siehst, ich betreibe keine Optimierung für Standardfälle, sondern will eigentlich den Extremfall so gut es geht abfedern. Allerdings muss ich zugeben, dass ich gar nicht weiß, wie groß der Overhead pro Query ist … das ist nur eine Gefühlssache. Was meint ihr dazu? Lieber ID nach ID, oder die IDs in größere Happen zusammenfassen?
Kurz zur Erläuterung: Es geht um eine InMemory Queue, welche eine Tabelle in der DB als Queuestore nutzt. Sollte die Anwendung neu gestartet werden, dann sollen die Einträge geladen werden, falls welche drin stehen. Normalerweise sollte sie leer sein, aber man kennt das ja … es gibt auch Zeiten wo Normalerweise nicht ist und die Queue verstopft. Dann sollte der Anwendungsneustart nicht unnötig lange dauern.
Warum? Wenn ich nicht verstehen muss, wie die partition Methode funktioniert und sie einfach nur nutzen muss, sieht das alles doch ziemlich kurz und leserlich aus. Was spricht dagegen?
Rein auf Grund von Gefühlen so komplizierte Konstruktionen ist keine gute Idee. IMHO.
Mach das allereinfachste was geht und damit ein paar Performance-Tests.
Könnte, könnte könnte. Wenn du nicht weißt, wie viele es durchschnittlich sind - dann hat das alles keinen Sinn. Natürlich ist der Overhead groß (vor allem wenn die DB remote ist, die Netzwerklatenz erschlägt alle deine Optimierungen), aber wer weiß? Veilleicht sind es ja nur 750 IDs im Normalfall, dann machst du eben 750 Queries. Why not? Weil es langsam sein „könnte“? Die Frage ist, wen das stört.
na ja, where ID IN (1,2,3,4) ist natürlich am besten. Das „könnte“ hier stört doch ganz gewaltig.
Spasseshalber könntest du auch MIN und MAX deiner Liste bestimmen, per SQL alle holen mit MIN<=ID<=MAX und dann beim Client filtern: möglich ist vieles…aber ob du das brauchst?