Как работает алгоритм хеширования в Dataset.repartition

Я делал базовую переработку набора данных. У меня есть данные, как показано ниже в файле test.csv

abc,1
def,2
ghi,3
jkl,4
mno,5

Я читаю в кадре данных, например

val df= spark.read.csv("test.csv")
val repart=df.repartition(5,col("_c1"))
repart.write.csv("/home/partfiles/")

Теперь после записи данных он создал 5 файлов деталей, что правильно. Но в этом процессе только три файла частей имеют правильные данные, как показано ниже.

part00000 -empty
part00001 -jkl,4
part00002 -empty
part00003 -ghi,3
part00004 - abc,1
            def,2
            mno,5

Но поскольку я перераспределил на основе второго столбца, и все данные разные, в идеале должно быть создано 5 разных файлов частей.
Согласно документу API набора данных.

Возвращает новый набор данных, разделенный заданными выражениями разделения на numPartitions. Результирующий набор данных секционируется по хеш-функции.

Затем я погуглил несколько вещей и нашел эту замечательную статью о разбиении (Как работает HashPartitioner?) .
Как упоминалось в этой статье, DataSet использует алгоритм Murmur3Hash. Итак, я написал небольшой код для получения хеш-значения на основе этой статьи SO (Как я могу использовать реализацию Scala MurmurHash: scala.util.MurmurHash3?).

class Murmur3{
  import scala.util.hashing.{ MurmurHash3 => MH3 }
  val values= (1 to 5).map(p=> p.toString)
  val result = values.map(n => (n,MH3.stringHash(n,MH3.stringSeed)))
  def resultVal(): Unit ={
    val dn= result.map( d=> d._1 -> (d._2,d._2 % 5)) //
    dn.foreach(println)
  }
}

Что дает мне это значение. Вывод похож на (число, (имеет значение, хеш-значение% 5))

(1,(-1672130795,0))
(2,(382493853,3))
(3,(1416458177,2))
(4,(1968144336,1))
(5,(2100358791,1))

Теперь из этих данных он должен сгенерировать 4 файла частей. Но как были сгенерированы файлы из 3 частей. Пожалуйста, дайте мне знать, как работает hashpartitioning в случае набора данных.


person whoisthis    schedule 16.02.2018    source источник


Ответы (1)


Ошибка, которую вы сделали, заключается в предположении, что хеширование выполняется в строке Scala. На практике Spark хэширует напрямую небезопасный массив байтов.

Таким образом, выражение эквивалентно

import org.apache.spark.sql.functions.hash

Seq("1", "2", "3", "4", "5").toDF.select(
  when(hash($"value") % 5 > 0, hash($"value") % 5 )
    .otherwise(hash($"value") % 5 + 5)
).show
// +-----------------------------------------------------------------------------------------+
// |CASE WHEN ((hash(value) % 5) > 0) THEN (hash(value) % 5) ELSE ((hash(value) % 5) + 5) END|
// +-----------------------------------------------------------------------------------------+
// |                                                                                        4|
// |                                                                                        4|
// |                                                                                        3|
// |                                                                                        1|
// |                                                                                        4|
// +-----------------------------------------------------------------------------------------+

что дает наблюдаемое распределение.

person Alper t. Turker    schedule 16.02.2018