Можете ли вы перебалансировать несбалансированный Spliterator неизвестного размера?

Я хочу использовать Stream для распараллеливания обработки разнородного набора удаленно хранимых файлов JSON с неизвестным номером (количество файлов заранее неизвестно). Файлы могут сильно различаться по размеру: от 1 записи JSON на файл до 100 000 записей в некоторых других файлах. Запись JSON в данном случае означает автономный объект JSON, представленный одной строкой в ​​файле.

Я действительно хочу использовать Streams для этого, поэтому я реализовал это Spliterator:

public abstract class JsonStreamSpliterator<METADATA, RECORD> extends AbstractSpliterator<RECORD> {

    abstract protected JsonStreamSupport<METADATA> openInputStream(String path);

    abstract protected RECORD parse(METADATA metadata, Map<String, Object> json);

    private static final int ADDITIONAL_CHARACTERISTICS = Spliterator.IMMUTABLE | Spliterator.DISTINCT | Spliterator.NONNULL;
    private static final int MAX_BUFFER = 100;
    private final Iterator<String> paths;
    private JsonStreamSupport<METADATA> reader = null;

    public JsonStreamSpliterator(Iterator<String> paths) {
        this(Long.MAX_VALUE, ADDITIONAL_CHARACTERISTICS, paths);
    }

    private JsonStreamSpliterator(long est, int additionalCharacteristics, Iterator<String> paths) {
        super(est, additionalCharacteristics);
        this.paths = paths;
    }

    private JsonStreamSpliterator(long est, int additionalCharacteristics, Iterator<String> paths, String nextPath) {
        this(est, additionalCharacteristics, paths);
        open(nextPath);
    }

    @Override
    public boolean tryAdvance(Consumer<? super RECORD> action) {
        if(reader == null) {
            String path = takeNextPath();
            if(path != null) {
                open(path);
            }
            else {
                return false;
            }
        }
        Map<String, Object> json = reader.readJsonLine();
        if(json != null) {
            RECORD item = parse(reader.getMetadata(), json);
            action.accept(item);
            return true;
        }
        else {
            reader.close();
            reader = null;
            return tryAdvance(action);
        }
    }

    private void open(String path) {
        reader = openInputStream(path);
    }

    private String takeNextPath() {
        synchronized(paths) {
            if(paths.hasNext()) {
                return paths.next();
            }
        }
        return null;
    }

    @Override
    public Spliterator<RECORD> trySplit() {
        String nextPath = takeNextPath();
        if(nextPath != null) {
            return new JsonStreamSpliterator<METADATA,RECORD>(Long.MAX_VALUE, ADDITIONAL_CHARACTERISTICS, paths, nextPath) {
                @Override
                protected JsonStreamSupport<METADATA> openInputStream(String path) {
                    return JsonStreamSpliterator.this.openInputStream(path);
                }
                @Override
                protected RECORD parse(METADATA metaData, Map<String,Object> json) {
                    return JsonStreamSpliterator.this.parse(metaData, json);
                }
            };              
        }
        else {
            List<RECORD> records = new ArrayList<RECORD>();
            while(tryAdvance(records::add) && records.size() < MAX_BUFFER) {
                // loop
            }
            if(records.size() != 0) {
                return records.spliterator();
            }
            else {
                return null;
            }
        }
    }
}

Проблема, с которой я сталкиваюсь, заключается в том, что, хотя Stream сначала прекрасно распараллеливается, в конечном итоге самый большой файл обрабатывается в одном потоке. Я считаю, что ближайшая причина хорошо задокументирована: сплитератор «несбалансирован».

Более конкретно, похоже, что метод trySplit не вызывается после определенного момента в жизненном цикле Stream.forEach, поэтому дополнительная логика для распределения небольших пакетов в конце trySplit выполняется редко.

Обратите внимание, что все разделители, возвращенные из trySplit, используют один и тот же итератор paths. Я думал, что это действительно умный способ сбалансировать работу между всеми разделителями, но этого недостаточно для достижения полного параллелизма.

Я хотел бы, чтобы параллельная обработка выполнялась сначала по файлам, а затем, когда несколько больших файлов все еще остаются разделенными, я хочу распараллелить фрагменты оставшихся файлов. Это было целью блока else в конце trySplit.

Есть ли легкий/простой/канонический способ обойти эту проблему?


person Alex R    schedule 29.10.2019    source источник
comment
Вам нужен расчет размера. Это может быть полностью фиктивным, если оно примерно отражает соотношение вашего несбалансированного сплита. В противном случае поток не знает, что разбиения несбалансированы, и остановится, как только будет создано определенное количество чанков.   -  person Holger    schedule 29.10.2019
comment
@Holger, можете ли вы уточнить, будет ли остановлено, как только будет создано определенное количество фрагментов, или указать мне на источник JDK для этого? Каково количество кусков, где он останавливается?   -  person Alex R    schedule 29.10.2019
comment
Код не имеет значения, так как он покажет слишком много не относящихся к делу деталей реализации, которые могут измениться в любое время. Существенным моментом является то, что реализация пытается вызывать split достаточно часто, так что каждый рабочий поток (с поправкой на количество ядер ЦП) должен что-то делать. Чтобы компенсировать непредсказуемые различия во времени вычислений, он, вероятно, будет создавать даже больше фрагментов, чем рабочие потоки, чтобы разрешить кражу работы и использовать предполагаемые размеры в качестве эвристики (например, чтобы решить, какой подразделитель следует разделить дальше). См. также stackoverflow.com/a/48174508/2711488.   -  person Holger    schedule 29.10.2019
comment
Я провел несколько экспериментов, чтобы попытаться понять ваш комментарий. Эвристики кажутся довольно примитивными. Похоже, что возврат Long.MAX_VALUE вызывает чрезмерное и ненужное разбиение, в то время как любая оценка, отличная от Long.MAX_VALUE, приводит к остановке дальнейшего разбиения, убивая параллелизм. Возврат набора точных оценок, по-видимому, не приводит к какой-либо разумной оптимизации.   -  person Alex R    schedule 10.11.2019
comment
Я не утверждаю, что стратегия реализации была очень умной, но, по крайней мере, она работает для некоторых сценариев с предполагаемыми размерами (иначе сообщений об ошибках было гораздо больше). Так что, похоже, с вашей стороны были какие-то ошибки во время экспериментов. Например, в коде вашего вопроса вы расширяете AbstractSpliterator, но переопределяете trySplit(), что является плохой комбинацией для всего, кроме Long.MAX_VALUE, поскольку вы не адаптируете оценку размера в trySplit(). После trySplit() оценка размера должна быть уменьшена на количество отщепленных элементов.   -  person Holger    schedule 11.11.2019


Ответы (3)


Ваш trySplit должен выводить фрагменты одинакового размера, независимо от размера базовых файлов. Вы должны рассматривать все файлы как единое целое и каждый раз заполнять разделитель с поддержкой ArrayList одним и тем же количеством объектов JSON. Количество объектов должно быть таким, чтобы обработка одного разделения занимала от 1 до 10 миллисекунд: меньше 1 мс — и вы начинаете приближаться к стоимости передачи пакета рабочему потоку, больше — и начинаете рисковать неравномерной загрузкой ЦП из-за слишком грубые задачи.

Разделитель не обязан сообщать оценку размера, и вы уже делаете это правильно: ваша оценка равна Long.MAX_VALUE, что является специальным значением, означающим «неограниченный». Однако, если у вас есть много файлов с одним объектом JSON, что приводит к пакетам размером 1, это ухудшит вашу производительность двумя способами: накладные расходы на открытие-чтение-закрытие файла могут стать узким местом и, если вам удастся избежать что стоимость передачи потока может быть значительной по сравнению со стоимостью обработки одного элемента, что опять же вызывает узкое место.

Пять лет назад я решал аналогичную проблему, вы можете взглянуть на мое решение.

person Marko Topolnik    schedule 29.10.2019
comment
Да, вы не обязаны сообщать оценку размера, и Long.MAX_VALUE правильно описывает неизвестный размер, но это не помогает, когда реальная реализация Stream работает плохо. Даже использование результата ThreadLocalRandom.current().nextInt(100, 100_000) в качестве предполагаемого размера дает лучшие результаты. - person Holger; 29.10.2019
comment
Он хорошо показал себя в моих случаях использования, когда вычислительные затраты на каждый элемент были значительными. Я легко достиг 98% общего использования ЦП, а пропускная способность масштабировалась почти линейно с параллелизмом. По сути, важно правильно подобрать размер пакета, чтобы его обработка занимала от 1 до 10 миллисекунд. Это намного превышает любые затраты на передачу потоков и не слишком долго, чтобы вызвать проблемы с гранулярностью задач. Я опубликовал результаты тестов ближе к концу этот пост. - person Marko Topolnik; 29.10.2019
comment
Ваше решение отделяет ArraySpliterator, который имеет предполагаемый размер (даже точный размер). Таким образом, реализация Stream увидит размер массива по сравнению с Long.MAX_VALUE, сочтет это несбалансированным и разделит больший разделитель (игнорируя, что Long.MAX_VALUE означает неизвестно), пока он не сможет разделить дальше. Затем, если фрагментов недостаточно, он разделит сплиттеры на основе массива, используя их известные размеры. Да, это работает очень хорошо, но не противоречит моему утверждению о том, что вам нужна оценка размера, независимо от того, насколько она плоха. - person Holger; 29.10.2019
comment
Итак, это похоже на недоразумение, потому что вам не нужна оценка размера на входе. Только на отдельные шпагаты, и вы всегда можете это сделать. - person Marko Topolnik; 29.10.2019
comment
Ну, мой первый комментарий было Вам нужна оценка размера. Это может быть полностью фиктивным, если оно примерно отражает соотношение вашего несбалансированного разделения. Ключевым моментом здесь было то, что код OP создает еще один разделитель, содержащий один элемент, но все еще сообщающий о неизвестном размере. Именно это делает реализацию Stream беспомощной. Подойдет любое оценочное число для нового разветвителя, которое значительно меньше Long.MAX_VALUE. - person Holger; 29.10.2019
comment
Возможно, вы лучше знакомы с интимными подробностями этой реализации, но, по моему опыту, она никогда не пытается разделить расщепления, взятые из неограниченного разделителя. Так что я не знаю, какое значение для него имеет заявленный размер. Для меня более вероятно, что многие файлы с одним объектом являются узким местом в цикле ввода-вывода открытие-чтение-закрытие, а стоимость передачи потоков по сравнению с обработкой одного объекта неблагоприятна, поэтому это также может быть узким местом. - person Marko Topolnik; 29.10.2019
comment
Мы можем резюмировать фактическое поведение как ужасное, поскольку оно явно игнорирует тот факт, что Long.MAX_VALUE был указан как неизвестный размер, что должно подходить для потоков, которые могут быть короткими, но обрабатывает его как буквальную оценку Long.MAX_VALUE (нет никакой разницы, например, с Long.MAX_VALUE-1, который не подходит для всех случаев, когда поток не приближается к этому числу (другими словами, каждый реальный поток жизни). - person Holger; 29.10.2019
comment
С точки зрения оптимизации производительности их интересовал только случай известного размера. Когда я упомянул основной команде Java о жалобе на то, что большинство реальных потоков имеют неизвестный размер и, даже если они известны, не доступны произвольно, они спросили меня, на каком основании я утверждаю, что это так, что их исследования пользователей показывают обратное. - person Marko Topolnik; 29.10.2019
comment
@MarkoTopolnik ваш FixedBatchSpliteratorWrapper не подходит для меня напрямую (потому что метод trySplit будет слишком блокирующим и медленным, потребуется много времени для запуска всех процессоров), но он дает мне представление о том, что можно попробовать. - person Alex R; 30.10.2019
comment
Можете ли вы объяснить, что может быть узким местом? - person Marko Topolnik; 30.10.2019
comment
@MarkoTopolnik, поскольку файлы хранятся в облачной файловой системе, открытие каждого файла происходит с большой задержкой. Таким образом, если открытие файлов происходит в trySplit, происходит большая задержка в ускорении вычислений. Открытие файлов должно быть отложено до метода tryAdvance, который вызывается в рабочих потоках, вместо того, чтобы блокировать вызывающий поток в trySplit. Моя реализация на самом деле очень хорошо обрабатывает этот начальный скачок, потому что она передает отдельный файл каждому потоку (это то, что делает верхняя половина trySplit). Моя проблема в том, что параллелизм отмирает после начального разгона. - person Alex R; 30.10.2019
comment
Если ваши облачные операции с файлами блокируются, у вас есть несоответствие между оптимальным для получения данных и оптимальным для их обработки (при условии, что сама обработка сильно загружает ЦП). Я думаю, вы должны иметь открытие и буферизацию файлов на отдельном этапе перед конвейером Stream, используя эластичный пул потоков, который будет использовать гораздо больше потоков, чем ядер для операций блокировки. Затем вы можете передавать данные в вычислительную часть конвейера. - person Marko Topolnik; 30.10.2019
comment
Что меня больше всего озадачивает, так это то, что любая попытка перенести часть разделения в tryAdvance (путем синхронизации на общих коллекциях) приводит к тупику! - person Alex R; 30.10.2019
comment
Вы выяснили причину этого? У вас есть критические разделы, защищенные более чем одним замком? - person Marko Topolnik; 31.10.2019
comment
В итоге я отказался от этой конкретной ветки кода и переписал ее, чтобы использовать размеры файлов, доступные в моих метаданных. У меня это сработало, но решение было не таким, как я ожидал. - person Alex R; 10.11.2019

После долгих экспериментов я так и не смог добиться дополнительного параллелизма, играя с оценками размера. По сути, любое значение, отличное от Long.MAX_VALUE, приведет к слишком раннему прекращению работы разделителя (и без какого-либо разделения), в то время как, с другой стороны, оценка Long.MAX_VALUE приведет к неустанному вызову trySplit, пока он не вернет null.

Решение, которое я нашел, состоит в том, чтобы внутренне разделить ресурсы между сплитераторами и позволить им перебалансировать между собой.

Рабочий код:

public class AwsS3LineSpliterator<LINE> extends AbstractSpliterator<AwsS3LineInput<LINE>> {

    public final static class AwsS3LineInput<LINE> {
        final public S3ObjectSummary s3ObjectSummary;
        final public LINE lineItem;
        public AwsS3LineInput(S3ObjectSummary s3ObjectSummary, LINE lineItem) {
            this.s3ObjectSummary = s3ObjectSummary;
            this.lineItem = lineItem;
        }
    }

    private final class InputStreamHandler {
        final S3ObjectSummary file;
        final InputStream inputStream;
        InputStreamHandler(S3ObjectSummary file, InputStream is) {
            this.file = file;
            this.inputStream = is;
        }
    }

    private final Iterator<S3ObjectSummary> incomingFiles;

    private final Function<S3ObjectSummary, InputStream> fileOpener;

    private final Function<InputStream, LINE> lineReader;

    private final Deque<S3ObjectSummary> unopenedFiles;

    private final Deque<InputStreamHandler> openedFiles;

    private final Deque<AwsS3LineInput<LINE>> sharedBuffer;

    private final int maxBuffer;

    private AwsS3LineSpliterator(Iterator<S3ObjectSummary> incomingFiles, Function<S3ObjectSummary, InputStream> fileOpener,
            Function<InputStream, LINE> lineReader,
            Deque<S3ObjectSummary> unopenedFiles, Deque<InputStreamHandler> openedFiles, Deque<AwsS3LineInput<LINE>> sharedBuffer,
            int maxBuffer) {
        super(Long.MAX_VALUE, 0);
        this.incomingFiles = incomingFiles;
        this.fileOpener = fileOpener;
        this.lineReader = lineReader;
        this.unopenedFiles = unopenedFiles;
        this.openedFiles = openedFiles;
        this.sharedBuffer = sharedBuffer;
        this.maxBuffer = maxBuffer;
    }

    public AwsS3LineSpliterator(Iterator<S3ObjectSummary> incomingFiles, Function<S3ObjectSummary, InputStream> fileOpener, Function<InputStream, LINE> lineReader, int maxBuffer) {
        this(incomingFiles, fileOpener, lineReader, new ConcurrentLinkedDeque<>(), new ConcurrentLinkedDeque<>(), new ArrayDeque<>(maxBuffer), maxBuffer);
    }

    @Override
    public boolean tryAdvance(Consumer<? super AwsS3LineInput<LINE>> action) {
        AwsS3LineInput<LINE> lineInput;
        synchronized(sharedBuffer) {
            lineInput=sharedBuffer.poll();
        }
        if(lineInput != null) {
            action.accept(lineInput);
            return true;
        }
        InputStreamHandler handle = openedFiles.poll();
        if(handle == null) {
            S3ObjectSummary unopenedFile = unopenedFiles.poll();
            if(unopenedFile == null) {
                return false;
            }
            handle = new InputStreamHandler(unopenedFile, fileOpener.apply(unopenedFile));
        }
        for(int i=0; i < maxBuffer; ++i) {
            LINE line = lineReader.apply(handle.inputStream);
            if(line != null) {
                synchronized(sharedBuffer) {
                    sharedBuffer.add(new AwsS3LineInput<LINE>(handle.file, line));
                }
            }
            else {
                return tryAdvance(action);
            }
        }
        openedFiles.addFirst(handle);
        return tryAdvance(action);
    }

    @Override
    public Spliterator<AwsS3LineInput<LINE>> trySplit() {
        synchronized(incomingFiles) {
            if (incomingFiles.hasNext()) {
                unopenedFiles.add(incomingFiles.next());
                return new AwsS3LineSpliterator<LINE>(incomingFiles, fileOpener, lineReader, unopenedFiles, openedFiles, sharedBuffer, maxBuffer);
            } else {
                return null;
            }
        }
    }
}
person Alex R    schedule 10.11.2019

Это не прямой ответ на ваш вопрос. Но я думаю стоит попробовать Stream в библиотеке abacus-common:

void test_58601518() throws Exception {
    final File tempDir = new File("./temp/");

    // Prepare the test files:
    //    if (!(tempDir.exists() && tempDir.isDirectory())) {
    //        tempDir.mkdirs();
    //    }
    //
    //    final Random rand = new Random();
    //    final int fileCount = 1000;
    //
    //    for (int i = 0; i < fileCount; i++) {
    //        List<String> lines = Stream.repeat(TestUtil.fill(Account.class), rand.nextInt(1000) * 100 + 1).map(it -> N.toJSON(it)).toList();
    //        IOUtil.writeLines(new File("./temp/_" + i + ".json"), lines);
    //    }

    N.println("Xmx: " + IOUtil.MAX_MEMORY_IN_MB + " MB");
    N.println("total file size: " + Stream.listFiles(tempDir).mapToLong(IOUtil::sizeOf).sum() / IOUtil.ONE_MB + " MB");

    final AtomicLong counter = new AtomicLong();
    final Consumer<Account> yourAction = it -> {
        counter.incrementAndGet();
        it.toString().replace("a", "bbb");
    };

    long startTime = System.currentTimeMillis();
    Stream.listFiles(tempDir) // the file/data source could be local file system or remote file system.
            .parallel(2) // thread number used to load the file/data and convert the lines to Java objects.
            .flatMap(f -> Stream.lines(f).map(line -> N.fromJSON(Account.class, line))) // only certain lines (less 1024) will be loaded to memory. 
            .parallel(8) // thread number used to execute your action. 
            .forEach(yourAction);

    N.println("Took: " + ((System.currentTimeMillis()) - startTime) + " ms" + " to process " + counter + " lines/objects");

    // IOUtil.deleteAllIfExists(tempDir);
}

В конце концов, загрузка ЦП на моем ноутбуке довольно высока (около 70%), и для обработки 51 899 100 строк/объектов из 1000 файлов с процессором Intel(R) Core(TM) i5-8365U и памятью Xmx256m jvm потребовалось около 70 секунд. . Общий размер файла составляет около: 4524 МБ. если yourAction не является тяжелой операцией, последовательный поток может быть даже быстрее, чем параллельный поток.

К вашему сведению, я разработчик abacus-common.

person user_3380739    schedule 07.03.2021