diff --git a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionTimeExtractor.java b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionTimeExtractor.java index 36ca92382a86..59c39ec361b3 100644 --- a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionTimeExtractor.java +++ b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionTimeExtractor.java @@ -34,8 +34,6 @@ import java.time.format.ResolverStyle; import java.time.format.SignStyle; import java.time.temporal.ChronoField; -import java.util.ArrayList; -import java.util.LinkedHashMap; import java.util.List; import java.util.Locale; import java.util.Objects; @@ -93,12 +91,9 @@ public PartitionTimeExtractor(@Nullable String pattern, @Nullable String formatt this.formatter = formatter; } - public LocalDateTime extract(LinkedHashMap spec) { - return extract(new ArrayList<>(spec.keySet()), new ArrayList<>(spec.values())); - } - - public LocalDateTime extract(List partitionKeys, List partitionValues) { - LocalDateTime dateTime = null; + @Nullable + public LocalDateTime extract( + List partitionKeys, List partitionValues, boolean ignoreException) { try { String timestampString; if (pattern == null) { @@ -112,22 +107,31 @@ public LocalDateTime extract(List partitionKeys, List partitionValues partitionValues.get(i).toString()); } } - dateTime = toLocalDateTime(timestampString, this.formatter); + return toLocalDateTime(timestampString, this.formatter); } catch (Exception e) { String partitionInfos = IntStream.range(0, partitionKeys.size()) .mapToObj(i -> partitionKeys.get(i) + ":" + partitionValues.get(i)) .collect(Collectors.joining(",")); - LOG.warn( - "Partition {} can't uses '{}' formatter to extract datetime to expire." - + " Please check the partition expiration configuration or" - + " manually delete the partition using the drop-partition command or" - + " use 'update-time' expiration strategy by set {}, the strategy support non-date formatted partition.", - partitionInfos, - this.formatter, - CoreOptions.PARTITION_EXPIRATION_STRATEGY.key()); + String message = + String.format( + "Can't extract datetime from partition '%s' with formatter '%s' and pattern '%s'.", + partitionInfos, this.formatter, this.pattern); + if (ignoreException) { + LOG.warn( + "{}. If you want to configure partition expiration, please:\n" + + " 1. Check the expiration configuration.\n" + + " 2. Manually delete the partition using the drop-partition command if the partition" + + " value is non-date formatted.\n" + + " 3. Use '{}' expiration strategy by set '{}', which supports non-date formatted partition.", + message, + CoreOptions.PartitionExpireStrategy.UPDATE_TIME, + CoreOptions.PARTITION_EXPIRATION_STRATEGY.key()); + return null; + } else { + throw new RuntimeException(message, e); + } } - return dateTime; } private static LocalDateTime toLocalDateTime( diff --git a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionValuesTimeExpireStrategy.java b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionValuesTimeExpireStrategy.java index 5efe8b910e08..69f23e55fb3b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionValuesTimeExpireStrategy.java +++ b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionValuesTimeExpireStrategy.java @@ -64,7 +64,8 @@ private PartitionValuesTimePredicate(LocalDateTime expireDateTime) { @Override public boolean test(BinaryRow partition) { Object[] array = convertPartition(partition); - LocalDateTime partTime = timeExtractor.extract(partitionKeys, Arrays.asList(array)); + LocalDateTime partTime = + timeExtractor.extract(partitionKeys, Arrays.asList(array), true); return partTime != null && expireDateTime.isAfter(partTime); } diff --git a/paimon-core/src/test/java/org/apache/paimon/partition/PartitionTimeExtractorTest.java b/paimon-core/src/test/java/org/apache/paimon/partition/PartitionTimeExtractorTest.java index ca9b4869b44e..ee63935602f6 100644 --- a/paimon-core/src/test/java/org/apache/paimon/partition/PartitionTimeExtractorTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/partition/PartitionTimeExtractorTest.java @@ -18,6 +18,8 @@ package org.apache.paimon.partition; +import org.apache.paimon.testutils.assertj.PaimonAssertions; + import org.junit.jupiter.api.Test; import java.time.LocalDateTime; @@ -25,6 +27,7 @@ import java.util.Collections; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Test for {@link PartitionTimeExtractor}. */ public class PartitionTimeExtractorTest { @@ -35,23 +38,29 @@ public void testDefault() { assertThat( extractor.extract( Collections.emptyList(), - Collections.singletonList("2023-01-01 20:08:08"))) + Collections.singletonList("2023-01-01 20:08:08"), + true)) .isEqualTo(LocalDateTime.parse("2023-01-01T20:08:08")); assertThat( extractor.extract( Collections.emptyList(), - Collections.singletonList("2023-1-1 20:08:08"))) + Collections.singletonList("2023-1-1 20:08:08"), + true)) .isEqualTo(LocalDateTime.parse("2023-01-01T20:08:08")); assertThat( extractor.extract( - Collections.emptyList(), Collections.singletonList("2023-01-01"))) + Collections.emptyList(), + Collections.singletonList("2023-01-01"), + true)) .isEqualTo(LocalDateTime.parse("2023-01-01T00:00:00")); assertThat( extractor.extract( - Collections.emptyList(), Collections.singletonList("2023-1-1"))) + Collections.emptyList(), + Collections.singletonList("2023-1-1"), + true)) .isEqualTo(LocalDateTime.parse("2023-01-01T00:00:00")); } @@ -62,20 +71,24 @@ public void testPattern() { assertThat( extractor.extract( Arrays.asList("year", "month", "day"), - Arrays.asList("2023", "01", "01"))) + Arrays.asList("2023", "01", "01"), + true)) .isEqualTo(LocalDateTime.parse("2023-01-01T00:00:00")); extractor = new PartitionTimeExtractor("$year-$month-$day $hour:00:00", null); assertThat( extractor.extract( Arrays.asList("year", "month", "day", "hour"), - Arrays.asList("2023", "01", "01", "01"))) + Arrays.asList("2023", "01", "01", "01"), + true)) .isEqualTo(LocalDateTime.parse("2023-01-01T01:00:00")); extractor = new PartitionTimeExtractor("$dt", null); assertThat( extractor.extract( - Arrays.asList("other", "dt"), Arrays.asList("dummy", "2023-01-01"))) + Arrays.asList("other", "dt"), + Arrays.asList("dummy", "2023-01-01"), + true)) .isEqualTo(LocalDateTime.parse("2023-01-01T00:00:00")); } @@ -84,7 +97,30 @@ public void testFormatter() { PartitionTimeExtractor extractor = new PartitionTimeExtractor(null, "yyyyMMdd"); assertThat( extractor.extract( - Collections.emptyList(), Collections.singletonList("20230101"))) + Collections.emptyList(), + Collections.singletonList("20230101"), + true)) .isEqualTo(LocalDateTime.parse("2023-01-01T00:00:00")); } + + @Test + public void testExtractNonDateFormattedPartition() { + PartitionTimeExtractor extractor = new PartitionTimeExtractor("$ds", "yyyyMMdd"); + assertThat( + extractor.extract( + Collections.singletonList("ds"), + Collections.singletonList("unknown"), + true)) + .isNull(); + assertThatThrownBy( + () -> + extractor.extract( + Collections.singletonList("ds"), + Collections.singletonList("unknown"), + false)) + .satisfies( + PaimonAssertions.anyCauseMatches( + RuntimeException.class, + "Can't extract datetime from partition 'ds:unknown' with formatter 'yyyyMMdd' and pattern '$ds'.")); + } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTrigger.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTrigger.java index ed168387ba7e..0989ea8657d7 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTrigger.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTrigger.java @@ -36,6 +36,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -122,9 +123,14 @@ public List donePartitions(boolean endInput, long currentTimeMillis) { String partition = entry.getKey(); long lastUpdateTime = entry.getValue(); + LinkedHashMap partitionSpec = + extractPartitionSpecFromPath(new Path(partition)); long partitionStartTime = timeExtractor - .extract(extractPartitionSpecFromPath(new Path(partition))) + .extract( + new ArrayList<>(partitionSpec.keySet()), + new ArrayList<>(partitionSpec.values()), + false) .atZone(ZoneId.systemDefault()) .toInstant() .toEpochMilli();