Skip to content

Commit

Permalink
Added support for additional time comibinations in s3 scan
Browse files Browse the repository at this point in the history
  • Loading branch information
asifsmohammed committed Aug 14, 2023
1 parent a5c4fe2 commit 52f9ea4
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,15 @@ private LocalDateTime instantToLocalDateTime(final Instant instant) {
*/
private boolean isKeyMatchedBetweenTimeRange(final LocalDateTime lastModifiedTime,
final LocalDateTime startDateTime,
final LocalDateTime endDateTime){
if (Objects.isNull(startDateTime) || Objects.isNull(endDateTime) || Objects.nonNull(schedulingOptions)) {
final LocalDateTime endDateTime) {
if (Objects.nonNull(schedulingOptions)) {
return true;
} else if (Objects.isNull(startDateTime) && Objects.isNull(endDateTime)) {
return true;
} else if (Objects.isNull(startDateTime)) {
return lastModifiedTime.isBefore(endDateTime);
} else if (Objects.isNull(endDateTime)) {
return lastModifiedTime.isAfter(startDateTime);
}
return lastModifiedTime.isAfter(startDateTime) && lastModifiedTime.isBefore(endDateTime);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
package org.opensearch.dataprepper.plugins.source;

import org.opensearch.dataprepper.plugins.source.configuration.S3ScanBucketOption;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.time.LocalDateTime;
Expand All @@ -15,7 +17,7 @@
* Class consists the scan related properties.
*/
public class ScanOptions {

private static final Logger LOG = LoggerFactory.getLogger(ScanOptions.class);
private LocalDateTime startDateTime;

private Duration range;
Expand Down Expand Up @@ -102,21 +104,22 @@ public ScanOptions build() {
.filter(Objects::nonNull)
.count();

if (nonNullCount == 0 || nonNullCount == 2) {
setDateTimeToUse(bucketStartDateTime, bucketEndDateTime, bucketRange);
} else if (nonNullCount == 3) {
long originalBucketLevelNonNullCount = Stream.of(
bucketOption.getStartTime(), bucketOption.getEndTime(), bucketOption.getRange())
.filter(Objects::nonNull)
.count();
long originalBucketLevelNonNullCount = Stream.of(
bucketOption.getStartTime(), bucketOption.getEndTime(), bucketOption.getRange())
.filter(Objects::nonNull)
.count();

if (originalBucketLevelNonNullCount == 2) {
setDateTimeToUse(bucketOption.getStartTime(), bucketOption.getEndTime(), bucketOption.getRange());
} else {
if (nonNullCount == 3) {
if (originalBucketLevelNonNullCount == 3) {
scanRangeDateValidationError();
} else if (originalBucketLevelNonNullCount == 2) {
setDateTimeToUse(bucketOption.getStartTime(), bucketOption.getEndTime(), bucketOption.getRange());
} else if (originalBucketLevelNonNullCount == 1) {
// Use start_time, end_time, range from global level options, because we are unable to build it from single option configured at bucket level
setDateTimeToUse(startDateTime, endDateTime, range);
}
} else {
scanRangeDateValidationError();
setDateTimeToUse(bucketStartDateTime, bucketEndDateTime, bucketRange);
}
return new ScanOptions(this);
}
Expand All @@ -132,10 +135,17 @@ private void setDateTimeToUse(LocalDateTime bucketStartDateTime, LocalDateTime b
} else if (Objects.nonNull(bucketEndDateTime) && Objects.nonNull(bucketRange)) {
this.useStartDateTime = bucketEndDateTime.minus(bucketRange);
this.useEndDateTime = bucketEndDateTime;
} else if (Objects.nonNull(bucketStartDateTime)) {
this.useStartDateTime = bucketStartDateTime;
} else if (Objects.nonNull(bucketEndDateTime)) {
this.useEndDateTime = bucketEndDateTime;
} else if (Objects.nonNull(bucketRange)) {
LOG.info("Scan is configured with just range for the bucket with name {}, unable to establish a time period with range alone. " +
"Configure start_time or end_time, else all the objects in the bucket will be included", bucketOption.getName());
}
}

private void scanRangeDateValidationError(){
private void scanRangeDateValidationError() {
String message = "To set a time range for the bucket with name " + bucketOption.getName() +
", specify any two configurations from start_time, end_time and range";
throw new IllegalArgumentException(message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@
package org.opensearch.dataprepper.plugins.source.configuration;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.Valid;

/**
* Class consists the bucket properties.
*/
public class S3ScanBucketOptions {
@JsonProperty("bucket")
@Valid
private S3ScanBucketOption scanBucketOption;

public S3ScanBucketOption getS3ScanBucketOption() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,6 @@ public void s3scan_options_with_valid_global_time_range_build_success(
assertThat(scanOptions.getBucketOption(), instanceOf(S3ScanBucketOption.class));
}

@ParameterizedTest
@MethodSource("invalidTimeRangeOptions")
public void s3scan_options_with_invalid_global_time_range_throws_exception_when_build(
LocalDateTime startDateTime, LocalDateTime endDateTime, Duration range) {
assertThrows(IllegalArgumentException.class, () -> ScanOptions.builder()
.setStartDateTime(startDateTime)
.setEndDateTime(endDateTime)
.setRange(range)
.setBucketOption(new S3ScanBucketOption())
.build());
}

@ParameterizedTest
@MethodSource("validBucketTimeRangeOptions")
public void s3scan_options_with_valid_bucket_time_range_build_success(
Expand Down Expand Up @@ -139,17 +127,16 @@ private static Stream<Arguments> validGlobalTimeRangeOptions() {
LocalDateTime.parse("2023-01-21T18:00:00"), LocalDateTime.parse("2023-01-24T18:00:00")),
Arguments.of(null, LocalDateTime.parse("2023-01-24T18:00:00"), Duration.parse("P3D"),
LocalDateTime.parse("2023-01-21T18:00:00"), LocalDateTime.parse("2023-01-24T18:00:00")),
Arguments.of(LocalDateTime.parse("2023-01-21T18:00:00"), null, null, LocalDateTime.parse("2023-01-21T18:00:00"), null),
Arguments.of(null, LocalDateTime.parse("2023-01-21T18:00:00"), null, null, LocalDateTime.parse("2023-01-21T18:00:00")),
Arguments.of(null, null, null, null, null)
);
);
}

private static Stream<Arguments> invalidTimeRangeOptions() {
return Stream.of(
Arguments.of(LocalDateTime.parse("2023-01-21T18:00:00"), LocalDateTime.parse("2023-04-21T18:00:00"),
Duration.parse("P90DT3H4M")),
Arguments.of(LocalDateTime.parse("2023-01-21T18:00:00"), null, null),
Arguments.of(null, LocalDateTime.parse("2023-04-21T18:00:00"), null),
Arguments.of(null, null, Duration.parse("P90DT3H4M"))
Duration.parse("P90DT3H4M"))
);
}

Expand All @@ -161,6 +148,8 @@ private static Stream<Arguments> validBucketTimeRangeOptions() {
LocalDateTime.parse("2023-01-21T18:00:00"), LocalDateTime.parse("2023-01-24T18:00:00")),
Arguments.of(null, LocalDateTime.parse("2023-01-24T18:00:00"), Duration.ofDays(3L),
LocalDateTime.parse("2023-01-21T18:00:00"), LocalDateTime.parse("2023-01-24T18:00:00")),
Arguments.of(LocalDateTime.parse("2023-01-21T18:00:00"), null, null, LocalDateTime.parse("2023-01-21T18:00:00"), null),
Arguments.of(null, LocalDateTime.parse("2023-01-21T18:00:00"), null, null, LocalDateTime.parse("2023-01-21T18:00:00")),
Arguments.of(null, null, null, null, null)
);
}
Expand All @@ -186,30 +175,20 @@ private static Stream<Arguments> validCombinedTimeRangeOptions() {
Arguments.of(
LocalDateTime.parse("2023-01-21T18:00:00"), LocalDateTime.parse("2023-01-24T18:00:00"), null,
LocalDateTime.parse("2023-05-21T18:00:00"), null, Duration.ofDays(3L),
LocalDateTime.parse("2023-05-21T18:00:00"), LocalDateTime.parse("2023-05-24T18:00:00"))
LocalDateTime.parse("2023-05-21T18:00:00"), LocalDateTime.parse("2023-05-24T18:00:00")),
Arguments.of(
LocalDateTime.parse("2023-01-21T18:00:00"), LocalDateTime.parse("2023-01-24T18:00:00"), null,
null, null, Duration.ofDays(3L),
LocalDateTime.parse("2023-01-21T18:00:00"), LocalDateTime.parse("2023-01-24T18:00:00")),
Arguments.of(
LocalDateTime.parse("2023-01-21T18:00:00"), LocalDateTime.parse("2023-12-24T18:00:00"), null,
LocalDateTime.parse("2023-05-21T18:00:00"), null, null,
LocalDateTime.parse("2023-05-21T18:00:00"), LocalDateTime.parse("2023-12-24T18:00:00"))
);
}

private static Stream<Arguments> invalidCombinedTimeRangeOptions() {
return Stream.of(
Arguments.of(
LocalDateTime.parse("2023-05-21T18:00:00"), null, null,
LocalDateTime.parse("2023-05-24T18:00:00"), null, null),
Arguments.of(
null, LocalDateTime.parse("2023-05-24T18:00:00"), null,
null, LocalDateTime.parse("2023-05-21T18:00:00"), null),
Arguments.of(
null, null, Duration.ofDays(3L),
null, null, Duration.ofDays(3L)),
Arguments.of(
LocalDateTime.parse("2023-01-21T18:00:00"), LocalDateTime.parse("2023-01-24T18:00:00"), null,
null, null, Duration.ofDays(3L)),
Arguments.of(
LocalDateTime.parse("2023-01-21T18:00:00"), null, Duration.ofDays(3L),
null, LocalDateTime.parse("2023-05-24T18:00:00"), null),
Arguments.of(
null, LocalDateTime.parse("2023-01-24T18:00:00"), Duration.ofDays(3L),
LocalDateTime.parse("2023-05-21T18:00:00"), null, null),
Arguments.of(
LocalDateTime.parse("2023-01-21T18:00:00"), LocalDateTime.parse("2023-01-24T18:00:00"), Duration.ofDays(3L),
LocalDateTime.parse("2023-05-21T18:00:00"), LocalDateTime.parse("2023-05-24T18:00:00"), Duration.ofDays(3L))
Expand Down

0 comments on commit 52f9ea4

Please sign in to comment.