Ключ сопоставления, пара значений на основе сходства их значений в Spark

Я изучаю Spark в течение нескольких недель, в настоящее время я пытаюсь сгруппировать несколько элементов или людей на основе их связи, используя Spark и Hadoop в Scala. Например, я хочу посмотреть, как связаны футболисты на основе истории их клуба. Мои "игроки" rdd будут:

(John, FC Sion)
(Mike, FC Sion)
(Bobby, PSV Eindhoven)
(Hans, FC Sion)

Я хочу иметь rdd следующим образом:

(John, <Mike, Hans>)
(Mike, <John, Hans>)
(Bobby, <>)
(Hans, <Mike, John>)

Я планирую использовать карту для достижения этой цели.

val splitClubs = players.map(player=> (player._1, parseTeammates(player._2, players)))

Где parseTeammates — это функция, которая находит игроков, которые также играют за тот же клуб (player._2)

// RDD is not a type, how can I insert rdd into a function?
def parseTeammates(club: String, rdd: RDD) : List[String] = {
    // will generate a list of players that contains same "club" value
    val playerList = rdd.filter(_._1 == club)
    return playerList.values;
}

Я получаю сообщение об ошибке компиляции, несоответствие типов, так как ожидается, что функция вернет List[String], но вместо этого playerList.values ​​возвращает org.apache.spark.rdd.RDD[List[String]]. Может ли кто-нибудь помочь мне получить значение RDD в его простой форме (в моем случае List[String])?

Кроме того, я думаю, что есть более элегантный способ решить эту проблему, чем создавать отдельный RDD, а затем находить определенный ключ в новом RDD, а затем возвращать значение в виде списка.


person bonchenko    schedule 18.10.2015    source источник
comment
Алгоритмов обнаружения сообществ несколько. И обсуждение их здесь слишком длинное и совершенно выходит за рамки SO. На чем вы пытаетесь сосредоточиться? Какой алгоритм вы пытаетесь реализовать?   -  person eliasah    schedule 18.10.2015
comment
Я пытаюсь реализовать это с помощью карты и уменьшить с помощью Spark и Hadoop на языке Scala.   -  person bonchenko    schedule 18.10.2015
comment
Каков алгоритм? Сокращение карты - это не алгоритм, это парадигма программирования, полученная из функционального программирования.   -  person eliasah    schedule 18.10.2015
comment
Можете ли вы добавить псевдокод того, что вы пытаетесь реализовать? Ваше описание низкого качества и звучит так, как будто вы просите нас сделать за вас домашнее задание.   -  person eliasah    schedule 18.10.2015
comment
Я изменил свой код, теперь он включает код, который я пытаюсь реализовать   -  person bonchenko    schedule 18.10.2015
comment
players является СДР?   -  person eliasah    schedule 18.10.2015
comment
да. см. последнее предложение в первом абзаце. пожалуйста, внимательно прочитайте вопрос, прежде чем задать   -  person bonchenko    schedule 18.10.2015
comment
Давайте продолжим обсуждение в чате.   -  person eliasah    schedule 18.10.2015


Ответы (2)


Я думаю, что ваш parseTeammates подход немного отличается от мира RDD. Когда дело доходит до работы с RDD и потенциально очень, ДЕЙСТВИТЕЛЬНО большим объемом данных, вы не хотите использовать такой вложенный цикл. Вместо этого попробуйте реорганизовать свои данные.

Код ниже даст вам то, что вы хотите

players.map{case(player, club) => (club, List(player))}
   .reduceByKey(_++_)
   .flatMap{case(_, list) =>list.zipWithIndex.map{case(player, index) => (player, list.take(index) ++ list.drop(index+1))}}

Обратите внимание, что я сначала организую данные в соответствии с клубом, за который они играли, а затем объединяю игроков, чтобы получить результат в нужном вам формате.

Надеюсь, это поможет.

person Glennie Helles Sindholt    schedule 19.10.2015

Другой взгляд на решение @Glennie (кто ИМО прав в том, что ваш первоначальный подход не подходит).

TL;DR;

players.map { case (player, team) => (team, mutable.HashSet[String](player)) }
  .reduceByKey(_++=_)
  .flatMap {
      case (team, players) => {
        for (player <- players)
          yield (player, players - player)
      }
  }

Основная идея та же самая (создайте список товарищей по команде, выбранных командами, и flatMap этот результат). Но я предлагаю использовать другие строительные блоки для того же результата. Будет ли это победа, зависит от вкуса и характеристик производительности вашего набора данных.

Другой взгляд на reduceByKey

Сокращение по ключу здесь включает объединение набора (игроков) с одним или несколькими игроками. Если мы возьмем исходный код:

players.map{case(player, club) => (club, List(player))}
   .reduceByKey(_++_)

Внутри мы закончим вызовом чего-то вроде (начиная со scala 1.4):

def add: (List[String], List[String]) => List[String] = _++_;

players.map { case (player, team) => (team, List(player)) }
       .combineByKey(
           // The first time we see a new team on each partition
           (list: List[String]) => list, 
           // invoked each time we fusion a player in its team's list
           // (e.g. map side combine) 
           add, 
           // invoked each time we fusion each team's partial lists
           // (e.g. reduce side combine)
           add)

Вывод здесь заключается в том, что операция add (_++_) вызывается много раз. Так что его лучше оптимизировать.
В этом отношении мы знаем, что List работает плохо, потому что каждая мутация влечет за собой полное копирование существующего списка в другой. Обратите внимание: «плохо» на самом деле может быть неуместным. Если у вас миллионы команд и всего 20 игроков в каждой, то производительность ++ может быть незначительной по сравнению с другими вычислениями искры, задействованными в сокращении.

(На мой взгляд, хуже: если List станет действительно большим, видя, что некоторые операции, связанные с его сериализацией, реализованы рекурсивно, мы можем столкнуться с переполнением стека. Мне нужно это проверить).

Таким образом, мы могли бы извлечь выгоду из переключения на изменяемую коллекцию, например:

players.map { case (player, team) => (team, mutable.ArrayBuffer[String](player)) }
  .reduceByKey(_++=_)
  1. Теперь у нас есть изменяемая коллекция, для которой оптимизирована конкатенация.
  2. Мы используем ++= вместо ++, чтобы каждый раз нам даже не приходилось выделять новую коллекцию при объединении двух существующих.
  3. Если мы хорошо знаем или набираем данные, мы можем предварительно изменить размер буфера, чтобы иметь предсказуемое распределение памяти и избежать, насколько это возможно, изменения размера буфера. Или переключите реализацию, соответственно.

Другой взгляд на flatMap

Преимущества использования изменяемой коллекции

Первоначальная реализация снова использует обширные операции со списками, такие как take и drop, в сочетании с zip с индексом.

Использование изменяемой коллекции помогает нам лучше с точки зрения удобочитаемости здесь, поскольку мы можем заменить 3 копии неизменяемого списка (take, drop, ++):

list.take(index) ++ list.drop(index+1)

Только с одним (- выполняет клон)

list - list(index)

Альтернативный синтаксис

Мы также можем предоставить совершенно другую реализацию, избегая архивирования с индексом для использования вложений:

  .flatMap {
      case (team, players) => {
        for (player <- players)
          yield (player, players - player)
      }
    }

Обратите внимание, что шаг players - player включает в себя поиск игрока в списке. Используя ArrayBuffer, это операция O(n). Таким образом, мы можем, опять же, в зависимости от набора данных, предпочесть использование mutable.HashSet в качестве изменяемой коллекции вместо буфера массива, если мы пойдем по этому пути.

(Я собирался добавить при условии, что у нас нет дубликатов в именах игроков, но это не имеет значения, потому что если у вас в команде два "Джона", то нет смысла иметь две строки в вашем RDD для двух Джонов это не имеет большего значения, чем один.)

person GPI    schedule 21.10.2015