Skip to content

Commit

Permalink
Core, Spark: Refactor RewriteFileGroup planner to core
Browse files Browse the repository at this point in the history
  • Loading branch information
Peter Vary committed Nov 11, 2024
1 parent 166edc7 commit 5fdcb3d
Show file tree
Hide file tree
Showing 5 changed files with 367 additions and 159 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.actions;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.RewriteJobOrder;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.StructLikeMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Checks the files in the table, and using the {@link FileRewriter} plans the groups for
* compaction.
*/
public class RewriteFileGroupPlanner {
private static final Logger LOG = LoggerFactory.getLogger(RewriteFileGroupPlanner.class);

private final FileRewriter<FileScanTask, DataFile> rewriter;
private final RewriteJobOrder rewriteJobOrder;

public RewriteFileGroupPlanner(
FileRewriter<FileScanTask, DataFile> rewriter, RewriteJobOrder rewriteJobOrder) {
this.rewriter = rewriter;
this.rewriteJobOrder = rewriteJobOrder;
}

public RewritePlanResult plan(Table table, Expression filter, long startingSnapshotId) {
StructLikeMap<List<List<FileScanTask>>> plan =
planFileGroups(table, filter, startingSnapshotId);
RewriteExecutionContext ctx = new RewriteExecutionContext();
Stream<RewriteFileGroup> groups =
plan.entrySet().stream()
.filter(e -> !e.getValue().isEmpty())
.flatMap(
e -> {
StructLike partition = e.getKey();
List<List<FileScanTask>> scanGroups = e.getValue();
return scanGroups.stream().map(tasks -> newRewriteGroup(ctx, partition, tasks));
})
.sorted(RewriteFileGroup.comparator(rewriteJobOrder));
Map<StructLike, Integer> groupsInPartition = plan.transformValues(List::size);
int totalGroupCount = groupsInPartition.values().stream().reduce(Integer::sum).orElse(0);
return new RewritePlanResult(groups, totalGroupCount, groupsInPartition);
}

private StructLikeMap<List<List<FileScanTask>>> planFileGroups(
Table table, Expression filter, long startingSnapshotId) {
CloseableIterable<FileScanTask> fileScanTasks =
table
.newScan()
.useSnapshot(startingSnapshotId)
.filter(filter)
.ignoreResiduals()
.planFiles();

try {
Types.StructType partitionType = table.spec().partitionType();
StructLikeMap<List<FileScanTask>> filesByPartition =
groupByPartition(table, partitionType, fileScanTasks);
return filesByPartition.transformValues(
tasks -> ImmutableList.copyOf(rewriter.planFileGroups(tasks)));
} finally {
try {
fileScanTasks.close();
} catch (IOException io) {
LOG.error("Cannot properly close file iterable while planning for rewrite", io);
}
}
}

private StructLikeMap<List<FileScanTask>> groupByPartition(
Table table, Types.StructType partitionType, Iterable<FileScanTask> tasks) {
StructLikeMap<List<FileScanTask>> filesByPartition = StructLikeMap.create(partitionType);
StructLike emptyStruct = GenericRecord.create(partitionType);

for (FileScanTask task : tasks) {
// If a task uses an incompatible partition spec the data inside could contain values
// which belong to multiple partitions in the current spec. Treating all such files as
// un-partitioned and grouping them together helps to minimize new files made.
StructLike taskPartition =
task.file().specId() == table.spec().specId() ? task.file().partition() : emptyStruct;

filesByPartition.computeIfAbsent(taskPartition, unused -> Lists.newArrayList()).add(task);
}

return filesByPartition;
}

private RewriteFileGroup newRewriteGroup(
RewriteExecutionContext ctx, StructLike partition, List<FileScanTask> tasks) {
RewriteDataFiles.FileGroupInfo info =
ImmutableRewriteDataFiles.FileGroupInfo.builder()
.globalIndex(ctx.currentGlobalIndex())
.partitionIndex(ctx.currentPartitionIndex(partition))
.partition(partition)
.build();
return new RewriteFileGroup(info, Lists.newArrayList(tasks));
}

public static class RewritePlanResult {
private final Stream<RewriteFileGroup> groups;
private final int totalGroupCount;
private final Map<StructLike, Integer> groupsInPartition;

private RewritePlanResult(
Stream<RewriteFileGroup> groups,
int totalGroupCount,
Map<StructLike, Integer> groupsInPartition) {
this.groups = groups;
this.totalGroupCount = totalGroupCount;
this.groupsInPartition = groupsInPartition;
}

public Stream<RewriteFileGroup> groups() {
return groups;
}

public int groupsInPartition(StructLike partition) {
return groupsInPartition.get(partition);
}

public int totalGroupCount() {
return totalGroupCount;
}
}

private static class RewriteExecutionContext {
private final Map<StructLike, Integer> partitionIndexMap;
private final AtomicInteger groupIndex;

private RewriteExecutionContext() {
this.partitionIndexMap = Maps.newConcurrentMap();
this.groupIndex = new AtomicInteger(1);
}

private int currentGlobalIndex() {
return groupIndex.getAndIncrement();
}

private int currentPartitionIndex(StructLike partition) {
return partitionIndexMap.merge(partition, 1, Integer::sum);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ protected long inputSize(List<T> group) {
* of output files. The final split size is adjusted to be at least as big as the target file size
* but less than the max write file size.
*/
protected long splitSize(long inputSize) {
public long splitSize(long inputSize) {
long estimatedSplitSize = (inputSize / numOutputFiles(inputSize)) + SPLIT_OVERHEAD;
if (estimatedSplitSize < targetFileSize) {
return targetFileSize;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.actions;

import static org.assertj.core.api.Assertions.assertThat;

import java.io.File;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.RewriteJobOrder;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.TestBase;
import org.apache.iceberg.TestTables;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;

class TestRewriteFileGroupPlanner {
private static final DataFile FILE_1 = newDataFile("data_bucket=0", 10);
private static final DataFile FILE_2 = newDataFile("data_bucket=0", 10);
private static final DataFile FILE_3 = newDataFile("data_bucket=0", 10);
private static final DataFile FILE_4 = newDataFile("data_bucket=1", 11);
private static final DataFile FILE_5 = newDataFile("data_bucket=1", 11);
private static final DataFile FILE_6 = newDataFile("data_bucket=2", 50);

private static final Map<RewriteJobOrder, List<StructLike>> EXPECTED =
ImmutableMap.of(
RewriteJobOrder.FILES_DESC,
ImmutableList.of(FILE_1.partition(), FILE_4.partition(), FILE_6.partition()),
RewriteJobOrder.FILES_ASC,
ImmutableList.of(FILE_6.partition(), FILE_4.partition(), FILE_1.partition()),
RewriteJobOrder.BYTES_DESC,
ImmutableList.of(FILE_6.partition(), FILE_1.partition(), FILE_4.partition()),
RewriteJobOrder.BYTES_ASC,
ImmutableList.of(FILE_4.partition(), FILE_1.partition(), FILE_6.partition()));

@TempDir private File tableDir = null;
private TestTables.TestTable table = null;

@BeforeEach
public void setupTable() throws Exception {
this.table = TestTables.create(tableDir, "test", TestBase.SCHEMA, TestBase.SPEC, 3);
}

@AfterEach
public void cleanupTables() {
TestTables.clearTables();
}

@ParameterizedTest
@EnumSource(
value = RewriteJobOrder.class,
names = {"FILES_DESC", "FILES_ASC", "BYTES_DESC", "BYTES_ASC"})
void testGroups(RewriteJobOrder order) {
table
.newAppend()
.appendFile(FILE_1)
.appendFile(FILE_2)
.appendFile(FILE_3)
.appendFile(FILE_4)
.appendFile(FILE_5)
.appendFile(FILE_6)
.commit();
RewriteFileGroupPlanner planner = new RewriteFileGroupPlanner(new DummyRewriter(false), order);
RewriteFileGroupPlanner.RewritePlanResult result =
planner.plan(table, Expressions.alwaysTrue(), table.currentSnapshot().snapshotId());
List<RewriteFileGroup> groups = result.groups().collect(Collectors.toList());
assertThat(groups.stream().map(group -> group.info().partition()).collect(Collectors.toList()))
.isEqualTo(EXPECTED.get(order));
assertThat(result.totalGroupCount()).isEqualTo(3);
EXPECTED.get(order).forEach(s -> assertThat(result.groupsInPartition(s)).isEqualTo(1));
}

@Test
void testContext() {
table
.newAppend()
.appendFile(FILE_1)
.appendFile(FILE_2)
.appendFile(FILE_3)
.appendFile(FILE_4)
.appendFile(FILE_5)
.appendFile(FILE_6)
.commit();
RewriteFileGroupPlanner planner =
new RewriteFileGroupPlanner(new DummyRewriter(true), RewriteJobOrder.FILES_DESC);
RewriteFileGroupPlanner.RewritePlanResult result =
planner.plan(table, Expressions.alwaysTrue(), table.currentSnapshot().snapshotId());
assertThat(result.totalGroupCount()).isEqualTo(6);
assertThat(result.groupsInPartition(FILE_1.partition())).isEqualTo(3);
assertThat(result.groupsInPartition(FILE_4.partition())).isEqualTo(2);
assertThat(result.groupsInPartition(FILE_6.partition())).isEqualTo(1);
}

private static class DummyRewriter implements FileRewriter<FileScanTask, DataFile> {
private final boolean split;

private DummyRewriter(boolean split) {
this.split = split;
}

@Override
public Set<String> validOptions() {
return Set.of();
}

@Override
public void init(Map<String, String> options) {}

@Override
public Iterable<List<FileScanTask>> planFileGroups(Iterable<FileScanTask> tasks) {
List<FileScanTask> taskList = Lists.newArrayList(tasks);
return split
? taskList.stream().map(ImmutableList::of).collect(Collectors.toList())
: ImmutableList.of(taskList);
}

@Override
public Set<DataFile> rewrite(List<FileScanTask> group) {
return Set.of();
}
}

private static DataFile newDataFile(String partitionPath, long fileSize) {
return DataFiles.builder(TestBase.SPEC)
.withPath("/path/to/data-" + UUID.randomUUID() + ".parquet")
.withFileSizeInBytes(fileSize)
.withPartitionPath(partitionPath)
.withRecordCount(1)
.build();
}
}
Loading

0 comments on commit 5fdcb3d

Please sign in to comment.