Datei parallel auslesen

Hallo,

ich arbeite momentan daran große Dateien zu analysieren, da dieser Vorgang aber recht viel Zeit beanprucht habe ich mir überlegt, dass man durch Parallelisieren das Ganze sicherlich beschleunigen könnte. Problem dabei ist aber, dass die Dateien oft zu groß sind um sie vollständig in den Speicher zu laden. Daher müsste man mit mehreren RandomAccessFile gleichzeitig die selbe Datei auslesen.
Ich bin mir unsicher ob das überhaupt möglich/sinnvoll ist, da es je nach dem wie es gehandhabt wird, vorkommen könnte, dass die Auslesevorgänge ich die ganze Zeit gegenseitig blocken, so dass am Ende die Performance im schlechtesten Fall sogar abnimmt, dem rein sequenziellen Ablauf gegenüber.
Ich habe schon versucht über die Google-Suche etwas über das Thema herauszufinden, leider erfolglos, vielleicht habe ich aber auch nur die falschen Suchbegriffe genutzt.

Falls es nicht möglich/sinnvoll ist, hatte ich noch die Idee, dass man mehrere temporärer Kopien der Datei anfertigen könnte. Aufgrund der möglichen Größe der Dateien bin ich dem aber eher abgeneigt.

Also: Was haltet ihr von den Ansätzen, wisst ihr wie es sich mit dem parallelen Auslesen verhält ?

MfG,

~Oneric

mal abgesehen vom eigentlichen thema wäre vielleicht interessant zu wissen, was für dich “große dateien”
sind beziehungsweise was genau du versuchst…

Große Dateien: 3-10 GB
Da ich nur 6GB RAM habe passen 10GB definitiv nicht in meinen Arbeitsspeicher und 3GB würden zwar theoretisch gehen, aber da noch andere Programme laufen würde ich das gerne vermeiden.

Das Ganze bezieht sich darauf: http://forum.byte-welt.net/java-forum-erste-hilfe-vom-java-welt-kompetenz-zentrum/allgemeine-themen/15224-analysieren-langer-strings-zeichenketten.html auch wenn ich nicht denke dass es direkt etwas mit dem parallelen Auslesen zu tun hat.

MfG,
~Oneric

Also. Ich habe grundsätzlich nicht viel tiefergehende Ahnung davon, aber ein paar Sachen:

Viele Random Acces Files auf einen File ergibt in meinen Augen keinen Sinn. Deine Festplatte
kann ja keine mehrere lesevorgänge gleichzeitig erledigen. Also du kannst den Lese vorgang an
sich nicht parallelisieren oder irgendwie beschleunigen, soweit ich das verstehe. funktioniert nicht.
von daher, wenn du an verschiedene stellen musst wohl eher oftmals mit seek arbeiten.
Wobei man doch auch mit normalen streams die stelle wechseln kann, oder? wieso überhaupt
raf? (wie gesagt, ich kenne mich da nicht aus)

und wenn du so ne datei stück für stück auswertest, und nur das was du letzten endes brauchst
(also was die analyse halt so hervorbringen soll) speicherst, dann sollte doch auch dein ram nicht
überlaufen?..

[quote=mymaksimus]von daher, wenn du an verschiedene stellen musst wohl eher oftmals mit seek arbeiten.
Wobei man doch auch mit normalen streams die stelle wechseln kann, oder?[/quote]
RandomAccesFile habe ich weil man dort die Stelle wechseln kann und bei dem verwendeten Dateiformat steht der Header am Ende der Datei. Bei ‘normalen’ InputStreams geht das “Hin- und Herspringen” innerhalb der Datei soweit ich weiß nicht.

und wenn du so ne datei stück für stück auswertest, und nur das was du letzten endes brauchst
(also was die analyse halt so hervorbringen soll) speicherst, dann sollte doch auch dein ram nicht
überlaufen?..
Ich speichere natürlich nur die Ergebnisse, aber wenn ich die Datei nur sequenziell auslesen kann benötigt diese Analyse halt der großen Datenmenge entsprechend viel Zeit. Deshalb meine Idee mit dem Parallelisieren.
Am Analyse-Algorithmus ist, denke ich, ohne Parallelisierung auch nicht mehr viel zu holen, da alle mir bekannten schnelleren Varianten, die die Rohdaten erst aufbereiten (z.B. Suffixbaum), auch zu viel Speicherplatz benötigen.

MfG,
~Oneric

Wie gesagt, du kannst das Daten Lesen nicht parallelisieren, weil deine Festplatte nur einmal pro… zugriff
irgendetwas lesen kann. verstehst du was ich meine?

du kannst über den channel auch nen normalen fileinputstream verschieben: fos.getChannel().position(long);

[quote=mymaksimus]Daten Lesen nicht parallelisieren, weil deine Festplatte nur einmal pro… zugriff
irgendetwas lesen kann[/quote]
Ja, das kann ich verstehen. Aber da wohl auch ständig irgendwelche Hintergrund-Programme im Betriebssystem ständig Dateioperation durchführen, hatte ich darauf gehofft, dass es vielleicht doch irgendeinen (kleinen) Vorteil bringen könnte.

Hmm… Das wusste ich bisher nicht. Aber mit RandomAccessFile komme ich eigentlich ganz gut zurecht. Und da es speziell für das „Hin- und Herspringen“ ausgelegt ist, ist beim RandomAccessFile wahrscheinlich auch die Performance etwas besser, oder irre ich mich da ?

MfG,
~Oneric

FileChannel geht schon in die richtige Richtung. Je nachdem, welches Zugriffsmuster dort verwendet wird, könnte eine https://docs.oracle.com/javase/8/docs/api/java/nio/MappedByteBuffer.html was geeignetes sein.

Allgmein muss man in bezug auf Parallelisierung aber berücksichtigen, dass zufällig verteilte Zugriffe eher schlecht sind. Die Datentransferrate einer Festplatte ist am höchsten, wenn linear am Stück gelesen wird. Durch mehrere Threads wird diese Linearität eher gestört, und der Lesekopft hüpft munter in der Gegend rum, und die Geschwindigkeit bricht ein. (Bei SSDs dürfte das schon deutlich weniger schlimm sein, aber auch die sind bei “random access” deutlich langsamer als bei linearem Lesen).

Am besten wäre es vernutlich, wenn man die Datei “Chunk-Weise” verarbeiten könnte - also immer 2GB am Stück lesen, verarbeiten, und dann weitermachen mit den nächsten 2GB. (Wenn man, als “Mittelweg”, zwar immer 2GB lesen kann, aber dann innerhalb dieser 2GB rumspringen muss (ggf. weil man DIE dann Parallel verarbeiten will) könnte das durch o.g. MemoryMappedFile evtl. gelöst werden).

Aber auch ich muss sagen, dass das alles eher “theoretisch” ist: Wirklich ausprobiert und die Alternativen verglichen habe ich bisher nicht.
(Der Verkäufer meines PCs war etwas irritiert, und fragte: “Wozu brauchen sie denn 32GB RAM?” - tja :D)

Du solltest erstmal rausfinden, wo der Performanceflaschenhals liegt. Geht für das Auslesen die meiste Zeit drauf oder für die Analyse? Wenn es das Auslesen ist bringt paralleles Auslesen nichts.

Ggf. kann es auch sinnvoll sein, die Lesezugriffe manuell zu synchronisieren. Dadurch ist der Zugriff dann nicht mehr gar so zufällig und “parallel”, sondern sinnvoll serialisiert - auf einer höheren Abstraktionsebene als das NCQ des Festplattencontrollers es machen kann.

was allles wie gelesen und wo warum gesprungen wird ist unbekannt, ob dir selber bekannt auch erst noch die Frage

InputStreams haben zumindest die skip-Methode, dürften in hintere Teile der Datei kommen können,
wie performant alles, vor allem das normale Lesen, RandomAccesFile oder InputStream verwenden, ist das auch Thema?
na, solltest du ja im Grunde gut testen können wenn du dabei bist
java - Best way to get an InputStream for the second half of a file? - Stack Overflow


allgemein aber einmal mehr an das sequentielle Lesen, welches du schon selber im ersten Posting erwähnst, erinnert,

falls keine engen Speichergrenzen (einer von vielen Usern gleichzeitig aktiv), dann dürfte es, egal um welche Verarbeitung es geht, kaum Sprünge/ Parallelität geben,
lies doch einfach wenigstens immer 1 bis 100 MB-Blöcke, egal von wo in der Datei
→ die 10 GB-Datei sind 100 oder 10.000 Lese-Vorgänge, könnte in eigener Methode stehen, ganz egal mit welcher IO-Klasse und welchen sonsten Stand der Verarbeitung des Programm
immer komplett neu Lesen beginnen, mit skip oder RandomAccess dahin, 1 bis 100 MB Speicher reservieren und befüllen usw.,

das kann man auch performanter machen, wenn 80% der Datei mit einer Schleife zu durchlaufen dann sicher empfehlenswert,
aber bedeutend langsamer sollten die einzelnen großen Lesevorgänge nicht sein, wenige Minuten für 10 GB?
ich rechne seit Jahren immer mit Fausregel ‚ein Programm kann 50-100 MB in einer Sekunde einlesen‘, freilich selten wirklich konkret in Java bestätigt :wink:

welchen Art Zeitprobleme bestehen? geht es um jede Sekunde, dann natürlich immer noch was zu verbessern,
aber die Daten werden doch wohl nicht GB-weise pro Minute neu erscheinen?


was jedenfalls nicht relevant sein sollte, ist das erwähnte „Hin- und Herspringen“ beim Lesen aus Datei,
tausendfach einzelne Zeilen von verschiedenen Stellen wären in der Tat ein Problem, und dann die Idee von parallelen Lesen in welcher Form auch immer,
aber das sollte nicht vorkommen, lieber große Blöcke einlesen und nur im Arbeitsspeicher die Kleinarbeit

die Datei muss nicht komplett in den Speicher passen, sofern nur an 2-3 Stellen je ein aktueller MB-Block und ein zweitere im Übergang reichen,

idealerweise sowieso mindestens ein Hauptbereich mit Schleife zu lesen und nur wenige dazu komplizierter gecacht

schlecht wäre freilich, falls über die gesamte Datei hinweg überall etwas zu lesen, so dass selbst tausend 1MB-Ausschnitte nicht zur Abdeckung reichen,
aber das ist ja kaum vorstellbar ungünstig angeordnet, und selbst dann kann man noch auf 100KB usw. runter gehen

problematisch sind Lesevorgänge/ Sprünge auf Ebene von einzelnen Zeilen für große Datenmengen,
ab KB oder wenigstens 10 KB (~Buffergröße BufferedReader) erscheint üblich großes wieder überschaubar,
10 GB werden zu ‚nur‘ 1 Mio. Lese-Vorgänge,

Normalerweise bin ich der Auffassung : man muss nicht immer alle Daten gleichzeitig im Speicher haben, vor allem dann nicht wenn man nicht die Möglichkeit hat mit allen echt-parallel zu arbeiten. Man kann halt sowas nicht beliebig parallelisieren. Irgendwo ist der Overhead zu groß und man verliert wieder Performance.

Gehen wir doch mal von dem Beispiel aus die Datenmenge würde in deinen virtuell endlos großen Hauptarbeitsspeicher passen, wie viel davon kannst du dann wirklich echt-parallel gleichzeitig überhaupt verarbeiten ? Selbst wenn man ein großes Server-Board mit 2 XEON-CPUs und insgesamt was um die 24 Rechenwerke hat so ist auch da irgendwann das Limit erreicht an dem der Overhead für das ständige umkopieren zwischen den Threads so groß wird das der Anteil dafür den Anteil der eigentlichen Verarbeitung übersteigt und somit die Performance wieder abnimmt. Und das dürfte dann auch schon bei ein paar 100MB passieren, nicht erst bei 10GB. Zusätzlich willst du die Ergebnisse ja auch noch wieder wegspeichern und brauchst dafür auch noch ein paar Resourcen (gut, dank DMA fast nichts, aber es läuft irgendwie doch alles über die CPU).

Versuch also erstmal rauszubekommen wie viele Daten du ohne Performanceverlust überhaupt gleichzeitig verarbeiten kannst. Das ist dann deine Blockgröße.
Um deine Platten zu schonen und sowohl den Lese- als auch Schreibvorgang schön effektiv sequentiell zu halten liest du halt zum Start initial 16 Blöcke ein und fängst an die zu verarbeiten. Wenn du 8 Blöcke fertig hast schreibst du diese vom RAM wieder auf die Platte raus und lädst die nächsten 8 Blöcke nach, so dass du im RAM immer einen Puffer von ca. 8 Blöcken hast.

Ob jetzt die Blockgröße auf Grund mega-komplizierter Formeln nur ein paar MB sind, oder dank einfacher Arithmetik in einem Takt gleich ein paar 100MB durchgeschaufelt werden können musst du ausprobieren.

Vorher würde ich mir über Dinge wie “Meine Daten passen alle nicht gleichzeitig in den RAM” gar keine gedanken machen.

Schönes vergleichsbeispiel : Was meint ihr warum auch Supercomputer an bestimmten Aufgabe ewig brauchen : nicht etwa viel der Speicher zu klein oder zu langsam wäre, aber weil halt irgendwo die Performancegrenze erreicht ist was wirklich echt-parallel gleichzeitig berechnet werden kann.

Also nur mal flux die Theorie. Du könntest 100mb lesen, gleichzeitig bearbeiten 4 Threads jeweils 25mb mit rechenzeit-aufwändigen Operationen, und danach schreiben 100mb zurück, und das ganze beginnt von Neuem.

So ist nicht die CPU und auch nicht die Festplatte der Flaschenhals.

Ein fünfter Thread könnte auch das lesen schreiben übernehmen parallel, aber dann müsste “herkömmlich” synchronisiert werden.

Vielleicht kann ich nachher ein Beispiel/KSKB posten. Grüße.

die Festplatte ist IMMER der Flaschenhals. Einzig mehrere Threads mit entsprechender Ausbalancierung zwischen Rechnen und Laden/Speichern ist sinnvoll

Das ist richtig, wenn aber nicht „in einem Rutsch“ gelesen/geschrieben wird, dann verlangsamt sich die Festplatte um den Faktor 1000.

Also die Festplatte lässt sich beschleunigen, s. d. sie vielleicht schneller ist, als nicht parallelisierte Operationen, weshalb sich dann Parallelisierung der Operationen lohnt.

Irgendwie komisch ausgedrückt, Impulsantwort.

Leider hatte ich keine langen Rechnungen einfallen, aber trotzdem ein KSKB/SSCCE geschrieben:

 * To change this license header, choose License Headers in Project Properties.
 * To change this template file, choose Tools | Templates
 * and open the template in the editor.
 */
package javaapplication1;

import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.LinkedList;

/**
 * @author CB
 */
public class ParaDatei {

    public static class Buf {

        LinkedList<byte[]> inList = new LinkedList<byte[]>();
        LinkedList<byte[]> outList = new LinkedList<byte[]>();
        FileInputStream fis = null;
        FileOutputStream fos = null;

        public Buf() throws FileNotFoundException {
            fis = new FileInputStream("beispiel4in.txt");
            fos = new FileOutputStream("beispiel4out.txt");
        }

        public synchronized byte[] read() throws IOException {
            if (inList.isEmpty()) {
                for (int i = 0; i < 4; i++) {
                    byte[] ba = new byte[100 * 1024 * 1024 / 4]; // 25mb
                    fis.read(ba);
                    inList.add(ba);
                }
            }
            return inList.remove();
        }

        public synchronized void write(byte[] ba) throws IOException {
            outList.add(ba);
            if (outList.size() >= 4) {
                while (!outList.isEmpty()) {
                    fos.write(outList.remove());
                }
                fos.flush();
            }
        }
    }

    public static void main(String[] args) throws FileNotFoundException, InterruptedException {
        final Buf buf = new Buf();
        for (int i = 0; i < 4; i++) {
            new Thread() {
                @Override
                public void run() {
                    try {
                        byte[] ba = buf.read();
                        for (int j = 0; j < ba.length - 1; j++) {
                            if (ba[j + 1] != 0) {
                                ba[j] %= ba[j + 1];
                            }
                            ba[j] ^= ba[j + 1];
                            ba[j] -= ba[j + 1];
                            ba[j] = (byte) ~ba[j];
                        }
                        buf.write(ba);
                    } catch (IOException ioe) {
                    }
                }
            }.start();
            Thread.sleep(40L);
        }
    }
}

Also man sieht, es sind immer vier Threads aktiv, aber es gibt ein paar Einschränkungen; „beispiel4in.txt“ sollte genau 100 MiB und/oder durch 4*2 teilbar sein; damit die Reihenfolge nicht durcheinander kommt, müsste man eigentlich einen Index mitschleppen.

Threads und Ausnahmen auch nicht so leicht; eigentlich muss auch nicht synchronisiert werden, an einer Stelle schreiben + lesen, gar kein Problem; for (int i = 0; i < 4; hab ich mir jetzt auch ausgedacht, ein langes byte-Arr stückeln ist kein Spaß.

hth :DaumenHoch:

So, war etwas Fummelei, aber jetzt sollte auch die Reihenfolge gewährleistet sein:

 * To change this license header, choose License Headers in Project Properties.
 * To change this template file, choose Tools | Templates
 * and open the template in the editor.
 */
package javaapplication1;

import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;

/**
 * @author CB
 */
public class ParaDatei {

    public static class Buf {

        byte[][] inList = new byte[4][20 * 1024 * 1024 / 4]; // 5x 4x 5mb
        byte[][] outList = new byte[4][20 * 1024 * 1024 / 4]; // 5x 4x 5mb
        FileInputStream fis = null;
        FileOutputStream fos = null;
        int inCnt = 0;
        int outCnt = 0;

        public Buf() throws FileNotFoundException, IOException {
            fis = new FileInputStream("beispiel4in.txt");
            fos = new FileOutputStream("beispiel4out.txt");
            for (int i = 0; i < inList.length; i++) {
                fis.read(inList**);
            }
            inCnt = (int) Math.pow(2, inList.length) - 1;
        }

        public synchronized byte[] read(int index) throws IOException {
            System.out.println("read " + Integer.toBinaryString(index));
            while ((inCnt & index) == 0) {
                try {
                    wait();
                } catch (InterruptedException ie) {
                }
            }
            inCnt = inCnt & ~index; // toggle status
            if (inCnt == 0) {
                for (int i = 0; i < inList.length; i++) {
                    fis.read(inList**);
                }
                inCnt = (int) Math.pow(2, inList.length) - 1;
                notifyAll();
            }
            return inList[(int) (Math.log10(index) / Math.log10(2))];
        }

        public synchronized void write(byte[] ba, int index) throws IOException {
            System.out.println("write " + Integer.toBinaryString(index));
            while ((outCnt & index) > 0) {
                try {
                    wait();
                } catch (InterruptedException ie) {
                }
            }
            outCnt = outCnt | index; // toggle status
            outList[(int) (Math.log10(index) / Math.log10(2))] = ba;
            if (outCnt == (int) Math.pow(2, outList.length) - 1) {
                for (int i = 0; i < outList.length; i++) {
                    fos.write(outList**);
                }
                fos.flush(); // flush !!!!
                outCnt = 0;
                notifyAll();
            }
        }
    }

    public static void main(String[] args) throws FileNotFoundException, InterruptedException, IOException {
        final Buf buf = new Buf();
        for (int j = 0; j < 5; j++) {
            for (int i = 0; i < 4; i++) {
                final int fi = 1 << i;
                new Thread() {
                    @Override
                    public void run() {
                        try {
                            byte[] ba = buf.read(fi);
                            for (int j = 0; j < ba.length - 1; j++) {
                                if (ba[j + 1] != 0) {
                                    ba[j] %= ba[j + 1];
                                }
                                ba[j] ^= ba[j + 1];
                                ba[j] -= ba[j + 1];
                                ba[j] = (byte) ~ba[j];
                            }
                            buf.write(ba, fi);
                        } catch (IOException ioe) {
                        }
                    }
                }.start();
            }
        }
    }
}

Zu bearbeitente Daten: 100mb,
Anzahl der Threads: 54 Threads,
Daten pro Thread: 100 / (5
4) == 5mb,

Es können/könnten immer 5*4 mb parallel verarbeitet werden, z. B. bei 4 Kernen.

Funktioniert bei mir und friert nicht ein.

Was kann/könnte denn verbessert werden? Grüße.

so nen code mag man ja gar nicht lesen… ehrlich…

Danke erstmal für die vielen Antworten ! :slight_smile:

Momentan gehört bei mir der Auslese und Analysevorgang zusammen, da ich ja die Datei nicht (komplett) in den Speicher laden kann/will.

Hmmmm… Ich könnte natürlich immer einen ‚großen‘ Block (~400MB vl.) einlesen, den dann unterteilen und die Teilstücke an Threads zur Analyse geben. Anschließend / Während ich auf die Threads warte, könnte ich dann noch die Randstellen analysieren. Damit sich das lohnt müsste die Blockgröße aber groß genug sein.

Was mir noch aufgefallen ist: theoretisch wäre es möglich, dass die durch die Analyse gesammelten Daten auch zu groß für den Heap-Speicher werden, auch wenn dieser Fall seeehr unwahrscheinlich ist. Für den Fall, dass ich diesen Fall trotzdem behandeln wollen würde: Wenn man der Speicher droht vollzulaufen könnte man auch die Analyse-Daten in eine Datei schreiben. Wenn diese Datei auf einer anderen Festplatte liegt sollte dass das (gleichzeitig stattfindende) Auslesen doch nicht beeinträchtigen, oder ?

MfG,
~Oneric

Ich muss zugeben, nicht alles gelesen und nachvollzogen zu haben, aber ein Tipp, den ich zwischen den Zeilen gelesen zu haben glaube, und der auch für deinen letzten Absatz relevant sein könnte, ist Pipelining: Während man noch mit einer Sache beschäftigt ist, schon die nächste zu machen. Gaaanz High-Level und sinngemäß:

Auf einem 5-Kerner mit 2GB RAM soll eine 4GB-Datei verarbeitet werden, und 2GB Daten werden dabei generiert.


Man könnte dann 500MB lesen
    Die mit 4 Threads verarbeiten, und dabei 250MB generieren
    Gleichzeitig (!) mit dem 5. Thread schon die nächsten 500 MB lesen
        Die neuen 500MB mit 4 Threads verarbeiten, und dabei 250MB generieren
        Gleichzeitig (!) mit dem 5. Thread die ersten 250MB rausschreiben und schon die nächsten 500 MB lesen

Also, grob dafür sorgen, dass aus


          Process                     Process
 Read ->  Process -> Write -> Read -> Process -> Write 
          Process                     Process
          Process                     Process

eher sowas wird


          Process                     
 Read ->  Process -> Write 
          Process                     
                     Process 
           Read   -> Process -> Write 
                     Process

und damit die Gesamtzeit verringert wird.

Ob das funktioniert, hängt natürlich von den Daten und deren Abhängigkeiten ab. Und ob es was bringt, hängt von 1000 weiteren Faktoren ab. Aber so also grober Gedanke…

Ich bin mal nicht so. Hier ist das fertige Programm:

Einfach auf die Datei doppelklicken, fertig. Das Menü, perfekt.

Ich behaupte mal, rückübersetzen unmöglich!!! Wer findet Fehler?

[QUOTE=Oneric]
Hmmmm… Ich könnte natürlich immer einen ‘großen’ Block (~400MB vl.) einlesen, den dann unterteilen und die Teilstücke an Threads zur Analyse geben. Anschließend / Während ich auf die Threads warte, könnte ich dann noch die Randstellen analysieren. Damit sich das lohnt müsste die Blockgröße aber groß genug sein.

Was mir noch aufgefallen ist: theoretisch wäre es möglich, dass die durch die Analyse gesammelten Daten auch zu groß für den Heap-Speicher werden, auch wenn dieser Fall seeehr unwahrscheinlich ist. Für den Fall, dass ich diesen Fall trotzdem behandeln wollen würde: Wenn man der Speicher droht vollzulaufen könnte man auch die Analyse-Daten in eine Datei schreiben. Wenn diese Datei auf einer anderen Festplatte liegt sollte dass das (gleichzeitig stattfindende) Auslesen doch nicht beeinträchtigen, oder ?

MfG,
~Oneric[/QUOTE]

Nein, das geht gleichzeitig, wie mein Programm zeigt…