java.util.Random parallelisieren

Hi, ich möchte Random parallelisieren, weil ich eine Partition mit Zufallszahlen beschreiben möchte. Dabei hab ich festgestellt, dass das Beziehen der Zufallszahlen am längsten dauert (ein Thread ist immer voll ausgelastet), und nur mit 20 MiB/s geschrieben werden kann. Daher dachte ich, es wäre klug, wenn mehrere Threads die Zufallszahlen gleichzeitig beziehen würden. Ich brauche jedoch die gleiche Sequenz der Zufallszahlen, die geschrieben werden sollen - und dann später gelesen werden sollen. Jetzt ist die Anwendung aber bei einer CPU-Last von 100% und noch langsamer als zuvor:

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Iterator;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class USBTest {

	public static class MyIter implements Iterable<Integer> {
		private static final int n = 10_000;
		private Deque<Integer> deque = new ArrayDeque<>(n);
		private Random r = new Random(1);
		ExecutorService es = null;

		public MyIter() {
			es = Executors.newFixedThreadPool(4);
			Runnable r = new Runnable() {
				@Override
				public void run() {
					while (!Thread.interrupted()) {
						add();
					}
				}
			};
			for (int i = 0; i < 4; i++) {
				es.execute(r);
			}
		}

		public void stop() {
			if (es != null) {
				es.shutdown();
			}
		}

		private synchronized void add() {
			if (deque.size() < n) {
				deque.add(r.nextInt(256));
				notifyAll();
			}
		}

		private synchronized int get() {
			if (deque.isEmpty()) {
				try {
					wait();
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
			return deque.pollFirst();
		}

		@Override
		public Iterator<Integer> iterator() {
			return new Iterator<Integer>() {
				@Override
				public boolean hasNext() {
					return true;
				}

				@Override
				public Integer next() {
					return get();
				}
			};
		}
	}

	private static float sec(long t1, long t2) {
		return (float) ((t2 - t1) / 1000.0);
	}

	private static float kib(long size) {
		return (float) (size / 1024.0);
	}

	private static float mib(long size) {
		return (float) (size / 1024.0 / 1024.0);
	}

	public static void test(String dir) throws FileNotFoundException {
		MyIter mi = new MyIter();
		Iterator<Integer> r = mi.iterator();
		long size = 0;
		try (BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(dir + "\\testfile1.txt"))) {
			long t1 = System.currentTimeMillis();
			for (long i = 0; true; i++) {
				bos.write(r.next());
				size++;
				if (size % 10_000_000 == 0) {
					long t2 = System.currentTimeMillis();
					System.out.printf("I write %s MiB in %s MiB/s%n", mib(size), mib(size) / sec(t1, t2));
				}
			}
		} catch (IOException e) {
		}
		System.out.printf("The capacity is: %s byte, %s kilobyte, %s megabyte, %s KiB, %s MiB%n", size, (float) (size / 1000.0), (float) (size / 1000.0 / 1000.0), kib(size), mib(size));

		mi.stop();
		mi = new MyIter();
		r = mi.iterator();
		size = 0;
		try (BufferedInputStream bis = new BufferedInputStream(new FileInputStream(dir + "\\testfile1.txt"))) {
			long t1 = System.currentTimeMillis();
			int b;
			while ((b = bis.read()) != -1) {
				size++;
				if (b != r.next()) {
					System.out.println("Failure!");
					return;
				}
				if (size % 10_000_000 == 0) {
					long t2 = System.currentTimeMillis();
					System.out.printf("I read %s MiB in %s MiB/s%n", mib(size), mib(size) / sec(t1, t2));
				}
			}
			long t2 = System.currentTimeMillis();
			System.out.printf("I read %s MiB in %s MiB/s%n", mib(size), mib(size) / sec(t1, t2));
		} catch (IOException e) {
			e.printStackTrace();
		}
		System.out.println("The usb stick is proper!");
	}

	public static void main(String[] args) throws FileNotFoundException {
		test("E:");
	}

}

Habt ihr Ideen?

So ist es schon etwas schneller (aber nur minimal), aber immer noch saulangsam:

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Iterator;
import java.util.Random;
import java.util.concurrent.ConcurrentLinkedDeque;

public class USBTest {

	public static class MyIter implements Iterable<Integer> {
		private static final int n = 10_000;
		private ConcurrentLinkedDeque<Integer> deque = new ConcurrentLinkedDeque<>();
		private Random r = new Random();
		private Thread t = null;
		private long seed = 1;

		public MyIter(int i) {
			seed = i;
			t = new Thread(new Runnable() {
				@Override
				public void run() {
					while (!Thread.interrupted()) {
						add();
					}
				}
			});
			t.start();
		}

		public void stop() {
			if (t != null) {
				t.interrupt();
			}
		}

		private void add() {
			if (deque.size() < n) {
				r.setSeed(seed);
				deque.add(r.nextInt(256));
				seed += 4;
			}
		}

		private int get() {
			while (deque.isEmpty()) {
				try {
					Thread.sleep(1);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
			return deque.pollFirst();
		}

		@Override
		public Iterator<Integer> iterator() {
			return new Iterator<Integer>() {
				@Override
				public boolean hasNext() {
					return true;
				}

				@Override
				public Integer next() {
					return get();
				}
			};
		}
	}

	private static float sec(long t1, long t2) {
		return (float) ((t2 - t1) / 1000.0);
	}

	private static float kib(long size) {
		return (float) (size / 1024.0);
	}

	private static float mib(long size) {
		return (float) (size / 1024.0 / 1024.0);
	}

	public static void test(String dir) throws FileNotFoundException {
		MyIter[] mia = { new MyIter(1), new MyIter(2), new MyIter(3), new MyIter(4) };
		Iterator<?>[] ra = { mia[0].iterator(), mia[1].iterator(), mia[2].iterator(), mia[3].iterator() };
		long size = 0;
		try (BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(dir + "\\testfile1.txt"))) {
			long t1 = System.currentTimeMillis();
			for (; true;) {
				for (Iterator<?> iterator : ra) {
					bos.write((int) iterator.next());
					size++;
					if (size % 10_000_000 == 0) {
						long t2 = System.currentTimeMillis();
						System.out.printf("I write %s MiB in %s MiB/s%n", mib(size), mib(size) / sec(t1, t2));
					}
				}
			}
		} catch (IOException e) {
		}
		System.out.printf("The capacity is: %s byte, %s kilobyte, %s megabyte, %s KiB, %s MiB%n", size, (float) (size / 1000.0), (float) (size / 1000.0 / 1000.0), kib(size), mib(size));

		for (MyIter myIter : mia) {
			myIter.stop();
		}
		mia = new MyIter[] { new MyIter(1), new MyIter(2), new MyIter(3), new MyIter(4) };
		ra = new Iterator<?>[] { mia[0].iterator(), mia[1].iterator(), mia[2].iterator(), mia[3].iterator() };
		size = 0;
		try (BufferedInputStream bis = new BufferedInputStream(new FileInputStream(dir + "\\testfile1.txt"))) {
			long t1 = System.currentTimeMillis();
			int i = 0;
			@SuppressWarnings("unchecked")
			Iterator<Integer> r = (Iterator<Integer>) ra[i];
			int b;
			while ((b = bis.read()) != -1) {
				size++;
				if (b != r.next()) {
					System.out.println("Failure!");
					return;
				}
				i = (i + 1) % 4;
				r = (Iterator<Integer>) ra[i];
				if (size % 10_000_000 == 0) {
					long t2 = System.currentTimeMillis();
					System.out.printf("I read %s MiB in %s MiB/s%n", mib(size), mib(size) / sec(t1, t2));
				}
			}
			long t2 = System.currentTimeMillis();
			System.out.printf("I read %s MiB in %s MiB/s%n", mib(size), mib(size) / sec(t1, t2));
		} catch (IOException e) {
			e.printStackTrace();
		}
		System.out.println("The usb stick is proper!");
	}

	public static void main(String[] args) throws FileNotFoundException {
		test("E:");
	}

}

Jeder Thread/ Produzent bekommt hier ein eigenes Random, auf dem er operieren kann…

r.setSeed(seed);

Hmm… solte das nicht hoechstens einmal gemacht werden?
Das initialisiert den random generator und ist langsam.

Gegenvorschlag?

Ich möchte 4 Threads haben, deren einzige Aufgabe es ist, Zufallszahlen zu sammeln. Die Zufallszahlen sollen aber trotzdem „in geordneter Reihenfolge“ bereitgestellt werden, denn, nachdem alles geschrieben wurde, möchte ich prüfen, ob alles in der richtigen Reihenfolge zurückgelesen wird.

Ich möchte die tatsächliche Kapazität überprüfen, und, ob der usb flash drive controller schummelt (also z.B. mehr Speicherplatz angibt, als tatsächlich möglich).

Aber ein einfaches „Geht nicht, aus der source kann nur sequentiell gelesen werden“, würde mir natürlich auch genügen. Aber bin mir fast sicher, dass es doch geht, denn das ist doch ein Software-PRNG…

Du muesstest ja die ganzen Daten erstmal irgendwo halten, um spaeter nach dem USB Stick beschreiben vergleichen zu koennen. Das halten kann auch in einer Datei auf der HDD sein.

Ansonsten solltest du IMO die Initialisierung nur einmal pro Thread machen anstatt in der add Methode, das sollte schon um einiges schneller laufen.

Ja, aber das sync management und/oder die threadsicheren Datenstrukturen sind aus langsam. Eigentlich ist alles langsam… Ich bin jetzt am überlegen, den supplier in C zu schreiben… aber das sollte irgendwie planvoll und strukturiert sein…

Du kannst mehrere PRNGs (so viele, wie die CPU Kerne hat) initialisieren und dann die Zufallszahlen generieren. Die pufferst du dann in bspw. 1KB-Blöcken und bringst sie dann in dem Thread, der die Daten schreiben soll in einer vordefinierten Reihenfolge zusammen. Damit wird auch die Gesamtsequenz deterministisch.

Edit: es macht auch wenig Sinn, die Daten nur byteweise zu generieren. Wenn du ein ganzes int verwendest, hast du 4x soviele Daten.

1 „Gefällt mir“

Wozu überhaupt Zufallszahlen? Es geht ja nur darum zu beschreiben und zu lesen.
Würde es dir, wenn du nicht einfach hochzählen willst, reichen einfach ne Liste der Zahlen zu erzeugen und die dann einmal zu mischen und fertig?

Vorallem generierst du die Zahlen ja 2 mal?

Danke!! Ich versuche das mal.

Ich verstehe das nicht… Jetzt ist es ca. 6-mal so schnell wie die vorherigen Versuche und die CPU-Last ist bei 100% - jedoch ist es immer noch 4-mal langsamer, als wenn ich es nicht mit Threads umgesetzt hätte (wobei dabei die CPU-Last dann nur bei 25% wäre, denn es würde ja nur ein Kern genutzt):

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Iterator;
import java.util.Random;

public class USBTest {

	public static class MyIter implements Iterable<Integer> {
		public static final int n = 100_000;
		private ArrayDeque<Integer> deque = new ArrayDeque<>();
		private long offset = 0;
		private Random r = new Random();
		private volatile boolean is_alive = true;

		public MyIter(int nr) {
			this.offset = nr * n;
			add();
		}

		public void stop() {
			is_alive = false;
		}

		private void add() {
			new Thread(new Runnable() {
				@Override
				public void run() {
					while (is_alive) {
						if (deque.isEmpty()) {
							r.setSeed(offset);
							offset += n;
							for (int i = 0; i < n; i++) {
								synchronized (deque) {
									deque.add(r.nextInt(256));
								}
							}
						} else {
							try {
								Thread.sleep(1);
							} catch (InterruptedException e) {
								e.printStackTrace();
							}
						}
					}
				}
			}).start();
		}

		private int get() {
			while (deque.isEmpty()) {
				try {
					Thread.sleep(1);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
			synchronized (deque) {
				return deque.pollFirst();
			}
		}

		@Override
		public Iterator<Integer> iterator() {
			return new Iterator<Integer>() {
				@Override
				public boolean hasNext() {
					return true;
				}

				@Override
				public Integer next() {
					return get();
				}
			};
		}
	}

	private static float sec(long t1, long t2) {
		return (float) ((t2 - t1) / 1000.0);
	}

	private static float kib(long size) {
		return (float) (size / 1024.0);
	}

	private static float mib(long size) {
		return (float) (size / 1024.0 / 1024.0);
	}

	public static void test(String dir) throws FileNotFoundException {
		MyIter[] mia = new MyIter[4];
		Iterator<?>[] ra = new Iterator<?>[4];
		for (int i = 0; i < 4; i++) {
			mia[i] = new MyIter(i);
			ra[i] = mia[i].iterator();
		}
		long size = 0;
		try (BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(dir + "\\testfile1.txt"))) {
			long t1 = System.currentTimeMillis();
			for (; true;) {
				for (Iterator<?> iterator : ra) {
					@SuppressWarnings("unchecked")
					Iterator<Integer> ii = (Iterator<Integer>) iterator;
					for (int i = 0; i < MyIter.n; i++) {
						bos.write(ii.next());
						size++;
						if (size % 10_000_000 == 0) {
							long t2 = System.currentTimeMillis();
							System.out.printf("I write %s MiB in %s MiB/s%n", mib(size), mib(size) / sec(t1, t2));
						}
					}
				}
			}
		} catch (IOException igno) {
		}
		System.out.printf("The capacity is: %s byte, %s kilobyte, %s megabyte, %s KiB, %s MiB%n", size, (float) (size / 1000.0), (float) (size / 1000.0 / 1000.0), kib(size), mib(size));

		for (MyIter myIter : mia) {
			myIter.stop();
		}
		for (int i = 0; i < 4; i++) {
			mia[i] = new MyIter(i);
			ra[i] = mia[i].iterator();
		}
		size = 0;
		try (BufferedInputStream bis = new BufferedInputStream(new FileInputStream(dir + "\\testfile1.txt"))) {
			long t1 = System.currentTimeMillis();
			int i = 0;
			@SuppressWarnings("unchecked")
			Iterator<Integer> ii = (Iterator<Integer>) ra[i];
			int b;
			while ((b = bis.read()) != -1) {
				size++;
				if (b != ii.next()) {
					System.out.println("Failure!");
					return;
				}
				if (size % 10_000_000 == 0) {
					long t2 = System.currentTimeMillis();
					System.out.printf("I read %s MiB in %s MiB/s%n", mib(size), mib(size) / sec(t1, t2));
				}
				if (size % MyIter.n == 0) {
					i = (i + 1) % 4;
					ii = (Iterator<Integer>) ra[i];
				}
			}
			long t2 = System.currentTimeMillis();
			System.out.printf("I read %s MiB in %s MiB/s%n", mib(size), mib(size) / sec(t1, t2));
			System.out.println("The usb flash drive controller is proper!");
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

	public static void main(String[] args) throws FileNotFoundException {
		test("E:");
	}

}

Das heißt, das (die Mikrooperationen) zu koordinieren verbraucht mehr Rechenkapazität, als wenn es einer alleine machen würde. Oder: Viele Köche verderben den Brei. :frowning:

Ach so, Frohe Ostern!

Mehr als 26 MiB/s schreiben und lesen (das sind immerhin 6 MiB/s mehr) bekomme ich da nicht raus (die SSD Partition sollte 400 schaffen). Das liegt aber nicht mehr an der Bereitstellung von Random, sondern an Java selber, also dem Buffered Writer und Reader…

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Random;

public class USBTest {

	public static class MyIter {
		public static final int n = 10_000_000;
		private int[] array = new int[n];
		private long offset = 0;
		private Random r = new Random();
		private volatile boolean is_alive = true;
		private volatile boolean is_empty = true;
		private Thread t = null;

		public MyIter(int nr) {
			this.offset = nr * n;
			t = new Thread(new Runnable() {
				@Override
				public void run() {
					while (is_alive) {
						add();
					}
				}
			});
			t.start();
		}

		public void stop() {
			is_alive = false;
			t.interrupt();
		}

		private synchronized void add() {
			while (is_alive && !is_empty) {
				try {
					wait();
				} catch (InterruptedException igno) {
				}
			}
			r.setSeed(offset);
			offset += 4 * n;
			for (int i = 0; i < n; i++) {
				array[i] = r.nextInt(256);
			}
			is_empty = false;
			notifyAll();
		}

		public synchronized int[] get() {
			while (is_alive && is_empty) {
				notifyAll();
				try {
					wait();
				} catch (InterruptedException igno) {
				}
			}
			int[] a = new int[n];
			System.arraycopy(array, 0, a, 0, n);
			is_empty = true;
			notifyAll();
			return a;
		}
	}

	private static float sec(long t1, long t2) {
		return (float) ((t2 - t1) / 1000.0);
	}

	private static float kib(long size) {
		return (float) (size / 1024.0);
	}

	private static float mib(long size) {
		return (float) (size / 1024.0 / 1024.0);
	}

	public static void test(String dir) throws FileNotFoundException {
		MyIter[] mia = new MyIter[4];
		for (int i = 0; i < 4; i++) {
			mia[i] = new MyIter(i);
		}
		long size = 0;
		try (BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(dir + "\\testfile1.txt"), MyIter.n)) {
			long t1 = System.currentTimeMillis();
			for (; true;) {
				for (MyIter myIter : mia) {
					int[] a = myIter.get();
					for (int i = 0; i < MyIter.n; i++) {
						bos.write(a[i]);
						size++;
						if (size % 10_000_000 == 0) {
							long t2 = System.currentTimeMillis();
							System.out.printf("I write %s MiB in %s MiB/s%n", mib(size), mib(size) / sec(t1, t2));
						}
					}
				}
			}
		} catch (IOException igno) {
		}
		System.out.printf("The capacity is: %s byte, %s kilobyte, %s megabyte, %s KiB, %s MiB%n", size, (float) (size / 1000.0), (float) (size / 1000.0 / 1000.0), kib(size), mib(size));

		for (MyIter myIter : mia) {
			myIter.stop();
		}
		for (int i = 0; i < 4; i++) {
			mia[i] = new MyIter(i);
		}
		size = 0;
		try (BufferedInputStream bis = new BufferedInputStream(new FileInputStream(dir + "\\testfile1.txt"), MyIter.n)) {
			long t1 = System.currentTimeMillis();
			int i = 0;
			int[] a = mia[i].get();
			int j = 0;
			int b;
			while ((b = bis.read()) != -1) {
				size++;
				if (b != a[j++]) {
					System.out.println("Failure!");
					return;
				}
				if (size % 10_000_000 == 0) {
					long t2 = System.currentTimeMillis();
					System.out.printf("I read %s MiB in %s MiB/s%n", mib(size), mib(size) / sec(t1, t2));
				}
				if (size % MyIter.n == 0) {
					i = (i + 1) % 4;
					a = mia[i].get();
					j = 0;
				}
			}
			System.out.printf("The capacity is: %s byte, %s kilobyte, %s megabyte, %s KiB, %s MiB%n", size, (float) (size / 1000.0), (float) (size / 1000.0 / 1000.0), kib(size), mib(size));
			System.out.println("The usb flash drive controller is proper!");
		} catch (IOException e) {
			e.printStackTrace();
		} finally {
			for (MyIter myIter : mia) {
				myIter.stop();
			}
		}
	}

	public static void main(String[] args) throws FileNotFoundException {
		test("E:");
	}

}

Nette Knobelei. Wie ich es oben beschrieben habe, habe ich mal eine Testimplementierung „hingehackt“. Ich erreiche dabei bei mir lokal eine Schreibrate bei 5GB Daten von > 330MB/s. Limit ist offenbar die SSD - wenn ich die generierten Daten nicht wegschreibe, dann generiere ich knapp 870MB/s deterministische Zufallszahlen.

Vermutlich verlierst du viel Zeit, weil dein Code jedes Byte einzeln behandelt.

Guck dir mal NIO (= non blocking IO) an. Mit Channels und ByteBuffers kannst du mit einem Befehl mehrere MB an Daten herumschieben - damit sparst du Milliarden von Methodenaufrufen ein.

Danke euch, ich werd vielleicht noch einen Versuch machen. :slight_smile:

Melde dich, wenn du aufgibst. Dann poste ich meinen Code hier für die Nachwelt. Im Gegensatz zu dem Schwurbelcode oben ist der halbwegs objektorientiert.

Her damit :smirk:

Und so schlecht ist mein Code gar nicht… vielleicht nicht 100%ig oo, aber das schadet ja nicht…

Main.java

import java.io.*;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class Main {
    private final int numberOfWorkers;
    private final int kilobytesToGenerate;

    public Main(int numberOfWorkers, int kilobytesToGenerate) {
        this.numberOfWorkers = numberOfWorkers;
        this.kilobytesToGenerate = kilobytesToGenerate;
    }

    public static void main(String[] args) {
        new Main(6, 5242880).run();
    }

    private void run() {
        List<Producer> producers = new ArrayList<>(numberOfWorkers);
        List<BlockingQueue<Blob>> queues = new ArrayList<>(numberOfWorkers);
        for (int i = 0; i < numberOfWorkers; i++) {
            BlockingQueue<Blob> queue = new ArrayBlockingQueue<>(2);
            producers.add(new Producer(i, queue, 1024));
            queues.add(queue);
        }
        try (OutputStream outputStream = new BufferedOutputStream(new FileOutputStream("data.dat"))) {
            Merger merger = new Merger(queues, blob -> {
                try {
                    outputStream.write(blob.getData());
                } catch (IOException ignored) {
                }
            });

            System.out.println("Start.");
            Instant start = Instant.now();
            producers.forEach(Producer::start);
            merger.consume(kilobytesToGenerate / numberOfWorkers);
            producers.forEach(Producer::stop);
            Instant end = Instant.now();
            System.out.println("Finish in " + Duration.between(start, end));
        } catch (IOException ignored) {
        }
    }
}

Merger.java

import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.function.Consumer;

class Merger {
    private final List<BlockingQueue<Blob>> queues;
    private final Consumer<Blob> consumer;

    Merger(List<BlockingQueue<Blob>> queues, Consumer<Blob> blobConsumer) {
        this.queues = queues;
        this.consumer = blobConsumer;
    }

    void consume(int numberOfRounds) {
        for (int i = 0; i < numberOfRounds; i++) {
            for (BlockingQueue<Blob> queue : queues) {
                try {
                    consumer.accept(queue.take());
                } catch (InterruptedException ignored) {
                    Thread.currentThread().interrupt();
                    return;
                }
            }
        }
    }
}

Producer.java

import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

class Producer {
    private final Random random;
    private final BlockingQueue<Blob> queue;
    private final int blobSize;
    private final AtomicBoolean running;

    Producer(long seed, BlockingQueue<Blob> queue, int blobSize) {
        this.random = new Random(seed);
        this.queue = queue;
        this.blobSize = blobSize;
        this.running = new AtomicBoolean(false);
    }

    void start() {
        if (running.compareAndSet(false, true)) {
            new Thread(() -> {
                while (running.get()) {
                    byte[] buffer = new byte[blobSize];
                    random.nextBytes(buffer);
                    Blob blob = new Blob(buffer);
                    try {
                        while (!queue.offer(blob, 1, TimeUnit.SECONDS) && running.get());
                    } catch (InterruptedException ignored) {
                        Thread.currentThread().interrupt();
                        return;
                    }
                }
            }).start();
        }
    }

    void stop() {
        running.set(false);
    }
}

Blob.java

class Blob {
    private final byte[] data;

    Blob(byte[] data) {
        this.data = data;
    }

    byte[] getData() {
        return data;
    }
}

Die Anzahl der Worker in der Main.java sollte man auf die Anzahl der CPU-Kerne einstellen (in meinem Fall 6) und der Consumer, der dem Merger übergeben wird, kann bei Bedarf auch angepasst werden.

Für maximale Performance kann man sicherlich mit der Buffer-Size etwas rumspielen.

Eine Nutzung von Channels bringt in diesem Fall keinen Performancegewinn - der Code lässt sich schnell so refactorn, dass man statt des Blob einfach einen ByteBuffer nutzt - das ist aber wie gesagt nicht schneller, mit dem BufferedOutputStream holt man mehr Performance raus. Um das mit einem ByteBuffer zu erzielen, müsste man die generierten Chunks vergrößern (ich schätze mal auf 4 MB oder so).

Wenn der Merger die Daten nicht schnell genug wegschafft (hier durch zu langsame Schreibgeschwindigkeit), wird die CPU nicht mehr zu 100% ausgelastet, weil die Producer blockiert werden.

Oki, die einzelnen write()-Aufrufe an BufferedOutputStream schienen der bottleneck gewesen zu sein.

Wenn ich meinen Code umschreiben würde, so dass byte[]s geschrieben und gelesen (und auch produziert) würden, wäre der bestimmt genauso schnell; auch, wenn ich nur eine 4-Kerne-CPU hab.

Vielen Dank euch. :slight_smile:

Edit: @cmrudolph Du schreibst eine vordefinierte Menge an kilobyte’s, ist mir aufgefallen; um die Kapazität zu testen, sollten aber so viel byte wie möglich geschrieben werden.

Das abzuändern sollte ja eine eher leichte Übung sein. Ich wollte meine interne SSD nicht künstlich vollmüllen…