Я наблюдаю очень странные запросы, выдаваемые блоками данных при использовании пользовательского формата файла. Мне нужно реализовать собственный FileFormat для чтения двоичных файлов в spark sql. Я реализовал класс FileFormat (в основном это копирование/вставка из AvroFileFormat), зарегистрировал его через META-INF/services и использовал следующим образом:
val df = spark.read.format("my-format").load("s3://exos-dev/table_data/file.bin")
Это работает как в локальной искре, так и в датабриках. Но в databricks я вижу, что до загрузки моей реализации формата файла - он пытается найти _delta_log в каталоге s3://exos-dev/table_data/file.bin
На самом деле он выдает 3 запроса, см. Журнал ошибок
В конце концов, после 3 чтений s3 он загружает мою реализацию RecAvroFormat и использует ее. Вот где это начинается:
at org.apache.spark.sql.DataFrameReader.preprocessDeltaLoading(DataFrameReader.scala:230)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:278)
Метод preprocessDeltaLoading
существует только в искре блоков данных. Как это отключить? Мне нужно прочитать 10 тысяч кадров данных в цикле, поэтому он выдаст 30 тысяч бесполезных запросов s3, что, похоже, замедляет мой код.
Журнал ошибок:
20/05/15 07:11:14 INFO S3AFileSystem: Stack trace: com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden; request: HEAD https://exos-dev.s3.amazonaws.com table_data/file.bin/_delta_log {} aws-sdk-java/1.11.595 Linux/4.4.0-1105-aws OpenJDK_64-Bit_Server_VM/25.242-b08 java/1.8.0_242 scala/2.11.12 vendor/Private_Build com.amazonaws.services.s3.model.GetObjectMetadataRequest; Request ID: 98EAF67749BB0068, Extended Request ID: ...
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1712)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1367)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1113)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:770)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:744)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:726)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:686)
at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:668)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:532)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:512)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4921)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4867)
at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1320)
at com.databricks.s3a.aws.EnforcingDatabricksS3Client$$anonfun$getObjectMetadata$1.apply(EnforcingDatabricksS3Client.scala:220)
at com.databricks.s3a.aws.EnforcingDatabricksS3Client$$anonfun$getObjectMetadata$1.apply(EnforcingDatabricksS3Client.scala:220)
at scala.util.Try$.apply(Try.scala:192)
at com.databricks.s3a.aws.DatabricksS3Client$class.retryRequest(DatabricksS3Client.scala:151)
at com.databricks.s3a.aws.DatabricksS3Client$class.withExponentialBackoff(DatabricksS3Client.scala:125)
at com.databricks.s3a.aws.EnforcingDatabricksS3Client.withExponentialBackoff(EnforcingDatabricksS3Client.scala:28)
at com.databricks.s3a.aws.EnforcingDatabricksS3Client.getObjectMetadata(EnforcingDatabricksS3Client.scala:219)
at com.databricks.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:1876)
at com.databricks.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:1378)
at com.databricks.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:88)
at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1426)
at com.databricks.sql.transaction.tahoe.DeltaTableUtils$$anonfun$findDeltaTableRoot$1.apply$mcZ$sp(DeltaTable.scala:171)
at com.databricks.sql.transaction.tahoe.DeltaTableUtils$$anonfun$findDeltaTableRoot$1.apply(DeltaTable.scala:171)
at com.databricks.sql.transaction.tahoe.DeltaTableUtils$$anonfun$findDeltaTableRoot$1.apply(DeltaTable.scala:171)
at scala.util.Try$.apply(Try.scala:192)
at com.databricks.sql.transaction.tahoe.DeltaTableUtils$.findDeltaTableRoot(DeltaTable.scala:171)
at org.apache.spark.sql.DataFrameReader.preprocessDeltaLoading(DataFrameReader.scala:230)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:278)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:214)
at line2bfff6d09387477794603ac98a6c816640.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-3147709739428370:1)
at line2bfff6d09387477794603ac98a6c816640.$read$$iw$$iw$$iw$$iw$$iw.<init>(command-3147709739428370:51)
at line2bfff6d09387477794603ac98a6c816640.$read$$iw$$iw$$iw$$iw.<init>(command-3147709739428370:53)
at line2bfff6d09387477794603ac98a6c816640.$read$$iw$$iw$$iw.<init>(command-3147709739428370:55)
at line2bfff6d09387477794603ac98a6c816640.$read$$iw$$iw.<init>(command-3147709739428370:57)
at line2bfff6d09387477794603ac98a6c816640.$read$$iw.<init>(command-3147709739428370:59)
at line2bfff6d09387477794603ac98a6c816640.$read.<init>(command-3147709739428370:61)
at line2bfff6d09387477794603ac98a6c816640.$read$.<init>(command-3147709739428370:65)
at line2bfff6d09387477794603ac98a6c816640.$read$.<clinit>(command-3147709739428370)
at line2bfff6d09387477794603ac98a6c816640.$eval$.$print$lzycompute(<notebook>:7)
at line2bfff6d09387477794603ac98a6c816640.$eval$.$print(<notebook>:6)
at line2bfff6d09387477794603ac98a6c816640.$eval.$print(<notebook>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:793)
at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1054)
at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:645)
at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:644)
at scala.reflect.internal.util.ScalaClassLoader$class.asContext(ScalaClassLoader.scala:31)
at scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:19)
at scala.tools.nsc.interpreter.IMain$WrappedRequest.loadAndRunReq(IMain.scala:644)
at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:576)
at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:572)
at com.databricks.backend.daemon.driver.DriverILoop.execute(DriverILoop.scala:215)
at com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$repl$1.apply$mcV$sp(ScalaDriverLocal.scala:202)
at com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$repl$1.apply(ScalaDriverLocal.scala:202)
at com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$repl$1.apply(ScalaDriverLocal.scala:202)
at com.databricks.backend.daemon.driver.DriverLocal$TrapExitInternal$.trapExit(DriverLocal.scala:714)
at com.databricks.backend.daemon.driver.DriverLocal$TrapExit$.apply(DriverLocal.scala:667)
at com.databricks.backend.daemon.driver.ScalaDriverLocal.repl(ScalaDriverLocal.scala:202)
at com.databricks.backend.daemon.driver.DriverLocal$$anonfun$execute$9.apply(DriverLocal.scala:396)
at com.databricks.backend.daemon.driver.DriverLocal$$anonfun$execute$9.apply(DriverLocal.scala:373)
at com.databricks.logging.UsageLogging$$anonfun$withAttributionContext$1.apply(UsageLogging.scala:238)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at com.databricks.logging.UsageLogging$class.withAttributionContext(UsageLogging.scala:233)
at com.databricks.backend.daemon.driver.DriverLocal.withAttributionContext(DriverLocal.scala:49)
at com.databricks.logging.UsageLogging$class.withAttributionTags(UsageLogging.scala:275)
at com.databricks.backend.daemon.driver.DriverLocal.withAttributionTags(DriverLocal.scala:49)
at com.databricks.backend.daemon.driver.DriverLocal.execute(DriverLocal.scala:373)
at com.databricks.backend.daemon.driver.DriverWrapper$$anonfun$tryExecutingCommand$2.apply(DriverWrapper.scala:644)
at com.databricks.backend.daemon.driver.DriverWrapper$$anonfun$tryExecutingCommand$2.apply(DriverWrapper.scala:644)
at scala.util.Try$.apply(Try.scala:192)
at com.databricks.backend.daemon.driver.DriverWrapper.tryExecutingCommand(DriverWrapper.scala:639)
at com.databricks.backend.daemon.driver.DriverWrapper.getCommandOutputAndError(DriverWrapper.scala:485)
at com.databricks.backend.daemon.driver.DriverWrapper.executeCommand(DriverWrapper.scala:597)
at com.databricks.backend.daemon.driver.DriverWrapper.runInnerLoop(DriverWrapper.scala:390)
at com.databricks.backend.daemon.driver.DriverWrapper.runInner(DriverWrapper.scala:337)
at com.databricks.backend.daemon.driver.DriverWrapper.run(DriverWrapper.scala:219)
at java.lang.Thread.run(Thread.java:748)
20/05/15 07:11:14 INFO S3AFileSystem: Caught an AmazonServiceException, which means your request made it to Amazon S3, but was rejected with an error response for some reason.
20/05/15 07:11:14 INFO S3AFileSystem: Error Message: Forbidden; request: HEAD https://exos-dev.s3.amazonaws.com table_data/_delta_log {} aws-sdk-java/1.11.595 Linux/4.4.0-1105-aws OpenJDK_64-Bit_Server_VM/25.242-b08 java/1.8.0_242 scala/2.11.12 vendor/Private_Build com.amazonaws.services.s3.model.GetObjectMetadataRequest; Request ID: 15CAB9301B744BB1, Extended Request ID: W5BfMldhZBseAEeuu6pGrVg7j/BYwA6i0i9ulx8EeQolnesCbDUROysO00K+go2lWIAihBp65HQ=, Cloud Provider: AWS, Instance ID: i-04d201385f2c85deb (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID: 15CAB9301B744BB1; S3 Extended Request ID: ...
20/05/15 07:11:14 INFO S3AFileSystem: HTTP Status Code: 403
20/05/15 07:11:14 INFO S3AFileSystem: AWS Error Code: 403 Forbidden
20/05/15 07:11:14 INFO S3AFileSystem: Error Type: Client
20/05/15 07:11:14 INFO S3AFileSystem: Request ID: 15CAB9301B744BB1
20/05/15 07:11:14 INFO S3AFileSystem: Class Name: com.amazonaws.services.s3.model.AmazonS3Exception
20/05/15 07:11:14 INFO S3AFileSystem: Stack trace: com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden; request: HEAD https://exos-dev.s3.amazonaws.com table_data/_delta_log {} aws-sdk-java/1.11.595 Linux/4.4.0-1105-aws OpenJDK_64-Bit_Server_VM/25.242-b08 java/1.8.0_242 scala/2.11.12 vendor/Private_Build com.amazonaws.services.s3.model.GetObjectMetadataRequest; Request ID: 15CAB9301B744BB1, Extended Request ID: W5BfMldhZBseAEeuu6pGrVg7j/BYwA6i0i9ulx8EeQolnesCbDUROysO00K+go2lWIAihBp65HQ=, Cloud Provider: AWS, Instance ID: i-04d201385f2c85deb (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID: 15CAB9301B744BB1; S3 Extended Request ID: W5BfMldhZBseAEeuu6pGrVg7j/BYwA6i0i9ulx8EeQolnesCbDUROysO00K+go2lWIAihBp65HQ=), S3 Extended Request ID: ...
...
20/05/15 07:11:14 INFO S3AFileSystem: Caught an AmazonServiceException, which means your request made it to Amazon S3, but was rejected with an error response for some reason.
20/05/15 07:11:14 INFO S3AFileSystem: Error Message: Forbidden; request: HEAD https://exos-dev.s3.amazonaws.com _delta_log {} aws-sdk-java/1.11.595 Linux/4.4.0-1105-aws OpenJDK_64-Bit_Server_VM/25.242-b08 java/1.8.0_242 scala/2.11.12 vendor/Private_Build com.amazonaws.services.s3.model.GetObjectMetadataRequest; Request ID: 468EF53831074B5C, ...
20/05/15 07:11:14 INFO S3AFileSystem: HTTP Status Code: 403
20/05/15 07:11:14 INFO S3AFileSystem: AWS Error Code: 403 Forbidden
20/05/15 07:11:14 INFO S3AFileSystem: Error Type: Client
20/05/15 07:11:14 INFO S3AFileSystem: Request ID: 468EF53831074B5C
20/05/15 07:11:14 INFO S3AFileSystem: Class Name: com.amazonaws.services.s3.model.AmazonS3Exception
20/05/15 07:11:14 INFO S3AFileSystem: Stack trace: com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden; request: HEAD https://exos-dev.s3.amazonaws.com _delta_log {} aws-sdk-java/1.11.595 Linux/4.4.0-1105-aws OpenJDK_64-Bit_Server_VM/25.242-b08 java/1.8.0_242 scala/2.11.12 vendor/Private_Build com.amazonaws.services.s3.model.GetObjectMetadataRequest; ...
...
20/05/15 07:11:14 INFO RecAvroFormat: -------- RecAvroFormat works ------------
20/05/15 07:11:14 INFO InMemoryFileIndex: Start listing leaf files and directories. Size of Paths: 1; threshold: 32
Любые идеи?