Почему stream parallel() не использует все доступные потоки?

Я попытался запустить 100 задач Sleep параллельно, используя Java8(1.8.0_172) stream.parallel(), представленный внутри пользовательского пула ForkJoinPool с более чем 100 доступными потоками. Каждая задача будет засыпать на 1 секунду. Я ожидал, что вся работа завершится через ~ 1 с, учитывая, что 100 снов можно было выполнить параллельно. Однако я наблюдаю время выполнения 7 с.

    @Test
    public void testParallelStream() throws Exception {
        final int REQUESTS = 100;
        ForkJoinPool forkJoinPool = null;
        try {
            // new ForkJoinPool(256): same results for all tried values of REQUESTS
            forkJoinPool = new ForkJoinPool(REQUESTS);
            forkJoinPool.submit(() -> {

                IntStream stream = IntStream.range(0, REQUESTS);
                final List<String> result = stream.parallel().mapToObj(i -> {
                    try {
                        System.out.println("request " + i);
                        Thread.sleep(1000);
                        return Integer.toString(i);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }).collect(Collectors.toList());
                // assertThat(result).hasSize(REQUESTS);
            }).join();
        } finally {
            if (forkJoinPool != null) {
                forkJoinPool.shutdown();
            }
        }
    }

С выводом, указывающим, что ~ 16 элементов потока выполняются до паузы в 1 с, затем еще ~ 16 и так далее. Таким образом, несмотря на то, что пул forkjoinpool был создан со 100 потоками, кажется, что используется только ~ 16.

Этот шаблон возникает, как только я использую более 23 потоков:

1-23 threads: ~1s
24-35 threads: ~2s
36-48 threads: ~3s
...
System.out.println(Runtime.getRuntime().availableProcessors());
// Output: 4

person tkruse    schedule 21.01.2020    source источник
comment
Каков ваш Runtime.getRuntime().availableProcessors() результат?   -  person daniu    schedule 21.01.2020
comment
availableProcessors() == 4, я добавил в описание   -  person tkruse    schedule 21.01.2020
comment
Сколько времени занимает последовательное выполнение?   -  person Ravindra Ranwala    schedule 21.01.2020
comment
последовательный занимает 100 секунд, как и ожидалось.   -  person tkruse    schedule 21.01.2020
comment
Этот вопрос может наблюдать то же самое (нет ответа) stackoverflow.com/questions/49068119   -  person tkruse    schedule 21.01.2020
comment
Этот трюк, позволяющий параллельному потоку использовать разные пулы потоков, является недокументированным побочным эффектом реализации и не предназначен для такой работы. Таким образом, реализация не заботилась о возможности другого параллелизма.   -  person Holger    schedule 21.01.2020


Ответы (2)


Поскольку использование пула Fork/Join в реализации Stream является деталью реализации, трюк, заставляющий его использовать другой пул Fork/Join, также недокументирован и, похоже, работает случайно, т. е. есть жестко запрограммированная константа определение фактического параллелизма в зависимости от параллелизма пула по умолчанию. Поэтому изначально использование другого пула не предусматривалось.

Однако было признано, что использование другого пула с неподходящим целевым параллелизмом является ошибкой, даже если этот прием не задокументирован, см. JDK-8190974.

Она была исправлена ​​в Java 10 и перенесена на Java 8, обновление 222.

Таким образом, простым миром решений является обновление версии Java.

Вы также можете изменить параллелизм пула по умолчанию, например.

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "100");

перед выполнением любого действия Fork/Join.

Но это может иметь непреднамеренные последствия для других параллельных операций.

person Holger    schedule 21.01.2020

Как вы написали, вы позволяете потоку решать параллелизм выполнения.

Здесь у вас есть эффект, что ArrayList.parallelStream пытается перехитрить вас, равномерно распределяя данные, не принимая во внимание количество доступных потоков. Это хорошо для операций, связанных с ЦП, где бесполезно иметь больше потоков, чем ядер ЦП, но не предназначено для процессов, которым нужно ждать ввода-вывода.

Почему бы не принудительно передать все ваши элементы последовательно в ForkJoinPool, чтобы он был вынужден использовать все доступные потоки?

        IntStream stream = IntStream.range(0, REQUESTS);
        List<ForkJoinTask<String>> results
                = stream.mapToObj(i -> forkJoinPool.submit(() -> {

            try {
                System.out.println("request " + i);
                Thread.sleep(1000);
                return Integer.toString(i);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        })).collect(Collectors.toList());
        results.forEach(ForkJoinTask::join);

На моей машине это занимает менее двух секунд.

person GreyFairer    schedule 21.01.2020
comment
Суть в том, чтобы понять, как работают параллельные потоки и на что обращать внимание при написании кода. Так что ваше решение не является неправильным, но вопрос был конкретно о том, что решение с использованием параллельных потоков не работает. - person tkruse; 22.01.2020