Skip to content

Commit

Permalink
S3 scan enhancements
Browse files Browse the repository at this point in the history
Signed-off-by: Asif Sohail Mohammed <[email protected]>
  • Loading branch information
asifsmohammed committed Jul 20, 2023
1 parent 90ee648 commit 2dd22f3
Show file tree
Hide file tree
Showing 24 changed files with 583 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@ public class SourcePartition<T> {

private final String partitionKey;
private final T partitionState;
private final Long partitionClosedCount;

private SourcePartition(final Builder<T> builder) {
Objects.requireNonNull(builder.partitionKey);

this.partitionKey = builder.partitionKey;
this.partitionState = builder.partitionState;
this.partitionClosedCount = builder.partitionClosedCount;
}

public String getPartitionKey() {
Expand All @@ -34,6 +36,10 @@ public Optional<T> getPartitionState() {
return Optional.ofNullable(partitionState);
}

public Long getPartitionClosedCount() {
return partitionClosedCount;
}

public static <T> Builder<T> builder(Class<T> clazz) {
return new Builder<>(clazz);
}
Expand All @@ -42,6 +48,7 @@ public static class Builder<T> {

private String partitionKey;
private T partitionState;
private Long partitionClosedCount;

public Builder(Class<T> clazz) {

Expand All @@ -57,6 +64,11 @@ public Builder<T> withPartitionState(final T partitionState) {
return this;
}

public Builder<T> withPartitionClosedCount(final Long partitionClosedCount) {
this.partitionClosedCount = partitionClosedCount;
return this;
}

public SourcePartition<T> build() {
return new SourcePartition<T>(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.junit.jupiter.api.Test;

import java.util.Random;
import java.util.UUID;

import static org.hamcrest.MatcherAssert.assertThat;
Expand All @@ -31,15 +32,18 @@ void sourcePartitionBuilderWithNullPartitionThrowsNullPointerException() {
void sourcePartitionBuilder_returns_expected_SourcePartition() {
final String partitionKey = UUID.randomUUID().toString();
final String partitionState = UUID.randomUUID().toString();
final Long partitionClosedCount = new Random().nextLong();

final SourcePartition<String> sourcePartition = SourcePartition.builder(String.class)
.withPartitionKey(partitionKey)
.withPartitionState(partitionState)
.withPartitionClosedCount(partitionClosedCount)
.build();

assertThat(sourcePartition, notNullValue());
assertThat(sourcePartition.getPartitionKey(), equalTo(partitionKey));
assertThat(sourcePartition.getPartitionState().isPresent(), equalTo(true));
assertThat(sourcePartition.getPartitionState().get(), equalTo(partitionState));
assertThat(sourcePartition.getPartitionClosedCount(), equalTo(partitionClosedCount));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ public Optional<SourcePartition<T>> getNextPartition(final Function<Map<String,
final SourcePartition<T> sourcePartition = SourcePartition.builder(partitionProgressStateClass)
.withPartitionKey(ownedPartitions.get().getSourcePartitionKey())
.withPartitionState(convertStringToPartitionProgressStateClass(ownedPartitions.get().getPartitionProgressState()))
.withPartitionClosedCount(ownedPartitions.get().getClosedCount())
.build();

partitionManager.setActivePartition(sourcePartition);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ private String getKeyString(final RecordsGenerator recordsGenerator, final int n

private void parseObject(final String key, final S3ObjectWorker objectUnderTest) throws IOException {
final S3ObjectReference s3ObjectReference = S3ObjectReference.bucketAndKey(bucket, key).build();
objectUnderTest.parseS3Object(s3ObjectReference, null);
objectUnderTest.parseS3Object(s3ObjectReference, null, null, null);
}

static class IntegrationTestArguments implements ArgumentsProvider {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.configuration.PluginModel;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
Expand All @@ -31,7 +32,6 @@
import org.opensearch.dataprepper.parser.model.SourceCoordinationConfig;
import org.opensearch.dataprepper.plugins.codec.CompressionOption;
import org.opensearch.dataprepper.plugins.source.configuration.S3ScanBucketOption;
import org.opensearch.dataprepper.plugins.source.configuration.S3ScanSchedulingOptions;
import org.opensearch.dataprepper.plugins.source.configuration.S3SelectCSVOption;
import org.opensearch.dataprepper.plugins.source.configuration.S3SelectJsonOption;
import org.opensearch.dataprepper.plugins.source.configuration.S3SelectSerializationFormatOption;
Expand Down Expand Up @@ -90,7 +90,11 @@ public class S3ScanObjectWorkerIT {

private SourceCoordinator<S3SourceProgressState> sourceCoordinator;
@Mock
private S3ScanSchedulingOptions s3ScanSchedulingOptions;
private S3SourceConfig s3SourceConfig;
@Mock
private AcknowledgementSetManager acknowledgementSetManager;
@Mock
private S3ObjectDeleteWorker s3ObjectDeleteWorker;

private S3ObjectHandler createObjectUnderTest(final S3ObjectRequest s3ObjectRequest){
if(Objects.nonNull(s3ObjectRequest.getExpression()))
Expand Down Expand Up @@ -177,7 +181,7 @@ private ScanObjectWorker createObjectUnderTest(final RecordsGenerator recordsGen
.compressionType(shouldCompress ? CompressionType.GZIP : CompressionType.NONE)
.s3SelectResponseHandlerFactory(new S3SelectResponseHandlerFactory()).build();
return new ScanObjectWorker(s3Client,List.of(scanOptions),createObjectUnderTest(s3ObjectRequest)
,bucketOwnerProvider, sourceCoordinator, s3ScanSchedulingOptions);
,bucketOwnerProvider, sourceCoordinator, s3SourceConfig, acknowledgementSetManager, s3ObjectDeleteWorker);
}

@ParameterizedTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ private String getKeyString(final RecordsGenerator recordsGenerator, final int n

private void parseObject(final String key, final S3SelectObjectWorker objectUnderTest) throws IOException {
final S3ObjectReference s3ObjectReference = S3ObjectReference.bucketAndKey(bucket, key).build();
objectUnderTest.parseS3Object(s3ObjectReference,null);
objectUnderTest.parseS3Object(s3ObjectReference,null, null, null);
}

static class IntegrationTestArguments implements ArgumentsProvider {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package org.opensearch.dataprepper.plugins.source;

import io.micrometer.core.instrument.Counter;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
import software.amazon.awssdk.services.s3.model.DeleteObjectResponse;

public class S3ObjectDeleteWorker {
private static final Logger LOG = LoggerFactory.getLogger(S3ObjectDeleteWorker.class);
static final String S3_OBJECTS_DELETED_METRIC_NAME = "s3ObjectsDeleted";
static final String S3_OBJECTS_DELETE_FAILED_METRIC_NAME = "s3ObjectsDeleteFailed";
private final S3Client s3Client;
private final Counter s3ObjectsDeletedCounter;
private final Counter s3ObjectsDeleteFailedCounter;

public S3ObjectDeleteWorker(final S3Client s3Client, final PluginMetrics pluginMetrics) {
this.s3Client = s3Client;

s3ObjectsDeletedCounter = pluginMetrics.counter(S3_OBJECTS_DELETED_METRIC_NAME);
s3ObjectsDeleteFailedCounter = pluginMetrics.counter(S3_OBJECTS_DELETE_FAILED_METRIC_NAME);
}

public void deleteS3Object(final DeleteObjectRequest deleteObjectRequest) {
try {
final DeleteObjectResponse deleteObjectResponse = s3Client.deleteObject(deleteObjectRequest);
if (deleteObjectResponse.sdkHttpResponse().isSuccessful()) {
LOG.info("Deleted object: {} in S3 bucket: {}. ", deleteObjectRequest.key(), deleteObjectRequest.bucket());
s3ObjectsDeletedCounter.increment();
} else {
s3ObjectsDeleteFailedCounter.increment();
}
} catch (final SdkException e) {
LOG.error("Failed to delete object: {} from S3 bucket: {}. ", deleteObjectRequest.key(), deleteObjectRequest.bucket(), e);
s3ObjectsDeleteFailedCounter.increment();
}
}

public DeleteObjectRequest buildDeleteObjectRequest(final String bucketName, final String key) {
return DeleteObjectRequest.builder()
.bucket(bucketName)
.key(key)
.build();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package org.opensearch.dataprepper.plugins.source;

import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator;

import java.io.IOException;

Expand All @@ -21,5 +22,7 @@ public interface S3ObjectHandler {
* @throws IOException exception is thrown every time because this is not supported
*/
void parseS3Object(final S3ObjectReference s3ObjectReference,
final AcknowledgementSet acknowledgementSet) throws IOException;
final AcknowledgementSet acknowledgementSet,
final SourceCoordinator<S3SourceProgressState> sourceCoordinator,
final String partitionKey) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.opensearch.dataprepper.model.codec.InputCodec;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator;
import org.opensearch.dataprepper.plugins.codec.CompressionOption;
import org.opensearch.dataprepper.plugins.source.ownership.BucketOwnerProvider;
import org.slf4j.Logger;
Expand All @@ -20,6 +21,7 @@
import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;

/**
Expand All @@ -28,6 +30,7 @@
*/
class S3ObjectWorker implements S3ObjectHandler {
private static final Logger LOG = LoggerFactory.getLogger(S3ObjectWorker.class);
static final int RECORDS_TO_ACCUMULATE_TO_SAVE_STATE = 10_000;
private final S3Client s3Client;
private final Buffer<Record<Event>> buffer;

Expand All @@ -51,11 +54,14 @@ public S3ObjectWorker(final S3ObjectRequest s3ObjectRequest) {
this.s3ObjectPluginMetrics = s3ObjectRequest.getS3ObjectPluginMetrics();
}

public void parseS3Object(final S3ObjectReference s3ObjectReference, final AcknowledgementSet acknowledgementSet) throws IOException {
public void parseS3Object(final S3ObjectReference s3ObjectReference,
final AcknowledgementSet acknowledgementSet,
final SourceCoordinator<S3SourceProgressState> sourceCoordinator,
final String partitionKey) throws IOException {
final BufferAccumulator<Record<Event>> bufferAccumulator = BufferAccumulator.create(buffer, numberOfRecordsToAccumulate, bufferTimeout);
try {
s3ObjectPluginMetrics.getS3ObjectReadTimer().recordCallable((Callable<Void>) () -> {
doParseObject(acknowledgementSet, s3ObjectReference, bufferAccumulator);
doParseObject(acknowledgementSet, s3ObjectReference, bufferAccumulator, sourceCoordinator, partitionKey);
return null;
});
} catch (final IOException | RuntimeException e) {
Expand All @@ -68,9 +74,12 @@ public void parseS3Object(final S3ObjectReference s3ObjectReference, final Ackno
s3ObjectPluginMetrics.getS3ObjectsSucceededCounter().increment();
}

private void doParseObject(final AcknowledgementSet acknowledgementSet, final S3ObjectReference s3ObjectReference, final BufferAccumulator<Record<Event>> bufferAccumulator) throws IOException {
private void doParseObject(final AcknowledgementSet acknowledgementSet,
final S3ObjectReference s3ObjectReference,
final BufferAccumulator<Record<Event>> bufferAccumulator,
final SourceCoordinator<S3SourceProgressState> sourceCoordinator,
final String partitionKey) throws IOException {
final long s3ObjectSize;
final long totalBytesRead;

LOG.info("Read S3 object: {}", s3ObjectReference);

Expand All @@ -79,6 +88,7 @@ private void doParseObject(final AcknowledgementSet acknowledgementSet, final S3
final CompressionOption fileCompressionOption = compressionOption != CompressionOption.AUTOMATIC ?
compressionOption : CompressionOption.fromFileName(s3ObjectReference.getKey());

final AtomicInteger saveStateCounter = new AtomicInteger();
try {
s3ObjectSize = inputFile.getLength();

Expand All @@ -89,6 +99,12 @@ private void doParseObject(final AcknowledgementSet acknowledgementSet, final S3
if (acknowledgementSet != null) {
acknowledgementSet.add(record.getData());
}
int recordsWrittenAfterLastSaveState = bufferAccumulator.getTotalWritten() - saveStateCounter.get() * RECORDS_TO_ACCUMULATE_TO_SAVE_STATE;
// Saving state to renew source coordination ownership for every 10,000 records, ownership time is 10 minutes
if (recordsWrittenAfterLastSaveState >= RECORDS_TO_ACCUMULATE_TO_SAVE_STATE && sourceCoordinator != null && partitionKey != null) {
sourceCoordinator.saveProgressStateForPartition(partitionKey, null);
saveStateCounter.getAndIncrement();
}
} catch (final Exception e) {
LOG.error("Failed writing S3 objects to buffer due to: {}", e.getMessage());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
*/
package org.opensearch.dataprepper.plugins.source;

import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator;
import org.opensearch.dataprepper.plugins.source.configuration.S3ScanBucketOptions;
import org.opensearch.dataprepper.plugins.source.configuration.S3ScanSchedulingOptions;
import org.opensearch.dataprepper.plugins.source.ownership.BucketOwnerProvider;

import java.time.Duration;
Expand All @@ -19,8 +19,8 @@
* objects and spawn a thread {@link S3SelectObjectWorker}
*/
public class S3ScanService {
private final S3SourceConfig s3SourceConfig;
private final List<S3ScanBucketOptions> s3ScanBucketOptions;
private final S3ScanSchedulingOptions s3ScanSchedulingOptions;
private final S3ClientBuilderFactory s3ClientBuilderFactory;
private final LocalDateTime endDateTime;

Expand All @@ -32,26 +32,32 @@ public class S3ScanService {

private final BucketOwnerProvider bucketOwnerProvider;
private final SourceCoordinator<S3SourceProgressState> sourceCoordinator;
private final AcknowledgementSetManager acknowledgementSetManager;
private final S3ObjectDeleteWorker s3ObjectDeleteWorker;

public S3ScanService(final S3SourceConfig s3SourceConfig,
final S3ClientBuilderFactory s3ClientBuilderFactory,
final S3ObjectHandler s3ObjectHandler,
final BucketOwnerProvider bucketOwnerProvider,
final SourceCoordinator<S3SourceProgressState> sourceCoordinator) {
final SourceCoordinator<S3SourceProgressState> sourceCoordinator,
final AcknowledgementSetManager acknowledgementSetManager,
final S3ObjectDeleteWorker s3ObjectDeleteWorker) {
this.s3SourceConfig = s3SourceConfig;
this.s3ScanBucketOptions = s3SourceConfig.getS3ScanScanOptions().getBuckets();
this.s3ClientBuilderFactory = s3ClientBuilderFactory;
this.endDateTime = s3SourceConfig.getS3ScanScanOptions().getEndTime();
this.startDateTime = s3SourceConfig.getS3ScanScanOptions().getStartTime();
this.range = s3SourceConfig.getS3ScanScanOptions().getRange();
this.s3ScanSchedulingOptions = s3SourceConfig.getS3ScanScanOptions().getSchedulingOptions();
this.s3ObjectHandler = s3ObjectHandler;
this.bucketOwnerProvider = bucketOwnerProvider;
this.sourceCoordinator = sourceCoordinator;
this.acknowledgementSetManager = acknowledgementSetManager;
this.s3ObjectDeleteWorker = s3ObjectDeleteWorker;
}

public void start() {
scanObjectWorkerThread = new Thread(new ScanObjectWorker(s3ClientBuilderFactory.getS3Client(),
getScanOptions(),s3ObjectHandler,bucketOwnerProvider, sourceCoordinator, s3ScanSchedulingOptions));
getScanOptions(),s3ObjectHandler,bucketOwnerProvider, sourceCoordinator, s3SourceConfig, acknowledgementSetManager, s3ObjectDeleteWorker));
scanObjectWorkerThread.start();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.log.JacksonLog;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator;
import org.opensearch.dataprepper.plugins.source.configuration.S3SelectCSVOption;
import org.opensearch.dataprepper.plugins.source.configuration.S3SelectJsonOption;
import org.opensearch.dataprepper.plugins.source.configuration.S3SelectSerializationFormatOption;
Expand Down Expand Up @@ -99,7 +100,10 @@ public S3SelectObjectWorker(final S3ObjectRequest s3ObjectRequest) {
this.bucketOwnerProvider = s3ObjectRequest.getBucketOwnerProvider();
}

public void parseS3Object(final S3ObjectReference s3ObjectReference, final AcknowledgementSet acknowledgementSet) throws IOException {
public void parseS3Object(final S3ObjectReference s3ObjectReference,
final AcknowledgementSet acknowledgementSet,
final SourceCoordinator<S3SourceProgressState> sourceCoordinator,
final String partitionKey) throws IOException {
try{
LOG.info("Read S3 object: {}", s3ObjectReference);
selectObject(s3ObjectReference, acknowledgementSet);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public class S3Service {
}

void addS3Object(final S3ObjectReference s3ObjectReference, AcknowledgementSet acknowledgementSet) throws IOException {
s3ObjectHandler.parseS3Object(s3ObjectReference, acknowledgementSet);
s3ObjectHandler.parseS3Object(s3ObjectReference, acknowledgementSet, null, null);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ public void start(Buffer<Record<Event>> buffer) {
s3SourceConfig.getBufferTimeout(), s3ObjectPluginMetrics);
final BiConsumer<Event, S3ObjectReference> eventMetadataModifier = new EventMetadataModifier(
s3SourceConfig.getMetadataRootKey());
final S3ObjectDeleteWorker s3ObjectDeleteWorker = new S3ObjectDeleteWorker(s3ClientBuilderFactory.getS3Client(), pluginMetrics);

if (s3SelectOptional.isPresent()) {
S3SelectCSVOption csvOption = (s3SelectOptional.get().getS3SelectCSVOption() != null) ?
s3SelectOptional.get().getS3SelectCSVOption() : new S3SelectCSVOption();
Expand Down Expand Up @@ -122,7 +124,7 @@ public void start(Buffer<Record<Event>> buffer) {
sqsService.start();
}
if(s3ScanScanOptional.isPresent()) {
s3ScanService = new S3ScanService(s3SourceConfig,s3ClientBuilderFactory,s3Handler,bucketOwnerProvider, sourceCoordinator);
s3ScanService = new S3ScanService(s3SourceConfig, s3ClientBuilderFactory, s3Handler, bucketOwnerProvider, sourceCoordinator, acknowledgementSetManager, s3ObjectDeleteWorker);
s3ScanService.start();
}
}
Expand Down
Loading

0 comments on commit 2dd22f3

Please sign in to comment.