Skip to content

Commit

Permalink
[core] Refactor exception handling of PartitionTimeExtractor
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzelin committed Jul 24, 2024
1 parent 10a982e commit 6ead384
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@
import java.time.format.ResolverStyle;
import java.time.format.SignStyle;
import java.time.temporal.ChronoField;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
Expand Down Expand Up @@ -93,12 +91,9 @@ public PartitionTimeExtractor(@Nullable String pattern, @Nullable String formatt
this.formatter = formatter;
}

public LocalDateTime extract(LinkedHashMap<String, String> spec) {
return extract(new ArrayList<>(spec.keySet()), new ArrayList<>(spec.values()));
}

public LocalDateTime extract(List<String> partitionKeys, List<?> partitionValues) {
LocalDateTime dateTime = null;
@Nullable
public LocalDateTime extract(
List<String> partitionKeys, List<?> partitionValues, boolean ignoreException) {
try {
String timestampString;
if (pattern == null) {
Expand All @@ -112,22 +107,31 @@ public LocalDateTime extract(List<String> partitionKeys, List<?> partitionValues
partitionValues.get(i).toString());
}
}
dateTime = toLocalDateTime(timestampString, this.formatter);
return toLocalDateTime(timestampString, this.formatter);
} catch (Exception e) {
String partitionInfos =
IntStream.range(0, partitionKeys.size())
.mapToObj(i -> partitionKeys.get(i) + ":" + partitionValues.get(i))
.collect(Collectors.joining(","));
LOG.warn(
"Partition {} can't uses '{}' formatter to extract datetime to expire."
+ " Please check the partition expiration configuration or"
+ " manually delete the partition using the drop-partition command or"
+ " use 'update-time' expiration strategy by set {}, the strategy support non-date formatted partition.",
partitionInfos,
this.formatter,
CoreOptions.PARTITION_EXPIRATION_STRATEGY.key());
String message =
String.format(
"Can't extract datetime from partition '%s' with formatter '%s' and pattern '%s'.",
partitionInfos, this.formatter, this.pattern);
if (ignoreException) {
LOG.warn(
"{}. If you want to configure partition expiration, please:\n"
+ " 1. Check the expiration configuration.\n"
+ " 2. Manually delete the partition using the drop-partition command if the partition"
+ " value is non-date formatted.\n"
+ " 3. Use '{}' expiration strategy by set '{}', which supports non-date formatted partition.",
message,
CoreOptions.PartitionExpireStrategy.UPDATE_TIME,
CoreOptions.PARTITION_EXPIRATION_STRATEGY.key());
return null;
} else {
throw new RuntimeException(message, e);
}
}
return dateTime;
}

private static LocalDateTime toLocalDateTime(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ private PartitionValuesTimePredicate(LocalDateTime expireDateTime) {
@Override
public boolean test(BinaryRow partition) {
Object[] array = convertPartition(partition);
LocalDateTime partTime = timeExtractor.extract(partitionKeys, Arrays.asList(array));
LocalDateTime partTime =
timeExtractor.extract(partitionKeys, Arrays.asList(array), true);
return partTime != null && expireDateTime.isAfter(partTime);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@

package org.apache.paimon.partition;

import org.apache.paimon.testutils.assertj.PaimonAssertions;

import org.junit.jupiter.api.Test;

import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.Collections;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** Test for {@link PartitionTimeExtractor}. */
public class PartitionTimeExtractorTest {
Expand All @@ -35,23 +38,29 @@ public void testDefault() {
assertThat(
extractor.extract(
Collections.emptyList(),
Collections.singletonList("2023-01-01 20:08:08")))
Collections.singletonList("2023-01-01 20:08:08"),
true))
.isEqualTo(LocalDateTime.parse("2023-01-01T20:08:08"));

assertThat(
extractor.extract(
Collections.emptyList(),
Collections.singletonList("2023-1-1 20:08:08")))
Collections.singletonList("2023-1-1 20:08:08"),
true))
.isEqualTo(LocalDateTime.parse("2023-01-01T20:08:08"));

assertThat(
extractor.extract(
Collections.emptyList(), Collections.singletonList("2023-01-01")))
Collections.emptyList(),
Collections.singletonList("2023-01-01"),
true))
.isEqualTo(LocalDateTime.parse("2023-01-01T00:00:00"));

assertThat(
extractor.extract(
Collections.emptyList(), Collections.singletonList("2023-1-1")))
Collections.emptyList(),
Collections.singletonList("2023-1-1"),
true))
.isEqualTo(LocalDateTime.parse("2023-01-01T00:00:00"));
}

Expand All @@ -62,20 +71,24 @@ public void testPattern() {
assertThat(
extractor.extract(
Arrays.asList("year", "month", "day"),
Arrays.asList("2023", "01", "01")))
Arrays.asList("2023", "01", "01"),
true))
.isEqualTo(LocalDateTime.parse("2023-01-01T00:00:00"));

extractor = new PartitionTimeExtractor("$year-$month-$day $hour:00:00", null);
assertThat(
extractor.extract(
Arrays.asList("year", "month", "day", "hour"),
Arrays.asList("2023", "01", "01", "01")))
Arrays.asList("2023", "01", "01", "01"),
true))
.isEqualTo(LocalDateTime.parse("2023-01-01T01:00:00"));

extractor = new PartitionTimeExtractor("$dt", null);
assertThat(
extractor.extract(
Arrays.asList("other", "dt"), Arrays.asList("dummy", "2023-01-01")))
Arrays.asList("other", "dt"),
Arrays.asList("dummy", "2023-01-01"),
true))
.isEqualTo(LocalDateTime.parse("2023-01-01T00:00:00"));
}

Expand All @@ -84,7 +97,30 @@ public void testFormatter() {
PartitionTimeExtractor extractor = new PartitionTimeExtractor(null, "yyyyMMdd");
assertThat(
extractor.extract(
Collections.emptyList(), Collections.singletonList("20230101")))
Collections.emptyList(),
Collections.singletonList("20230101"),
true))
.isEqualTo(LocalDateTime.parse("2023-01-01T00:00:00"));
}

@Test
public void testExtractNonDateFormattedPartition() {
PartitionTimeExtractor extractor = new PartitionTimeExtractor("$ds", "yyyyMMdd");
assertThat(
extractor.extract(
Collections.singletonList("ds"),
Collections.singletonList("unknown"),
true))
.isNull();
assertThatThrownBy(
() ->
extractor.extract(
Collections.singletonList("ds"),
Collections.singletonList("unknown"),
false))
.satisfies(
PaimonAssertions.anyCauseMatches(
RuntimeException.class,
"Can't extract datetime from partition 'ds:unknown' with formatter 'yyyyMMdd' and pattern '$ds'."));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -122,9 +123,14 @@ public List<String> donePartitions(boolean endInput, long currentTimeMillis) {
String partition = entry.getKey();

long lastUpdateTime = entry.getValue();
LinkedHashMap<String, String> partitionSpec =
extractPartitionSpecFromPath(new Path(partition));
long partitionStartTime =
timeExtractor
.extract(extractPartitionSpecFromPath(new Path(partition)))
.extract(
new ArrayList<>(partitionSpec.keySet()),
new ArrayList<>(partitionSpec.values()),
false)
.atZone(ZoneId.systemDefault())
.toInstant()
.toEpochMilli();
Expand Down

0 comments on commit 6ead384

Please sign in to comment.