Flink DataSet API: GroupBy работает некорректно?

В моей программе Flink Java я использую GroupBy-Operator следующим образом:

dataSet.groupBy(new KeySelector<myObject, Tuple2<Tuple2<Integer, Integer>, Integer>>() {
    private static final long serialVersionUID = 5L;
    Tuple2<Tuple2<Integer, Integer>, Integer> groupingKey = new Tuple2<Tuple2<Integer, Integer>, Integer>();

        public Tuple2<Tuple2<Integer, Integer>, Integer> getKey(myObject s) {
            groupingKey.setField(s.getPosition(), 0);
            groupingKey.setField(s.getBand(), 1);
            return groupingKey;
        }
    })
    .reduceGroup(reduceFunction);

getPosition() возвращает Tuple2<Integer, Integer>, а getBand() возвращает int.

Я хочу сгруппировать свой набор данных по обоим значениям. Если у меня есть 6 позиций и 4 диапазона, я бы хотел получить 24 отдельные группы и использовать groupReduce-функцию для каждой группы независимо. Но в настоящее время мои результирующие группы, кажется, содержат различные значения для полосы и позиции. Я вот так проверил в функции groupReduce:

if (this.band == null) {
    this.band = myObject.getBand();
}
if (this.band != myObject.getBand()) {
    System.out.println("The band should be " + this.band + " but is: " + myObject.getBand());

Кроме того, в моем итоговом файле есть значения, указывающие на проблему с группировкой. Возможно ли, что в моем случае группировка не работает? Или это могло быть просто следствием еще одной потенциальной ошибки в моем коде?


person Robin Ellerkmann    schedule 13.01.2016    source источник
comment
band является переменной-членом вашего GroupReduceFunction. reduce метод одного и того же GroupReduceFunction объекта может вызываться несколько раз для разных групп (но только один раз для каждой группы). Вы сбросили this.band на ноль (кстати, почему ноль, это должно быть int, нет?) В конце метода reduce? В качестве альтернативы вы можете сделать band локальной переменной в reduce().   -  person Fabian Hueske    schedule 13.01.2016
comment
@Fabian: Спасибо, что указали мне правильное направление. Моя переменная this.band действительно была переменной-членом моего пользовательского класса GroupReduceFunction, но должна была быть в методе reduce().   -  person Robin Ellerkmann    schedule 13.01.2016
comment
Решило ли это вашу проблему или у вас все еще есть проблемы с оператором groupReduce?   -  person Fabian Hueske    schedule 13.01.2016
comment
Извините, я не видел вашего комментария. Теперь все работает, я сделал бэнд локальной переменной.   -  person Robin Ellerkmann    schedule 14.01.2016


Ответы (1)


Я думаю, ваш чек в GroupReduceFunction работает некорректно. GroupReduceFunction.reduce() можно вызывать несколько раз для разных групп. this.band является переменной-членом вашего GroupReduceFunction, и я предполагаю, что вы не сбрасываете this.band в конце метода reduce().

Следовательно, this.band будет null только при первом вызове reduce(). В начале второго вызова this.band будет инициализирован и не будет установлен на полосу текущей группы. Следовательно, следующая проверка не удастся.

person Fabian Hueske    schedule 14.01.2016