Я соединяю 2 набора данных, используя метод Apache Spark ML LSH ApproSimilarityJoin, но наблюдаю странное поведение.
После (внутреннего) соединения набор данных немного искажен, однако каждый раз, когда выполнение одной или нескольких задач занимает слишком много времени.
Как вы можете видеть, медиана составляет 6 мс на задачу (я запускаю ее на меньшем исходном наборе данных для тестирования), но 1 задача занимает 10 минут. Он почти не использует циклы процессора, он фактически объединяет данные, но очень, очень медленный. Следующая самая медленная задача выполняется за 14 с, имеет в 4 раза больше записей и фактически выливается на диск.
Само соединение представляет собой внутреннее соединение между двумя наборами данных по pos и hashValue (minhash) в соответствии со спецификацией minhash и udf для вычисления расстояния жаккарда между парами соответствия.
Взорвите хеш-таблицы:
modelDataset.select(
struct(col("*")).as(inputName), posexplode(col($(outputCol))).as(explodeCols))
Функция расстояния Жаккара:
override protected[ml] def keyDistance(x: Vector, y: Vector): Double = {
val xSet = x.toSparse.indices.toSet
val ySet = y.toSparse.indices.toSet
val intersectionSize = xSet.intersect(ySet).size.toDouble
val unionSize = xSet.size + ySet.size - intersectionSize
assert(unionSize > 0, "The union of two input sets must have at least 1 elements")
1 - intersectionSize / unionSize
}
Объединение обработанных наборов данных:
// Do a hash join on where the exploded hash values are equal.
val joinedDataset = explodedA.join(explodedB, explodeCols)
.drop(explodeCols: _*).distinct()
// Add a new column to store the distance of the two rows.
val distUDF = udf((x: Vector, y: Vector) => keyDistance(x, y), DataTypes.DoubleType)
val joinedDatasetWithDist = joinedDataset.select(col("*"),
distUDF(col(s"$leftColName.${$(inputCol)}"), col(s"$rightColName.${$(inputCol)}")).as(distCol)
)
// Filter the joined datasets where the distance are smaller than the threshold.
joinedDatasetWithDist.filter(col(distCol) < threshold)
Я пробовал комбинации кэширования, перераспределения и даже включения spark.speculation
, все безрезультатно.
Данные состоят из текста адреса черепицы, который необходимо сопоставить: 53536, Evansville, WI
=> 53, 35, 36, ev, va, an, ns, vi, il, ll, le, wi
будет небольшое расстояние с записями, где есть опечатка в городе или почтовом индексе.
Что дает довольно точные результаты, но может быть причиной перекоса соединения.
Мой вопрос:
- Чем может быть вызвано это несоответствие? (Одна задача занимает очень много времени, хотя в ней меньше записей)
- Как я могу предотвратить этот перекос в minhash без потери точности?
- Есть ли лучший способ сделать это в масштабе? (Я не могу Яро-Винклера/Левенштейна сравнивать миллионы записей со всеми записями в наборе данных о местоположении)
levenstein
(и тому подобное), чтобы получить действительно близкие. Третий проход содержал гораздо меньше данных и работал с LSH. - person Tom Lous   schedule 05.12.2019