Сериализация и настраиваемый класс Spark RDD

Я пишу собственную реализацию Spark RDD на Scala и отлаживаю свою реализацию с помощью оболочки Spark. Моя цель на данный момент - получить:

customRDD.count

чтобы добиться успеха без исключения. Прямо сейчас я получаю следующее:

15/03/06 23:02:32 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
15/03/06 23:02:32 ERROR TaskSetManager: Failed to serialize task 0, not attempting to retry it.
java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.serializer.SerializationDebugger$ObjectStreamClassMethods$.getObjFieldValues$extension(SerializationDebugger.scala:240)

...

Caused by: java.lang.ArrayIndexOutOfBoundsException: 1
    at java.io.ObjectStreamClass$FieldReflector.getObjFieldValues(ObjectStreamClass.java:2050)
    at java.io.ObjectStreamClass.getObjFieldValues(ObjectStreamClass.java:1252)
    ... 45 more

Мое внимание привлекает «не удалось сериализовать задачу 0». У меня нет четкого мысленного представления о том, что происходит, я делаю customRDD.count, и очень неясно, что нельзя сериализовать.

Мой собственный RDD состоит из:

  • пользовательский класс RDD
  • пользовательский класс раздела
  • custom (scala) класс итератора

Моя сессия оболочки Spark выглядит так:

import custom.rdd.stuff
import org.apache.spark.SparkContext

val conf = sc.getConf
conf.set(custom, parameters)
sc.stop
sc2 = new SparkContext(conf)
val mapOfThings: Map[String, String] = ...
myRdd = customRDD(sc2, mapOfStuff)
myRdd.count

... (exception output) ...

Что я хотел бы знать:

  • Что должно быть «сериализуемым» в целях создания пользовательского класса RDD?
  • Что значит быть «сериализуемым» для Spark? Это похоже на Java "Serializable"?
  • Все ли данные, возвращаемые итератором моего RDD (возвращенные методом compute), также должны быть сериализуемыми?

Большое спасибо за любые разъяснения по этому вопросу.


person llovett    schedule 06.03.2015    source источник


Ответы (3)


Код, выполняемый в контексте Spark, должен существовать в пределах той же границы процесса рабочего узла, на котором поручено выполнение задачи. Это означает, что необходимо позаботиться о том, чтобы любые объекты или значения, указанные в ваших настройках RDD, были сериализуемыми. Если объекты не сериализуемы, вам необходимо убедиться, что они имеют правильную область видимости, чтобы каждый раздел имел новый экземпляр этого объекта.

По сути, вы не можете совместно использовать несериализуемый экземпляр объекта, объявленного в вашем драйвере Spark, и ожидать, что его состояние будет реплицировано на другие узлы в вашем кластере.

Это пример, в котором не удается сериализовать несериализуемый объект:

NotSerializable notSerializable = new NotSerializable();
JavaRDD<String> rdd = sc.textFile("/tmp/myfile");

rdd.map(s -> notSerializable.doSomething(s)).collect();

Приведенный ниже пример будет работать нормально, поскольку он находится в контексте лямбда-выражения, его можно правильно распределить по нескольким разделам без необходимости сериализации состояния экземпляра несериализуемого объекта. Это также относится к несериализуемым транзитивным зависимостям, указанным как часть вашей настройки RDD (если таковая имеется).

rdd.forEachPartition(iter -> {
  NotSerializable notSerializable = new NotSerializable();

  // ...Now process iter
});

Подробнее см. Здесь: http://databricks.gitbooks.io/databricks-spark-knowledge-base/content/troubleshooting/javaionotserializableexception.html.

person Kenny Bastani    schedule 07.03.2015

В дополнение к объяснению Кенни, я бы посоветовал вам включить отладку сериализации, чтобы увидеть, что вызывает проблему. Часто это невозможно понять, просто взглянув на код.

-Dsun.io.serialization.extendedDebugInfo=true
person Daniel Darabos    schedule 07.03.2015
comment
Спасибо тебе за это. У меня была та же проблема, что и у OP. Обычно процедура Spark «ImproveException» распечатывает проблемный класс, но в случае OP и в моем случае она не работает. Добавление этой опции в $ SPARK_CONF / java-opts дало мне информацию, необходимую для продвижения вперед. - person Necro; 07.04.2015
comment
могу я спросить, что такое ОП? - person tribbloid; 13.04.2015
comment
OP = Вступительный плакат, в данном случае @llovett. - person Daniel Darabos; 13.04.2015
comment
На самом деле, если быть точным, это оригинальный пост / оригинальный постер (источник: en.wikipedia.org/wiki/ Internet_forum # сообщение) - person Eran Medan; 12.05.2015
comment
Спасибо: очень-очень полезный флаг для добавления! - person xtof54; 16.09.2015

Проблема в том, что вы передаете SparkContex (Boilerplate) в свой метод customRdd (customRDD (sc2, mapOfStuff)). Убедитесь, что ваш класс также сериализует, что делает SparkContext.

person Dinesh Thalke    schedule 06.05.2016