Замена Apache Spark RDD

Я пытаюсь решить проблему, так что у меня есть такой набор данных:

(1, 3)
(1, 4)
(1, 7)
(1, 2)   <-
(2, 7)   <-
(6, 6)    
(3, 7)   <-
(7, 4)   <-
...

Так как (1 -> 2) и (2 -> 7) хотелось бы заменить набор (2, 7) на (1, 7) аналогично, (3 -> 7) и (7 -> 4) тоже заменить (7,4) на (3, 4)

Следовательно, мой набор данных становится

(1, 3)
(1, 4)
(1, 7)
(1, 2)  
(1, 7)  
(6, 6)    
(3, 7)
(3, 4)
...

Любая идея, как решить или решить эту проблему?

Спасибо


person Adetiloye Philip Kehinde    schedule 25.10.2016    source источник
comment
Может быть ясно с вашим входным значением? это немного сбивает с толку. Ваш ввод RDD[String]? если нет, пожалуйста, поделитесь своим форматом RDD   -  person Balaji Reddy    schedule 25.10.2016
comment
Ваша формулировка проблемы не очень ясна. Вы говорите (2, 7), (1,7) -> (1,7). Но почему не (1, 7), (7,4) -> 1, 4)? Другими словами, у вас есть много потенциальных совпадений. Как определить, что выбрать?   -  person The Archetypal Paul    schedule 25.10.2016
comment
@TheArchetypalPaul вы правы (1,7), (7,4) также будет подразумевать (1, 4) в основном, если значение карты предыдущей строки равно ключу карты текущей строки, тогда мы можно заменить это как (предыдущее значение строки, текущее значение)   -  person Adetiloye Philip Kehinde    schedule 25.10.2016
comment
Ах. Строка за строкой не подходит для Spark. Датафреймы + окна, наверное. На самом деле, даже функции Windows могут этого не делать. Что происходит с (1, 2), (2,3), (3,4)? Когда первые два заменяются на (1,3), это все, или вы снова начинаете сопоставление и делаете (1,3), (3,4) -> (1,4)   -  person The Archetypal Paul    schedule 25.10.2016
comment
@TheArchetypalPaul да, нужно снова начать сопоставление   -  person Adetiloye Philip Kehinde    schedule 25.10.2016
comment
ХОРОШО. поэтому мы сворачиваем цепочки, пока не получим совпадение. Таким образом, это означает, что для любой записи нам может потребоваться просмотреть произвольное количество записей, в том числе из одного раздела в другой. Мне кажется, это не подходит для Spark.   -  person The Archetypal Paul    schedule 25.10.2016
comment
Кстати, это проблема с графиком. На ум приходит транзитивное замыкание. Вероятно, GraphX ​​был бы лучшим вариантом.   -  person maasg    schedule 25.10.2016
comment
BTW2, похоже, вы не принимаете ответы на свои вопросы. Это не круто в SO.   -  person maasg    schedule 25.10.2016
comment
@TheArchetypalPaul Я смог использовать график для решения проблемы, но открыт, чтобы попробовать другой подход.   -  person Adetiloye Philip Kehinde    schedule 25.10.2016
comment
@maasg спасибо за комментарий, я принимаю ответы - к вашему сведению   -  person Adetiloye Philip Kehinde    schedule 25.10.2016
comment
@maasg прав. Пожалуйста, просмотрите свой ответ на вопрос и примите то, что необходимо принять, в противном случае прокомментируйте, почему предоставленное решение не сработало для вас!   -  person eliasah    schedule 26.10.2016


Ответы (1)


Эта задача выглядит как транзитивное замыкание графа, представленного в виде распределенного списка ребер.

Одной из ключевых особенностей Spark по сравнению со старой версией Hadoop MR является то, что Spark поддерживает интерактивные алгоритмы. Чтобы решить подобную задачу обхода графа, мы используем эту возможность в рекурсивной функции:

def closure(rdd:RDD[(Int, Int)]):RDD[(Int,Int)] = {
  val transitiveValues = rdd.map(_.swap).join(rdd).filter{case (_,(x,y)) => x != y}
  if (transitiveValues.isEmpty) {
    rdd
  } else {
    val usedTransitions = transitiveValues.flatMap{case (a,(x,y)) => Seq((x,a),(a,y))}
    val newTransitions = transitiveValues.map{case (a,(x,y)) => (x,y)}
    closure(rdd.subtract(usedTransitions).union(newTransitions)).distinct
  }
}

Это не совсем приводит к ожидаемому выше результату, потому что нет понятия приоритета (неявного порядка), поэтому closure((1, 2),(2, 7)) = (1,7), а не (1, 2), (1, 7), как ожидалось выше. Заказ может быть добавлен за счет дополнительной сложности. Также он не поддерживает циклические графики (с циклами).

Этот алгоритм должен служить только отправной точкой для настройки на конкретные внутренние требования.

person maasg    schedule 26.10.2016
comment
Это чистый ответ! Мне нравится сравнение с подходом Hadoop MR - person eliasah; 26.10.2016