diff --git a/core/src/main/java/org/apache/iceberg/actions/RewriteFileGroupPlanner.java b/core/src/main/java/org/apache/iceberg/actions/RewriteFileGroupPlanner.java new file mode 100644 index 000000000000..b0824f692c3d --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/actions/RewriteFileGroupPlanner.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.actions; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Stream; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.RewriteJobOrder; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.StructLikeMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Checks the files in the table, and using the {@link FileRewriter} plans the groups for + * compaction. + */ +public class RewriteFileGroupPlanner { + private static final Logger LOG = LoggerFactory.getLogger(RewriteFileGroupPlanner.class); + + private final FileRewriter rewriter; + private final RewriteJobOrder rewriteJobOrder; + + public RewriteFileGroupPlanner( + FileRewriter rewriter, RewriteJobOrder rewriteJobOrder) { + this.rewriter = rewriter; + this.rewriteJobOrder = rewriteJobOrder; + } + + public RewritePlanResult plan(Table table, Expression filter, long startingSnapshotId) { + StructLikeMap>> plan = + planFileGroups(table, filter, startingSnapshotId); + RewriteExecutionContext ctx = new RewriteExecutionContext(); + Stream groups = + plan.entrySet().stream() + .filter(e -> !e.getValue().isEmpty()) + .flatMap( + e -> { + StructLike partition = e.getKey(); + List> scanGroups = e.getValue(); + return scanGroups.stream().map(tasks -> newRewriteGroup(ctx, partition, tasks)); + }) + .sorted(RewriteFileGroup.comparator(rewriteJobOrder)); + Map groupsInPartition = plan.transformValues(List::size); + int totalGroupCount = groupsInPartition.values().stream().reduce(Integer::sum).orElse(0); + return new RewritePlanResult(groups, totalGroupCount, groupsInPartition); + } + + private StructLikeMap>> planFileGroups( + Table table, Expression filter, long startingSnapshotId) { + CloseableIterable fileScanTasks = + table + .newScan() + .useSnapshot(startingSnapshotId) + .filter(filter) + .ignoreResiduals() + .planFiles(); + + try { + Types.StructType partitionType = table.spec().partitionType(); + StructLikeMap> filesByPartition = + groupByPartition(table, partitionType, fileScanTasks); + return filesByPartition.transformValues( + tasks -> ImmutableList.copyOf(rewriter.planFileGroups(tasks))); + } finally { + try { + fileScanTasks.close(); + } catch (IOException io) { + LOG.error("Cannot properly close file iterable while planning for rewrite", io); + } + } + } + + private StructLikeMap> groupByPartition( + Table table, Types.StructType partitionType, Iterable tasks) { + StructLikeMap> filesByPartition = StructLikeMap.create(partitionType); + StructLike emptyStruct = GenericRecord.create(partitionType); + + for (FileScanTask task : tasks) { + // If a task uses an incompatible partition spec the data inside could contain values + // which belong to multiple partitions in the current spec. Treating all such files as + // un-partitioned and grouping them together helps to minimize new files made. + StructLike taskPartition = + task.file().specId() == table.spec().specId() ? task.file().partition() : emptyStruct; + + filesByPartition.computeIfAbsent(taskPartition, unused -> Lists.newArrayList()).add(task); + } + + return filesByPartition; + } + + private RewriteFileGroup newRewriteGroup( + RewriteExecutionContext ctx, StructLike partition, List tasks) { + RewriteDataFiles.FileGroupInfo info = + ImmutableRewriteDataFiles.FileGroupInfo.builder() + .globalIndex(ctx.currentGlobalIndex()) + .partitionIndex(ctx.currentPartitionIndex(partition)) + .partition(partition) + .build(); + return new RewriteFileGroup(info, Lists.newArrayList(tasks)); + } + + public static class RewritePlanResult { + private final Stream groups; + private final int totalGroupCount; + private final Map groupsInPartition; + + private RewritePlanResult( + Stream groups, + int totalGroupCount, + Map groupsInPartition) { + this.groups = groups; + this.totalGroupCount = totalGroupCount; + this.groupsInPartition = groupsInPartition; + } + + public Stream groups() { + return groups; + } + + public int groupsInPartition(StructLike partition) { + return groupsInPartition.get(partition); + } + + public int totalGroupCount() { + return totalGroupCount; + } + } + + private static class RewriteExecutionContext { + private final Map partitionIndexMap; + private final AtomicInteger groupIndex; + + private RewriteExecutionContext() { + this.partitionIndexMap = Maps.newConcurrentMap(); + this.groupIndex = new AtomicInteger(1); + } + + private int currentGlobalIndex() { + return groupIndex.getAndIncrement(); + } + + private int currentPartitionIndex(StructLike partition) { + return partitionIndexMap.merge(partition, 1, Integer::sum); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/actions/SizeBasedFileRewriter.java b/core/src/main/java/org/apache/iceberg/actions/SizeBasedFileRewriter.java index cea7003c1a38..5d45392c5487 100644 --- a/core/src/main/java/org/apache/iceberg/actions/SizeBasedFileRewriter.java +++ b/core/src/main/java/org/apache/iceberg/actions/SizeBasedFileRewriter.java @@ -191,7 +191,7 @@ protected long inputSize(List group) { * of output files. The final split size is adjusted to be at least as big as the target file size * but less than the max write file size. */ - protected long splitSize(long inputSize) { + public long splitSize(long inputSize) { long estimatedSplitSize = (inputSize / numOutputFiles(inputSize)) + SPLIT_OVERHEAD; if (estimatedSplitSize < targetFileSize) { return targetFileSize; diff --git a/core/src/test/java/org/apache/iceberg/actions/TestRewriteFileGroupPlanner.java b/core/src/test/java/org/apache/iceberg/actions/TestRewriteFileGroupPlanner.java new file mode 100644 index 000000000000..da4f9d30e043 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/actions/TestRewriteFileGroupPlanner.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.actions; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.File; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.RewriteJobOrder; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.TestBase; +import org.apache.iceberg.TestTables; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; + +class TestRewriteFileGroupPlanner { + private static final DataFile FILE_1 = newDataFile("data_bucket=0", 10); + private static final DataFile FILE_2 = newDataFile("data_bucket=0", 10); + private static final DataFile FILE_3 = newDataFile("data_bucket=0", 10); + private static final DataFile FILE_4 = newDataFile("data_bucket=1", 11); + private static final DataFile FILE_5 = newDataFile("data_bucket=1", 11); + private static final DataFile FILE_6 = newDataFile("data_bucket=2", 50); + + private static final Map> EXPECTED = + ImmutableMap.of( + RewriteJobOrder.FILES_DESC, + ImmutableList.of(FILE_1.partition(), FILE_4.partition(), FILE_6.partition()), + RewriteJobOrder.FILES_ASC, + ImmutableList.of(FILE_6.partition(), FILE_4.partition(), FILE_1.partition()), + RewriteJobOrder.BYTES_DESC, + ImmutableList.of(FILE_6.partition(), FILE_1.partition(), FILE_4.partition()), + RewriteJobOrder.BYTES_ASC, + ImmutableList.of(FILE_4.partition(), FILE_1.partition(), FILE_6.partition())); + + @TempDir private File tableDir = null; + private TestTables.TestTable table = null; + + @BeforeEach + public void setupTable() throws Exception { + this.table = TestTables.create(tableDir, "test", TestBase.SCHEMA, TestBase.SPEC, 3); + } + + @AfterEach + public void cleanupTables() { + TestTables.clearTables(); + } + + @ParameterizedTest + @EnumSource( + value = RewriteJobOrder.class, + names = {"FILES_DESC", "FILES_ASC", "BYTES_DESC", "BYTES_ASC"}) + void testGroups(RewriteJobOrder order) { + table + .newAppend() + .appendFile(FILE_1) + .appendFile(FILE_2) + .appendFile(FILE_3) + .appendFile(FILE_4) + .appendFile(FILE_5) + .appendFile(FILE_6) + .commit(); + RewriteFileGroupPlanner planner = new RewriteFileGroupPlanner(new DummyRewriter(false), order); + RewriteFileGroupPlanner.RewritePlanResult result = + planner.plan(table, Expressions.alwaysTrue(), table.currentSnapshot().snapshotId()); + List groups = result.groups().collect(Collectors.toList()); + assertThat(groups.stream().map(group -> group.info().partition()).collect(Collectors.toList())) + .isEqualTo(EXPECTED.get(order)); + assertThat(result.totalGroupCount()).isEqualTo(3); + EXPECTED.get(order).forEach(s -> assertThat(result.groupsInPartition(s)).isEqualTo(1)); + } + + @Test + void testContext() { + table + .newAppend() + .appendFile(FILE_1) + .appendFile(FILE_2) + .appendFile(FILE_3) + .appendFile(FILE_4) + .appendFile(FILE_5) + .appendFile(FILE_6) + .commit(); + RewriteFileGroupPlanner planner = + new RewriteFileGroupPlanner(new DummyRewriter(true), RewriteJobOrder.FILES_DESC); + RewriteFileGroupPlanner.RewritePlanResult result = + planner.plan(table, Expressions.alwaysTrue(), table.currentSnapshot().snapshotId()); + assertThat(result.totalGroupCount()).isEqualTo(6); + assertThat(result.groupsInPartition(FILE_1.partition())).isEqualTo(3); + assertThat(result.groupsInPartition(FILE_4.partition())).isEqualTo(2); + assertThat(result.groupsInPartition(FILE_6.partition())).isEqualTo(1); + } + + private static class DummyRewriter implements FileRewriter { + private final boolean split; + + public DummyRewriter(boolean split) { + this.split = split; + } + + @Override + public Set validOptions() { + return Set.of(); + } + + @Override + public void init(Map options) {} + + @Override + public Iterable> planFileGroups(Iterable tasks) { + List taskList = Lists.newArrayList(tasks); + return split + ? taskList.stream().map(ImmutableList::of).collect(Collectors.toList()) + : ImmutableList.of(taskList); + } + + @Override + public Set rewrite(List group) { + return Set.of(); + } + } + + private static DataFile newDataFile(String partitionPath, long fileSize) { + return DataFiles.builder(TestBase.SPEC) + .withPath("/path/to/data-" + UUID.randomUUID() + ".parquet") + .withFileSizeInBytes(fileSize) + .withPartitionPath(partitionPath) + .withRecordCount(1) + .build(); + } +} diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java index 4e381a7bd362..651abfaa3fdd 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java @@ -18,20 +18,16 @@ */ package org.apache.iceberg.spark.actions; -import java.io.IOException; import java.math.RoundingMode; import java.util.Arrays; import java.util.Collection; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; -import java.util.stream.Stream; import org.apache.iceberg.DataFile; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.RewriteJobOrder; @@ -44,27 +40,23 @@ import org.apache.iceberg.actions.RewriteDataFiles; import org.apache.iceberg.actions.RewriteDataFilesCommitManager; import org.apache.iceberg.actions.RewriteFileGroup; -import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.actions.RewriteFileGroupPlanner; +import org.apache.iceberg.actions.RewriteFileGroupPlanner.RewritePlanResult; import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; -import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Queues; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.relocated.com.google.common.math.IntMath; import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors; import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder; -import org.apache.iceberg.types.Types.StructType; import org.apache.iceberg.util.PropertyUtil; -import org.apache.iceberg.util.StructLikeMap; import org.apache.iceberg.util.Tasks; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.internal.SQLConf; @@ -168,21 +160,17 @@ public RewriteDataFiles.Result execute() { validateAndInitOptions(); - StructLikeMap>> fileGroupsByPartition = - planFileGroups(startingSnapshotId); - RewriteExecutionContext ctx = new RewriteExecutionContext(fileGroupsByPartition); + RewritePlanResult result = plan(startingSnapshotId); - if (ctx.totalGroupCount() == 0) { + if (result.totalGroupCount() == 0) { LOG.info("Nothing found to rewrite in {}", table.name()); return EMPTY_RESULT; } - Stream groupStream = toGroupStream(ctx, fileGroupsByPartition); - Builder resultBuilder = partialProgressEnabled - ? doExecuteWithPartialProgress(ctx, groupStream, commitManager(startingSnapshotId)) - : doExecute(ctx, groupStream, commitManager(startingSnapshotId)); + ? doExecuteWithPartialProgress(result, commitManager(startingSnapshotId)) + : doExecute(result, commitManager(startingSnapshotId)); if (removeDanglingDeletes) { RemoveDanglingDeletesSparkAction action = @@ -190,67 +178,18 @@ public RewriteDataFiles.Result execute() { int removedCount = Iterables.size(action.execute().removedDeleteFiles()); resultBuilder.removedDeleteFilesCount(removedCount); } - return resultBuilder.build(); - } - - StructLikeMap>> planFileGroups(long startingSnapshotId) { - CloseableIterable fileScanTasks = - table - .newScan() - .useSnapshot(startingSnapshotId) - .filter(filter) - .ignoreResiduals() - .planFiles(); - try { - StructType partitionType = table.spec().partitionType(); - StructLikeMap> filesByPartition = - groupByPartition(partitionType, fileScanTasks); - return fileGroupsByPartition(filesByPartition); - } finally { - try { - fileScanTasks.close(); - } catch (IOException io) { - LOG.error("Cannot properly close file iterable while planning for rewrite", io); - } - } - } - - private StructLikeMap> groupByPartition( - StructType partitionType, Iterable tasks) { - StructLikeMap> filesByPartition = StructLikeMap.create(partitionType); - StructLike emptyStruct = GenericRecord.create(partitionType); - - for (FileScanTask task : tasks) { - // If a task uses an incompatible partition spec the data inside could contain values - // which belong to multiple partitions in the current spec. Treating all such files as - // un-partitioned and grouping them together helps to minimize new files made. - StructLike taskPartition = - task.file().specId() == table.spec().specId() ? task.file().partition() : emptyStruct; - - List files = filesByPartition.get(taskPartition); - if (files == null) { - files = Lists.newArrayList(); - } - - files.add(task); - filesByPartition.put(taskPartition, files); - } - return filesByPartition; - } - - private StructLikeMap>> fileGroupsByPartition( - StructLikeMap> filesByPartition) { - return filesByPartition.transformValues(this::planFileGroups); + return resultBuilder.build(); } - private List> planFileGroups(List tasks) { - return ImmutableList.copyOf(rewriter.planFileGroups(tasks)); + RewritePlanResult plan(long startingSnapshotId) { + return new RewriteFileGroupPlanner(rewriter, rewriteJobOrder) + .plan(table, filter, startingSnapshotId); } @VisibleForTesting - RewriteFileGroup rewriteFiles(RewriteExecutionContext ctx, RewriteFileGroup fileGroup) { - String desc = jobDesc(fileGroup, ctx); + RewriteFileGroup rewriteFiles(RewritePlanResult planResult, RewriteFileGroup fileGroup) { + String desc = jobDesc(fileGroup, planResult); Set addedFiles = withJobGroupInfo( newJobGroupInfo("REWRITE-DATA-FILES", desc), @@ -276,29 +215,25 @@ RewriteDataFilesCommitManager commitManager(long startingSnapshotId) { } private Builder doExecute( - RewriteExecutionContext ctx, - Stream groupStream, - RewriteDataFilesCommitManager commitManager) { + RewritePlanResult planResult, RewriteDataFilesCommitManager commitManager) { ExecutorService rewriteService = rewriteService(); ConcurrentLinkedQueue rewrittenGroups = Queues.newConcurrentLinkedQueue(); Tasks.Builder rewriteTaskBuilder = - Tasks.foreach(groupStream) + Tasks.foreach(planResult.groups()) .executeWith(rewriteService) .stopOnFailure() .noRetry() .onFailure( - (fileGroup, exception) -> { - LOG.warn( - "Failure during rewrite process for group {}", fileGroup.info(), exception); - }); + (fileGroup, exception) -> + LOG.warn( + "Failure during rewrite process for group {}", + fileGroup.info(), + exception)); try { - rewriteTaskBuilder.run( - fileGroup -> { - rewrittenGroups.add(rewriteFiles(ctx, fileGroup)); - }); + rewriteTaskBuilder.run(fileGroup -> rewrittenGroups.add(rewriteFiles(planResult, fileGroup))); } catch (Exception e) { // At least one rewrite group failed, clean up all completed rewrites LOG.error( @@ -341,20 +276,19 @@ private Builder doExecute( } private Builder doExecuteWithPartialProgress( - RewriteExecutionContext ctx, - Stream groupStream, - RewriteDataFilesCommitManager commitManager) { + RewritePlanResult planResult, RewriteDataFilesCommitManager commitManager) { ExecutorService rewriteService = rewriteService(); // start commit service - int groupsPerCommit = IntMath.divide(ctx.totalGroupCount(), maxCommits, RoundingMode.CEILING); + int groupsPerCommit = + IntMath.divide(planResult.totalGroupCount(), maxCommits, RoundingMode.CEILING); RewriteDataFilesCommitManager.CommitService commitService = commitManager.service(groupsPerCommit); commitService.start(); Collection rewriteFailures = new ConcurrentLinkedQueue<>(); // start rewrite tasks - Tasks.foreach(groupStream) + Tasks.foreach(planResult.groups()) .suppressFailureWhenFinished() .executeWith(rewriteService) .noRetry() @@ -367,7 +301,7 @@ private Builder doExecuteWithPartialProgress( .dataFilesCount(fileGroup.numFiles()) .build()); }) - .run(fileGroup -> commitService.offer(rewriteFiles(ctx, fileGroup))); + .run(fileGroup -> commitService.offer(rewriteFiles(planResult, fileGroup))); rewriteService.shutdown(); // stop commit service @@ -400,32 +334,6 @@ private Builder doExecuteWithPartialProgress( .rewriteFailures(rewriteFailures); } - Stream toGroupStream( - RewriteExecutionContext ctx, Map>> groupsByPartition) { - return groupsByPartition.entrySet().stream() - .filter(e -> !e.getValue().isEmpty()) - .flatMap( - e -> { - StructLike partition = e.getKey(); - List> scanGroups = e.getValue(); - return scanGroups.stream().map(tasks -> newRewriteGroup(ctx, partition, tasks)); - }) - .sorted(RewriteFileGroup.comparator(rewriteJobOrder)); - } - - private RewriteFileGroup newRewriteGroup( - RewriteExecutionContext ctx, StructLike partition, List tasks) { - int globalIndex = ctx.currentGlobalIndex(); - int partitionIndex = ctx.currentPartitionIndex(partition); - FileGroupInfo info = - ImmutableRewriteDataFiles.FileGroupInfo.builder() - .globalIndex(globalIndex) - .partitionIndex(partitionIndex) - .partition(partition) - .build(); - return new RewriteFileGroup(info, tasks); - } - private Iterable toRewriteResults(List commitResults) { return commitResults.stream().map(RewriteFileGroup::asResult).collect(Collectors.toList()); } @@ -488,7 +396,7 @@ void validateAndInitOptions() { PARTIAL_PROGRESS_ENABLED); } - private String jobDesc(RewriteFileGroup group, RewriteExecutionContext ctx) { + private String jobDesc(RewriteFileGroup group, RewritePlanResult planResult) { StructLike partition = group.info().partition(); if (partition.size() > 0) { return String.format( @@ -496,10 +404,10 @@ private String jobDesc(RewriteFileGroup group, RewriteExecutionContext ctx) { group.rewrittenFiles().size(), rewriter.description(), group.info().globalIndex(), - ctx.totalGroupCount(), + planResult.totalGroupCount(), partition, group.info().partitionIndex(), - ctx.groupsInPartition(partition), + planResult.groupsInPartition(partition), table.name()); } else { return String.format( @@ -507,39 +415,8 @@ private String jobDesc(RewriteFileGroup group, RewriteExecutionContext ctx) { group.rewrittenFiles().size(), rewriter.description(), group.info().globalIndex(), - ctx.totalGroupCount(), + planResult.totalGroupCount(), table.name()); } } - - @VisibleForTesting - static class RewriteExecutionContext { - private final StructLikeMap numGroupsByPartition; - private final int totalGroupCount; - private final Map partitionIndexMap; - private final AtomicInteger groupIndex; - - RewriteExecutionContext(StructLikeMap>> fileGroupsByPartition) { - this.numGroupsByPartition = fileGroupsByPartition.transformValues(List::size); - this.totalGroupCount = numGroupsByPartition.values().stream().reduce(Integer::sum).orElse(0); - this.partitionIndexMap = Maps.newConcurrentMap(); - this.groupIndex = new AtomicInteger(1); - } - - public int currentGlobalIndex() { - return groupIndex.getAndIncrement(); - } - - public int currentPartitionIndex(StructLike partition) { - return partitionIndexMap.merge(partition, 1, Integer::sum); - } - - public int groupsInPartition(StructLike partition) { - return numGroupsByPartition.get(partition); - } - - public int totalGroupCount() { - return totalGroupCount; - } - } } diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java index b7ab47f865b5..4689e50c347f 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java @@ -103,7 +103,6 @@ import org.apache.iceberg.spark.SparkTableUtil; import org.apache.iceberg.spark.SparkWriteOptions; import org.apache.iceberg.spark.TestBase; -import org.apache.iceberg.spark.actions.RewriteDataFilesSparkAction.RewriteExecutionContext; import org.apache.iceberg.spark.data.TestHelpers; import org.apache.iceberg.spark.source.ThreeColumnRecord; import org.apache.iceberg.types.Comparators; @@ -112,7 +111,6 @@ import org.apache.iceberg.types.Types.NestedField; import org.apache.iceberg.util.ArrayUtil; import org.apache.iceberg.util.Pair; -import org.apache.iceberg.util.StructLikeMap; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; @@ -1795,11 +1793,8 @@ protected List currentDataFiles(Table table) { private Stream toGroupStream(Table table, RewriteDataFilesSparkAction rewrite) { rewrite.validateAndInitOptions(); - StructLikeMap>> fileGroupsByPartition = - rewrite.planFileGroups(table.currentSnapshot().snapshotId()); - return rewrite.toGroupStream( - new RewriteExecutionContext(fileGroupsByPartition), fileGroupsByPartition); + return rewrite.plan(table.currentSnapshot().snapshotId()).groups(); } protected List currentData() {