Как я могу перехватить исключение RejectedExecutionException, созданное при вызове комплететаблефутуре комплетеасинк?

В следующем примере кода я ввожу biconsumer, который спит в течение 100 миллисекунд, как действие завершения набора завершаемого будущего. Я использовал метод whenCompleteAsync, предоставив для использования отдельный executorService. executorService — это ThreadPoolExecutor с размером основного пула 5, максимальным размером 5 и длиной очереди 1.

public class CompleteTest {
    public static void main(String[] args) {
        ExecutorService executorService = new ThreadPoolExecutor(5, 5, 10,
                TimeUnit.SECONDS, new ArrayBlockingQueue<>(1));

        ArrayList<CompletableFuture<String>> list = new ArrayList<>();

        for (int i = 0; i <100; i++) {
            CompletableFuture<String> stringCompletableFuture = new CompletableFuture<>();
            stringCompletableFuture.whenCompleteAsync((e, a) -> {
                System.out.println("Complete " + e);
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e1) {e1.printStackTrace();}
            }, executorService);

            list.add(stringCompletableFuture);
        }

        for (int i = 0; i < list.size(); i++) {
            list.get(i).complete(i + "");
        }
    }
}

Когда я запустил код, несмотря на то, что я завершаю 100 фьючерсов, печатается только 6 выходных данных. Это 5 основных потоков и 1 поток в очереди. Что происходит с остальными? Если другие исполняемые объекты не могут быть отправлены в службу-исполнитель из-за того, что очередь уже заполнена, не должно быть исключений.?

Вывод

Complete 0
Complete 1
Complete 2
Complete 3
Complete 4
Complete 5

person Sudheera    schedule 21.02.2017    source источник


Ответы (1)


Возникает исключение, и CompletableFuture выполняется исключительно, но не любой из тех, которые вы отслеживаете.

Вы создаете и инициализируете ThreadPoolExecutor с помощью конструктора, который использует RejectedExecutionHandler по умолчанию, который просто генерирует исключение. Мы знаем, что RejectedExecutionException выбрасывается, если ExecutorService не может принять задачу. Так где же добавляется задача и где генерируется исключение?

В нынешнем виде вся цепочка происходит внутри whenCompleteAsync. Когда вы вызываете это, вы добавляете зависимого к получателю CompletableFuture, stringCompletableFuture. Когда stringCompletableFuture завершится (в данном случае успешно), он создаст новый CompletableFuture (который он вернет) и попытается запланировать данный BiConsumer на заданный ExecutorService.

Поскольку в очереди ExecutorService нет места, он вызовет RejectedExecutionHandler, который выдаст RejectedExecutionException. Это исключение фиксируется в это время и используется для completeExceptionally CompletableFuture, которые будут возвращены.

Другими словами, в цикле for захватите CompletableFuture, возвращенное whenCompleteAsync, сохраните его и распечатайте его состояние.

ArrayList<CompletableFuture<String>> list = new ArrayList<>();
ArrayList<CompletableFuture<?>> dependents = new ArrayList<>();
for (int i = 0; i <100; i++) {
    CompletableFuture<String> stringCompletableFuture = new CompletableFuture<>();
    CompletableFuture<?> thisWillHaveException = stringCompletableFuture.whenCompleteAsync((e, a) -> {
        System.out.println("Complete " + e);
        try {
            Thread.sleep(100);
        } catch (InterruptedException e1) {e1.printStackTrace();}
    }, executorService);
    dependents.add(thisWillHaveException);
    list.add(stringCompletableFuture);
}

for (int i = 0; i < list.size(); i++) {
    list.get(i).complete(i + "");
}
Thread.sleep(2000);
dependents.forEach(cf -> {
    cf.whenComplete((r, e) -> {
        if (e != null)
            System.out.println(cf + " " + e.getMessage());
    });
});

Вы заметите, что все они (кроме 6, которые были успешно напечатаны ранее) дополнены исключительно RejectedExecutionException.

...
java.util.concurrent.CompletableFuture@2d8e6db6[Completed exceptionally] java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.CompletableFuture$UniWhenComplete@3f91beef rejected from java.util.concurrent.ThreadPoolExecutor@4eec7777[Running, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0]
java.util.concurrent.CompletableFuture@23ab930d[Completed exceptionally] java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.CompletableFuture$UniWhenComplete@1a6c5a9e rejected from java.util.concurrent.ThreadPoolExecutor@4eec7777[Running, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0]
java.util.concurrent.CompletableFuture@4534b60d[Completed exceptionally] java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.CompletableFuture$UniWhenComplete@37bba400 rejected from java.util.concurrent.ThreadPoolExecutor@4eec7777[Running, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0]
person Sotirios Delimanolis    schedule 21.02.2017