GridGain: MapReduce с обработкой данных на локальном узле?

Я пытаюсь выполнить некоторые численные вычисления на большом распределенном наборе данных. Алгоритмы хорошо соответствуют модели MapReduce с дополнительным свойством, заключающимся в том, что выходные данные шага карты имеют небольшой размер по сравнению с входными данными. Данные могут считаться доступными только для чтения и статически распределяться по узлам (за исключением повторной балансировки при отработке отказа). Обратите внимание, что это несколько противоречит стандартным примерам подсчета слов, где входные данные отправляются узлам, выполняющим шаг сопоставления.

Это означает, что шаг карты должен выполняться параллельно на всех узлах, обрабатывая локальные данные каждого узла, в то время как допустимо, чтобы выходные данные шага карты отправлялись на один узел для шага сокращения.

Как лучше всего реализовать это с помощью GridGain?

Кажется, в более ранних версиях GridGain был метод reduce(..) для интерфейсов GridCache/GridCacheProjection, но его больше нет. Есть ли замена? Я думаю о механизме, который берет закрытие карты и выполняет его, распределенное по каждому элементу данных ровно один раз, избегая при этом копирования каких-либо входных данных по сети.

(Несколько ручной) подход, который я придумал до сих пор, заключается в следующем:

public class GridBroadcastCountDemo {

    public static void main(String[] args) throws GridException {
        try (Grid grid = GridGain.start(CONFIG_FILE)) {

            GridFuture<Collection<Integer>> future = grid.forRemotes().compute().broadcast(new GridCallable<Integer>() {
                @Override
                public Integer call() throws Exception {
                    GridCache<Integer, float[]> cache = grid.cache(CACHE_NAME);
                    int count = 0;
                    for (float[] array : cache.primaryValues()) {
                        count += array.length;
                    }
                    return count;
                }
            });

            int totalCount = 0;
            for (int count : future.get()) {
                totalCount += count;
            }
            // expect size of input data
            System.out.println(totalCount);
        }
    }
}

Однако нет гарантии, что при таком подходе каждое значение будет обработано ровно один раз. Например. когда происходит повторная балансировка во время выполнения GridCallable, часть данных может обрабатываться ноль или несколько раз.


person Qwert Zuiopü    schedule 19.02.2015    source источник


Ответы (1)


GridGain с открытым исходным кодом (который теперь называется Apache Ignite) имеет API ComputeTask, в котором есть как map(), так и () методы. Если вы ищете метод reduce(), то ComputeTask — это, безусловно, правильный API для вас.

На данный момент ваша реализация в порядке. Apache Ignite добавляет функцию, при которой узел не будет считаться основным, пока миграция не будет полностью завершена. Это должно скоро произойти.

person Dmitriy    schedule 20.02.2015
comment
Приятно знать, что будет функция, которая гарантирует, что ключи не будут считаться первичными на нескольких узлах во время перебалансировки. Однако ошибочная обработка некоторых ключей несколько раз не представляет большой проблемы (если не считать некоторых накладных расходов), потому что дубликаты легко обнаруживаются и отбрасываются на этапе сокращения. Противоположный случай был бы действительно проблематичным, если бы какие-либо ключи не обрабатывались из-за происходящей перебалансировки. Возможно ли, что это происходит, или GridGain предоставляет какие-либо гарантии того, что в любой момент существует хотя бы одна копия каждого первичного ключа? - person Qwert Zuiopü; 20.02.2015
comment
@QwertZuiopü Противоположный случай невозможен. GridGain гарантирует наличие хотя бы одной копии ключа. - person Dmitriy; 20.02.2015
comment
@QwertZuiopü Кстати, вы можете отслеживать функцию отложенного назначения первичного ключа до завершения предварительной загрузки здесь: issues.apache.org/jira/browse/IGNITE-324 - person Dmitriy; 20.02.2015
comment
На самом деле, GridComputeTask.map и GridComputeTask.reduce не совсем обеспечивают абстракцию уменьшения карты высокого уровня, которую я искал (как описано в оригинальной статье Google]). Конечно, это можно легко реализовать поверх ComputeTask API. Тем не менее, было бы неплохо, если бы GridGain предоставлял его «из коробки»; оптимально гарантируя, что каждый первичный ключ обрабатывается ровно один раз. - person Qwert Zuiopü; 26.02.2015
comment
@QwertZuiopü Pure MapReduce никогда не создавался для повышения производительности и является большим излишеством для вычислений в памяти. Мне еще предстоит увидеть вариант использования, чувствительный к производительности, который действительно требует его для обработки в реальном времени. Подробнее см. здесь: apacheignite.readme.io/v1.0/docs/ вычислительные задачи - person Dmitriy; 27.02.2015
comment
@QwertZuiopü Было бы неплохо узнать о вашем варианте использования и понять, почему вам необходим чистый MapReduce (вместо fork-join). - person Dmitriy; 27.02.2015
comment
Я согласен с тем, что классический MapReduce во многих случаях (и в нашем конкретном случае использования) влечет за собой ненужные накладные расходы. Таким образом, мой запрос сводится к гарантии того, что каждый первичный ключ обрабатывается ComputeTask ровно один раз (и локально, на узле, где он хранится). Кажется, это действительно было бы решено с помощью функции отложенного назначения первичного ключа, на которую вы указали. - person Qwert Zuiopü; 27.02.2015