API, Core, Spark: Pass FileIO on Spark's read path#15448
API, Core, Spark: Pass FileIO on Spark's read path#15448nastra wants to merge 2 commits intoapache:mainfrom
Conversation
000673a to
dc8a4f7
Compare
spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestBaseWithCatalog.java
Show resolved
Hide resolved
open-api/src/testFixtures/java/org/apache/iceberg/rest/RESTServerCatalogAdapter.java
Show resolved
Hide resolved
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| public class SerializableFileIOWithSize |
There was a problem hiding this comment.
this makes sure that the FileIO instance is only closed on the driver and not on executor nodes and is similar to SerializableTableWithSize
45a44aa to
122ccc8
Compare
core/src/test/java/org/apache/iceberg/rest/TestRESTScanPlanning.java
Outdated
Show resolved
Hide resolved
core/src/test/java/org/apache/iceberg/rest/TestRESTScanPlanning.java
Outdated
Show resolved
Hide resolved
| @SuppressWarnings("unchecked") | ||
| private <T extends RESTResponse> T maybeAddStorageCredential(T response) { | ||
| if (response instanceof PlanTableScanResponse resp | ||
| && PlanStatus.COMPLETED == resp.planStatus()) { |
There was a problem hiding this comment.
I'm looking into when we can return a FileIO from the scan and I was surprised to see that storage-credentials is returned for any CompletedPlanningResult rather than CompletedPlanningWithIDResult. The one with ID is used for completed responses from the plan endpoint, while the generic result is returned from both plan and fetch endpoints.
Why can we return storage credentials when fetching tasks? Wouldn't it make more sense to return credentials once per planning operation? That simplifies when we know we have credentials.
That wouldn't solve the problem of needing to wait until after planFiles is called to get the FileIO, but it would at least simplify the protocol so we don't need to try to create a new FileIO after each call to fetch more tasks. (@danielcweeks, any thoughts on this?)
There was a problem hiding this comment.
I guess if we want to change this we would at the very least need to update the Spec that we added in #14563
spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
Outdated
Show resolved
Hide resolved
spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
Outdated
Show resolved
Hide resolved
| SparkScan( | ||
| SparkSession spark, | ||
| Table table, | ||
| Supplier<FileIO> fileIO, |
There was a problem hiding this comment.
This is a supplier because it will be a broadcast?
There was a problem hiding this comment.
this is a supplier because planFiles() will be called later in the read path and so the actual FileIO with the right credentials will only be available later
122ccc8 to
4a69352
Compare
4a69352 to
f9e1bde
Compare
c13cd0a to
b5367d7
Compare
b5367d7 to
3e3ec04
Compare
The downside of this approach is that the changes are much bigger than in #15368 and we also need
When accessing/reading data files, the codebase is using the Table's
FileIOinstance throughtable.io()on Spark's read path. With remote scan planning theFileIOinstance is configured with a PlanID + custom storage credentials insideRESTTableScan, but that instance is never propagated to the place(s) that actually perform the read., thus leading to errors.This PR passes the
FileIOobtained during remote/distributed scan planning next to theTableinstance on Spark's read path.This is an alternative to #15368 and requires
SerializableFileIOWithSize, which makes sure that theFileIOinstance is only closed on the driver and not on executor nodes (similar toSerializableTableWithSize).