Flink: Allow arbitrary post-commit maintenance tasks via IcebergSink Builder#15566
Flink: Allow arbitrary post-commit maintenance tasks via IcebergSink Builder#15566mxm wants to merge 7 commits intoapache:mainfrom
Conversation
| if (flinkWriteConf.compactMode()) { | ||
| RewriteDataFilesConfig rewriteDataFilesConfig = | ||
| flinkMaintenanceConfig.createRewriteDataFilesConfig(); | ||
| maintenanceTasks.add(RewriteDataFiles.builder().config(rewriteDataFilesConfig)); | ||
| } |
There was a problem hiding this comment.
What about configs for the other maintenance tasks.
Do we want to have a generic way to config those?
There was a problem hiding this comment.
Eventually, yes. This will give users the option to use post-commit table maintenance outside of the Java API.
There was a problem hiding this comment.
I think this solution conflicts with "add only a single way to do a single thing".
The user can enable compaction by adding COMPACTION_ENABLE but the same could be archived with setting the compaction in the maintenance. Do we really want to do this?
There was a problem hiding this comment.
I thought about that as well. We already allow configuring Iceberg via table options, Flink config, or via the builder method. We don't allow just one way. I think this a bit of a similar case.
Granted, if we treat maintenance tasks as a single configuration option, then we should not allow builder-based and table options at the same time. However, if we think about the maintenance tasks as separate options, then it makes sense to allow both configuration styles at the same time.
There was a problem hiding this comment.
It would be sensible to allow only one of each maintenance types, i.e. let that be the unit of configuration. Consequently, when RewriteDataFiles is configured, it should supersede "compaction mode".
There was a problem hiding this comment.
What is the typical use-case?
The most wide config settings are like this:
public String orcCompressionStrategy() {
return confParser
.stringConf()
.option(FlinkWriteOptions.COMPRESSION_STRATEGY.key())
.flinkConfig(FlinkWriteOptions.COMPRESSION_STRATEGY)
.tableProperty(TableProperties.ORC_COMPRESSION_STRATEGY)
.defaultValue(TableProperties.ORC_COMPRESSION_STRATEGY_DEFAULT)
.parse();
}
Sometimes we allow the user to override the settings by manually adding options:
@Override
public Builder overwrite(boolean newOverwrite) {
writeOptions.put(FlinkWriteOptions.OVERWRITE_MODE.key(), Boolean.toString(newOverwrite));
return this;
}
We might even validate against setting them twice with a different java method:
public Builder<T> watermarkColumn(String columnName) {
Preconditions.checkArgument(
splitAssignerFactory == null,
"Watermark column and SplitAssigner should not be set in the same source");
readOptions.put(FlinkReadOptions.WATERMARK_COLUMN, columnName);
return this;
}
To support the property based solutions we likely to have:
- ExpireSnapshotsConfig
- DeleteOrphanFilesConfig
This means that setting ExpireSnapshotsConfig and also adding a new ExpireSnapshots tasks should only effect each other. If we create a generic maintenance method on IcebergSink, then we will have hard time matching those.
I think we need to think a bit more about this.
CC: @Guosmilesmile
There was a problem hiding this comment.
Out of curiosity, is the scenario we’re trying to solve here to avoid having to maintain a separate maintenance job per table Or what scenario is this intended to address?
From my perspective, I have a few (still rough) questions would like to ask:
- Do we allow multiple ways to configure the same maintenance task, and also allow multiple instances of the same type of task? For example, conflicts between the
compactModeoption and themaintenance()method, or adding multiple rewrite-data tasks with different configurations viamaintenance(). - Regarding config injection, we can inject configs via the existing
flinkConfapproach, or bypassflinkConfand havemaintenance()generate task configs directly. That could get confusing—can we unify this behind a single entry point?
There was a problem hiding this comment.
Fair, point taken.
Out of curiosity, is the scenario we’re trying to solve here to avoid having to maintain a separate maintenance job per table Or what scenario is this intended to address?
We're trying to make all table maintenance available post-commit. This is way simpler than having multiple jobs and avoids conflicts with the writer.
I'm going to change the implementation:
- I'll add a similar configuration as for RewriteDataFiles ("compactMode")
- I'll remove the generic
maintenance(..)builder method. - Change the code to only allow one maintenance for each type. Builder will have precedence over the property-based configuration.
There was a problem hiding this comment.
| ConfigOptions.key("compaction-enabled").booleanType().defaultValue(false); | ||
| ConfigOptions.key(RewriteDataFilesConfig.PREFIX + "enabled") | ||
| .booleanType() | ||
| .defaultValue(false) | ||
| .withDeprecatedKeys("compaction-enabled"); |
There was a problem hiding this comment.
I moved the enabled flag behind the config prefix.
| * @deprecated See {@code rewriteDatafiles(..)} | ||
| */ | ||
| @Deprecated | ||
| public Builder compaction(boolean enabled) { | ||
| writeOptions.put(FlinkWriteOptions.COMPACTION_ENABLE.key(), Boolean.toString(enabled)); | ||
| return this; | ||
| } |
There was a problem hiding this comment.
I deprecated this method in favor of rewriteDataFiles. I think it's better to use the actual maintenance task names, to avoid confusion.
There was a problem hiding this comment.
+1 on this. I think the best API would be to let the user choose what kind of "compaction task" they want to enable. I'd even suggest to mark this compaction method for deletion in the next release (or at least the next to next release)
This comment was marked as resolved.
This comment was marked as resolved.
IcebergSink previously hardcoded RewriteDataFiles as the only post-commit table maintenance task, behind a boolean `compactMode` option. Users could not configure ExpireSnapshots, DeleteOrphanFiles, or any combination of tasks through the builder API. This adds a maintenance(MaintenanceTaskBuilder) method to the IcebergSink builder that can be called multiple times. The compactMode config can still be used.
a40bded to
04d190d
Compare
|
Rebased after conflicts with #15575. |
| * @see ExpireSnapshotsConfig for the default config. | ||
| */ | ||
| public Builder expireSnapshots(boolean enabled) { | ||
| writeOptions.put(FlinkWriteOptions.EXPIRE_SNAPSHOTS_ENABLE.key(), Boolean.toString(enabled)); |
There was a problem hiding this comment.
Do we have reasonable defaults?
There was a problem hiding this comment.
Defaults are hard. These are the current defaults:
ExpireSnapshotsConfig defaults:
- schedule.commit-count: 10
- schedule.data-file-count: 1,000
- schedule.data-file-size: 100 GB
- schedule.interval-second: 3600 (1 hour)
- max-snapshot-age-seconds: none
- retain-last: none
- delete-batch-size: 1,000
- clean-expired-metadata: false
- planning-worker-pool-size: none (shared pool)
DeleteOrphanFilesConfig defaults:
- schedule.commit-count: 10
- schedule.data-file-count: 1,000
- schedule.data-file-size: 100 GB
- schedule.interval-second: 3600 (1 hour)
- min-age-seconds: 259,200 (3 days)
- delete-batch-size: 1,000
- use-prefix-listing: false
- planning-worker-pool-size: none (shared pool)
- equal-schemes: none
- equal-authorities: none
- prefix-mismatch-mode: ERROR
There was a problem hiding this comment.
Defaults are hard. These are the current defaults:
ExpireSnapshotsConfig defaults:
- schedule.commit-count: 10
- schedule.data-file-count: 1,000
- schedule.data-file-size: 100 GB
- schedule.interval-second: 3600 (1 hour)
- max-snapshot-age-seconds: none
- retain-last: none
- delete-batch-size: 1,000
- clean-expired-metadata: false
- planning-worker-pool-size: none (shared pool)
Can we remove schedule.data-file-count, schedule.data-file-size - I don't thing we need those for expire snapshots.
Maybe we should set clean-expired-metadata? I think keeping is only makes sense when we want to keep the old functionality. We are introducing a new one. We should remove unused metadata as soon as possible.
DeleteOrphanFilesConfig defaults:
- schedule.commit-count: 10
- schedule.data-file-count: 1,000
- schedule.data-file-size: 100 GB
- schedule.interval-second: 3600 (1 hour)
- min-age-seconds: 259,200 (3 days)
- delete-batch-size: 1,000
- use-prefix-listing: false
- planning-worker-pool-size: none (shared pool)
- equal-schemes: none
- equal-authorities: none
- prefix-mismatch-mode: ERROR
Maybe we could remove all of the schedule configs other than the interval. The others shouldn't have to do anything with the delete orphan files
Maybe setting use-prefix-listing to true would be better. It should perform better in most cases, and only edge cases need the recursive listing.
What do you think about these @Guosmilesmile?
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
Outdated
Show resolved
Hide resolved
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
Outdated
Show resolved
Hide resolved
e84e0f8 to
51ef32b
Compare
51ef32b to
07822d0
Compare
| // ... | ||
| // Configure expire snapshots | ||
| flinkConf.put("flink-maintenance.expire-snapshots.retain-last", "5"); | ||
| flinkConf.put("flink-maintenance.expire-snapshots.max-snapshot-age-seconds", "604800"); |
There was a problem hiding this comment.
I see we're offering
flinkConf.put("flink-maintenance.expire-snapshots.enabled", "true");and we are also offering specific expire-snapshot configs like:
flinkConf.put("flink-maintenance.expire-snapshots.retain-last", "5");
flinkConf.put("flink-maintenance.expire-snapshots.max-snapshot-age-seconds", "604800")Does flink-maintenance.expire-snapshots.retain-last require flink-maintenance.expire-snapshots.enabled as true? (same question for the other config, and this comment in general for all the other maintenance tasks.
| Map<String, String> schemes = deleteOrphanFilesConfig.equalSchemes(); | ||
| if (schemes != null) { | ||
| this.equalSchemes(schemes); | ||
| } |
There was a problem hiding this comment.
We have default value in
Does this need to be changed to an append instead of an overwrite here?
| | `flink-maintenance.delete-orphan-files.location` | Location to start recursive listing | Table location | | ||
| | `flink-maintenance.delete-orphan-files.use-prefix-listing` | Use prefix listing for file discovery | `false` | | ||
| | `flink-maintenance.delete-orphan-files.planning-worker-pool-size` | Worker pool size for planning | Shared pool | | ||
| | `flink-maintenance.delete-orphan-files.equal-schemes` | Equivalent schemes (format: `s3n=s3,s3a=s3`) | Not set | |
There was a problem hiding this comment.
We have default value
private Map<String, String> equalSchemes =
Maps.newHashMap(
ImmutableMap.of(
"s3n", "s3",
"s3a", "s3"));
| public static final ConfigOption<String> EQUAL_SCHEMES_OPTION = | ||
| ConfigOptions.key(EQUAL_SCHEMES) | ||
| .stringType() | ||
| .noDefaultValue() |
IcebergSink previously hardcoded RewriteDataFiles as the only post-commit table maintenance task, behind a boolean
compactModeoption. Users could not configure ExpireSnapshots, DeleteOrphanFiles, or any combination of tasks through the builder API.This adds a maintenance(MaintenanceTaskBuilder) method to the IcebergSink builder that can be called multiple times. The compactMode config can still be used.