Другой взгляд на решение @Glennie (кто ИМО прав в том, что ваш первоначальный подход не подходит).
TL;DR;
players.map { case (player, team) => (team, mutable.HashSet[String](player)) }
.reduceByKey(_++=_)
.flatMap {
case (team, players) => {
for (player <- players)
yield (player, players - player)
}
}
Основная идея та же самая (создайте список товарищей по команде, выбранных командами, и flatMap
этот результат). Но я предлагаю использовать другие строительные блоки для того же результата. Будет ли это победа, зависит от вкуса и характеристик производительности вашего набора данных.
Другой взгляд на reduceByKey
Сокращение по ключу здесь включает объединение набора (игроков) с одним или несколькими игроками. Если мы возьмем исходный код:
players.map{case(player, club) => (club, List(player))}
.reduceByKey(_++_)
Внутри мы закончим вызовом чего-то вроде (начиная со scala 1.4):
def add: (List[String], List[String]) => List[String] = _++_;
players.map { case (player, team) => (team, List(player)) }
.combineByKey(
// The first time we see a new team on each partition
(list: List[String]) => list,
// invoked each time we fusion a player in its team's list
// (e.g. map side combine)
add,
// invoked each time we fusion each team's partial lists
// (e.g. reduce side combine)
add)
Вывод здесь заключается в том, что операция add
(_++_
) вызывается много раз. Так что его лучше оптимизировать.
В этом отношении мы знаем, что List
работает плохо, потому что каждая мутация влечет за собой полное копирование существующего списка в другой. Обратите внимание: «плохо» на самом деле может быть неуместным. Если у вас миллионы команд и всего 20 игроков в каждой, то производительность ++
может быть незначительной по сравнению с другими вычислениями искры, задействованными в сокращении.
(На мой взгляд, хуже: если List
станет действительно большим, видя, что некоторые операции, связанные с его сериализацией, реализованы рекурсивно, мы можем столкнуться с переполнением стека. Мне нужно это проверить).
Таким образом, мы могли бы извлечь выгоду из переключения на изменяемую коллекцию, например:
players.map { case (player, team) => (team, mutable.ArrayBuffer[String](player)) }
.reduceByKey(_++=_)
- Теперь у нас есть изменяемая коллекция, для которой оптимизирована конкатенация.
- Мы используем
++=
вместо ++
, чтобы каждый раз нам даже не приходилось выделять новую коллекцию при объединении двух существующих.
- Если мы хорошо знаем или набираем данные, мы можем предварительно изменить размер буфера, чтобы иметь предсказуемое распределение памяти и избежать, насколько это возможно, изменения размера буфера. Или переключите реализацию, соответственно.
Другой взгляд на flatMap
Преимущества использования изменяемой коллекции
Первоначальная реализация снова использует обширные операции со списками, такие как take
и drop
, в сочетании с zip с индексом.
Использование изменяемой коллекции помогает нам лучше с точки зрения удобочитаемости здесь, поскольку мы можем заменить 3 копии неизменяемого списка (take
, drop
, ++
):
list.take(index) ++ list.drop(index+1)
Только с одним (-
выполняет клон)
list - list(index)
Альтернативный синтаксис
Мы также можем предоставить совершенно другую реализацию, избегая архивирования с индексом для использования вложений:
.flatMap {
case (team, players) => {
for (player <- players)
yield (player, players - player)
}
}
Обратите внимание, что шаг players - player
включает в себя поиск игрока в списке. Используя ArrayBuffer
, это операция O(n). Таким образом, мы можем, опять же, в зависимости от набора данных, предпочесть использование mutable.HashSet
в качестве изменяемой коллекции вместо буфера массива, если мы пойдем по этому пути.
(Я собирался добавить при условии, что у нас нет дубликатов в именах игроков, но это не имеет значения, потому что если у вас в команде два "Джона", то нет смысла иметь две строки в вашем RDD для двух Джонов это не имеет большего значения, чем один.)
person
GPI
schedule
21.10.2015
players
является СДР? - person eliasah   schedule 18.10.2015