From 324bf47737d4efa426a571b8782d0779ca264f46 Mon Sep 17 00:00:00 2001 From: Denys Kuzmenko Date: Wed, 27 Nov 2024 19:46:41 +0100 Subject: [PATCH] Core: Support aggregated basic stats in partition summary --- .../org/apache/iceberg/SnapshotProducer.java | 34 +++++++++++++++++-- 1 file changed, 31 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java index 45b71d654344..d1e1cc0aeac3 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java @@ -67,6 +67,7 @@ import org.apache.iceberg.metrics.Timer.Timed; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.base.Splitter; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -92,6 +93,9 @@ abstract class SnapshotProducer implements SnapshotUpdate { static final int MIN_FILE_GROUP_SIZE = 10_000; static final Set EMPTY_SET = Sets.newHashSet(); + private static final Splitter.MapSplitter PARTITION_SUMMARY_PROP_SPLITTER = + Splitter.on(",").withKeyValueSeparator("="); + /** Default callback used to delete files. */ private final Consumer defaultDelete = new Consumer() { @@ -301,6 +305,7 @@ private Map summary(TableMetadata previous) { Map previousSummary; SnapshotRef previousBranchHead = previous.ref(targetBranch); + if (previousBranchHead != null) { if (previous.snapshot(previousBranchHead.snapshotId()).summary() != null) { previousSummary = previous.snapshot(previousBranchHead.snapshotId()).summary(); @@ -325,7 +330,32 @@ private Map summary(TableMetadata previous) { // copy all summary properties from the implementation builder.putAll(summary); + aggregatedSummary(builder, previousSummary, summary); + + if (Boolean.parseBoolean(summary.get(SnapshotSummary.PARTITION_SUMMARY_PROP))) { + summary.keySet().stream() + .filter(key -> key.startsWith(SnapshotSummary.CHANGED_PARTITION_PREFIX)) + .forEach( + key -> { + Map map = PARTITION_SUMMARY_PROP_SPLITTER.split(summary.get(key)); + builder.put( + key, + SnapshotSummary.MAP_JOINER.join( + aggregatedSummary( + ImmutableMap.builder().putAll(map), + previousSummary, + map) + .build())); + }); + } + builder.putAll(EnvironmentContext.get()); + return builder.buildKeepingLast(); + } + private static ImmutableMap.Builder aggregatedSummary( + ImmutableMap.Builder builder, + Map previousSummary, + Map summary) { updateTotal( builder, previousSummary, @@ -368,9 +398,7 @@ private Map summary(TableMetadata previous) { summary, SnapshotSummary.ADDED_EQ_DELETES_PROP, SnapshotSummary.REMOVED_EQ_DELETES_PROP); - - builder.putAll(EnvironmentContext.get()); - return builder.build(); + return builder; } protected TableMetadata current() {