Threading, Streaming - Best Practice/ Workflow?

Hallo Leute,

habe folgendes Szenario:

Ich ziehe über verschiedene Sockets Daten (String, Double) in mein laufendes Programm.
Jetzt möchte ich diese Variablen permanent abfragen, ob bestimmte Eigenschaften vorhanden sind. (z.B. die Höhe des Double Wertes) und dann darauf reagieren.
Da ich eine Oberfläche mit Swing öffne und bereits die Socket Streams als Threads laufen lasse, stelle ich mir die Frage, wie man so etwas am besten löst.
Spreche jetzt nicht vom Quellcode, sondern ganz theoretisch…

Wäre hier ein Producer Consumer Pattern zu bevorzugen oder einfach alles über synchronized Methodiken abzuhandeln? Eventuell liege ich auch falsch und Threads sind hier fehl am Platz… eine andere Lösung um auf zeitversetzte Inputs zu warten (und natürlich nichts einzufrieren: while(true)) kenne ich nur nicht…

Auf der Ebene, wie die Frage gestellt ist, kann man natürlich nur etwas unspezifisch antworten.

Es klingt erstmal, als wäre das ein klassisches Producer-Consumer. Man kann reinkommende Daten von einem Thread in die BlockingQueue legen lassen, und ein anderer Thread wartet darauf, aus dieser BlockingQueue Sachen rauszunehmen. D.h. auf der Ebene, wie es bisher beschrieben wurde, sehe ich nichts, was dagegen spricht. (Fragen, wie etwa ob die Queue “unbounded” sein soll etc. sind dann ggf. noch zu klären)

(Dass man die, wenn man sie an Swing weiterreichen will, mit SwingUtilities.invokeLater in den EventDispatchThread packen muss, ist ein Detail, das hier erstmal vermutlich keine Rolle spielt).

Es gibt Leute, die sagen (oder zumindest andeuten) dass man als “normaler” Programmierer synchronized/wait/notify praktisch nicht mehr verwenden sollte. Es ist wirklich sehr low-level, und für die meisten Aufgaben gibt es in den concurrent-Packages schon was passendes. (Und wenn man sich bei einigen dieser Klassen die Implementierungen ansieht, bekommt man auch eine Ahnung, warum es sinnvoll sein kann, da die Finger davon zu lassen :wink: )

Danke schonmal.
Jetzt wird es expliziter:

Ich lese alle paar Sekunden Daten (Double) von einem Socket Stream ein.
Diese werden in eine Collections.synchronizedList geschrieben, welcher maximal 100 Werte enthalten darf. (Ich lösche den ersten [0] Eintrag ab und setze per add am Ende wieder den neuesten Value hinzu)

public final List<Double> data = Collections.synchronizedList(new ArrayList<>());;

Der Stream Thread läuft permanent.
Jetzt will ich aus einem anderen Thread welcher ebenfalls permanent läuft über diese Daten iterieren und mir z.B. den Collections.max Value ziehen.

Da beide Dinge permanent laufen sollen (while irgendeine Bedingung true), frage ich mich hier, was wohl die beste Möglichkeit darstellt.

Soll die ArrayList volatile gesetzt werden? (Andernfalls erhalte ich SynchronisationsExceptions, da während der Iteration geschrieben wird.
Ich darf aber auch nicht “warten” und Werte verlieren, z.B. wenn ich mit dem Streaming auf die Iteration warten würde…

Ist jetzt wieder etwas theoretisch, weil ich den Code aus Copyrightgründen nicht veröffentlichen darf…

Mit Java 9 gibt es die Flow API um damit Reactive Programming im JDK.

Reactive Programming geht schon vor Java 9 mit den entsprechenden Frameworks, mit Java 9 gibt es aber die Flow API die hier vereinheitlichen soll. RxJava zum Beispiel.

Kurz um finde ich, dass dies der Weg ist, wenn man High-Level an die Sache rangehen möchte. Dort spricht man dann von Publish-Subscribe.

Oracle zu dem Thema:
https://community.oracle.com/docs/DOC-1006738

Ein Vortrag über die Reactive API, dessen Beispiel doch eine gewisse Ähnlichkeit zu dem hier beschriebenen Anwendungsfall hat. Ab Minute 18:20

Das würde praktisch nichts verändern. Das würde nur die Referenz selbst volatile machen, aber das hat mit ihrem Inhalt nichts zu tun.

Falls ich das jetzt richtig vertanden habe, ist das im Pseudocode ja sowas

void threadA() {
    while (true) {
        double d = read();
        list.add(d);
        while (list.size() > 100) list.remove(0);
    }
}
void threadB() {
    while (true) {
        double d = Collections.max(list); 
        showInGui(d);
    }
}

richtig?

Da stellen sich schon einige Detailfragen zur Taktung. Ich meine, wenn das so dasteht, wird der zweite Thread ja ggf. bei 100% CPU-Last Rechenzeit verbrennen.

Das Problem mit der ConcurrentModificationException (was du wohl mit “SynchronisationsExceptions” meintest) läßt sich leicht umgehen. Im Zweifelsfall einfach keine ArrayList sondern eine CopyOnWriteArrayList verwenden.

Aber je nachdem, wie die Taktung oder der allgemeine Ablauf sein soll, könnte es sinnvoll sein, da eine andere Form der Synchronization drumzuwickeln. Das könnte ein pragmatisches synchronized(list) sein, das man in die while-Schleifen packt. Aber wenn man z.B. das UI nur updaten will, wenn sich an der Liste auch etwas geändert hat, könnte eine andere Strategie sinnvoller sein. Vielleicht irgendwas mit einem https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CountDownLatch.html , aber das ist nur ein Gedanke. Mehr Infos könnten da helfen.