Semaphore:
private static final Semaphore sem = new Semaphore(1);
Producer:
public void msg(String msg) {
try {
JSONObject o = new JSONObject(msg);
try {
sem.acquire();
list.add(new El(o.getString("$schemaRef"), o.getJSONObject("header"), o.getJSONObject("message")));
} finally {
sem.release();
}
} catch (Exception ignored) {
}
}
Er wird ca. 1000mal pro Minute aufgerufen, also eigentlich kaum Gefahr einer Verklemmung. Dennoch möchte ichs ja richtig machen.
Consumer:
new Thread(new Runnable() {
@Override
public void run() {
try {
while (true) {
ArrayList<El> list2 = new ArrayList<El>();
try {
sem.acquire();
labels[0].setText(list.size() + " list1");
list2 = list.stream().filter(e -> {
int l = e.schema.length();
return l > 0 && e.schema.charAt(l - 1) - '0' > 1;
}).collect(Collectors.toCollection(ArrayList::new));
} finally {
sem.release();
}
labels[1].setText(list2.size() + " list2");
for (El e : list2) {
System.out.println(e.message.toString(4));
System.out.println();
}
Thread.sleep(10000);
}
} catch (InterruptedException ie) {
ie.printStackTrace();
}
}
}).start();
Meine Frage wäre, ob man (da „übersichtlicher“, imo) gefahrlos auch schreiben könnte im Producer:
public void msg(String msg) {
try {
JSONObject o = new JSONObject(msg);
sem.acquire();
list.add(new El(o.getString("$schemaRef"), o.getJSONObject("header"), o.getJSONObject("message")));
} catch (Exception ignored) {
} finally {
sem.release();
}
}
new JSONObject(msg);
kann einen Fehler hervorrufen, „release“ würde dann gecalled, obwohl zuvor nicht „acquire“ gecalled wurde.
Im Java Doc Kommentar zu „release“ steht folgendes:
…
There is no requirement that a thread that releases a permit musthave acquired that permit by calling acquire.Correct usage of a semaphore is established by programming conventionin the application.
Wenn der interne Zähler >1 werden könnte, dann könnte es ja zu Verklemmungen kommen…
Hier ist ein minimales Beispiel für die Reproduktion:
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Semaphore;
public class SemClazz {
private static final Semaphore sem = new Semaphore(1);
private static final List<Integer> queue = new LinkedList<Integer>();
// consume
private static void consume() {
try {
sem.acquire();
System.out.println(queue.size());
} catch (InterruptedException ie) {
System.out.println(ie.getMessage());
} finally {
sem.release();
}
}
// produce
private static void produce() {
try {
if (Math.random() < 0.1) {
throw new InterruptedException("Something went wrong...");
}
sem.acquire();
queue.add((int) (Math.random() * 1000.0));
} catch (InterruptedException ie) {
System.out.println(ie.getMessage());
} finally {
sem.release();
}
}
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
try {
for (int i = 0; i < 1000; i++) {
produce();
Thread.sleep(1);
}
} catch (InterruptedException ie) {
ie.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
try {
for (int i = 0; i < 1100; i++) {
consume();
Thread.sleep(1);
}
} catch (InterruptedException ie) {
ie.printStackTrace();
}
}
}).start();
}
}
Die Ausgabe ist jetzt zum Beispiel 910 anstatt 1000…
Ist das Verhalten des Semaphore nicht falsch bzw. in dem Java-Doc-Kommentar nicht ausreichend beschrieben?
Ich freue mich über jede Antwort.
Sorry, das Problem trat auf, weil add
(im catch-Block) nicht mehr gecalled wurde…
Neu:
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Semaphore;
public class SemClazz {
private static final Semaphore sem = new Semaphore(1);
private static final List<Integer> queue = new LinkedList<Integer>();
// consume
private static void consume() {
try {
sem.acquire();
System.out.println(queue.size());
} catch (InterruptedException ie) {
System.out.println(ie.getMessage());
} finally {
sem.release();
}
}
// produce
private static void produce() {
try {
if (Math.random() < 0.1) {
// throw new InterruptedException("Something went wrong...");
}
sem.acquire();
queue.add((int) (Math.random() * 1000.0));
} catch (InterruptedException ie) {
System.out.println(ie.getMessage());
queue.add((int) (Math.random() * 1000.0));
} finally {
sem.release();
}
}
public static void main(String[] args) {
for (int i = 0; i < 1000; i++) {
new Thread(new Runnable() {
@Override
public void run() {
produce();
}
}).start();
}
new Thread(new Runnable() {
@Override
public void run() {
try {
for (int i = 0; i < 1100; i++) {
consume();
Thread.sleep(1);
}
} catch (InterruptedException ie) {
ie.printStackTrace();
}
}
}).start();
}
}
versus
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Semaphore;
public class SemClazz {
private static final Semaphore sem = new Semaphore(1);
private static final List<Integer> queue = new LinkedList<Integer>();
// consume
private static void consume() {
try {
sem.acquire();
System.out.println(queue.size());
} catch (InterruptedException ie) {
System.out.println(ie.getMessage());
} finally {
sem.release();
}
}
// produce
private static void produce() {
try {
if (Math.random() < 0.1) {
throw new InterruptedException("Something went wrong...");
}
sem.acquire();
queue.add((int) (Math.random() * 1000.0));
} catch (InterruptedException ie) {
System.out.println(ie.getMessage());
queue.add((int) (Math.random() * 1000.0));
} finally {
sem.release();
}
}
public static void main(String[] args) {
for (int i = 0; i < 1000; i++) {
new Thread(new Runnable() {
@Override
public void run() {
produce();
}
}).start();
}
new Thread(new Runnable() {
@Override
public void run() {
try {
for (int i = 0; i < 1100; i++) {
consume();
Thread.sleep(1);
}
} catch (InterruptedException ie) {
ie.printStackTrace();
}
}
}).start();
}
}
Also 1000 vs. zum Beispiel 997. Jetzt tritt das Problem auf, weil die Semaphore dem Producer mehrmals den Eintritt gestattet, und dadurch Produkte abhandenkommen.