Странные запросы на поиск _delta_log при использовании пользовательского формата FileFormat в блоках данных

Я наблюдаю очень странные запросы, выдаваемые блоками данных при использовании пользовательского формата файла. Мне нужно реализовать собственный FileFormat для чтения двоичных файлов в spark sql. Я реализовал класс FileFormat (в основном это копирование/вставка из AvroFileFormat), зарегистрировал его через META-INF/services и использовал следующим образом:

val df = spark.read.format("my-format").load("s3://exos-dev/table_data/file.bin")

Это работает как в локальной искре, так и в датабриках. Но в databricks я вижу, что до загрузки моей реализации формата файла - он пытается найти _delta_log в каталоге s3://exos-dev/table_data/file.bin На самом деле он выдает 3 запроса, см. Журнал ошибок

В конце концов, после 3 чтений s3 он загружает мою реализацию RecAvroFormat и использует ее. Вот где это начинается:

at org.apache.spark.sql.DataFrameReader.preprocessDeltaLoading(DataFrameReader.scala:230)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:278)

Метод preprocessDeltaLoading существует только в искре блоков данных. Как это отключить? Мне нужно прочитать 10 тысяч кадров данных в цикле, поэтому он выдаст 30 тысяч бесполезных запросов s3, что, похоже, замедляет мой код.

Журнал ошибок:

20/05/15 07:11:14 INFO S3AFileSystem: Stack trace: com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden; request: HEAD https://exos-dev.s3.amazonaws.com table_data/file.bin/_delta_log {} aws-sdk-java/1.11.595 Linux/4.4.0-1105-aws OpenJDK_64-Bit_Server_VM/25.242-b08 java/1.8.0_242 scala/2.11.12 vendor/Private_Build com.amazonaws.services.s3.model.GetObjectMetadataRequest; Request ID: 98EAF67749BB0068, Extended Request ID: ...
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1712)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1367)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1113)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:770)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:744)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:726)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:686)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:668)
    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:532)
    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:512)
    at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4921)
    at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4867)
    at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1320)
    at com.databricks.s3a.aws.EnforcingDatabricksS3Client$$anonfun$getObjectMetadata$1.apply(EnforcingDatabricksS3Client.scala:220)
    at com.databricks.s3a.aws.EnforcingDatabricksS3Client$$anonfun$getObjectMetadata$1.apply(EnforcingDatabricksS3Client.scala:220)
    at scala.util.Try$.apply(Try.scala:192)
    at com.databricks.s3a.aws.DatabricksS3Client$class.retryRequest(DatabricksS3Client.scala:151)
    at com.databricks.s3a.aws.DatabricksS3Client$class.withExponentialBackoff(DatabricksS3Client.scala:125)
    at com.databricks.s3a.aws.EnforcingDatabricksS3Client.withExponentialBackoff(EnforcingDatabricksS3Client.scala:28)
    at com.databricks.s3a.aws.EnforcingDatabricksS3Client.getObjectMetadata(EnforcingDatabricksS3Client.scala:219)
    at com.databricks.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:1876)
    at com.databricks.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:1378)
    at com.databricks.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:88)
    at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1426)
    at com.databricks.sql.transaction.tahoe.DeltaTableUtils$$anonfun$findDeltaTableRoot$1.apply$mcZ$sp(DeltaTable.scala:171)
    at com.databricks.sql.transaction.tahoe.DeltaTableUtils$$anonfun$findDeltaTableRoot$1.apply(DeltaTable.scala:171)
    at com.databricks.sql.transaction.tahoe.DeltaTableUtils$$anonfun$findDeltaTableRoot$1.apply(DeltaTable.scala:171)
    at scala.util.Try$.apply(Try.scala:192)
    at com.databricks.sql.transaction.tahoe.DeltaTableUtils$.findDeltaTableRoot(DeltaTable.scala:171)
    at org.apache.spark.sql.DataFrameReader.preprocessDeltaLoading(DataFrameReader.scala:230)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:278)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:214)
    at line2bfff6d09387477794603ac98a6c816640.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-3147709739428370:1)
    at line2bfff6d09387477794603ac98a6c816640.$read$$iw$$iw$$iw$$iw$$iw.<init>(command-3147709739428370:51)
    at line2bfff6d09387477794603ac98a6c816640.$read$$iw$$iw$$iw$$iw.<init>(command-3147709739428370:53)
    at line2bfff6d09387477794603ac98a6c816640.$read$$iw$$iw$$iw.<init>(command-3147709739428370:55)
    at line2bfff6d09387477794603ac98a6c816640.$read$$iw$$iw.<init>(command-3147709739428370:57)
    at line2bfff6d09387477794603ac98a6c816640.$read$$iw.<init>(command-3147709739428370:59)
    at line2bfff6d09387477794603ac98a6c816640.$read.<init>(command-3147709739428370:61)
    at line2bfff6d09387477794603ac98a6c816640.$read$.<init>(command-3147709739428370:65)
    at line2bfff6d09387477794603ac98a6c816640.$read$.<clinit>(command-3147709739428370)
    at line2bfff6d09387477794603ac98a6c816640.$eval$.$print$lzycompute(<notebook>:7)
    at line2bfff6d09387477794603ac98a6c816640.$eval$.$print(<notebook>:6)
    at line2bfff6d09387477794603ac98a6c816640.$eval.$print(<notebook>)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:793)
    at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1054)
    at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:645)
    at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:644)
    at scala.reflect.internal.util.ScalaClassLoader$class.asContext(ScalaClassLoader.scala:31)
    at scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:19)
    at scala.tools.nsc.interpreter.IMain$WrappedRequest.loadAndRunReq(IMain.scala:644)
    at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:576)
    at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:572)
    at com.databricks.backend.daemon.driver.DriverILoop.execute(DriverILoop.scala:215)
    at com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$repl$1.apply$mcV$sp(ScalaDriverLocal.scala:202)
    at com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$repl$1.apply(ScalaDriverLocal.scala:202)
    at com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$repl$1.apply(ScalaDriverLocal.scala:202)
    at com.databricks.backend.daemon.driver.DriverLocal$TrapExitInternal$.trapExit(DriverLocal.scala:714)
    at com.databricks.backend.daemon.driver.DriverLocal$TrapExit$.apply(DriverLocal.scala:667)
    at com.databricks.backend.daemon.driver.ScalaDriverLocal.repl(ScalaDriverLocal.scala:202)
    at com.databricks.backend.daemon.driver.DriverLocal$$anonfun$execute$9.apply(DriverLocal.scala:396)
    at com.databricks.backend.daemon.driver.DriverLocal$$anonfun$execute$9.apply(DriverLocal.scala:373)
    at com.databricks.logging.UsageLogging$$anonfun$withAttributionContext$1.apply(UsageLogging.scala:238)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
    at com.databricks.logging.UsageLogging$class.withAttributionContext(UsageLogging.scala:233)
    at com.databricks.backend.daemon.driver.DriverLocal.withAttributionContext(DriverLocal.scala:49)
    at com.databricks.logging.UsageLogging$class.withAttributionTags(UsageLogging.scala:275)
    at com.databricks.backend.daemon.driver.DriverLocal.withAttributionTags(DriverLocal.scala:49)
    at com.databricks.backend.daemon.driver.DriverLocal.execute(DriverLocal.scala:373)
    at com.databricks.backend.daemon.driver.DriverWrapper$$anonfun$tryExecutingCommand$2.apply(DriverWrapper.scala:644)
    at com.databricks.backend.daemon.driver.DriverWrapper$$anonfun$tryExecutingCommand$2.apply(DriverWrapper.scala:644)
    at scala.util.Try$.apply(Try.scala:192)
    at com.databricks.backend.daemon.driver.DriverWrapper.tryExecutingCommand(DriverWrapper.scala:639)
    at com.databricks.backend.daemon.driver.DriverWrapper.getCommandOutputAndError(DriverWrapper.scala:485)
    at com.databricks.backend.daemon.driver.DriverWrapper.executeCommand(DriverWrapper.scala:597)
    at com.databricks.backend.daemon.driver.DriverWrapper.runInnerLoop(DriverWrapper.scala:390)
    at com.databricks.backend.daemon.driver.DriverWrapper.runInner(DriverWrapper.scala:337)
    at com.databricks.backend.daemon.driver.DriverWrapper.run(DriverWrapper.scala:219)
    at java.lang.Thread.run(Thread.java:748)

20/05/15 07:11:14 INFO S3AFileSystem: Caught an AmazonServiceException, which means your request made it to Amazon S3, but was rejected with an error response for some reason.
20/05/15 07:11:14 INFO S3AFileSystem: Error Message: Forbidden; request: HEAD https://exos-dev.s3.amazonaws.com table_data/_delta_log {} aws-sdk-java/1.11.595 Linux/4.4.0-1105-aws OpenJDK_64-Bit_Server_VM/25.242-b08 java/1.8.0_242 scala/2.11.12 vendor/Private_Build com.amazonaws.services.s3.model.GetObjectMetadataRequest; Request ID: 15CAB9301B744BB1, Extended Request ID: W5BfMldhZBseAEeuu6pGrVg7j/BYwA6i0i9ulx8EeQolnesCbDUROysO00K+go2lWIAihBp65HQ=, Cloud Provider: AWS, Instance ID: i-04d201385f2c85deb (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID: 15CAB9301B744BB1; S3 Extended Request ID: ...
20/05/15 07:11:14 INFO S3AFileSystem: HTTP Status Code: 403
20/05/15 07:11:14 INFO S3AFileSystem: AWS Error Code: 403 Forbidden
20/05/15 07:11:14 INFO S3AFileSystem: Error Type: Client
20/05/15 07:11:14 INFO S3AFileSystem: Request ID: 15CAB9301B744BB1
20/05/15 07:11:14 INFO S3AFileSystem: Class Name: com.amazonaws.services.s3.model.AmazonS3Exception
20/05/15 07:11:14 INFO S3AFileSystem: Stack trace: com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden; request: HEAD https://exos-dev.s3.amazonaws.com table_data/_delta_log {} aws-sdk-java/1.11.595 Linux/4.4.0-1105-aws OpenJDK_64-Bit_Server_VM/25.242-b08 java/1.8.0_242 scala/2.11.12 vendor/Private_Build com.amazonaws.services.s3.model.GetObjectMetadataRequest; Request ID: 15CAB9301B744BB1, Extended Request ID: W5BfMldhZBseAEeuu6pGrVg7j/BYwA6i0i9ulx8EeQolnesCbDUROysO00K+go2lWIAihBp65HQ=, Cloud Provider: AWS, Instance ID: i-04d201385f2c85deb (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID: 15CAB9301B744BB1; S3 Extended Request ID: W5BfMldhZBseAEeuu6pGrVg7j/BYwA6i0i9ulx8EeQolnesCbDUROysO00K+go2lWIAihBp65HQ=), S3 Extended Request ID: ...
  ...
20/05/15 07:11:14 INFO S3AFileSystem: Caught an AmazonServiceException, which means your request made it to Amazon S3, but was rejected with an error response for some reason.
20/05/15 07:11:14 INFO S3AFileSystem: Error Message: Forbidden; request: HEAD https://exos-dev.s3.amazonaws.com _delta_log {} aws-sdk-java/1.11.595 Linux/4.4.0-1105-aws OpenJDK_64-Bit_Server_VM/25.242-b08 java/1.8.0_242 scala/2.11.12 vendor/Private_Build com.amazonaws.services.s3.model.GetObjectMetadataRequest; Request ID: 468EF53831074B5C, ...
20/05/15 07:11:14 INFO S3AFileSystem: HTTP Status Code: 403
20/05/15 07:11:14 INFO S3AFileSystem: AWS Error Code: 403 Forbidden
20/05/15 07:11:14 INFO S3AFileSystem: Error Type: Client
20/05/15 07:11:14 INFO S3AFileSystem: Request ID: 468EF53831074B5C
20/05/15 07:11:14 INFO S3AFileSystem: Class Name: com.amazonaws.services.s3.model.AmazonS3Exception
20/05/15 07:11:14 INFO S3AFileSystem: Stack trace: com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden; request: HEAD https://exos-dev.s3.amazonaws.com _delta_log {} aws-sdk-java/1.11.595 Linux/4.4.0-1105-aws OpenJDK_64-Bit_Server_VM/25.242-b08 java/1.8.0_242 scala/2.11.12 vendor/Private_Build com.amazonaws.services.s3.model.GetObjectMetadataRequest; ...
    ...

20/05/15 07:11:14 INFO RecAvroFormat: -------- RecAvroFormat works ------------

20/05/15 07:11:14 INFO InMemoryFileIndex: Start listing leaf files and directories. Size of Paths: 1; threshold: 32

Любые идеи?


person Vyacheslav Krot    schedule 15.05.2020    source источник


Ответы (1)


Я оставлю результаты моего расследования здесь

Метод DataFrameReader#load всегда вызывает DataFrameReader#preprocessDeltaLoading, для любого формата читалки - будь то кастом или паркет авро. Что делает preprocessDeltaLoading — он ищет корень дельты в пути, используемом методом load.

Таким образом, он всегда отправляет 3 дополнительных запроса. Но это происходит тогда и только тогда, когда методу load передается единственный путь. Если load вызывается с массивом путей - он не пытается искать корень дельты.

Что я сделал - передал дополнительный пустой файл avro. Он создает одну дополнительную задачу и читает этот пустой файл, но, по крайней мере, у меня нет тонн запрещенных ошибок, а также повторных попыток и отложений.

Необходимо попросить блоки данных сделать этот метод preprocessDeltaLoading настраиваемым и отключить его.

person Vyacheslav Krot    schedule 18.05.2020