diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/ownership/ConfigBucketOwnerProviderFactory.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/ownership/ConfigBucketOwnerProviderFactory.java index f059e2d952..a93c3b6ebb 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/ownership/ConfigBucketOwnerProviderFactory.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/ownership/ConfigBucketOwnerProviderFactory.java @@ -5,6 +5,7 @@ package org.opensearch.dataprepper.plugins.source.s3.ownership; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import org.opensearch.dataprepper.plugins.source.s3.S3SourceConfig; import org.opensearch.dataprepper.plugins.source.s3.SqsQueueUrl; import org.opensearch.dataprepper.plugins.source.s3.StsArnRole; @@ -24,7 +25,7 @@ public class ConfigBucketOwnerProviderFactory { public BucketOwnerProvider createBucketOwnerProvider(final S3SourceConfig s3SourceConfig) { if(s3SourceConfig.isDisableBucketOwnershipValidation()) return new NoOwnershipBucketOwnerProvider(); - StaticBucketOwnerProvider staticBucketOwnerProvider = getStaticBucketOwnerProvider(s3SourceConfig); + final StaticBucketOwnerProvider staticBucketOwnerProvider = getStaticBucketOwnerProvider(s3SourceConfig); if(s3SourceConfig.getBucketOwners() != null && !s3SourceConfig.getBucketOwners().isEmpty()) { return new MappedBucketOwnerProvider(s3SourceConfig.getBucketOwners(), staticBucketOwnerProvider); @@ -33,15 +34,18 @@ public BucketOwnerProvider createBucketOwnerProvider(final S3SourceConfig s3Sour } } - private StaticBucketOwnerProvider getStaticBucketOwnerProvider(S3SourceConfig s3SourceConfig) { + private StaticBucketOwnerProvider getStaticBucketOwnerProvider(final S3SourceConfig s3SourceConfig) { final String accountId; if(s3SourceConfig.getDefaultBucketOwner() != null) accountId = s3SourceConfig.getDefaultBucketOwner(); else if(s3SourceConfig.getSqsOptions() != null) accountId = extractQueueAccountId(s3SourceConfig); - else + else if(s3SourceConfig.getAwsAuthenticationOptions() != null && s3SourceConfig.getAwsAuthenticationOptions().getAwsStsRoleArn() != null) accountId = extractStsRoleArnAccountId(s3SourceConfig); + else + throw new InvalidPluginConfigurationException( + "The S3 source is unable to determine a bucket owner. Configure the default_bucket_owner for the account Id that owns the bucket. You may also want to configure bucket_owners if you read from S3 buckets in different accounts."); return new StaticBucketOwnerProvider(accountId); } diff --git a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/ownership/ConfigBucketOwnerProviderFactoryTest.java b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/ownership/ConfigBucketOwnerProviderFactoryTest.java index 409631d47f..50e6455eed 100644 --- a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/ownership/ConfigBucketOwnerProviderFactoryTest.java +++ b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/ownership/ConfigBucketOwnerProviderFactoryTest.java @@ -12,6 +12,7 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import org.opensearch.dataprepper.plugins.source.s3.S3SourceConfig; import org.opensearch.dataprepper.plugins.source.s3.configuration.AwsAuthenticationOptions; import org.opensearch.dataprepper.plugins.source.s3.configuration.SqsOptions; @@ -20,10 +21,12 @@ import java.util.Optional; import java.util.UUID; +import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -88,6 +91,14 @@ void createBucketOwnerProvider_returns_ownership_using_default_when_bucket_mappi assertThat(optionalOwner.get(), equalTo(accountId)); } + @Test + void createBucketOwnerProvider_throws_exception_when_ownership_cannot_be_determined() { + final ConfigBucketOwnerProviderFactory objectUnderTest = createObjectUnderTest(); + final InvalidPluginConfigurationException actualException = assertThrows(InvalidPluginConfigurationException.class, () -> objectUnderTest.createBucketOwnerProvider(s3SourceConfig)); + + assertThat(actualException.getMessage(), containsString("default_bucket_owner")); + } + @Nested class WithSqsQueueUrl {