Idiomatisches "fork-join" mit Java Streams

Vielleicht bin ich gerade betriebsblind, aber … : Ich suche eine Möglichkeit, ein „fork-join“ auf Java Streams zu machen.

(Das „fork-join“ steht in Anführungszeichen, weil ich nichts meine, was mit Fork/Join zu tun hat)

Damit meine ich, grob:

  • Man hat einen Stream von Listen (oder Listen-ähnlichen Objekten)
  • Man will den behandeln wie eine Liste von Streams (mit den entsprechenden Elementen)
  • Man will auf jeden dieser Streams eine Funktion anwenden (z.B. irgendeinen Reducer)
  • Man will die Ergebnisse dieser Funktionen als eine Liste bekommen.

Grob angedeutet im Pseudocode:
(Editiert, siehe unten)

import java.util.Arrays;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Stream;

public class StreamForkJoin
{
    public static void main(String[] args)
    {
        // A stream of lists (all with the same size)
        int listSize = 3;
        List<List<Integer>> lists = Arrays.asList(
            Arrays.asList(1,2,3),
            Arrays.asList(2,3,4),
            Arrays.asList(3,4,5),
            Arrays.asList(4,5,6));
        Stream<List<Integer>> listStream = lists.stream();
        
        // A function that is applied to a stream of elements
        // (namely, a stream of the elements of one index of the lists)
        Function<Stream<Integer>, Integer> reducer = stream -> 
            stream.mapToInt(Number::intValue).sum();
            
        // This is where the magic should happen - pseudocode:
        //List<Integer> actual = listStream
        //    .fork(listSize, index -> reducer)
        //    .join(listSize); 
        
        List<Integer> expected = Arrays.asList(10, 14, 18);
    }
}

Und im Moment sehe ich keine Möglichkeit, die ein vielversprechendes Verhältnis zwischen „Implementierungsaufwand“ und „Sinnhaftigkeit“ hat. Man könnte das natürlich ganz pragmatisch machen: Den Stream<List<T>> echt umwandeln in eine List<Stream<T>>, dann die Streams verarbeiten, und sich am Ende die Liste bauen. Aber das würde dem Grundgedanken von Streams widersprechen, denn man müßte den Stream<List<T>> erst in eine List<List<T>> umwandeln. Ansätze mit verschachtelten Collector-objekten oder händisch zusammengedengelten Pseudo-Streams die man sich aus Spliterator-objekten baut erscheinen mir kompliziert, und würden wohl immer noch nicht erlauben, das ganze idiomatisch auszuführen.

Übersehe ich was?

ja, dafür gibt es meines Erachtens nur Maps mit Collectors.partitioningBy() oder Collectors.groupingBy() sowie dann später Collectors.toList().

Ein KSKB auf Basis des KSKBs, das ich gepostet habe, würde ich schätzen. Die Aussage geht ansonsten nicht hinaus über ~„Hmja, das müßte irgendwie (einfach) gehen“, und das ist genau das, was ich auch erst gedacht habe, und was ich jetzt nicht mehr denke (weswegen ich auch diesen Thread erstellt habe).

Hm, wieso nicht „einfach“ so:

	int listSize = 3;
	List<List<Integer>> lists = Arrays.asList(
			Arrays.asList(1, 2, 3), 
			Arrays.asList(2, 3, 4),
			Arrays.asList(3, 4, 5), 
			Arrays.asList(4, 5, 6));
	Stream<List<Integer>> listStream = lists.stream();

	Function<List<Integer>, Integer> classifier1 = l -> l.stream().mapToInt(Number::intValue).sum();

	//Map<Integer, List<List<Integer>>> map = listStream.collect(Collectors.groupingBy(classifier));

	List<Integer> expected = listStream.map(classifier1).collect(Collectors.toList());

	System.out.println(expected);

Ich sehe nichts besseres als

public <A,B> List<B> convertListStream(Stream<List<A>> lists, Function<List<A>, B>> reducer) {
    return lists.map(Collection::stream)
                     .map(reducer)
                     .collect(Collectors.toList());
}

Aber ob sich das als Utility-Methode lohnt, wage ich zu bezweifeln. Sowas würde vielleicht als Extension-Methode in Kotlin gut aussehen, aber in Java bringt es nicht viel.

Hm. Sorry. Das Beispiel war falsch. :flushed:

Die letzte Liste hatte ich erst eingefügt, nachdem ich gesehen hatte, dass die dummy-Beispiel-Zahlen nicht eindeutig waren:

1 2 3
2 3 4
3 4 5 

und die darauffolgende Änderung in expected war genau die falsche :flushed: Dass man das ganze zeilenweise anwenden kann, ist klar. Aber der Kommentar beim reducer war (r/w)ichtig:

A function that is applied to a stream of elements
(namely, a stream of the elements of one index of the lists)

Ich meine aber, dass es spaltenweise angewendet werden sollte. Deswegen auch „fork-join“.

streamOfLists = 
 (11, 12, 13),
 (21, 22, 23),
 (31, 32, 33),
 (41, 42, 44),
   |   |   |
   |   |   | apply reducer to each column
   v   v   v
  104 108 112

Nur als Beispiel für das Ergebnis. Ein paar Details würde man noch verallgemeinern, wenn man es als building block verwenden wollte. Aber das unschöne hier ist, dass man erst alle Listen aus dem Stream in einer List<List<T>> sammelt, um darauf dann die (spaltenweise) Ansicht als List<T> bzw. Stream<T> aufzubauen.

import java.util.AbstractList;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.RandomAccess;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class StreamForkJoinWithListInFork
{
    public static void main(String[] args)
    {
        // A stream of lists (all with the same size)
        int listSize = 3;
        List<List<Integer>> lists = Arrays.asList(
            Arrays.asList(1, 2, 3), 
            Arrays.asList(2, 3, 4),
            Arrays.asList(3, 4, 5), 
            Arrays.asList(4, 5, 6));        
        Stream<List<Integer>> listsStream = lists.stream();
        
        // A function that is applied to a stream of elements
        // (namely, a stream of the elements of one index of the lists)
        Function<Stream<Integer>, Integer> reducer = stream -> 
            stream.mapToInt(Number::intValue).sum();
            
        // This is where the magic should happen - pseudocode:
        List<Integer> actual = join(fork(listSize, listsStream), reducer);
        
        List<Integer> expected = Arrays.asList(10, 14, 18);
        
        System.out.println("expected " + expected);
        System.out.println("actual   " + actual);
    }
    
    private static <T> List<Stream<T>> fork(
        int listSize, Stream<? extends List<? extends T>> listsStream)
    {
        List<Stream<T>> streams = new ArrayList<Stream<T>>();
        List<? extends List<? extends T>> lists = 
            listsStream.collect(Collectors.toList());
        for (int i = 0; i < listSize; i++)
        {
            int c = i;
            class Column extends AbstractList<T> implements RandomAccess
            {
                @Override
                public T get(int index)
                {
                    return lists.get(index).get(c);
                }

                @Override
                public int size()
                {
                    return lists.size();
                }
            }
            Column column = new Column();
            streams.add(column.stream());
        }
        return streams;
    }
    
    private static <T, R> List<R> join(
        List<? extends Stream<T>> streamsList, 
            Function<? super Stream<T>, R> mapper)
    {
        return streamsList.stream().map(mapper).collect(Collectors.toList());
    }
    
}

Nix, was nicht schon einmal gefragt wurde :wink: : https://stackoverflow.com/a/61412869

Kurzum, auf Spaltenebene ist das mit Streams nicht direkt möglich.

Hmja, der hat schon einen 2D-Array, d.h. strukturell das gleiche, wie das, was ich im letzten Beitrag mit der List<List<T>> angedeutet hatte. Hätte ja sein können. Schade, eigentlich.

Wie wäre es damit:

    int listSize = 3;
    List<List<Integer>> lists = Arrays.asList(
            Arrays.asList(11, 12, 13),
            Arrays.asList(21, 22, 23),
            Arrays.asList(31, 32, 33),
            Arrays.asList(41, 42, 43));
   IntStream.range(0, listSize)
            .map(i-> lists.stream().map(l-> l.get(i)).mapToInt(x-> x).sum())
            .forEach(System.out::println);

Auch da ist es so, dass das ganze am Anfang als List<List<T>> vorliegen muss, und nicht als Stream<List<T>> - letzteres wird halt in dem map mehrmals neu erzeugt.

Bin mal kurz kreativ geworden:

Ok ich verstehe jetzt, was du meinst.

Bei einem Stream wird für jedes Element immer die komplette Kette abgearbeitet, bevor das nächste dran ist. Hier wird jedoch aus jedem Element immer wieder ein Teil benötigt. Somit kann das Ganze nur mit einem Speicher außerhalb des Streams funktionieren.

Hier würde sich dann eine Map, wie in dem StackOverflow-Link von CyborgBeta anbieten.