Создание адаптивного уточнения сетки с помощью ForkJoin и Streams

Я хочу построить адаптивное уточнение сетки в 3D.

Основной принцип заключается в следующем:

У меня есть набор ячеек с уникальными идентификаторами ячеек. Я проверяю каждую ячейку, чтобы увидеть, нужно ли ее уточнять.

  • Если требуется уточнение, создайте 8 новых дочерних ячеек и добавьте их в список ячеек для проверки уточнения.
  • В противном случае это конечный узел, и я добавляю его в свой список конечных узлов.

Я хочу реализовать это с помощью фреймворка ForkJoin и потоков Java 8. Я прочитал эту статью, но не знаю, как применить ее в моем случае.

На данный момент я придумал следующее:

public class ForkJoinAttempt {
    private final double[] cellIds;

    public ForkJoinAttempt(double[] cellIds) {
        this.cellIds = cellIds;
    }

    public void refineGrid() {
        ForkJoinPool pool = ForkJoinPool.commonPool();
        double[] result = pool.invoke(new RefineTask(100));
    }

    private class RefineTask extends RecursiveTask<double[]> {
        final double cellId;

        private RefineTask(double cellId) {
            this.cellId = cellId;
        }

        @Override
        protected double[] compute() {
            return ForkJoinTask.invokeAll(createSubtasks())
                    .stream()
                    .map(ForkJoinTask::join)
                    .reduce(new double[0], new Concat());
        }
    }

    private double[] refineCell(double cellId) {
        double[] result;
        if (checkCell()) {
            result = new double[8];

            for (int i = 0; i < 8; i++) {
                result[i] = Math.random();
            }

        } else {
            result = new double[1];
            result[0] = cellId;
        }

        return result;
    }

    private Collection<RefineTask> createSubtasks() {
        List<RefineTask> dividedTasks = new ArrayList<>();

        for (int i = 0; i < cellIds.length; i++) {
            dividedTasks.add(new RefineTask(cellIds[i]));
        }
        
        return dividedTasks;
    }

    private class Concat implements BinaryOperator<double[]>  {

        @Override
        public double[] apply(double[] a, double[] b) {
            int aLen = a.length;
            int bLen = b.length;

            @SuppressWarnings("unchecked")
            double[] c = (double[]) Array.newInstance(a.getClass().getComponentType(), aLen + bLen);
            System.arraycopy(a, 0, c, 0, aLen);
            System.arraycopy(b, 0, c, aLen, bLen);

            return c;
        }
    }

    public boolean checkCell() {
        return Math.random() < 0.5;
    }
}

... и я застрял здесь.

Пока это мало что дает, потому что я никогда не вызываю функцию refineCell.

У меня также могут быть проблемы с производительностью со всеми теми double[], которые я создаю. И объединение их таким образом может быть не самым эффективным способом сделать это.

Но обо всем по порядку, может ли кто-нибудь помочь мне в реализации форк-соединения в этом случае?

Ожидаемый результат алгоритма — массив идентификаторов конечных ячеек (double[]).

Редактировать 1:

Благодаря комментариям я придумал кое-что, что работает немного лучше.

Некоторые изменения:

  • Я перешел от массивов к спискам. Это не очень хорошо для объема памяти, потому что я не могу использовать примитивы Java. Но это сделало имплантацию проще.
  • Идентификаторы ячеек теперь длинные, а не двойные.
  • Ids are not randomly chosen any more:
    • Root level cells have IDs 1, 2, 3 etc.;
    • Дети 1 имеют идентификаторы 10, 11, 12 и т. д.;
    • Дети 2 лет имеют идентификаторы 20, 21, 22 и т. д.;
    • Вы поняли идею...
  • Я уточняю все ячейки, ID которых меньше 100

Это позволяет мне ради этого примера лучше проверить результаты.

Вот новая реализация:

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.*;
import java.util.function.BinaryOperator;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;

public class ForkJoinAttempt {
    private static final int THRESHOLD = 2;
    private List<Long> leafCellIds;

    public void refineGrid(List<Long> cellsToProcess) {
        leafCellIds = ForkJoinPool.commonPool().invoke(new RefineTask(cellsToProcess));
    }

    public List<Long> getLeafCellIds() {
        return leafCellIds;
    }

    private class RefineTask extends RecursiveTask<List<Long>> {

        private final CopyOnWriteArrayList<Long> cellsToProcess = new CopyOnWriteArrayList<>();

        private RefineTask(List<Long> cellsToProcess) {
            this.cellsToProcess.addAll(cellsToProcess);
        }

        @Override
        protected List<Long> compute() {
            if (cellsToProcess.size() > THRESHOLD) {
                System.out.println("Fork/Join");
                return ForkJoinTask.invokeAll(createSubTasks())
                        .stream()
                        .map(ForkJoinTask::join)
                        .reduce(new ArrayList<>(), new Concat());
            } else {
                System.out.println("Direct computation");
                
                List<Long> leafCells = new ArrayList<>();

                for (Long cell : cellsToProcess) {
                    Long result = refineCell(cell);
                    if (result != null) {
                        leafCells.add(result);
                    }
                }

                return leafCells;
            }
        }

        private Collection<RefineTask> createSubTasks() {
            List<RefineTask> dividedTasks = new ArrayList<>();

            for (List<Long> list : split(cellsToProcess)) {
                dividedTasks.add(new RefineTask(list));
            }

            return dividedTasks;
        }

        private Long refineCell(Long cellId) {
            if (checkCell(cellId)) {
                for (int i = 0; i < 8; i++) {
                    Long newCell = cellId * 10 + i;
                    cellsToProcess.add(newCell);
                    System.out.println("Adding child " + newCell + " to cell " + cellId);
                }
                return null;
            } else {
                System.out.println("Leaf node " + cellId);
                return cellId;
            }
        }

        private List<List<Long>> split(List<Long> list)
        {
            int[] index = {0, (list.size() + 1)/2, list.size()};

            List<List<Long>> lists = IntStream.rangeClosed(0, 1)
                    .mapToObj(i -> list.subList(index[i], index[i + 1]))
                    .collect(Collectors.toList());

            return lists;
        }


    }



    private class Concat implements BinaryOperator<List<Long>> {
        @Override
        public List<Long> apply(List<Long> listOne, List<Long> listTwo) {
            return Stream.concat(listOne.stream(), listTwo.stream())
                    .collect(Collectors.toList());
        }
    }

    public boolean checkCell(Long cellId) {
        return cellId < 100;
    }
}

И метод тестирования:

    int initialSize = 4;
    List<Long> cellIds = new ArrayList<>(initialSize);
    for (int i = 0; i < initialSize; i++) {
        cellIds.add(Long.valueOf(i + 1));
    }

    ForkJoinAttempt test = new ForkJoinAttempt();
    test.refineGrid(cellIds);
    List<Long> leafCellIds = test.getLeafCellIds();
    System.out.println("Leaf nodes: " + leafCellIds.size());
    for (Long node : leafCellIds) {
        System.out.println(node);
    }

Вывод подтверждает, что он добавляет 8 дочерних элементов к каждой корневой ячейке. Но дальше не идет.

Я знаю почему, но я не знаю, как это решить: это потому, что, несмотря на то, что метод RefineCell добавляет новые ячейки в список ячеек для обработки. Метод createSubTask больше не вызывается, поэтому он не может знать, что я добавил новые ячейки.

Редактировать 2:

Чтобы сформулировать проблему по-другому, я ищу механизм, в котором Queue идентификаторов ячеек обрабатывается некоторыми RecursiveTask, а другие добавляются к Queue параллельно.


person Ben    schedule 09.01.2018    source источник
comment
Как вы, вероятно, можете видеть из статьи, на которую вы ссылаетесь, пул Fork-Join предназначен для действий типа «разделяй и властвуй». Это означает, что ваше рекурсивное действие должно иметь условие, при котором оно фактически выполняет некоторую работу внутри метода compute. Насколько мне известно, ваша реализация этого не делает, и самое близкое к правильной реализации compute, которое я вижу в вашем коде, - это метод refineCell в ветке, где он присваивает Math.random ячейке. Кроме того, checkCell, вероятно, действительно должен знать что-то о ячейке, иначе ваше описание не имеет большого смысла.   -  person M. Prokhorov    schedule 09.01.2018
comment
Я знаю, что пока это мало что дает, потому что я никогда не вызываю функцию RefineCell. Я просто не понимаю, как мне это назвать. Метод checkCell не учитывает ячейки, он просто случайным образом выбирает в среднем половину ячеек. В реальной жизни у меня есть реальная функция, которая вычисляет координаты ячейки и проверяет, нужно ли ее уточнять. Это предоставляется в качестве примера воспроизводимого примера, который сосредоточен на проблеме, с которой я столкнулся.   -  person Ben    schedule 09.01.2018
comment
Вернитесь к своей статье-примеру еще раз и посмотрите внимательно: каждая задача работает с порогом, что означает количество элементов, которое можно (достаточно быстро) обработать последовательно, поэтому не требуется никаких подзадач. В вашем случае это ветка, введенная, когда checkCell == false. В противном случае вы должны создавать дочерние задачи, а затем объединяться с их результатами, как в вашем текущем compute, но это должно быть перемещено внутрь ветки с checkCell == true. Вы также можете посмотреть в коде JDK реализацию Arrays.parallelSort. Это тоже классика.   -  person M. Prokhorov    schedule 09.01.2018
comment
Вместо .map(ForkJoinTask::join) .reduce(new ArrayList<>(), new Concat()); вы должны использовать .flatMap(task -> task.join().stream()) .collect(Collectors.toList()) и избавиться от класса Concat. Метод split может быть реализован так же просто, как int middle = (list.size() + 1)/2; return Arrays.asList(list.subList(0,middle), list.subList(middle, list.size()))); Что касается порога, этот ответ может быть полезен. Но обратите внимание, что здесь вы просто заново изобретаете параллельные потоки. В настоящее время я не вижу ничего, что не работало бы с ними.   -  person Holger    schedule 10.01.2018
comment
Спасибо за ваш полезный комментарий. Я не хочу заново изобретать параллельные потоки. Так что, если это может быть достигнуто с ними, я был бы счастлив сделать это. Можете ли вы сказать мне, как?   -  person Ben    schedule 10.01.2018


Ответы (1)


Во-первых, давайте начнем с решения на основе Stream.

public class Mesh {
    public static long[] refineGrid(long[] cellsToProcess) {
        return Arrays.stream(cellsToProcess).parallel().flatMap(Mesh::expand).toArray();
    }
    static LongStream expand(long d) {
        return checkCell(d)? LongStream.of(d): generate(d).flatMap(Mesh::expand);
    }
    private static boolean checkCell(long cellId) {
        return cellId > 100;
    }
    private static LongStream generate(long cellId) {
        return LongStream.range(0, 8).map(j -> cellId * 10 + j);
    }
}

Хотя текущая реализация flatMap имеет известные проблемы с параллельной обработкой, которые могут возникнуть, когда сетка слишком несбалансированная, производительность для вашего фактического задача может быть разумной, поэтому это простое решение всегда стоит попробовать, прежде чем начинать реализовывать что-то более сложное.

Если вам действительно нужна пользовательская реализация, например. если нагрузка несбалансированная и реализация Stream не может достаточно хорошо адаптироваться, можно сделать так:

public class MeshTask extends RecursiveTask<long[]> {
    public static long[] refineGrid(long[] cellsToProcess) {
        return new MeshTask(cellsToProcess, 0, cellsToProcess.length).compute();
    }
    private final long[] source;
    private final int from, to;

    private MeshTask(long[] src, int from, int to) {
        source = src;
        this.from = from;
        this.to = to;
    }
    @Override
    protected long[] compute() {
        return compute(source, from, to);
    }
    private static long[] compute(long[] source, int from, int to) {
        long[] result = new long[to - from];
        ArrayDeque<MeshTask> next = new ArrayDeque<>();
        while(getSurplusQueuedTaskCount()<3) {
            int mid = (from+to)>>>1;
            if(mid == from) break;
            MeshTask task = new MeshTask(source, mid, to);
            next.push(task);
            task.fork();
            to = mid;
        }
        int pos = 0;
        for(; from < to; ) {
            long value = source[from++];
            if(checkCell(value)) result[pos++]=value;
            else {
                long[] array = generate(value);
                array = compute(array, 0, array.length);
                result = Arrays.copyOf(result, result.length+array.length-1);
                System.arraycopy(array, 0, result, pos, array.length);
                pos += array.length;
            }
            while(from == to && !next.isEmpty()) {
                MeshTask task = next.pop();
                if(task.tryUnfork()) {
                    to = task.to;
                }
                else {
                    long[] array = task.join();
                    int newLen = pos+to-from+array.length;
                    if(newLen != result.length)
                        result = Arrays.copyOf(result, newLen);
                    System.arraycopy(array, 0, result, pos, array.length);
                    pos += array.length;
                }
            }
        }
        return result;
    }
    static boolean checkCell(long cellId) {
        return cellId > 1000;
    }
    static long[] generate(long cellId) {
        long[] sub = new long[8];
        for(int i = 0; i < sub.length; i++) sub[i] = cellId*10+i;
        return sub;
    }
}

Эта реализация вызывает метод compute корневой задачи напрямую, чтобы включить вызывающий поток в вычисления. Метод compute использует getSurplusQueuedTaskCount()< /a> чтобы решить, следует ли разделяться. Как говорится в документации, идея состоит в том, чтобы всегда иметь небольшой излишек, например. 3. Это гарантирует, что оценка может адаптироваться к несбалансированным рабочим нагрузкам, поскольку бездействующие потоки могут перехватывать работу у другой задачи.

Разделение не выполняется путем создания двух подзадач и ожидания обеих. Вместо этого отделяется только одна задача, представляющая вторую половину незавершенной работы, а рабочая нагрузка текущей задачи адаптируется к первой половине.

Затем оставшаяся рабочая нагрузка обрабатывается локально. После этого извлекается последняя отправленная подзадача и предпринимается попытка разветвить. Если разветвление прошло успешно, диапазон текущей рабочей нагрузки также адаптируется для охвата диапазона последующей задачи, и локальная итерация продолжается.

Таким образом, любая избыточная задача, которая не была украдена другим потоком, обрабатывается самым простым и легким способом, как если бы она никогда не разветвлялась.

Если задача была подхвачена другим потоком, мы должны дождаться ее завершения сейчас и объединить массив результатов.

Обратите внимание, что при ожидании подзадачи через join() базовая реализация также проверяет, возможны ли разветвление и локальная оценка, чтобы все рабочие потоки были заняты. Тем не менее, настройка нашей переменной цикла и непосредственное накопление результатов в нашем целевом массиве все же лучше, чем вложенный вызов compute, который по-прежнему требует слияния массивов результатов.

Если ячейка не является листом, результирующие узлы обрабатываются рекурсивно по той же логике. Это снова позволяет проводить адаптивную локальную и параллельную оценку, поэтому выполнение будет адаптироваться к несбалансированным рабочим нагрузкам, например. если конкретная ячейка имеет большее поддерево или оценка конкретной ячейки задачи намного дольше, чем другие.

Следует подчеркнуть, что во всех случаях для извлечения выгоды из параллельной обработки требуется значительная рабочая нагрузка. Если, как в примере, в основном используется только копирование данных, выгода может быть намного меньше, отсутствовать или, в худшем случае, параллельная обработка может работать хуже, чем последовательная.

person Holger    schedule 11.01.2018
comment
Вау, это отличный ответ, спасибо! Я пробовал потоковую версию в реальном случае (сфера, очищенная до 9 уровня) с 1 192 192 ячейками. 721 мс. Впечатляющий! - person Ben; 11.01.2018
comment
Еще один вопрос о LongStream.range(0, 8).map(j -> cellId * 10 + j); Предположим, вместо ввода только j мне нужны три целых числа i, j и k? Вы можете сделать это с диапазоном? - person Ben; 11.01.2018
comment
Я нашел ответ здесь: stackoverflow.com/ вопросы/26439163/ - person Ben; 12.01.2018
comment
Для числовых переменных вы можете использовать, например. LongStream.range(0, 8).flatMap(i -> LongStream.range(0, 8).flatMap(j -> LongStream.range(0, 8).map(k -> cellId*1000+i*100+j*10+k))) - person Holger; 12.01.2018