¿Cuál es la mejor manera de escribir en un archivo en un subproceso paralelo en Java?


Tengo un programa que realiza muchos cálculos y los reporta a un archivo con frecuencia. Sé que las operaciones de escritura frecuentes pueden ralentizar mucho un programa, así que para evitarlo me gustaría tener un segundo hilo dedicado a las operaciones de escritura.

Ahora mismo lo estoy haciendo con esta clase que escribí (el impaciente puede saltar al final de la pregunta):

public class ParallelWriter implements Runnable {

    private File file;
    private BlockingQueue<Item> q;
    private int indentation;

    public ParallelWriter( File f ){
        file = f;
        q = new LinkedBlockingQueue<Item>();
        indentation = 0;
    }

    public ParallelWriter append( CharSequence str ){
        try {
            CharSeqItem item = new CharSeqItem();
            item.content = str;
            item.type = ItemType.CHARSEQ;
            q.put(item);
            return this;
        } catch (InterruptedException ex) {
            throw new RuntimeException( ex );
        }
    }

    public ParallelWriter newLine(){
        try {
            Item item = new Item();
            item.type = ItemType.NEWLINE;
            q.put(item);
            return this;
        } catch (InterruptedException ex) {
            throw new RuntimeException( ex );
        }
    }

    public void setIndent(int indentation) {
        try{
            IndentCommand item = new IndentCommand();
            item.type = ItemType.INDENT;
            item.indent = indentation;
            q.put(item);
        } catch (InterruptedException ex) {
            throw new RuntimeException( ex );
        }
    }

    public void end(){
        try {
            Item item = new Item();
            item.type = ItemType.POISON;
            q.put(item);
        } catch (InterruptedException ex) {
            throw new RuntimeException( ex );
        }
    }

    public void run() {

        BufferedWriter out = null;
        Item item = null;

        try{
            out = new BufferedWriter( new FileWriter( file ) );
            while( (item = q.take()).type != ItemType.POISON ){
                switch( item.type ){
                    case NEWLINE:
                        out.newLine();
                        for( int i = 0; i < indentation; i++ )
                            out.append("   ");
                        break;
                    case INDENT:
                        indentation = ((IndentCommand)item).indent;
                        break;
                    case CHARSEQ:
                        out.append( ((CharSeqItem)item).content );
                }
            }
        } catch (InterruptedException ex){
            throw new RuntimeException( ex );
        } catch  (IOException ex) {
            throw new RuntimeException( ex );
        } finally {
            if( out != null ) try {
                out.close();
            } catch (IOException ex) {
                throw new RuntimeException( ex );
            }
        }
    }

    private enum ItemType {
        CHARSEQ, NEWLINE, INDENT, POISON;
    }
    private static class Item {
        ItemType type;
    }
    private static class CharSeqItem extends Item {
        CharSequence content;
    }
    private static class IndentCommand extends Item {
        int indent;
    }
}

Y luego lo uso haciendo:

ParallelWriter w = new ParallelWriter( myFile );
new Thread(w).start();

/// Lots of
w.append(" things ").newLine();
w.setIndent(2);
w.newLine().append(" more things ");

/// and finally
w.end();

Si bien esto funciona perfectamente bien, me pregunto: hay un mejor manera de lograr esto?

Author: Roman C, 2011-06-01

4 answers

Su enfoque básico se ve bien. Yo estructuraría el código de la siguiente manera:

import java.io.BufferedWriter;
import java.io.File;
import java.io.IOException;
import java.io.Writer;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

public interface FileWriter {
    FileWriter append(CharSequence seq);

    FileWriter indent(int indent);

    void close();
}

class AsyncFileWriter implements FileWriter, Runnable {
    private final File file;
    private final Writer out;
    private final BlockingQueue<Item> queue = new LinkedBlockingQueue<Item>();
    private volatile boolean started = false;
    private volatile boolean stopped = false;

    public AsyncFileWriter(File file) throws IOException {
        this.file = file;
        this.out = new BufferedWriter(new java.io.FileWriter(file));
    }

    public FileWriter append(CharSequence seq) {
        if (!started) {
            throw new IllegalStateException("open() call expected before append()");
        }
        try {
            queue.put(new CharSeqItem(seq));
        } catch (InterruptedException ignored) {
        }
        return this;
    }

    public FileWriter indent(int indent) {
        if (!started) {
            throw new IllegalStateException("open() call expected before append()");
        }
        try {
            queue.put(new IndentItem(indent));
        } catch (InterruptedException ignored) {
        }
        return this;
    }

    public void open() {
        this.started = true;
        new Thread(this).start();
    }

    public void run() {
        while (!stopped) {
            try {
                Item item = queue.poll(100, TimeUnit.MICROSECONDS);
                if (item != null) {
                    try {
                        item.write(out);
                    } catch (IOException logme) {
                    }
                }
            } catch (InterruptedException e) {
            }
        }
        try {
            out.close();
        } catch (IOException ignore) {
        }
    }

    public void close() {
        this.stopped = true;
    }

    private static interface Item {
        void write(Writer out) throws IOException;
    }

    private static class CharSeqItem implements Item {
        private final CharSequence sequence;

        public CharSeqItem(CharSequence sequence) {
            this.sequence = sequence;
        }

        public void write(Writer out) throws IOException {
            out.append(sequence);
        }
    }

    private static class IndentItem implements Item {
        private final int indent;

        public IndentItem(int indent) {
            this.indent = indent;
        }

        public void write(Writer out) throws IOException {
            for (int i = 0; i < indent; i++) {
                out.append(" ");
            }
        }
    }
}

Si no desea escribir en un hilo separado (tal vez en una prueba?), puede tener una implementación de FileWriter que llama a append en el Writer en el hilo de llamada.

 14
Author: Binil Thomas,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2011-06-01 19:46:13

Usar un LinkedBlockingQueue es una buena idea. No estoy seguro de que me guste algo del estilo del código... pero el principio parece razonable.

Quizás añadiría una capacidad al LinkedBlockingQueue igual a un cierto % de tu memoria total.. decir 10.000 artículos.. de esta manera, si su escritura va demasiado lenta, sus hilos de trabajo no seguirán agregando más trabajo hasta que el montón se vuele.

 6
Author: bwawok,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2011-06-02 01:39:01

Una buena manera de intercambiar datos con un solo hilo de consumo es usar un intercambiador.

Podría usar un StringBuilder o ByteBuffer como el búfer para intercambiar con el hilo de fondo. La latencia incurrida puede ser de alrededor de 1 micro-segundo, no implica la creación de ningún objeto y que es menor utilizando un BlockingQueue.

Del ejemplo que creo que vale la pena repetir aquí.

class FillAndEmpty {
   Exchanger<DataBuffer> exchanger = new Exchanger<DataBuffer>();
   DataBuffer initialEmptyBuffer = ... a made-up type
   DataBuffer initialFullBuffer = ...

   class FillingLoop implements Runnable {
     public void run() {
       DataBuffer currentBuffer = initialEmptyBuffer;
       try {
         while (currentBuffer != null) {
           addToBuffer(currentBuffer);
           if (currentBuffer.isFull())
             currentBuffer = exchanger.exchange(currentBuffer);
         }
       } catch (InterruptedException ex) { ... handle ... }
     }
   }

   class EmptyingLoop implements Runnable {
     public void run() {
       DataBuffer currentBuffer = initialFullBuffer;
       try {
         while (currentBuffer != null) {
           takeFromBuffer(currentBuffer);
           if (currentBuffer.isEmpty())
             currentBuffer = exchanger.exchange(currentBuffer);
         }
       } catch (InterruptedException ex) { ... handle ...}
     }
   }

   void start() {
     new Thread(new FillingLoop()).start();
     new Thread(new EmptyingLoop()).start();
   }
 }
 3
Author: Peter Lawrey,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2011-06-01 19:48:24

Sé que las operaciones de escritura frecuentes puede ralentizar un programa mucho

Probablemente no tanto como crees, siempre y cuando uses buffering.

 1
Author: user207421,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2011-06-02 01:29:07