Можно ли Stream последовательно обрабатывать часть пайплайна, а потом как параллельный?

У меня есть следующий код, который не работает так, как я задумал (пропускается случайная строка вместо первой):

Files.lines(path)
     .skip(1)
     .parallel()
     .forEach( System.out::println )

У меня такое чувство, что я неправильно понял поведение Streams. Вопрос в следующем: могу ли я сначала рассматривать поток как последовательный (и использовать «промежуточные операции с отслеживанием состояния»), а затем передавать его в параллельный forEach?


person Aleksandr Dubinsky    schedule 17.12.2013    source источник
comment
Вы выполняете линейную операцию, почему вы используете .parallel()?   -  person Esko    schedule 17.12.2013
comment
@Esko Очевидно, что println заменяет операцию с интенсивным использованием процессора, которую я выполняю в зависимости от записей в файле.   -  person Aleksandr Dubinsky    schedule 17.12.2013
comment
sheitt [смущенно] моя среда выполнения не выполняет их параллельно...!   -  person The Coordinator    schedule 20.12.2013
comment
Оказывается, мои тесты jUnit не выполняются параллельно (не знаю, почему ??), но не-тесты выполняются. Совсем меня запутал! В любом случае, поведение пропуска кажется очень непредсказуемым, иногда пропускается самый последний элемент. И даже parallel() шаткий ... иногда потоки создаются только в том случае, если количество итераций велико и количество элементов в списке велико. Для медленных заданий (которые требуют много вычислений) потоки не создавались. Для 1000+ коротких заданий (миллисекунд выполнения) он выполнялся параллельно. Тестируйте и смотрите.   -  person The Coordinator    schedule 21.12.2013
comment
Глядя на Javadoc, я думаю, что вам нужно forEachOrdered. И тогда скип будет работать нормально и поток будет параллельным. Однако вам все равно придется терпеть очень непредсказуемую и неконтролируемую формулу генерации потоков. Лучше свернуть самостоятельно. Если, как я пытался и работает, у вас много краткосрочных задач, то, похоже, это работает.   -  person The Coordinator    schedule 21.12.2013
comment
@SaintHill Влияет ли forEachOrdered на сам forEach? Я думаю, что это так, и отключает параллелизм. mail.openjdk.java.net/pipermail/lambda-dev/2013-June/010126.html   -  person Aleksandr Dubinsky    schedule 22.12.2013
comment
Мой тест показывает, что он породил один поток. Но тогда порожденный поток сделал всю (всю) работу. Это было немного бессмысленно...   -  person The Coordinator    schedule 22.12.2013


Ответы (4)


Весь конвейер либо параллельный, либо последовательный.

Попробуйте использовать forEachOrdered вместо forEach. В моем тесте он пропускает первую строку, если используется forEachOrderedforEach он пропускает последнюю строку).

forEach игнорирует порядок столкновений, и кажется, что он также может выполнять другие операции, чтобы игнорировать его.

person x22    schedule 19.12.2013
comment
У вас есть источник этой информации? Может быть, с более подробной информацией? - person Lii; 07.04.2014

Это не баг, а фича. Вызов parallel() делает весь поток параллельным. Если не будет сделан последующий вызов sequential(), который вернет весь поток в последовательный режим.

Javaodoc говорит:

Возвращает эквивалентный параллельный поток.

person JB Nizet    schedule 17.12.2013
comment
Как я могу добиться желаемого поведения, не выгружая все в промежуточную коллекцию? - person Aleksandr Dubinsky; 17.12.2013
comment
Я думаю, вы могли бы использовать StreamSupport.stream(Files.lines(path).skip(1).spliterator(), true). Хотя не проверял. - person JB Nizet; 17.12.2013
comment
Это так неэстетично? Никому не приходило в голову, что было бы неплохо выполнить часть операций над потоком последовательно из соображений семантики и производительности? - person Aleksandr Dubinsky; 18.12.2013
comment
Также очень удивительно то, что более поздние вызовы в цепочке резко изменяют семантику предыдущих. - person Aleksandr Dubinsky; 18.12.2013
comment
@AleksandrDubinsky: На самом деле переключение конвейера с параллельного на последовательный и обратно раньше было возможно, но было изменено относительно поздно при проектировании потока. Я коснусь этого в этом ответе с некоторыми комментариями самого Брайана Гетца. Возможно, вы могли бы найти сообщение в списке рассылки с подробным описанием изменения, если вам все еще интересно узнать, почему. - person Jeffrey Bosboom; 10.07.2014
comment
@JeffreyBosboom Спасибо, что подняли этот вопрос! - person Aleksandr Dubinsky; 11.07.2014

Нет, вы не можете этого сделать. Однако ваш код должен вероятно работать так, как задумано, из Stream.skip javadocs

В то время как skip() обычно является дешевой операцией в последовательных потоковых конвейерах, она может быть довольно затратной в упорядоченных параллельных конвейерах, особенно для больших значений n, поскольку skip(n) ограничивается пропуском не только любых n элементов, но и первых n элементов. элементы в порядке встречи. Использование источника неупорядоченного потока (например, generate(Supplier)) или удаление ограничения упорядочения с помощью BaseStream.unordered() может привести к значительному ускорению skip() в параллельных конвейерах, если позволяет семантика вашей ситуации. Если требуется согласованность с порядком встреч, и вы испытываете низкую производительность или использование памяти с помощью skip() в параллельных конвейерах, переключение на последовательное выполнение с помощью BaseStream.sequential() может улучшить производительность.

Работает ваш код или нет, зависит от характера потока, возвращаемого Files.lines(..), это зависит от того, является ли этот поток Ordered. Эти характеристики задаются используемым Spliterator, если поток упорядоченный, то он всегда будет пропускать первый элемент. если поток неупорядочен, то он пропустит один элемент.

http://download.java.net/jdk8/docs/api/java/util/Spliterator.html

person aepurniet    schedule 17.12.2013
comment
Я не могу найти способ проверить, упорядочен ли поток (неужели никак?), но в исходном коде Files.lines я вижу Spliterator.ORDERED. Кроме того, зачем кому-то вызывать skip в неупорядоченном потоке? Почему он просто не выдает исключение? - person Aleksandr Dubinsky; 17.12.2013
comment
вы все равно можете пропустить первый элемент, даже если элементы не имеют естественного порядка. - person aepurniet; 18.12.2013
comment
с какой стати мне пропускать случайный элемент? - person Aleksandr Dubinsky; 18.12.2013
comment
я не могу придумать хороший вариант использования, но это лучше, чем исключение. - person aepurniet; 18.12.2013
comment
Генерация исключения, когда операция, которая ожидает последовательный поток (или упорядоченный параллельный? — хотя я не уверен, что это имеет какое-либо значение), применяется к параллельному потоку, является подходящим поведением в соответствии с философией Java. - person Aleksandr Dubinsky; 18.12.2013
comment
в том-то и дело, что skip() не ожидает упорядоченного потока, он работает со всеми типами потоков. то же самое касается всех других потоковых функций. разработчики хотели создать класс функций, которые работают с последовательными и параллельными потоками, но чье поведение (для оптимизированного выполнения) может меняться в зависимости от характера (характеристик) потока. вы можете создавать определенные комбинации вызовов, которые приводят к некоторому не интуитивному поведению, но если вы действительно хотите знать причины, по которым это было реализовано таким образом (прямо из уст лошадей), я предлагаю вам прочитать почтовые архивы lambda dev. - person aepurniet; 18.12.2013
comment
Я не могу сказать, что все промежуточные операции с состоянием не имеют смысла в контексте параллельного потока, но это, безусловно, имеет место для skip() (и, например, для моего map(byOverlappingPairs), о котором я говорил в другом вопросе). Аннотации должны помечать такие операции и помечать их, когда они используются в параллельном потоке (что может привести к непреднамеренному, недетерминированному поведению). Альтернативная точка зрения, которую я слышал, что все операции должны быть без гражданства, несостоятельна. Мне не нужно проектировать все для параллелизма. - person Aleksandr Dubinsky; 18.12.2013

Похоже, что skip(n) не пропускает первые n элементов в параллельном потоке.

Решение: удалите первые [n] строк, используя метод BufferedReader readLine().

Затем получите поток, который продолжится с того места, где вы остановились с читателем:

import java.io.BufferedReader;
import java.io.IOException;
import java.io.StringReader;
import java.util.stream.IntStream;

public class TestStreams {

    public static void main(String[] args) throws Exception{
         unordered();
    }

    public static void unordered() throws IOException, InterruptedException {

        StringBuilder sb = new StringBuilder();
        IntStream.range(0, 1000).forEach(n -> sb.append(n).append("\n"));

        try (BufferedReader br = new BufferedReader(new StringReader(sb.toString()))) {
            if (br.readLine() != null) {
                br.lines()
                        .parallel()
                        .forEach(it -> System.out.println(Thread.currentThread() + " : " + it));
            }
        }
    }  
}
person The Coordinator    schedule 20.12.2013
comment
это, кажется, прямо противоречит javadocs для метода skip(long). похоже на ошибку. download.java.net/jdk8 /docs/api/java/util/stream/ - person aepurniet; 21.12.2013