Skip to content

Commit

Permalink
Add bucket ownership validation support to s3 sink
Browse files Browse the repository at this point in the history
Signed-off-by: Taylor Gray <[email protected]>

Add bucket ownership validations for s3 sink

Signed-off-by: Taylor Gray <[email protected]>
  • Loading branch information
graytaylor0 committed May 6, 2024
1 parent f6a06a0 commit 842e130
Show file tree
Hide file tree
Showing 32 changed files with 624 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.opensearch.dataprepper.plugins.sink.s3.configuration.ThresholdOptions;
import org.opensearch.dataprepper.plugins.sink.s3.grouping.S3GroupIdentifierFactory;
import org.opensearch.dataprepper.plugins.sink.s3.grouping.S3GroupManager;
import org.opensearch.dataprepper.plugins.sink.s3.ownership.BucketOwnerProvider;
import software.amazon.awssdk.core.ResponseBytes;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3AsyncClient;
Expand Down Expand Up @@ -140,6 +141,9 @@ class S3SinkServiceIT {
@Mock
private ExpressionEvaluator expressionEvaluator;

@Mock
private BucketOwnerProvider bucketOwnerProvider;

private OutputCodec codec;
private KeyGenerator keyGenerator;

Expand Down Expand Up @@ -270,7 +274,7 @@ void verify_flushed_records_into_s3_bucketNewLine_with_compression() throws IOEx
private S3SinkService createObjectUnderTest() {
OutputCodecContext codecContext = new OutputCodecContext("Tag", Collections.emptyList(), Collections.emptyList());
final S3GroupIdentifierFactory groupIdentifierFactory = new S3GroupIdentifierFactory(keyGenerator, expressionEvaluator, s3SinkConfig);
s3GroupManager = new S3GroupManager(s3SinkConfig, groupIdentifierFactory, bufferFactory, codecFactory, s3AsyncClient);
s3GroupManager = new S3GroupManager(s3SinkConfig, groupIdentifierFactory, bufferFactory, codecFactory, s3AsyncClient, bucketOwnerProvider);

return new S3SinkService(s3SinkConfig, codecContext, Duration.ofSeconds(5), pluginMetrics, s3GroupManager);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@


import org.apache.parquet.io.PositionOutputStream;
import org.opensearch.dataprepper.plugins.sink.s3.ownership.BucketOwnerProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.async.AsyncRequestBody;
Expand Down Expand Up @@ -59,6 +60,9 @@ public class S3OutputStream extends PositionOutputStream {
private final byte[] buf;

private final S3AsyncClient s3Client;

private final BucketOwnerProvider bucketOwnerProvider;

/**
* Collection of the etags for the parts that have been uploaded
*/
Expand Down Expand Up @@ -93,7 +97,8 @@ public class S3OutputStream extends PositionOutputStream {
public S3OutputStream(final S3AsyncClient s3Client,
final Supplier<String> bucketSupplier,
final Supplier<String> keySupplier,
final String defaultBucket) {
final String defaultBucket,
final BucketOwnerProvider bucketOwnerProvider) {
this.s3Client = s3Client;
this.bucket = bucketSupplier.get();
this.key = keySupplier.get();
Expand All @@ -103,6 +108,7 @@ public S3OutputStream(final S3AsyncClient s3Client,
open = true;
this.defaultBucket = defaultBucket;
this.executorService = Executors.newSingleThreadExecutor();
this.bucketOwnerProvider = bucketOwnerProvider;
}

@Override
Expand Down Expand Up @@ -191,6 +197,7 @@ public CompletableFuture<?> close(final Consumer<Boolean> runOnCompletion, final
.build();
CompleteMultipartUploadRequest completeMultipartUploadRequest = CompleteMultipartUploadRequest.builder()
.bucket(bucket)
.expectedBucketOwner(bucketOwnerProvider.getBucketOwner(defaultBucket).orElse(null))
.key(key)
.uploadId(uploadId)
.multipartUpload(completedMultipartUpload)
Expand Down Expand Up @@ -250,6 +257,7 @@ private void uploadPart() {
int partNumber = etags.size() + 1;
UploadPartRequest uploadRequest = UploadPartRequest.builder()
.bucket(bucket)
.expectedBucketOwner(bucketOwnerProvider.getBucketOwner(defaultBucket).orElse(null))
.key(key)
.uploadId(uploadId)
.partNumber(partNumber)
Expand Down Expand Up @@ -278,6 +286,7 @@ private void createMultipartUpload() {
CreateMultipartUploadRequest uploadRequest = CreateMultipartUploadRequest.builder()
.bucket(bucket)
.key(key)
.expectedBucketOwner(bucketOwnerProvider.getBucketOwner(bucket).orElse(null))
.build();
CompletableFuture<CreateMultipartUploadResponse> multipartUpload = s3Client.createMultipartUpload(uploadRequest);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.opensearch.dataprepper.plugins.sink.s3.compression.CompressionOption;
import org.opensearch.dataprepper.plugins.sink.s3.grouping.S3GroupIdentifierFactory;
import org.opensearch.dataprepper.plugins.sink.s3.grouping.S3GroupManager;
import org.opensearch.dataprepper.plugins.sink.s3.ownership.BucketOwnerProvider;
import org.opensearch.dataprepper.plugins.sink.s3.ownership.ConfigBucketOwnerProviderFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.s3.S3AsyncClient;
Expand Down Expand Up @@ -71,6 +73,8 @@ public S3Sink(final PluginSetting pluginSetting,
this.sinkContext = sinkContext;
final PluginModel codecConfiguration = s3SinkConfig.getCodec();
final CodecFactory codecFactory = new CodecFactory(pluginFactory, codecConfiguration);
final ConfigBucketOwnerProviderFactory configBucketOwnerProviderFactory = new ConfigBucketOwnerProviderFactory();
final BucketOwnerProvider bucketOwnerProvider = configBucketOwnerProviderFactory.createBucketOwnerProvider(s3SinkConfig);

final PluginSetting codecPluginSettings = new PluginSetting(codecConfiguration.getPluginName(),
codecConfiguration.getPluginSettings());
Expand Down Expand Up @@ -112,7 +116,7 @@ public S3Sink(final PluginSetting pluginSetting,
testCodec.validateAgainstCodecContext(s3OutputCodecContext);

final S3GroupIdentifierFactory s3GroupIdentifierFactory = new S3GroupIdentifierFactory(keyGenerator, expressionEvaluator, s3SinkConfig);
final S3GroupManager s3GroupManager = new S3GroupManager(s3SinkConfig, s3GroupIdentifierFactory, bufferFactory, codecFactory, s3Client);
final S3GroupManager s3GroupManager = new S3GroupManager(s3SinkConfig, s3GroupIdentifierFactory, bufferFactory, codecFactory, s3Client, bucketOwnerProvider);


s3SinkService = new S3SinkService(s3SinkConfig, s3OutputCodecContext, RETRY_FLUSH_BACKOFF, pluginMetrics, s3GroupManager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Size;
import org.opensearch.dataprepper.aws.validator.AwsAccountId;
import org.opensearch.dataprepper.model.configuration.PluginModel;
import org.opensearch.dataprepper.plugins.sink.s3.accumulator.BufferTypeOptions;
import org.opensearch.dataprepper.plugins.sink.s3.compression.CompressionOption;
Expand All @@ -18,6 +19,8 @@
import org.opensearch.dataprepper.plugins.sink.s3.configuration.ObjectKeyOptions;
import org.opensearch.dataprepper.plugins.sink.s3.configuration.ThresholdOptions;

import java.util.Map;

/**
* s3 sink configuration class contains properties, used to read yaml configuration.
*/
Expand Down Expand Up @@ -74,6 +77,16 @@ public class S3SinkConfig {
@JsonProperty("max_retries")
private int maxUploadRetries = DEFAULT_UPLOAD_RETRIES;

@JsonProperty("bucket_owners")
private Map<String, @AwsAccountId String> bucketOwners;

@JsonProperty("default_bucket_owner")
@AwsAccountId
private String defaultBucketOwner;

@JsonProperty("disable_bucket_ownership_validation")
private boolean disableBucketOwnershipValidation = false;

/**
* Aws Authentication configuration Options.
* @return aws authentication options.
Expand Down Expand Up @@ -154,4 +167,16 @@ public CompressionOption getCompression() {
}

public String getDefaultBucket() { return defaultBucket; }

public Map<String, String> getBucketOwners() {
return bucketOwners;
}

public String getDefaultBucketOwner() {
return defaultBucketOwner;
}

public boolean isDisableBucketOwnershipValidation() {
return disableBucketOwnershipValidation;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@

package org.opensearch.dataprepper.plugins.sink.s3.accumulator;

import org.opensearch.dataprepper.plugins.sink.s3.ownership.BucketOwnerProvider;
import software.amazon.awssdk.services.s3.S3AsyncClient;

import java.util.function.Supplier;

public interface BufferFactory {
Buffer getBuffer(S3AsyncClient s3Client, Supplier<String> bucketSupplier, Supplier<String> keySupplier, String defaultBucket);
Buffer getBuffer(S3AsyncClient s3Client, Supplier<String> bucketSupplier, Supplier<String> keySupplier, String defaultBucket, BucketOwnerProvider bucketOwnerProvider);
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.dataprepper.plugins.sink.s3.accumulator;

import org.opensearch.dataprepper.plugins.sink.s3.ownership.BucketOwnerProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.async.AsyncRequestBody;
Expand All @@ -25,26 +26,35 @@ class BufferUtilities {
static final String INVALID_BUCKET = "The specified bucket is not valid";

static CompletableFuture<PutObjectResponse> putObjectOrSendToDefaultBucket(final S3AsyncClient s3Client,
final AsyncRequestBody requestBody,
final Consumer<Boolean> runOnCompletion,
final Consumer<Throwable> runOnFailure,
final String objectKey,
final String targetBucket,
final String defaultBucket) {
final AsyncRequestBody requestBody,
final Consumer<Boolean> runOnCompletion,
final Consumer<Throwable> runOnFailure,
final String objectKey,
final String targetBucket,
final String defaultBucket,
final BucketOwnerProvider bucketOwnerProvider) {

final boolean[] defaultBucketAttempted = new boolean[1];
return s3Client.putObject(
PutObjectRequest.builder().bucket(targetBucket).key(objectKey).build(), requestBody)
PutObjectRequest.builder()
.bucket(targetBucket)
.key(objectKey)
.expectedBucketOwner(bucketOwnerProvider.getBucketOwner(targetBucket).orElse(null))
.build(), requestBody)
.handle((result, ex) -> {
if (ex != null) {
runOnFailure.accept(ex);

if (defaultBucket != null &&
(ex instanceof NoSuchBucketException || ex.getMessage().contains(ACCESS_DENIED) || ex.getMessage().contains(INVALID_BUCKET))) {
(ex.getCause() instanceof NoSuchBucketException || ex.getMessage().contains(ACCESS_DENIED) || ex.getMessage().contains(INVALID_BUCKET))) {
LOG.warn("Bucket {} could not be accessed, attempting to send to default_bucket {}", targetBucket, defaultBucket);
defaultBucketAttempted[0] = true;
return s3Client.putObject(
PutObjectRequest.builder().bucket(defaultBucket).key(objectKey).build(),
PutObjectRequest.builder()
.bucket(defaultBucket)
.key(objectKey)
.expectedBucketOwner(bucketOwnerProvider.getBucketOwner(defaultBucket).orElse(null))
.build(),
requestBody);
} else {
runOnCompletion.accept(false);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.opensearch.dataprepper.plugins.sink.s3.accumulator;

import org.opensearch.dataprepper.plugins.sink.s3.codec.BufferedCodec;
import org.opensearch.dataprepper.plugins.sink.s3.ownership.BucketOwnerProvider;
import software.amazon.awssdk.services.s3.S3AsyncClient;

import java.util.function.Supplier;
Expand All @@ -18,8 +19,9 @@ public CodecBufferFactory(BufferFactory innerBufferFactory, BufferedCodec codec)
public Buffer getBuffer(final S3AsyncClient s3Client,
final Supplier<String> bucketSupplier,
final Supplier<String> keySupplier,
final String defaultBucket) {
Buffer innerBuffer = innerBufferFactory.getBuffer(s3Client, bucketSupplier, keySupplier, defaultBucket);
final String defaultBucket,
final BucketOwnerProvider bucketOwnerProvider) {
Buffer innerBuffer = innerBufferFactory.getBuffer(s3Client, bucketSupplier, keySupplier, defaultBucket, bucketOwnerProvider);
return new CodecBuffer(innerBuffer, bufferedCodec);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.opensearch.dataprepper.model.codec.OutputCodec;
import org.opensearch.dataprepper.plugins.sink.s3.compression.CompressionEngine;
import org.opensearch.dataprepper.plugins.sink.s3.ownership.BucketOwnerProvider;
import software.amazon.awssdk.services.s3.S3AsyncClient;

import java.util.Objects;
Expand All @@ -17,7 +18,9 @@ public class CompressionBufferFactory implements BufferFactory {
private final CompressionEngine compressionEngine;
private final boolean compressionInternal;

public CompressionBufferFactory(final BufferFactory innerBufferFactory, final CompressionEngine compressionEngine, final OutputCodec codec) {
public CompressionBufferFactory(final BufferFactory innerBufferFactory,
final CompressionEngine compressionEngine,
final OutputCodec codec) {
this.innerBufferFactory = Objects.requireNonNull(innerBufferFactory);
this.compressionEngine = Objects.requireNonNull(compressionEngine);
compressionInternal = Objects.requireNonNull(codec).isCompressionInternal();
Expand All @@ -27,8 +30,9 @@ public CompressionBufferFactory(final BufferFactory innerBufferFactory, final Co
public Buffer getBuffer(final S3AsyncClient s3Client,
final Supplier<String> bucketSupplier,
final Supplier<String> keySupplier,
final String defaultBucket) {
final Buffer internalBuffer = innerBufferFactory.getBuffer(s3Client, bucketSupplier, keySupplier, defaultBucket);
final String defaultBucket,
final BucketOwnerProvider bucketOwnerProvider) {
final Buffer internalBuffer = innerBufferFactory.getBuffer(s3Client, bucketSupplier, keySupplier, defaultBucket, bucketOwnerProvider);
if(compressionInternal)
return internalBuffer;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.dataprepper.plugins.sink.s3.accumulator;

import org.apache.commons.lang3.time.StopWatch;
import org.opensearch.dataprepper.plugins.sink.s3.ownership.BucketOwnerProvider;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.services.s3.S3AsyncClient;

Expand All @@ -28,6 +29,8 @@ public class InMemoryBuffer implements Buffer {
private final S3AsyncClient s3Client;
private final Supplier<String> bucketSupplier;
private final Supplier<String> keySupplier;

private final BucketOwnerProvider bucketOwnerProvider;
private int eventCount;
private final StopWatch watch;
private boolean isCodecStarted;
Expand All @@ -39,7 +42,8 @@ public class InMemoryBuffer implements Buffer {
InMemoryBuffer(final S3AsyncClient s3Client,
final Supplier<String> bucketSupplier,
final Supplier<String> keySupplier,
final String defaultBucket) {
final String defaultBucket,
final BucketOwnerProvider bucketOwnerProvider) {
this.s3Client = s3Client;
this.bucketSupplier = bucketSupplier;
this.keySupplier = keySupplier;
Expand All @@ -49,6 +53,7 @@ public class InMemoryBuffer implements Buffer {
watch.start();
isCodecStarted = false;
this.defaultBucket = defaultBucket;
this.bucketOwnerProvider = bucketOwnerProvider;
}

@Override
Expand All @@ -73,7 +78,7 @@ public Optional<CompletableFuture<?>> flushToS3(final Consumer<Boolean> consumeO
final byte[] byteArray = byteArrayOutputStream.toByteArray();
return Optional.ofNullable(BufferUtilities.putObjectOrSendToDefaultBucket(s3Client, AsyncRequestBody.fromBytes(byteArray),
consumeOnCompletion, consumeOnException,
getKey(), getBucket(), defaultBucket));
getKey(), getBucket(), defaultBucket, bucketOwnerProvider));
}

private String getBucket() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.dataprepper.plugins.sink.s3.accumulator;

import org.opensearch.dataprepper.plugins.sink.s3.ownership.BucketOwnerProvider;
import software.amazon.awssdk.services.s3.S3AsyncClient;

import java.util.function.Supplier;
Expand All @@ -14,7 +15,8 @@ public class InMemoryBufferFactory implements BufferFactory {
public Buffer getBuffer(final S3AsyncClient s3Client,
final Supplier<String> bucketSupplier,
final Supplier<String> keySupplier,
final String defaultBucket) {
return new InMemoryBuffer(s3Client, bucketSupplier, keySupplier, defaultBucket);
final String defaultBucket,
final BucketOwnerProvider bucketOwnerProvider) {
return new InMemoryBuffer(s3Client, bucketSupplier, keySupplier, defaultBucket, bucketOwnerProvider);
}
}
Loading

0 comments on commit 842e130

Please sign in to comment.