Mehrere DeamonThread mit wiederkehrenden Aufgaben, parallel reduction

In meiner Anwendung (Bachelorarbeit), habe ich eine sich sehr oft in einer Schleife wiederholende datenunabhängige Aufgabe.

Die Aufgabe ist es den Index des Maximalwertes in einem sehr großen Array zu erhalten.

Weshalb ich den Ansatz der Parallelen Reduktion benutze.
Das Array wird dabei in kleinere Arrays unterteilt auf denen eine vorgegebene Anzahl von Threads arbeitet (Anzahl an CPU-Kernen).

Eine Implementierung mit einfachen Threads ist mir gelungen und war auch nicht sehr schwer.

Jedoch ist der mögliche Performanzgewinn des parallelen Ansatzes nun nicht so hoch, da die Threads jede Iteration neu initiiert werden, da es mir nicht gelungen ist dies mit Deamon-Threads zu realisieren.

Zur Realisierung mit Deamon-Threads habe ich mir eine eigentlich kleine und einfache abstrakte Deamon Thread Klasse geschrieben, welche wie folgt aussieht.

Abstrakte DeamonThread Klasse

package util.threads;

/**
 * DeamonThread Class for re-occuring Tasks
 */
public abstract class ADeamonThread extends Thread
{
	/**
	 * flag indicating to start the update method in the endless while loop
	 * of the run method
	 */
	private boolean run = false;
	
	/**
	 * flag indicating if this thread should be stopped
	 */
	private boolean stop = false;
	
	/**
	 * Creates a new Deamon thread who needs to be started manually
	 * 
	 * @param name String
	 */
	public ADeamonThread(String name)
	{
		super(name);
		this.setDaemon(true);
	}
	
	@Override
	public void run()
	{
		while(!this.stop)
		{
			try
            {
	            Thread.sleep(10);
            }
            catch (InterruptedException e)
            {
	            System.out.println("Error " + e.getMessage());
            }
            
			if(this.run)
			{
				this.update();
				this.run = false;
			}
		}
		this.run = false;
	}

	/**
	 * The Method which is executed by the deamon if {@link #activate()} is called
	 */
	protected abstract void update();
	
	/**
	 * sets the running flag to true!,
	 * should be called if arguments are needed!
	 */
	public void activate()
	{
		this.run = true;
	}

	/**
	 * Stops this thread safely
	 */
	public void stopSafe()
	{
		this.stop = true;
	}
	
	/**
	 * Returns true if this deamon thread is currently performing its update method.
	 * {@link #update()}
	 * 
	 * @return true if active
	 */
	public boolean isActive()
	{
		return this.run;
	}

	/**
	 * blocks the calling thread until the current call of the
	 * update method of this deamon thread has terminated.
	 * {@link #update()}
	 */
	public void waitTillFinished()
	{
		while(this.run) { }
	}
}

Implementierende DeamonThread Maxima Klasse

package Cluster.CPU.Threads;

import java.util.List;

import util.threads.ADeamonThread;

import Cluster.ACluster;
import Cluster.Cluster;


/**
 * Threaded Cluster Max calculation
 */
public class ClusterMaxDeamonThread extends ADeamonThread
{
	/**
	 * cluster list of this thread
	 */
	private List<ACluster> clusters = null;

	/**
	 * The Maximum Cluster of this Thread
	 */
	private Cluster clusterMax = null;
	
	public ClusterMaxDeamonThread(int index)
	{
	    super("CPU ClusterMax Thread: " + index);
	    this.start();
	}

	@Override
	protected void update()
	{
		this.clusterMax = new Cluster(null, null, -1.0f);
		Cluster cluster = null;

		for(ACluster aCluster : this.clusters)
			if(aCluster instanceof Cluster)
			{
				cluster = (Cluster)aCluster;
				if(cluster.getValue() > this.clusterMax.getValue())
					this.clusterMax = cluster;
			}
	}

	/**
	 * Activates this DeamonThread to find the current maximum cluster
	 * of the given clusters
	 * 
	 * @param clusters List<ACluster>
	 */
	public void activate(List<ACluster> clusters)
	{
		this.clusters = clusters;
		super.activate();
	}

	/**
	 * Returns a Cluster who's value is -1.0f if no cluster has been found!
	 * @return Cluster
	 */
	public Cluster getClusterMax()
	{
		return this.clusterMax;
	}
}

Das Problem ist, dass sich das Programm aufhängt, wenn ich mittels der active methode mehrere der DeamonThreads sozusagen aufwecke ihre Arbeit zu tuen, und dann im Hauptthread auf die Abarbeitung der Threads warte. Relevanter Code ab Zeile 18.

Aufrufende Klasse

Cluster[] clusterMaxs = new Cluster[ClusterProcessCPU.CPU_COUNT];

//get tile size each thread works on
int tileSize = Util.getCeil(this.clusters.size(), this.clusterMaxThreads.length);

//tileSize of 1 is checked by if before!!!
int startIdx = 0;
int endIdx = 0;
//activate all threads
for(int t = 0; t < this.clusterMaxThreads.length; t++)
{
	startIdx = Math.min(t * tileSize, this.clusters.size());
	endIdx = Math.min(startIdx + tileSize, this.clusters.size());

	/*
	 * Activate each Thread with its sub list
	 */
	this.clusterMaxThreads[t].activate(this.clusters.subList(startIdx, endIdx));	
}

//wait for them to finish and get results
for(int t = 0; t < this.clusterMaxThreads.length; t++)
{
	this.clusterMaxThreads[t].waitTillFinished();
	clusterMaxs[t] = this.clusterMaxThreads[t].getClusterMax();
}

/*
 * init clusterMax with first cluster and check last array
 */
Cluster clusterMax = clusterMaxs[0];
for(int i = 1; i < clusterMaxs.length; i++)
	if(clusterMaxs**.getValue() > clusterMax.getValue())
		clusterMax = clusterMaxs**;

return this.clusters.indexOf(clusterMax);

Ich sehe den Fehler nicht.
Alle Deamon Threads arbeiten auf ihrem eigenen sub Array und der haupt Thread wartet bis all deamon threads mit ihrer arbeit fertig sind.

Wenn ich in meiner abstrakten Deamon Thread Klasse so ändere dass diese einfache Punkte in ihrer while schleife ausgibt, dann funktioniert mein Programm manchmal, eine Erhöhung des Sleep Timeout in der update methode auch.

public void waitTillFinished()
{
	while(this.run) { System.out.print("."); }
}

Wäre für Hilfe sehr dankbar.

Ohne den Code jetzt auf die Schnelle komplett nachvollzogen zu haben: Hast du schon in Erwägung gezogen, einen Executor zu verwenden? Es gibt da verschiedene Implementierungen, z.B. einen, der einen “Thread Pool” verwaltet, in dem er die Threads für eine Weile am Leben hält, oder auch neue Threads erstellt, wenn sie benötigt werden - das ganze ist sehr flexibel, nimmt einem u.U. SEHR viel Arbeit ab, und verhindert solche Fehler wie im letzten Codestück (das ist “busy waiting”, und wird üblicherweise als no-go angesehen…)

Danke, habe das nun mit einem Executor gelöst, welchen ich gleich für die anderen parallelen Methoden benutzte und einer CyclicBarrier, was sehr gut funktioniert und auch etwas schneller ist.

Aber die Runnable’s müssen ja trotzdem in jeder Iteration neuinitiiert werden, was ich noch versuche zu lösen.

Bis dahin habe ich nun ein anderes Problem mit einem ReentrantLock!

Ich erstelle wieder eine Anzahl an Runnable/Threads, welche auf einer gemeinsamen Datenstruktur arbeiten sollen.

Der Hauptthread erstellt ein ReentrantLock und übergibt dieses, an die einzelnen Runnables/Threads.

Zu Testzwecken locke ich sofort von Begin der Run Methode. D.h die Threads laufen nun serial ab!

Solange ich meine gemeinsame Datenstruktur, eine Liste welche dem Thread per Konstruktor übergeben wird nicht benutze, ist auch alles ok.

Globale gemeinsame Thread instanzvariablen die per Konstruktor übergeben werden

private List<ACluster> clusters = null;
private List<ACluster> clustersNew = null;
private ReentrantLock lock = null;
private int index = 0;

Run Methode der Threads

@Override
public void run()
{
	this.lock.lock();
	System.out.print("Thread: " + this.index + " locked");
	try
	{
		//this.clustersNew.add(this.clusters.get(index));
	}
	finally
	{
		this.lock.unlock();
		System.out.println(" Thread: " + this.index + " unlocked");

	}
}

Ausgabe:

Thread: 0 locked Thread: 0 unlocked
Thread: 1 locked Thread: 1 unlocked
Thread: 2 locked Thread: 2 unlocked
Thread: 3 locked Thread: 3 unlocked
Thread: 0 locked Thread: 0 unlocked
Thread: 1 locked Thread: 1 unlocked
Thread: 2 locked Thread: 2 unlocked
Thread: 3 locked Thread: 3 unlocked

Adde ich in die Liste ein Objekt aus einer anderen globalen Liste gibt es einen Deadlock!
anscheinend kommen mehrere threads hinter die lock Anweisung!

Add Run Methode der Threads


@Override
public void run()
{
	this.lock.lock();
	System.out.print("Thread: " + this.index + " locked");
	try
	{
		this.clustersNew.add(this.clusters.get(index));
	}
	finally
	{
		this.lock.unlock();
		System.out.println(" Thread: " + this.index + " unlocked");

	}
}

Ausgabe:

Thread: 0 lockedThread: 1 locked Thread: 0 unlocked
Thread: 1 unlocked
Thread: 2 locked Thread: 2 unlocked
Thread: 3 locked Thread: 3 unlocked

Mehr als diesen try, finally block finde ich zu Locks auch nicht im Internet, das scheint überall so einfach implementiert zu sein.

Im Finally Block wird die CyclicBarrier klasse aufgerufen sieht so aus, kann leider nicht mehr editieren.

**Und der Fehler passiert doch immer, egal ob ich im try block nichts mache. **

finally
{
	this.lock.unlock();
	System.out.println(" Thread: " + this.index + " unlocked");
	this.barrier.await();
}

du darfst nicht nach der Ausgabe gehen, die Ausgaben mit System.out sind nicht Threadsave bzw. sie können auch zeitverzögert kommen

vielen danke, hab deswegen den Fehler an der falschen Stelle gesucht,

for(int i = 0; i < this.tileSize; i++)
{
	idx = this.index * this.tileSize + i;
	if(idx >= this.clusters.size())
		break;

	cluster = (Cluster)this.clusters.get(idx); // <-- da war ein i

Tja wenn ich die daten partitioniere muss ich auch mit “idx” in die liste schauen und nicht mit “i”.

Oh man, programmiere einfach zu lange am Tag momentan, dabei noch krank zu sein ist nicht hilfreich.
So dumme Fehler.

Mies ist nur, dass es mit einem Thread 30% schneller ist. Der Lock Aufwand in der Loop lohnt wohl nicht.
Vielleicht mit mehr Elementen, oder erst subarrays erstellen und dann zusammenfügen (benötigt list.contains prüfung für jedes element). Ob das schneller sein wird.

so nun habe ich ein neues problem

mein rechenintensives paralleles programm läuft für eine bestimmte eingabegröße perfekt!

meine parallelen threads werden mittels eines ExecutorCervice aufgerufen und
mittels einer CyclicBarrier mit dem aufrufenden Thread synchronisiert.

Jedoch funktioniert dies nicht mehr, sofern ich noch mehr daten bearbeite. Das programm stoppt da
die threads (klassen die runnable implementieren) die cyclicbarrier nicht erreichen.

Ich vermute, dass deren timeout zuerst greift, oder das timeout des aufrufenden thread ?
wie erhöhe ich dieses?

ps. in der run() Methode habe ich eine ausgabe in die vorletzte Zeile geschrieben dass die threads fertig sind, was auch passiert, d.h sie führen ihre arbeit ordnungsgemäß durch
in der letzten zeile wird await() der CyclicBarrier aufgerufen

edit: im taskmanager ist auch keine weitere cpu auslastung des programms zu sehen

Ein bißchen mehr Kontext könnte helfen. Du legst also eine Menge Runnables in einen ExecutorService. Ist es richtig, dass diese Runnables eigentlich nicht aufeinander warten müssen, sondern dass nur der Hauptthread warten muss, bis alle fertig sind? Falls das so ist: Das kann man manuell mit einer CyclicBarrier nachbauen, aber vielleicht ist es einfacher, das mit invokeAll zu lösen, das bildet dieses Verhalten nämlich ziemlich direkt ab. Die Callables können dabei Instanzen anonymer Klassen sein, die nur an das Runnable weiterreichen und ‘Void’ zurückgeben, grob aus dem Kopf…

List<Callable<Void>> callables = new ArrayList<Callable<Void>>();
for (int i=0; i<n; i++)
{
    final Runnable runnable = create(); // Aber das nicht direkt in den Executor packen, sondern....
    Callable<Void> callable = new Callable<Void>()
    {
        public Void call()
        {
            runnable.run();
            return null;
        }
    };
    callables.add(callable);
}

// Und jetzt alle auf einmal:
executorService.invokeAll(callables);

// Hier kommt er erst an, wenn alle fertig sind...
System.out.println("Alle fertig");

Bin gerade nicht 100% sicher, ob du sowas meintest…

hmm das mit invokeAll hatte ich mir eben auch kurz angeschaut, aber wollte erstmal abwarten bzgl der cyclicBarrier.

Diese Runnables müssten theoretich nicht aufeinander warten, jeder berechnet sein Resultat.
Nur der Hauptthread wartet auf die Runnables und fügt das resultat zusammen!

Folgend mein Hauptthread, die abstrakte Runnable Klasse mit der CyclicBarrier, sowie einer Helper-Klasse, welche alle Runnables sammelt und denen die gemeinsame CyclicBarrier übergibt.

Es funktioniert ja alles solange die liste(daten) nicht zu groß wird und die threads zulange arbeiten müssen , da es mit einer kleineren liste funktioniert.

Hauptthread

	Logger.logln("start threads");
	//create all threads to create the cluster objects
	for(int t = 0; t < this.cyclicRunner.getRunnerCount(); t++)
		this.cyclicRunner.addRunner(new UpdateDocSimClustersThreadGPU(list, 
				tileSize,
				t), t);
	this.cyclicRunner.startWait();
	Logger.logln("threads terminated"); //not reached if the list is too large!

	//... collect results of all threads

Hier meine abstrakte Runnable Klasse, wovon die benötigten threads erben.

package util.threads;

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;

import util.Logger;

/**
 * An abstract Cyclic Runnable Class with a CyclicBarrier
 */
public abstract class CyclicRunnable implements Runnable
{
	private CyclicBarrier barrier;

	@Override
	/**
	 * Should not be overriden by implementing classes, 
	 * call {@link #myRun()}
	 */
	public void run()
	{
		this.myRun();
		try
        {
			if(this.barrier == null)
				Logger.loglnError("Cyclic Runnable barrier is not set.");
			else
				this.barrier.await();
        }
        	catch (Exception e)
        	{
	        	Logger.log(e);
        	}
	}
	
	/**
	 * Run Method
	 */
	protected abstract void myRun();
	
	/**
	 * Sets the current barrier
	 * 
	 * @param barrier CyclicBarrier
	 */
	public void setCyclicBarrier(CyclicBarrier barrier)
	{
		this.barrier = barrier;
	}
}

Meine Thread-Sammel Helper Klasse

package util.threads;

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * A class which holds threads which can be synchronized with a calling thread using a CyclicBarrier
 */
public class CyclicRunner 
{
	/**
	 * Thread Barrier
	 */
	private CyclicBarrier barrier = null;
	/**
	 * Thread Runners / Tasks
	 */
	private CyclicRunnable[] runners = null;

	/**
	 * Thread Executor
	 */
	private ExecutorService executor = null;
	
	/**
	 * number of projected runners
	 */
	private int runnerCount = 0;
	
	/**
	 * Creates a new Cyclic Runner whis a barrier who's size is one greater then
	 * the given runnerCount, so that the calling main can wait for all
	 * threads of this CyclicRunner to finish
	 * 
	 * @param runnerCount int
	 */
	public CyclicRunner(int runnerCount) 
	{
		this.runnerCount = runnerCount;
		this.runners = new CyclicRunnable[runnerCount];
		this.barrier = new CyclicBarrier(runnerCount + 1);
		this.executor = Executors.newFixedThreadPool(runnerCount);
	}
	
	/**
	 * Starts all of the given runners
	 */
	public void start() 
	{
		if(this.runners == null)
			throw new IllegalArgumentException("Runners cannot be null.");
		else if(this.runners.length == 0)
			throw new IllegalArgumentException("Runners size cannot be 0.");
		else if(this.runners.length != (this.barrier.getParties() - 1))
			throw new IllegalArgumentException("Runners size is not: " + (this.barrier.getParties() - 1));

		//set barrier
		for(int i = 0; i < this.runners.length; i++)
			this.runners**.setCyclicBarrier(this.barrier);
		//execute 
		for(int i = 0; i < this.runners.length; i++)
			this.executor.submit(this.runners**);

	}
	
	/**
	 * Causes the current thread to wait until all threads have reached the barrier
	 * unless the thread is interrupted or it has never been started
	 * 
	 * 
	 * Resets the barrier flag.
	 * 
	 */
	public void await() 
	{
		try 
		{
			/*
			 * checks if this runner has already been used before, necessary for 
			 * async recursive scenarios!
			 */
			if(this.barrier != null)
			{
				this.barrier.await();
				this.barrier.reset();
			}
		} 
		catch (Exception e) 
		{
			throw new Error(e);
		}
	}

	/**
	 * Executes all threads
	 */
	public void startWait()
	{
		this.start();
		this.await();
	}
	
	/**
	 * Adds the given runner into our runner array at the given index
	 * 
	 * @param runner CyclicRunnable
	 * @param index int
	 */
	public void addRunner(CyclicRunnable runner, int index)
	{
		this.runners[index] = runner;
	}

	/**
	 * Returns all runners
	 * 
	 * @return CyclicRunnable[]
	 */
	public CyclicRunnable[] getRunners()
	{
		return this.runners;
	}
	
	/**
	 * Gets the number of projected runners
	 * 
	 * @return int
	 */
	public int getRunnerCount()
	{
		return this.runnerCount;
	}
	
	/**
	 * Shuts the executor of this runner down
	 */
	public void shutDown()
	{
		this.executor.shutdown();
	}
}

oh man irgendwie erreichen nur 4 der 8 threads die vorletzte zeile, aah (war zuhause immer auf 4 kernen und hier sinds 8, nicht gesehen), mal nachschauen

ah index out of bounds exception in dem thread

BTW, da war noch was: Man kann sich mit
Callable callable = Executors.callable(someRunnable);
auch die anonymen instanzen sparen.