Пример Java использования ExecutorService и PipedReader/PipedWriter (или PipedInputStream/PipedOutputStream) для потребителя-производителя

Я ищу простую реализацию производителя-потребителя на Java и не хочу изобретать велосипед.

Мне не удалось найти пример, в котором используется как новый пакет параллелизма, так и любой из классов Piped.

Есть ли пример использования как PipedInputStream, так и новый пакет параллелизма Java для этого?

Есть ли лучший способ без использования классов Piped для такой задачи?


person Eran Medan    schedule 01.05.2012    source источник
comment
И чего именно вы пытаетесь достичь? Вы задали очень широкий вопрос. Если бы я работал с конвейерными потоками, я был бы не против просто запустить поток.   -  person Marko Topolnik    schedule 02.05.2012
comment
Вы просто хотите создать Runnable для потребителя и производителя и отправить их в ExecutorService?   -  person trutheality    schedule 02.05.2012
comment
Задача просто читается из базы данных и записывается в файл неблокирующим/асинхронным/буферизованным способом, инструменты, упомянутые в вопросах, - это именно то, что я считаю правильными инструментами для работы, если есть более простой/другой Кстати, я буду рад услышать   -  person Eran Medan    schedule 02.05.2012
comment
@trutheality - да, в значительной степени   -  person Eran Medan    schedule 02.05.2012


Ответы (1)


Для вашей задачи может быть достаточно просто использовать один поток и писать в файл, используя BufferedOutputStream, когда вы читаете из базы данных.

Если вы хотите больше контролировать размер буфера и размер фрагментов, записываемых в файл, вы можете сделать что-то вроде этого:

class Producer implements Runnable {

    private final OutputStream out;
    private final SomeDBClass db;

    public Producer( OutputStream out, SomeDBClass db ){
        this.out = out;
        this.db = db;
    }

    public void run(){
        // If you're writing to a text file you might want to wrap
        // out in a Writer instead of using `write` directly.
        while( db has more data ){
            out.write( the data );
        }
        out.flush();
        out.close();
    }
}

class Consumer implements Runnable {

    private final InputStream in;
    private final OutputStream out;
    public static final int CHUNKSIZE=512;

    public Consumer( InputStream in, OutputStream out ){
        this.out = out;
        this.in = in;
    }

    public void run(){
        byte[] chunk = new byte[CHUNKSIZE];

        for( int bytesRead; -1 != (bytesRead = in.read(chunk,0,CHUNKSIZE) );;){
            out.write(chunk, 0, bytesRead);
        }
        out.close();
    }
}

И в вызывающем коде:

FileOutputStream toFile = // Open the stream to a file
SomeDBClass db = // Set up the db connection
PipedInputStream pi = new PipedInputStream(); // Optionally specify a size
PipedOutputStream po = new PipedOutputStream( pi );

ExecutorService exec = Executors.newFixedThreadPool(2);
exec.submit( new Producer( po, db ) );
exec.submit( new Consumer( pi, toFile ) );
exec.shutdown();
  • Также ловите любые исключения, которые могут быть выброшены.

Обратите внимание, что если это все, что вы делаете, нет никаких преимуществ в использовании ExecutorService. Исполнители полезны, когда у вас много задач (слишком много, чтобы запускать их все в потоках одновременно). Здесь у вас есть только два потока, которые должны выполняться одновременно, поэтому прямой вызов Thread#start будет иметь меньше накладных расходов.

person trutheality    schedule 02.05.2012