Spark - слишком много открытых файлов в случайном порядке

Использование Спарк 1.1

У меня есть 2 набора данных. Один очень большой, а другой был уменьшен (с использованием фильтрации 1:100) до гораздо меньшего масштаба. Мне нужно уменьшить большой набор данных до того же масштаба, объединив только те элементы из меньшего списка с соответствующими им аналогами в большем списке (эти списки содержат элементы, которые имеют поле взаимного соединения).

Я делаю это, используя следующий код:

  • Часть «if (joinKeys! = null)» является соответствующей частью
  • Меньший список — «joinKeys», больший список — «keyedEvents».

    private static JavaRDD<ObjectNode> createOutputType(JavaRDD jsonsList, final String type, String outputPath,JavaPairRDD<String,String> joinKeys) {
    
    outputPath = outputPath + "/" + type;
    
    JavaRDD events = jsonsList.filter(new TypeFilter(type));
    
    
    // This is in case we need to narrow the list to match some other list of ids... Recommendation List, for example... :)
    if(joinKeys != null) {
        JavaPairRDD<String,ObjectNode> keyedEvents = events.mapToPair(new KeyAdder("requestId"));
    
        JavaRDD < ObjectNode > joinedEvents = joinKeys.join(keyedEvents).values().map(new PairToSecond());
    
        events = joinedEvents;
    }
    
    
    JavaPairRDD<String,Iterable<ObjectNode>> groupedEvents = events.mapToPair(new KeyAdder("sliceKey")).groupByKey();
    // Add convert jsons to strings and add "\n" at the end of each
    JavaPairRDD<String, String> groupedStrings = groupedEvents.mapToPair(new JsonsToStrings());
    groupedStrings.saveAsHadoopFile(outputPath, String.class, String.class, KeyBasedMultipleTextOutputFormat.class);
    return events;
    }
    

Дело в том, что при выполнении этого задания я всегда получаю одну и ту же ошибку:

Exception in thread "main" java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:40)
at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2757 in stage 13.0 failed 4 times, most recent failure: Lost task 2757.3 in stage 13.0 (TID 47681, hadoop-w-175.c.taboola-qa-01.internal): java.io.FileNotFoundException: /hadoop/spark/tmp/spark-local-20141201184944-ba09/36/shuffle_6_2757_2762 (Too many open files)
    java.io.FileOutputStream.open(Native Method)
    java.io.FileOutputStream.<init>(FileOutputStream.java:221)
    org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:123)
    org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:192)
    org.apache.spark.shuffle.hash.HashShuffleWriter$$anonfun$write$1.apply(HashShuffleWriter.scala:67)
    org.apache.spark.shuffle.hash.HashShuffleWriter$$anonfun$write$1.apply(HashShuffleWriter.scala:65)
    scala.collection.Iterator$class.foreach(Iterator.scala:727)
    scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:65)
    org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
    org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    org.apache.spark.scheduler.Task.run(Task.scala:54)
    org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
    java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    java.lang.Thread.run(Thread.java:745)

Я уже увеличил свои ulimits, выполнив следующие действия на всех машинах кластера:

echo "* soft nofile 900000" >> /etc/security/limits.conf
echo "root soft nofile 900000" >> /etc/security/limits.conf
echo "* hard nofile 990000" >> /etc/security/limits.conf
echo "root hard nofile 990000" >> /etc/security/limits.conf
echo "session required pam_limits.so" >> /etc/pam.d/common-session
echo "session required pam_limits.so" >> /etc/pam.d/common-session-noninteractive

Но не решает мою проблему...


person Yaniv Donenfeld    schedule 01.12.2014    source источник
comment
После увеличения ограничений файловых дескрипторов вы также перезапустили все демоны искры (и, возможно, запустили новый сеанс на узле, на котором вы запускаете свой драйвер), чтобы новые ограничения были приняты? Также вы запускаете искру автономно или искру на YARN? При работе на YARN перезапуск всех демонов YARN также может быть полезен (по той же причине).   -  person Angus Davis    schedule 02.12.2014
comment
Я использую GCE, поэтому каждый раз развертываю новый кластер. Кроме того, настройка ulimits выполняется на этапе инициализации кластера до запуска задания. Наконец, я использую не YARN, а режим Spark Standalone.   -  person Yaniv Donenfeld    schedule 02.12.2014
comment
Повышение уровня ulimit должно сработать, но вы также можете попробовать изменить spark.shuffle.manager на нового менеджера SORT в соответствии с руководство по настройке здесь.   -  person Nick Chammas    schedule 03.12.2014


Ответы (1)


Фреймворк bdutil работает таким образом, что задание выполняет пользователь «hadoop». Сценарий, развертывающий кластер, создал файл /etc/security/limits.d/hadoop.conf, который переопределял настройки ulimit для пользователя «hadoop», о чем я не знал. Удалив этот файл или, в качестве альтернативы, установив там нужные ulimits, я смог решить проблему.

person Yaniv Donenfeld    schedule 03.12.2014