Странная проблема с производительностью Spark LSH MinHash приблизительноСходствоПрисоединиться

Я соединяю 2 набора данных, используя метод Apache Spark ML LSH ApproSimilarityJoin, но наблюдаю странное поведение.

После (внутреннего) соединения набор данных немного искажен, однако каждый раз, когда выполнение одной или нескольких задач занимает слишком много времени.

спаркуй-1

Как вы можете видеть, медиана составляет 6 мс на задачу (я запускаю ее на меньшем исходном наборе данных для тестирования), но 1 задача занимает 10 минут. Он почти не использует циклы процессора, он фактически объединяет данные, но очень, очень медленный. Следующая самая медленная задача выполняется за 14 с, имеет в 4 раза больше записей и фактически выливается на диск.

Если вы посмотрите sparkuisql

Само соединение представляет собой внутреннее соединение между двумя наборами данных по pos и hashValue (minhash) в соответствии со спецификацией minhash и udf для вычисления расстояния жаккарда между парами соответствия.

Взорвите хеш-таблицы:

modelDataset.select(
      struct(col("*")).as(inputName), posexplode(col($(outputCol))).as(explodeCols))

Функция расстояния Жаккара:

 override protected[ml] def keyDistance(x: Vector, y: Vector): Double = {
    val xSet = x.toSparse.indices.toSet
    val ySet = y.toSparse.indices.toSet
    val intersectionSize = xSet.intersect(ySet).size.toDouble
    val unionSize = xSet.size + ySet.size - intersectionSize
    assert(unionSize > 0, "The union of two input sets must have at least 1 elements")
    1 - intersectionSize / unionSize
  }

Объединение обработанных наборов данных:

// Do a hash join on where the exploded hash values are equal.
val joinedDataset = explodedA.join(explodedB, explodeCols)
  .drop(explodeCols: _*).distinct()

// Add a new column to store the distance of the two rows.
val distUDF = udf((x: Vector, y: Vector) => keyDistance(x, y), DataTypes.DoubleType)
val joinedDatasetWithDist = joinedDataset.select(col("*"),
  distUDF(col(s"$leftColName.${$(inputCol)}"), col(s"$rightColName.${$(inputCol)}")).as(distCol)
)

// Filter the joined datasets where the distance are smaller than the threshold.
joinedDatasetWithDist.filter(col(distCol) < threshold)

Я пробовал комбинации кэширования, перераспределения и даже включения spark.speculation, все безрезультатно.

Данные состоят из текста адреса черепицы, который необходимо сопоставить: 53536, Evansville, WI => 53, 35, 36, ev, va, an, ns, vi, il, ll, le, wi будет небольшое расстояние с записями, где есть опечатка в городе или почтовом индексе.

Что дает довольно точные результаты, но может быть причиной перекоса соединения.

Мой вопрос:

  • Чем может быть вызвано это несоответствие? (Одна задача занимает очень много времени, хотя в ней меньше записей)
  • Как я могу предотвратить этот перекос в minhash без потери точности?
  • Есть ли лучший способ сделать это в масштабе? (Я не могу Яро-Винклера/Левенштейна сравнивать миллионы записей со всеми записями в наборе данных о местоположении)

person Tom Lous    schedule 18.07.2018    source источник
comment
вы нашли решение для этого   -  person Aastha    schedule 02.12.2019
comment
Да, но скорее всего не тот, который вам нужен :-) Пару раз обрабатывал датасет. Сначала соединение по умолчанию, где все точно совпадает. Я отфильтровал их для второго прохода, где я использовал простые методы levenstein (и тому подобное), чтобы получить действительно близкие. Третий проход содержал гораздо меньше данных и работал с LSH.   -  person Tom Lous    schedule 05.12.2019


Ответы (1)


Это может быть немного поздно, но я все равно опубликую свой ответ здесь, чтобы помочь другим. Недавно у меня были похожие проблемы с сопоставлением названий компаний с ошибками (Все исполнители мертвы. Кто-то помог мне, предложив использовать NGrams, чтобы уменьшить перекос данных. Это мне очень помогло. Вы также можете попробовать использовать, например. 3 грамма или 4 грамма.

Я не знаю, насколько грязны данные, но вы можете попробовать использовать состояния. Это уже значительно сокращает количество возможных совпадений.

Что действительно помогло мне повысить точность совпадений, так это постобработка связанных компонентов (группы связанных совпадений, созданных MinHashLSH) путем запуска алгоритма распространения меток для каждого компонента. Это также позволяет вам увеличить N (из NGrams), тем самым смягчив проблему искажения данных, установив параметр расстояния Жаккарда в approxSimilarityJoin менее жестко, и постобработку с использованием распространения меток.

Наконец, в настоящее время я изучаю возможность использования скипграмм, чтобы соответствовать этому. Я обнаружил, что в некоторых случаях это работает лучше и несколько уменьшает перекос данных.

person thijsvdp    schedule 07.07.2020
comment
Хороший. На самом деле я уже использовал n-граммы, но распространение меток связанных компонентов может быть золотой идеей. Вы использовали для этого GraphX? К сожалению, я не работаю над этим проектом прямо сейчас, но, возможно, было бы неплохо вернуться к нему в какой-то момент. - person Tom Lous; 07.07.2020
comment
Нет, на самом деле я сейчас все делаю на питоне. Поэтому я использовал библиотеку NetworkX на питоне, чтобы она запускала алгоритм распространения меток локально для каждого из подключенных компонентов в сети. Я думаю, что это также должно работать, просто запустив алгоритм распространения меток по всей сети, но я еще не пробовал. Тогда я бы использовал для этого Graphframes, потому что сейчас я работаю на python. Я опубликую свой опыт с этим, как только я попробовал :) - person thijsvdp; 07.07.2020