сложное соединение в искре: элементы rdd имеют много пар ключ-значение

Я новичок в искре и пытаюсь найти способ интегрировать информацию из одного rdd в другой, но их структуры не поддаются стандартной функции соединения.

У меня на rdd такого формата:

[{a:a1, b:b1, c:[1,2,3,4], d:d1},
 {a:a2, b:b2, c:[5,6,7,8], d:d2}]

и еще один такого формата:

[{1:x1},{2,x2},{3,x3},{4,x4},{5,x5},{6,x6},{7,x7},{8,x8}]

Я хочу сопоставить значения во втором rdd с их ключами в первом rdd (которые находятся в значении списка в ключе c). Я знаю, как манипулировать ими, когда они там, поэтому меня не слишком беспокоит окончательный результат, но, может быть, я хотел бы увидеть что-то вроде этого:

[{a:a1, b:b1, c:[1,2,3,4],c0: [x1,x2,x3,x4], d:d1},
 {a:a2, b:b2, c:[5,6,7,8],c0: [x5,x6,x7,x8], d:d2}]

или это:

[{a:a1, b:b1, c:[(1,x1),(2,x2),(3,x3),(4,x4)], d:d1},
 {a:a2, b:b2, c:[(5,x5),(6,x6),(7,x7),(8,x8)], d:d2}]

или что-нибудь еще, что может сопоставить ключи во втором rdd со значениями в первом. Я думал превратить второй rdd в словарь, с которым я знаю, как работать, но я просто думаю, что мои данные слишком велики для этого.

Большое спасибо, я очень ценю это.


person cnrk    schedule 15.05.2015    source источник
comment
Я думаю, что вы мало что можете сделать... вы можете использовать декартову схему, но это будет очень неэффективно...   -  person mgaido    schedule 15.05.2015


Ответы (2)


join после flatMap или cartesian делает слишком много тасовок.

Одно из возможных решений — использовать cartesian после groupBy с HashPartitioner.

(Извините, это код scala)

val rdd0: RDD[(String, String, Seq[Int], String)]
val rdd1: RDD[(Int, String)]

val partitioner = new HashPartitioner(rdd0.partitions.size)

// here is the point!
val grouped = rdd1.groupBy(partitioner.getPartition(_))

val result = rdd0.cartesian(grouped).map { case (left, (_, right)) =>
    val map = right.toMap
    (left._1, left._2, left._4) -> left._3.flatMap(v => map.get(v).map(v -> _))
}.groupByKey().map { case (key, value) =>
    (key._1, key._2, value.flatten.toSeq, key._3)
}
person emeth    schedule 07.08.2015

Я предполагаю, что rdd1 — это ввод, содержащий {a:a1, b:b1, c:[1,2,3,4], d:d1}, а rdd2 содержит кортежи [(1, x1), (2, x2), (3, x3), (4, x4), (5, x5), (6, x6), (7, x7), (8, x8)]. Я также предполагаю, что все значения в поле "c" в rdd1 можно найти в rdd2. Если нет, вам нужно изменить часть кода ниже.

Иногда мне приходится решать подобные задачи. Если rdd2 достаточно мало, я обычно выполняю соединение на стороне карты, где я сначала транслирую объект, а затем выполняю простой поиск.

def augment_rdd1(line, lookup):
    c0 = []
    for key in line['c']:
        c0.append(lookup.value[key])
    return c0

lookup = sc.broadcast(dict(rdd2.collect()))
output = rdd1.map(lambda line: (line, augment_rdd1(line, lookup)))

Если rdd2 слишком велико для трансляции, я обычно использую flatMap для сопоставления каждой строки rdd1 с таким количеством строк, сколько есть элементов в поле «c», например. {a:a1, b:b1, c:[1,2,3,4], d:d1} будет отображаться на

  • (1, {a:a1, b:b1, c:[1,2,3,4], d:d1})
  • (2, {a:a1, b:b1, c:[1,2,3,4], d:d1})
  • (3, {a:a1, b:b1, c:[1,2,3,4], d:d1})
  • (4, {a:a1, b:b1, c:[1,2,3,4], d:d1})

FlatMap это

flat_rdd1 = rdd1.flatMap(lambda line: [(key, line) for key in line['c'])])

Затем я бы присоединился к rdd2, чтобы получить RDD, который имеет:

  • ({a:a1, b:b1, c:[1,2,3,4], d:d1}, x1)
  • ({a:a1, b:b1, c:[1,2,3,4], d:d1}, x2)
  • ({a:a1, b:b1, c:[1,2,3,4], d:d1}, x3)
  • ({a:a1, b:b1, c:[1,2,3,4], d:d1}, x4)

Присоединение следующее:

rdd2_tuple = rdd2.map(lambda line: line.items())
joined_rdd = flat_rdd1.join(rdd2_tuple).map(lambda x: x[1])

Наконец, все, что вам нужно сделать, это groupByKey, чтобы получить ({a:a1, b:b1, c:[1,2,3,4], d:d1}, [x1, x2, x3, x4]):

result = joined_rdd.groupByKey()
person mrm    schedule 05.10.2015
comment
Это круто. Хотел бы я знать, как добавить его в закладки, помимо написания комментария. - person Manuel G; 10.03.2016