Thread-Kommunikation BlockingQueue

Hallo,

ich habe ein Objekt A, welches mehrere Threads startet, die untereinander per BlockingQueues kommunizieren.
Nun möchte ich von außen, also von einem anderen Objekt B aus, öffentliche Methoden aus A aufrufen um Daten in die Queues von A zu “schütten”. Die Aufrufe können allerdings auch aus verschiedenen Threads erfolgen. Die Aufrufer müssten so nichts von den Queues wissen.
Allerdings müsste ich vermutlich (?) diese öffentlichen Methoden threadsicher machen oder?
Alternativ könnte ich auf die Methoden verzichten und die einfach die sowieso threadsicheren Queues öffentlich machen - mit dem Nachteil, weniger abstrakt zu sein weil mehr Implementationsdetails öffentlich werden …

Was mein Ihr?

Danke für Eure Unterstützung
Peter

Nein, das musst du nicht, weil die Blockingqueue threadsicher ist. Wenn du diese Methode threadsicher machen würdest, indem du sie z. B. synchronisierst, dann verlierst du ggf. einiges an Performance.

*** Edit ***

Voraussetzung ist natürlich, dass diese Methode nicht viel mehr macht, als an [japi]BlockingQueue#put(E)[/japi] zu delegieren. Falls du noch weiteren Zustand änderst, kannst du das dann aber in einem synchronized-Block (also nicht die ganze Methode) kapseln.

Beispiel:

    queue.put(task);
    synchronized(this) {
        // change some state
    }
}```

Danke für die Anregungen und einen guten Rutsch ins neue Jahr!

Peter

können eigentlich mehrere Threads auf die selbe Blockingqueue warten ? Und wenn ja, werden dann alle diese Threads gestartet und bekommen sie dann alle die Daten aus der Queue ?

Ja, es können mehrere Threads auf die selbe Blockingqueue warten. Die Daten werden aber nur genau einmal ausgegeben. Das lässt sich dann zur Auftragsverteilung nutzen, wenn man mehrere “Arbeiter” hat, die auf Aufträge in der Queue warten.

lieber nur einen Thread warten lassen, an diesem Listener adden zur Informierung mehrerer,
interessant sind die Details wie ‘auch lesen wenn kein Listener da’, aber kann man ja alles regeln und bauen

Hi, der Hinweis, BlockingQueues zu verwenden war ziemlich gut und ich bin recht weit damit gekommen. Jetzt allerdings stehe ich vor dem Problem, dass ich auf eine queue mit Timeout warten muss. D.h. Ich warte auf Daten und wenn die nicht innerhalb von 100mSec da sind muss ich auf das Ausbleiben reagieren.
Nichts einfacher als das, dachte ich mir, und kodierte das wie folgt:

     return commandAnswer.poll(COMMUNICATIONTIMEOUT, TimeUnit.MILLISECONDS);    
}``` Wobei commandIn und commandAnswer zwei BlockingQueues sind. COMMUNICATIONTIMEOUT ist 100

Das Kommado wird wird in die commandIn Queue geschrieben unter Angabe der gewünschten Queue für die Antwort. Eine anderer Thread sendet das Kommando, holt die Antwort und schreibt sie in die angegebene Antwort-Queue.  Geht auch ganz prima mit take statt poll.

Wie ich aber feststellen muss, wartet die Methode Poll nicht. Habe ich grundsätzlich etwas falsch gemacht oder falsch verstanden mit den BlockingQueues? Hat jemand dazu eine Idee oder einen Hinweise?

Danke

Zu der Aussage passt der Codeschnipsel aber überhaupt nicht. An und für sich scheinst du das alles schon richtig verstanden zu haben.

Wahrscheinlich sitzen im Hintergrund ein oder mehrere Worker-Threads, die ein take() auf der commandIn Queue machen und das Berechnungsergebnis dann in die commandAnswer Queue schreiben.
In dem Codeschnipsel gibst du also einen „Arbeitsauftrag“ in die Queue, der innerhalb von 100ms eingereiht sein muss. Weitere 100ms später muss dann eine Antwort (auf irgendeinen Arbeitsauftrag) vorliegen, ansonsten wird null zurückgegeben.

Nutzt du die [japi]SynchronousQueue[/japi]?

Beschreibe deinen Systemaufbau doch noch einmal ein bisschen detaillierter, vielleicht wird das Problem dann klarer.

Hallo cmrudolph,

im Hintergrund sitzt eine Workertask, die Kommandos über die BlockingQueue commandIn entgegennimmt (take) und per serieller Schnittstelle an ein externes Gerät sendet. Falls das externe Geräte eine Antwort sendet, wird diese ebenfalls in eine eine BlockingQueue geschrieben (nämlich die, die beim Kommando angegben ist!). Das Codeschnipsel oben soll ein Kommando senden und maximal 500mSec auf die Antwort warten.
Zur Verdeutlichung hier noch IoCommand:```public class IoCommand {
int adr; // Adresse des Gerätes
String cmdStr; // auszuführendes Kommando
BlockingQueue answerQueue; // Queue für die Antwort

public IoCommand(int adr, String cmd, BlockingQueue<String> ans)
{
    this.adr=adr;
    this.cmdStr=cmd;
    this.answerQueue=ans;
}  

}```welches der Workertask zur Abarbeitung übergeben wird. Darin ist eine Geräteadresse, das Kommando und die Queue für die Antwort gespeichert. Grund für die Angabe der AntwortQueue ist die Idee, dass mehrere Tasks Anforderungen in die commandIn Queue stellen könne, aber nur Antworten auf ihre eigenen Kommandos lesen wollen/sollen.

So hatte ich mir das auch vorgestellt. Was sich mir aber noch immer nicht erschließt ist, weshalb du offer() statt put() verwendest. Hat die Eingangsqueue eine festgelegte Maximallänge?

Prinzipiell sieht das eigentlich alles ganz gut aus. Hast du einen Test, der die Kommunikation simuliert? Und dort dann die Fälle “Eingangsqueue kann nicht befüllt werden”, “Workertask braucht zu lange” und “Timing passt” durchgespielt?

Meine BlockingQueue ist tatsächlich begrenzt. Grund ist zum einen, dass das Ganze auf einem Raspberry läuft und der Speicher dort verglichsweise klein ist. Gleichzeitig ist der zu erwartende Datendurchsatz zu den Geräten eher klein (ca 300 Byte/sek.).
Die von Dir angegeben Fälle habe ich noch nicht geprüft. Zum Test habe ich aber ein reales Gerät hier vor mir liegen, kann die Kommunikation also ohne SImulation testen.
Wie verhält sich die BlockingQueue, wenn ich sie nicht begrenze und mit put arbeite. Wächst der Speicherbedarf dann bei jedem put, bis der GarbageCollecor zuschlägt oder ist die Implementation intelliegenter ?

Mit einem realen Gerät wird es dir schwer fallen, die besagten Fälle nachzustellen - das ist mit einem automatisierten Test schon schwierig genug.

Das Verhalten ist stark abhängig von der Implementierung. Ein Extrembeispiel ist die oben genannte SynchronousQueue, bei der put und take so lange blockieren, bis an beiden Enden der Queue ein Thread ist. put und take werden dann also nur gleichzeitig ausgeführt.
Eine unlimitierte Queue blockiert beim put normalerweise nicht. Daher wird ein offer darauf immer true zurückgeben.

Der Speicherbedarf ist ebenso abhängig von der Implementierung. Eine ArrayBlockingQueue sollte einen konstanten Speicherbedarf haben, weil die Positionen in der Queue im Konstruktor instantiiert werden. Bei der LinkedBlockingQueue wächst der Speicherbedarf mit der Anzahl eingereihter Elemente, weil sie dynamisch erzeugt werden. Dabei fällt dann aber Garbage an.

Ein kurzer Blick in die Implementierung ist äußerst aufschlussreich.

Ich habe mal eine Mini-Testsuite mit den drei Tests programmiert (mit der heißen Nadel gestrickt, ist also ggf. unsauber):

Tests


import java.util.concurrent.BlockingQueue;

public class IoCommand {
    private final int adr;                                  // Adresse des Gerätes
    private final String cmdStr;                            // auszuführendes Kommando
    private final BlockingQueue<String> answerQueue;        // Queue für die Antwort

    public IoCommand(int adr, String cmd, BlockingQueue<String> ans) {
        this.adr = adr;
        this.cmdStr = cmd;
        this.answerQueue = ans;
    }

    public int getAdr() {
        return adr;
    }

    public String getCmdStr() {
        return cmdStr;
    }

    public BlockingQueue<String> getAnswerQueue() {
        return answerQueue;
    }
}```
```package queues;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

public class CommandExecutor {
    private static final long TIMEOUT = 100L;
    private final BlockingQueue<IoCommand> commandQueue;

    public CommandExecutor(BlockingQueue<IoCommand> commandQueue) {
        this.commandQueue = commandQueue;
    }

    public String executeWithTimeout(int adr, String cmd) {
        BlockingQueue<String> answerQueue = new ArrayBlockingQueue<>(1);
        try {
            if (commandQueue.offer(new IoCommand(adr, cmd, answerQueue), TIMEOUT, TimeUnit.MILLISECONDS))
                return answerQueue.poll(TIMEOUT, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        return null;
    }
}

import org.testng.Assert;
import org.testng.annotations.Test;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

@Test
public class CommandExecutorTest {
    public void returnsAnswerIfTimingIsCorrect() {
        final BlockingQueue<IoCommand> commandQueue = new ArrayBlockingQueue<>(1);
        CommandExecutor commandExecutor = new CommandExecutor(commandQueue);
        int anyNumber = 3;
        String anyString = "dummy";

        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    final IoCommand command = commandQueue.take();
                    command.getAnswerQueue().put("response");
                } catch (InterruptedException ignored) {
                }
            }
        }).start();

        String result = commandExecutor.executeWithTimeout(anyNumber, anyString);

        Assert.assertEquals(result, "response");
    }

    public void returnsNullIfWorkerIsTooSlow() {
        final BlockingQueue<IoCommand> commandQueue = new ArrayBlockingQueue<>(1);
        CommandExecutor commandExecutor = new CommandExecutor(commandQueue);
        int anyNumber = 3;
        String anyString = "dummy";

        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    final IoCommand command = commandQueue.take();
                    Thread.sleep(120);
                    command.getAnswerQueue().put("response");
                } catch (InterruptedException ignored) {
                }
            }
        }).start();

        String result = commandExecutor.executeWithTimeout(anyNumber, anyString);

        Assert.assertNull(result);
    }

    public void returnsNullIfCommandQueueIsFull() throws InterruptedException {
        final BlockingQueue<IoCommand> commandQueue = new ArrayBlockingQueue<>(1);
        CommandExecutor commandExecutor = new CommandExecutor(commandQueue);
        int anyNumber = 3;
        String anyString = "dummy";

        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(120);
                    commandQueue.take();
                    final IoCommand command = commandQueue.take();
                    command.getAnswerQueue().put("response");
                } catch (InterruptedException ignored) {
                }
            }
        }).start();

        commandQueue.put(new IoCommand(anyNumber, null, null));

        String result = commandExecutor.executeWithTimeout(anyNumber, anyString);

        Assert.assertNull(result);
    }
}```

*** Edit ***

Btw: ich würde mich freuen, wenn jemand mit TDD Erfahrung sich die Tests mal ansehen könnte und kurz dazu sagt, ob das so in Ordnung wäre.

Danke für die Mühe. Habe einige interessante Anregungen aus Deinem beispiel entnhemen können. Mit TDD habe ich leider keine Erfahrung und kann es aus diesem Gesichtspunkt nicht bewerten.