ExecutorCompletionService? Зачем он нужен, если у нас есть invokeAll?

Если мы используем ExecutorCompletionService, мы можем отправить ряд задач как Callables и получить результат, взаимодействуя с CompletionService как queue.

Но есть также invokeAll из ExecutorService, который принимает Collection задач, и мы получаем список Future для получения результатов.

Насколько я могу судить, нет никакой пользы от использования одного или другого (за исключением того, что мы избегаем цикла for, используя invokeAll, который нам бы пришлось submit выполнять задачи CompletionService), и, по сути, это одна и та же идея с небольшая разница.

Итак, почему существует 2 разных способа отправить серию задач? Правильно ли я, что по производительности они эквивалентны? Есть ли случай, когда один подходит больше, чем другой? Я не могу вспомнить ни одного.


person Cratylus    schedule 08.08.2012    source источник


Ответы (4)


Используя ExecutorCompletionService.poll/take, вы получаете Futures по мере их завершения в порядке завершения (более или менее). Используя ExecutorService.invokeAll, у вас нет этой силы; вы либо блокируете, пока все не будут завершены, либо указываете тайм-аут, по истечении которого незавершенные аннулируются.


static class SleepingCallable implements Callable<String> {

  final String name;
  final long period;

  SleepingCallable(final String name, final long period) {
    this.name = name;
    this.period = period;
  }

  public String call() {
    try {
      Thread.sleep(period);
    } catch (InterruptedException ex) { }
    return name;
  }
}

Ниже я продемонстрирую, как работает invokeAll:

final ExecutorService pool = Executors.newFixedThreadPool(2);
final List<? extends Callable<String>> callables = Arrays.asList(
    new SleepingCallable("quick", 500),
    new SleepingCallable("slow", 5000));
try {
  for (final Future<String> future : pool.invokeAll(callables)) {
    System.out.println(future.get());
  }
} catch (ExecutionException | InterruptedException ex) { }
pool.shutdown();

Это дает следующий результат:

C:\dev\scrap>java CompletionExample
... after 5 s ...
quick
slow

Используя CompletionService, мы видим другой результат:

final ExecutorService pool = Executors.newFixedThreadPool(2);
final CompletionService<String> service = new ExecutorCompletionService<String>(pool);
final List<? extends Callable<String>> callables = Arrays.asList(
    new SleepingCallable("slow", 5000),
    new SleepingCallable("quick", 500));
for (final Callable<String> callable : callables) {
  service.submit(callable);
}
pool.shutdown();
try {
  while (!pool.isTerminated()) {
    final Future<String> future = service.take();
    System.out.println(future.get());
  }
} catch (ExecutionException | InterruptedException ex) { }

Это дает следующий результат:

C:\dev\scrap>java CompletionExample
... after 500 ms ...
quick
... after 5 s ...
slow

Обратите внимание, что время указано относительно запуска программы, а не предыдущего сообщения.


Полный код можно найти здесь.

person obataku    schedule 08.08.2012
comment
Итак, вы говорите, что в List<Future>, возвращенном из invokeAll, если начать перебирать результаты, я мог бы заблокировать первый, пока он не закончится, а в ExecutioncCompletion я бы заблокировал, пока не будет доступен какой-либо один результат? Я понял вашу точку зрения? - person Cratylus; 09.08.2012
comment
+1 Да уж, @ user384706. Под ExecutorCompletionService находится BlockingQueue<Future<V>>, поэтому вы можете дождаться завершения первого задания, а не всех остальных. - person Gray; 09.08.2012
comment
@ user384706 ну, использование формы без тайм-аута возвращает Futures после того, как все они завершились, блокируя на неопределенный срок. - person obataku; 09.08.2012
comment
@Gray: Но в invokeAll я тоже не жду, пока все завершится - person Cratylus; 09.08.2012
comment
@ user384706, когда вы вызываете invokeAll, он ожидает, пока все они завершатся, прежде чем закончить. Когда ExecutorCompletionService.poll/take возвращается, вы знаете, что полученный get() не будет блокироваться. - person Gray; 09.08.2012
comment
@ user384706, то, возможно, вы не хотите invokeAll, если вы не согласны с отменой незавершенных задач с использованием тайм-аута. - person obataku; 09.08.2012
comment
@Gray на самом деле я был неправ ... как оказалось, ExecutorService.invokeAll ждет завершения. - person obataku; 09.08.2012
comment
@Gray: Хорошо, но вместо этого он будет заблокирован на take - person Cratylus; 09.08.2012
comment
Я добавил ответ с подробностями на @ user384706. Да он блокирует на take(). - person Gray; 09.08.2012
comment
Кстати @veer. take() никогда не возвращает null. - person Gray; 09.08.2012
comment
@ Серый, да, я знаю, но по какой-то странной причине я почувствовал себя обязанным поместить задание в условие цикла: p - person obataku; 09.08.2012
comment
Хех. Я никогда не помещал задание в условие цикла. Думаю, это любимая мозоль. Хороший ответ. :-) - person Gray; 09.08.2012
comment
@veer: Я не понимаю, почему в вашем первом примере (invokeAll) он сначала печатает slow. Я считаю, что я читал, что Future возвращаются в том же порядке, что Iterator из переданных Collection в invokeAll вернет их. Я ожидал увидеть quick первым, поскольку quick является первым в переданном List из invokeAll - person Cratylus; 09.08.2012
comment
@ user384706 хороший улов, я изменил порядок отправки без обновления вывода. Это было сделано для демонстрации того, что он ожидает завершения обоих, независимо от порядка, в котором вы их разместили. С тех пор я обновил вывод. Не стесняйтесь компилировать и запускать полный код из пасты. - person obataku; 09.08.2012
comment
@ user384706, поскольку вы здесь новее, не забудьте отметить ответ принятым, который больше всего помог в решении проблемы. - person obataku; 15.08.2012
comment
@veer: Хороший момент. Спасибо за ВСЕ прекрасные ответы Грея и esaj. - person Cratylus; 15.08.2012
comment
Что вы имеете в виду под порядком выполнения (более или менее)? - person Mr_and_Mrs_D; 24.01.2014
comment
@Mr_and_Mrs_D порядок, в котором задачи завершаются, т.е. если B завершается до A, то порядок будет B, A независимо от порядка, в котором они отправляются - person obataku; 25.01.2014
comment
@oldrinb: спасибо - часть более или менее смутила меня - так что они гарантированно вернутся в том порядке, в котором они закончили. Возможно, вы захотите отредактировать это, чтобы сделать его более понятным, и дайте мне знать, чтобы удалить комментарии :) - person Mr_and_Mrs_D; 25.01.2014

Итак, почему существует 2 разных способа отправить серию задач? Правильно ли я, что по производительности они эквивалентны? Есть ли случай, когда один подходит больше, чем другой? Я не могу вспомнить ни одного.

Используя ExecutorCompletionService, вы можете немедленно получать уведомления, когда каждая ваша работа завершена. Для сравнения, ExecutorService.invokeAll(...) ожидает завершения всех ваших заданий перед возвратом коллекции Futures. Это означает, что (например), если все задания, кроме одного, выполняются за 10 минут, а одно задание занимает 30 минут, вы не получите результатов в течение 30 минут.

// this waits until _all_ of the jobs complete
List<Future<Object>> futures = threadPool.invokeAll(...);

Вместо этого, когда вы используете ExecutorCompletionService, вы сможете получать задания, как только каждое из них завершится, что позволяет вам (например) отправлять их для обработки в другой пул потоков, немедленно регистрировать результаты и т. Д.

ExecutorService threadPool = Executors.newFixedThreadPool(2);
ExecutorCompletionService<Result> compService
      = new ExecutorCompletionService<Result>(threadPool);
for (MyJob job : jobs) {
    compService.submit(job);
}
// shutdown the pool but the jobs submitted continue to run
threadPool.shutdown();
while (true) {
    Future<Result> future;
    // if pool has terminated (all jobs finished after shutdown) then poll() else take()
    if (threadPool.isTerminated()) {
        future = compService.poll();
        if (future == null) {
            break;
        }
    } else {
        // the take() blocks until any of the jobs complete
        // this joins with the jobs in the order they _finish_
        future = compService.take();
    }
    // this get() won't block
    Result result = future.get();
    // you can then put the result in some other thread pool or something
    // to immediately start processing it
    someOtherThreadPool.submit(new SomeNewJob(result));
}
person Gray    schedule 08.08.2012
comment
while(!threadPool.isTerminated()) Разве это не занятое формальное ожидание? - person Sergio Bilello; 05.04.2016
comment
Он всего take() блоков, поэтому не крутится. Я ответил на ваш вопрос @Sergio? - person Gray; 06.04.2016
comment
Да, спасибо! Я копался, как ограничить очередь на блокировку, которая есть внутри Executors.newFixedThreadPool. В частности, я использую ListenableFuture - person Sergio Bilello; 06.04.2016
comment
@Gray Я не понял твоего объяснения while(!threadPool.isTerminated()). Зачем это нужно? Какой цели это служит? - person tinkuge; 14.01.2021
comment
isTerminate() истинно, если пул отключен и все задания выполнены. Это то, о чем вы спрашиваете @tinkuge? - person Gray; 14.01.2021
comment
@Gray Редактирование делает его более понятным, спасибо! - person tinkuge; 15.01.2021

На самом деле я никогда не использовал ExecutorCompletionService, но думаю, что случай, когда это могло бы быть более полезным, чем «нормальный» ExecutorService, был бы тогда, когда вы хотите получить Futures завершенных задач в порядке завершения. С invokeAll вы просто получаете список, который может содержать как незавершенные, так и завершенные задачи в любой момент времени.

person esaj    schedule 08.08.2012

Сравнение только с учетом порядка результатов:

Когда мы используем CompletionService всякий раз, когда отправленное задание завершено, результат будет помещен в очередь (Порядок завершения). Тогда порядок отправленных заданий и возвращаемых результатов больше не будет одинаковым. Поэтому, если вас беспокоит порядок выполнения задач, используйте CompletionService

Где As invokeAll возвращает список Futures, представляющих задачи, в том же последовательном порядке, что и итератор для данного списка задач, каждая из которых завершена.

person Dungeon Hunter    schedule 14.01.2013