Как запустить искровую программу на Java параллельно

Итак, у меня есть java-приложение, которое имеет зависимости от искры maven, и при его запуске оно запускает искровой сервер на хосте, на котором он работает. Экземпляр сервера имеет 36 ядер. Я указываю экземпляр SparkSession, где я параллельно указываю количество ядер и другие свойства конфигурации, но когда я вижу статистику с использованием htop, кажется, что используются не все ядра, а только 1.

   SparkSession spark  = SparkSession
                .builder()
                .master("local")
                .appName("my-spark")
                .config("spark.driver.memory","50g")
                .config("spark.hadoop.fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")
                .config("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2")
                .config("spark.sql.shuffle.partitions", "400")
                .config("spark.eventLog.enabled", "true")
                .config("spark.eventLog.dir", "/dir1/dir2/logs")
                .config("spark.history.fs.logDirectory", "/dir1/dir2/logs")
                .config("spark.executor.cores", "36")

Я также добавил в JavaSparkContext:

JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
sc.hadoopConfiguration().set("fs.s3a.access.key", AWS_KEY);
sc.hadoopConfiguration().set("fs.s3a.secret.key", AWS_SECRET_KEY);
sc.hadoopConfiguration().set("spark.driver.memory","50g");
sc.hadoopConfiguration().set("spark.eventLog.enabled", "true");
sc.hadoopConfiguration().set("spark.eventLog.dir", "/dir1/dir2/logs");
sc.hadoopConfiguration().set("spark.executor.cores", "36");

Моя задача — читать данные из aws s3 в df и записывать данные в другое ведро.

Dataset<Row> df = spark.read().format("csv").option("header", "true").load("s3a://bucket/file.csv.gz");
        //df = df.repartition(200);

        df.withColumn("col_name", df.col("col_name")).sort("col_name", "_id").write().format("iceberg").mode("append").save(location);



Ответы (2)


Файлы .gz «незаметны»: чтобы распаковать их, вы должны начать с байта 0 и читать вперед. В результате spark, hive, MapReduce и т. д. отдают весь файл одному рабочему процессу. Если вы хотите параллельную обработку, используйте другой формат сжатия (например, snappy).

person stevel    schedule 06.10.2018
comment
Но если у вас много таких файлов, вы сможете работать частично параллельно. Или я что-то упускаю? - person thebluephantom; 07.10.2018
comment
каждый файл можно отдать отдельному воркеру, да. Но ваш пример load(s3a://bucket/file.csv.gz) этого не делает, поэтому параллелизм равен 1 - person stevel; 08.10.2018

Вы используете Spark в локальном режиме, spark.executor.cores не вступит в силу, рассмотрите возможность изменения .master("local") на .master("local[*]")

Надеюсь это поможет

person Chitral Verma    schedule 05.10.2018
comment
Спасибо за ваш ответ. Однако не все 36 из них используются. И память я указываю 50 ГБ, так как у меня всего 60 ГБ памяти, а я использую 30 ГБ. Принимает ли spark эту конфигурацию в качестве верхнего предела? - person Atihska; 06.10.2018
comment
Насколько я знаю, когда вы создаете сеанс Spark с помощью конструктора, вы создаете «глобальный» сеанс. Затем вы можете создавать новые сеансы с помощью метода spark.newSession(). Это может понадобиться, если вы одновременно читаете несколько файлов или один и тот же файл повторно для выполнения разных операций. Для каждого прочитанного файла вы можете создать новый сеанс Spark с помощью метода newSession(). Каждый вызов newSession() создает новый поток. - person Nikhil; 14.10.2018
comment
@Atihska, да, эта конфигурация абсолютна. для ядер, можете ли вы проверить vcores на пользовательском интерфейсе nodemanager - person Chitral Verma; 15.10.2018