From 0e305e597009e724624a74b8f91ee7eeeeab5b48 Mon Sep 17 00:00:00 2001 From: Atul Mohan Date: Wed, 13 Oct 2021 16:46:07 -0700 Subject: [PATCH 1/7] Add endpoint support for s3inputsource --- .../druid/common/aws/AWSClientConfig.java | 35 ++++ .../druid/common/aws/AWSEndpointConfig.java | 29 +++ .../druid/common/aws/AWSProxyConfig.java | 35 ++++ docs/ingestion/native-batch.md | 42 +++++ .../druid/data/input/s3/S3InputSource.java | 130 ++++++++++++-- .../storage/s3/S3StorageDruidModule.java | 58 +----- .../org/apache/druid/storage/s3/S3Utils.java | 58 ++++++ .../data/input/s3/S3InputSourceTest.java | 168 ++++++++++++++++-- 8 files changed, 462 insertions(+), 93 deletions(-) diff --git a/cloud/aws-common/src/main/java/org/apache/druid/common/aws/AWSClientConfig.java b/cloud/aws-common/src/main/java/org/apache/druid/common/aws/AWSClientConfig.java index 5a0a8b0afbe3..5a5d59a8d8d8 100644 --- a/cloud/aws-common/src/main/java/org/apache/druid/common/aws/AWSClientConfig.java +++ b/cloud/aws-common/src/main/java/org/apache/druid/common/aws/AWSClientConfig.java @@ -22,6 +22,8 @@ import com.amazonaws.services.s3.S3ClientOptions; import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.Objects; + public class AWSClientConfig { @JsonProperty @@ -55,4 +57,37 @@ public boolean isForceGlobalBucketAccessEnabled() { return forceGlobalBucketAccessEnabled; } + + @Override + public String toString() + { + return "AWSClientConfig{" + + "protocol='" + protocol + '\'' + + ", disableChunkedEncoding=" + disableChunkedEncoding + + ", enablePathStyleAccess=" + enablePathStyleAccess + + ", forceGlobalBucketAccessEnabled=" + forceGlobalBucketAccessEnabled + + '}'; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + AWSClientConfig that = (AWSClientConfig) o; + return disableChunkedEncoding == that.disableChunkedEncoding + && enablePathStyleAccess == that.enablePathStyleAccess + && forceGlobalBucketAccessEnabled == that.forceGlobalBucketAccessEnabled + && Objects.equals(protocol, that.protocol); + } + + @Override + public int hashCode() + { + return Objects.hash(protocol, disableChunkedEncoding, enablePathStyleAccess, forceGlobalBucketAccessEnabled); + } } diff --git a/cloud/aws-common/src/main/java/org/apache/druid/common/aws/AWSEndpointConfig.java b/cloud/aws-common/src/main/java/org/apache/druid/common/aws/AWSEndpointConfig.java index 80216d9711f5..039e770fb960 100644 --- a/cloud/aws-common/src/main/java/org/apache/druid/common/aws/AWSEndpointConfig.java +++ b/cloud/aws-common/src/main/java/org/apache/druid/common/aws/AWSEndpointConfig.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import javax.annotation.Nullable; +import java.util.Objects; public class AWSEndpointConfig { @@ -44,4 +45,32 @@ public String getSigningRegion() { return signingRegion; } + + @Override + public String toString() + { + return "AWSEndpointConfig{" + + "url='" + url + '\'' + + ", signingRegion='" + signingRegion + '\'' + + '}'; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + AWSEndpointConfig that = (AWSEndpointConfig) o; + return Objects.equals(url, that.url) && Objects.equals(signingRegion, that.signingRegion); + } + + @Override + public int hashCode() + { + return Objects.hash(url, signingRegion); + } } diff --git a/cloud/aws-common/src/main/java/org/apache/druid/common/aws/AWSProxyConfig.java b/cloud/aws-common/src/main/java/org/apache/druid/common/aws/AWSProxyConfig.java index 085e810834e5..b4bb4ab22419 100644 --- a/cloud/aws-common/src/main/java/org/apache/druid/common/aws/AWSProxyConfig.java +++ b/cloud/aws-common/src/main/java/org/apache/druid/common/aws/AWSProxyConfig.java @@ -21,6 +21,8 @@ import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.Objects; + public class AWSProxyConfig { @JsonProperty @@ -54,4 +56,37 @@ public String getPassword() { return password; } + + @Override + public String toString() + { + return "AWSProxyConfig{" + + "host='" + host + '\'' + + ", port=" + port + + ", username='" + username + '\'' + + ", password='" + password + '\'' + + '}'; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + AWSProxyConfig that = (AWSProxyConfig) o; + return port == that.port && Objects.equals(host, that.host) && Objects.equals( + username, + that.username + ) && Objects.equals(password, that.password); + } + + @Override + public int hashCode() + { + return Objects.hash(host, port, username, password); + } } diff --git a/docs/ingestion/native-batch.md b/docs/ingestion/native-batch.md index 2059b631f66d..25c756fe2dca 100644 --- a/docs/ingestion/native-batch.md +++ b/docs/ingestion/native-batch.md @@ -943,6 +943,45 @@ Sample specs: }, ... ``` +```json +... + "ioConfig": { + "type": "index_parallel", + "inputSource": { + "type": "s3", + "uris": ["s3://foo/bar/file.json", "s3://bar/foo/file2.json"], + "endpointConfig": { + "url" : "s3-store.aws.com", + "signingRegion" : "us-west-2" + }, + "clientConfig": { + "protocol" : "http", + "disableChunkedEncoding" : true, + "enablePathStyleAccess" : true, + "forceGlobalBucketAccessEnabled" : false + }, + + "proxyConfig": { + "host" : "proxy-s3.aws.com", + "port" : 8888, + "username" : "admin", + "password" : "admin" + }, + + "properties": { + "accessKeyId": "KLJ78979SDFdS2", + "secretAccessKey": "KLS89s98sKJHKJKJH8721lljkd", + "assumeRoleArn": "arn:aws:iam::2981002874992:role/role-s3" + } + }, + "inputFormat": { + "type": "json" + }, + ... + }, +... +``` + |property|description|default|required?| |--------|-----------|-------|---------| @@ -950,6 +989,9 @@ Sample specs: |uris|JSON array of URIs where S3 objects to be ingested are located.|None|`uris` or `prefixes` or `objects` must be set| |prefixes|JSON array of URI prefixes for the locations of S3 objects to be ingested. Empty objects starting with one of the given prefixes will be skipped.|None|`uris` or `prefixes` or `objects` must be set| |objects|JSON array of S3 Objects to be ingested.|None|`uris` or `prefixes` or `objects` must be set| +| endpointConfig |Config for overriding the default S3 endpoint and signing region. This would allow ingesting data from a different S3 store. See below for more information.|None|No (defaults will be used if not given) +| clientConfig |S3 client properties for the overridden s3 endpoint. This is used in conjunction with `endPointConfig`. Please see [s3 config](../development/extensions-core/s3.md#connecting-to-s3-configuration) for more information.|None|No (defaults will be used if not given) +| proxyConfig |Properties for specifying proxy information for the overridden s3 endpoint. This is used in conjunction with `clientConfig`. Please see [s3 config](../development/extensions-core/s3.md#connecting-to-s3-configuration) for more information.|None|No (defaults will be used if not given) |properties|Properties Object for overriding the default S3 configuration. See below for more information.|None|No (defaults will be used if not given) Note that the S3 input source will skip all empty objects only when `prefixes` is specified. diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSource.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSource.java index 24fd99cf5202..7d92c81554fd 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSource.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSource.java @@ -19,10 +19,12 @@ package org.apache.druid.data.input.s3; +import com.amazonaws.Protocol; import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.auth.BasicAWSCredentials; import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider; +import com.amazonaws.client.builder.AwsClientBuilder; import com.amazonaws.services.s3.model.S3ObjectSummary; import com.amazonaws.services.securitytoken.AWSSecurityTokenService; import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder; @@ -33,6 +35,9 @@ import com.google.common.base.Preconditions; import com.google.common.base.Supplier; import com.google.common.base.Suppliers; +import org.apache.druid.common.aws.AWSClientConfig; +import org.apache.druid.common.aws.AWSEndpointConfig; +import org.apache.druid.common.aws.AWSProxyConfig; import org.apache.druid.data.input.InputEntity; import org.apache.druid.data.input.InputFileAttribute; import org.apache.druid.data.input.InputSplit; @@ -66,23 +71,29 @@ public class S3InputSource extends CloudObjectInputSource @JsonProperty("properties") private final S3InputSourceConfig s3InputSourceConfig; private final S3InputDataConfig inputDataConfig; + private final AWSProxyConfig awsProxyConfig; + private final AWSClientConfig awsClientConfig; + private final AWSEndpointConfig awsEndpointConfig; private final AWSCredentialsProvider awsCredentialsProvider; /** * Constructor for S3InputSource - * @param s3Client The default ServerSideEncryptingAmazonS3 client built with all default configs - * from Guice. This injected singleton client is use when {@param s3InputSourceConfig} - * is not provided and hence, we can skip building a new client from - * {@param s3ClientBuilder} - * @param s3ClientBuilder Use for building a new s3Client to use instead of the default injected - * {@param s3Client}. The configurations of the client can be changed - * before being built - * @param inputDataConfig Stores the configuration for options related to reading input data - * @param uris User provided uris to read input data - * @param prefixes User provided prefixes to read input data - * @param objects User provided cloud objects values to read input data - * @param s3InputSourceConfig User provided properties for overriding the default S3 configuration * + * @param s3Client The default ServerSideEncryptingAmazonS3 client built with all default configs + * from Guice. This injected singleton client is use when {@param s3InputSourceConfig} + * is not provided and hence, we can skip building a new client from + * {@param s3ClientBuilder} + * @param s3ClientBuilder Use for building a new s3Client to use instead of the default injected + * {@param s3Client}. The configurations of the client can be changed + * before being built + * @param inputDataConfig Stores the configuration for options related to reading input data + * @param uris User provided uris to read input data + * @param prefixes User provided prefixes to read input data + * @param objects User provided cloud objects values to read input data + * @param s3InputSourceConfig User provided properties for overriding the default S3 credentials + * @param awsProxyConfig User provided proxy information for the overridden s3 client + * @param awsEndpointConfig User provided s3 endpoint and region for overriding the default S3 endpoint + * @param awsClientConfig User provided properties for the S3 client with the overridden endpoint */ @JsonCreator public S3InputSource( @@ -93,6 +104,9 @@ public S3InputSource( @JsonProperty("prefixes") @Nullable List prefixes, @JsonProperty("objects") @Nullable List objects, @JsonProperty("properties") @Nullable S3InputSourceConfig s3InputSourceConfig, + @JsonProperty("proxyConfig") @Nullable AWSProxyConfig awsProxyConfig, + @JsonProperty("endpointConfig") @Nullable AWSEndpointConfig awsEndpointConfig, + @JsonProperty("clientConfig") @Nullable AWSClientConfig awsClientConfig, @JacksonInject AWSCredentialsProvider awsCredentialsProvider ) { @@ -100,9 +114,39 @@ public S3InputSource( this.inputDataConfig = Preconditions.checkNotNull(inputDataConfig, "S3DataSegmentPusherConfig"); Preconditions.checkNotNull(s3Client, "s3Client"); this.s3InputSourceConfig = s3InputSourceConfig; + this.awsProxyConfig = awsProxyConfig; + this.awsClientConfig = awsClientConfig; + this.awsEndpointConfig = awsEndpointConfig; + this.s3ClientSupplier = Suppliers.memoize( () -> { if (s3ClientBuilder != null && s3InputSourceConfig != null) { + if (awsEndpointConfig != null && awsEndpointConfig.getUrl() != null) { + s3ClientBuilder + .getAmazonS3ClientBuilder().setEndpointConfiguration( + new AwsClientBuilder.EndpointConfiguration( + awsEndpointConfig.getUrl(), + awsEndpointConfig.getSigningRegion() + )); + if (awsClientConfig != null) { + s3ClientBuilder + .getAmazonS3ClientBuilder() + .withChunkedEncodingDisabled(awsClientConfig.isDisableChunkedEncoding()) + .withPathStyleAccessEnabled(awsClientConfig.isEnablePathStyleAccess()) + .withForceGlobalBucketAccessEnabled(awsClientConfig.isForceGlobalBucketAccessEnabled()); + + if (awsProxyConfig != null) { + final Protocol protocol = S3Utils.determineProtocol(awsClientConfig, awsEndpointConfig); + s3ClientBuilder + .getAmazonS3ClientBuilder() + .withClientConfiguration(S3Utils.setProxyConfig( + s3ClientBuilder.getAmazonS3ClientBuilder() + .getClientConfiguration(), + awsProxyConfig + ).withProtocol(protocol)); + } + } + } if (s3InputSourceConfig.isCredentialsConfigured()) { if (s3InputSourceConfig.getAssumeRoleArn() == null) { s3ClientBuilder @@ -135,10 +179,25 @@ public S3InputSource( List uris, List prefixes, List objects, - S3InputSourceConfig s3InputSourceConfig + S3InputSourceConfig s3InputSourceConfig, + AWSProxyConfig awsProxyConfig, + AWSEndpointConfig awsEndpointConfig, + AWSClientConfig awsClientConfig ) { - this(s3Client, s3ClientBuilder, inputDataConfig, uris, prefixes, objects, s3InputSourceConfig, null); + this( + s3Client, + s3ClientBuilder, + inputDataConfig, + uris, + prefixes, + objects, + s3InputSourceConfig, + awsProxyConfig, + awsEndpointConfig, + awsClientConfig, + null + ); } private void applyAssumeRole( @@ -151,8 +210,9 @@ private void applyAssumeRole( if (assumeRoleArn != null) { String roleSessionName = StringUtils.format("druid-s3-input-source-%s", UUID.randomUUID().toString()); AWSSecurityTokenService securityTokenService = AWSSecurityTokenServiceClientBuilder.standard() - .withCredentials(awsCredentialsProvider) - .build(); + .withCredentials( + awsCredentialsProvider) + .build(); STSAssumeRoleSessionCredentialsProvider.Builder roleCredentialsProviderBuilder; roleCredentialsProviderBuilder = new STSAssumeRoleSessionCredentialsProvider .Builder(assumeRoleArn, roleSessionName).withStsClient(securityTokenService); @@ -183,6 +243,27 @@ public S3InputSourceConfig getS3InputSourceConfig() return s3InputSourceConfig; } + @Nullable + @JsonProperty("proxyConfig") + public AWSProxyConfig getAwsProxyConfig() + { + return awsProxyConfig; + } + + @Nullable + @JsonProperty("clientConfig") + public AWSClientConfig getAwsClientConfig() + { + return awsClientConfig; + } + + @Nullable + @JsonProperty("endpointConfig") + public AWSEndpointConfig getAwsEndpointConfig() + { + return awsEndpointConfig; + } + @Override protected InputEntity createEntity(CloudObjectLocation location) { @@ -215,6 +296,9 @@ public SplittableInputSource> withSplit(InputSplit getJacksonModules() { @@ -180,11 +126,11 @@ public ServerSideEncryptingAmazonS3.Builder getServerSideEncryptingAmazonS3Build ) { final ClientConfiguration configuration = new ClientConfigurationFactory().getConfig(); - final Protocol protocol = determineProtocol(clientConfig, endpointConfig); + final Protocol protocol = S3Utils.determineProtocol(clientConfig, endpointConfig); final AmazonS3ClientBuilder amazonS3ClientBuilder = AmazonS3Client .builder() .withCredentials(provider) - .withClientConfiguration(setProxyConfig(configuration, proxyConfig).withProtocol(protocol)) + .withClientConfiguration(S3Utils.setProxyConfig(configuration, proxyConfig).withProtocol(protocol)) .withChunkedEncodingDisabled(clientConfig.isDisableChunkedEncoding()) .withPathStyleAccessEnabled(clientConfig.isEnablePathStyleAccess()) .withForceGlobalBucketAccessEnabled(clientConfig.isForceGlobalBucketAccessEnabled()); diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java index 75c4e126c10c..0b5ea883b621 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java @@ -20,6 +20,8 @@ package org.apache.druid.storage.s3; import com.amazonaws.AmazonClientException; +import com.amazonaws.ClientConfiguration; +import com.amazonaws.Protocol; import com.amazonaws.services.s3.model.AccessControlList; import com.amazonaws.services.s3.model.AmazonS3Exception; import com.amazonaws.services.s3.model.CanonicalGrantee; @@ -33,14 +35,20 @@ import com.google.common.base.Joiner; import com.google.common.base.Predicate; import com.google.common.collect.ImmutableList; +import org.apache.druid.common.aws.AWSClientConfig; import org.apache.druid.common.aws.AWSClientUtil; +import org.apache.druid.common.aws.AWSEndpointConfig; +import org.apache.druid.common.aws.AWSProxyConfig; import org.apache.druid.data.input.impl.CloudObjectLocation; +import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.RetryUtils; import org.apache.druid.java.util.common.RetryUtils.Task; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.URIs; import org.apache.druid.java.util.common.logger.Logger; +import javax.annotation.Nullable; import java.io.File; import java.io.IOException; import java.net.URI; @@ -277,4 +285,54 @@ static void uploadFileIfPossible( log.info("Pushing [%s] to bucket[%s] and key[%s].", file, bucket, key); service.putObject(putObjectRequest); } + + @Nullable + private static Protocol parseProtocol(@Nullable String protocol) + { + if (protocol == null) { + return null; + } + + if (protocol.equalsIgnoreCase("http")) { + return Protocol.HTTP; + } else if (protocol.equalsIgnoreCase("https")) { + return Protocol.HTTPS; + } else { + throw new IAE("Unknown protocol[%s]", protocol); + } + } + + public static Protocol determineProtocol(AWSClientConfig clientConfig, AWSEndpointConfig endpointConfig) + { + final Protocol protocolFromClientConfig = parseProtocol(clientConfig.getProtocol()); + final String endpointUrl = endpointConfig.getUrl(); + if (org.apache.commons.lang.StringUtils.isNotEmpty(endpointUrl)) { + //noinspection ConstantConditions + final URI uri = URIs.parse(endpointUrl, protocolFromClientConfig.toString()); + final Protocol protocol = parseProtocol(uri.getScheme()); + if (protocol != null && (protocol != protocolFromClientConfig)) { + log.warn("[%s] protocol will be used for endpoint [%s]", protocol, endpointUrl); + } + return protocol; + } else { + return protocolFromClientConfig; + } + } + + public static ClientConfiguration setProxyConfig(ClientConfiguration conf, AWSProxyConfig proxyConfig) + { + if (org.apache.commons.lang.StringUtils.isNotEmpty(proxyConfig.getHost())) { + conf.setProxyHost(proxyConfig.getHost()); + } + if (proxyConfig.getPort() != -1) { + conf.setProxyPort(proxyConfig.getPort()); + } + if (org.apache.commons.lang.StringUtils.isNotEmpty(proxyConfig.getUsername())) { + conf.setProxyUsername(proxyConfig.getUsername()); + } + if (org.apache.commons.lang.StringUtils.isNotEmpty(proxyConfig.getPassword())) { + conf.setProxyPassword(proxyConfig.getPassword()); + } + return conf; + } } diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java index 957dd4db632f..84324768bb69 100644 --- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java @@ -19,6 +19,7 @@ package org.apache.druid.data.input.s3; +import com.amazonaws.ClientConfiguration; import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3Client; @@ -41,7 +42,10 @@ import com.google.inject.Guice; import com.google.inject.Injector; import com.google.inject.Provides; +import org.apache.druid.common.aws.AWSClientConfig; import org.apache.druid.common.aws.AWSCredentialsUtils; +import org.apache.druid.common.aws.AWSEndpointConfig; +import org.apache.druid.common.aws.AWSProxyConfig; import org.apache.druid.data.input.ColumnsFilter; import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.InputRowSchema; @@ -91,6 +95,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest { private static final ObjectMapper MAPPER = createS3ObjectMapper(); private static final AmazonS3Client S3_CLIENT = EasyMock.createMock(AmazonS3Client.class); + private static final ClientConfiguration CLIENT_CONFIGURATION = EasyMock.createMock(ClientConfiguration.class); private static final ServerSideEncryptingAmazonS3.Builder SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER = EasyMock.createMock(ServerSideEncryptingAmazonS3.Builder.class); private static final AmazonS3ClientBuilder AMAZON_S3_CLIENT_BUILDER = AmazonS3Client.builder(); @@ -123,6 +128,9 @@ public class S3InputSourceTest extends InitializedNullHandlingTest private static final S3InputSourceConfig CLOUD_CONFIG_PROPERTIES = new S3InputSourceConfig( new DefaultPasswordProvider("myKey"), new DefaultPasswordProvider("mySecret"), null, null); + private static final AWSEndpointConfig ENDPOINT_CONFIG = new AWSEndpointConfig(); + private static final AWSProxyConfig PROXY_CONFIG = new AWSProxyConfig(); + private static final AWSClientConfig CLIENT_CONFIG = new AWSClientConfig(); private static final List EXPECTED_LOCATION = ImmutableList.of(new CloudObjectLocation("foo", "bar/file.csv")); @@ -152,6 +160,9 @@ public void testSerdeWithUris() throws Exception EXPECTED_URIS, null, null, + null, + null, + null, null ); final S3InputSource serdeWithUris = MAPPER.readValue(MAPPER.writeValueAsString(withUris), S3InputSource.class); @@ -168,6 +179,9 @@ public void testSerdeWithPrefixes() throws Exception null, PREFIXES, null, + null, + null, + null, null ); final S3InputSource serdeWithPrefixes = @@ -185,6 +199,9 @@ public void testSerdeWithObjects() throws Exception null, null, EXPECTED_LOCATION, + null, + null, + null, null ); final S3InputSource serdeWithPrefixes = @@ -198,6 +215,7 @@ public void testSerdeWithCloudConfigPropertiesWithKeyAndSecret() throws Exceptio EasyMock.reset(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER); EasyMock.expect(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER.getAmazonS3ClientBuilder()) .andStubReturn(AMAZON_S3_CLIENT_BUILDER); + AMAZON_S3_CLIENT_BUILDER.withClientConfiguration(CLIENT_CONFIGURATION); EasyMock.expect(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER.build()) .andReturn(SERVICE); EasyMock.replay(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER); @@ -208,7 +226,10 @@ public void testSerdeWithCloudConfigPropertiesWithKeyAndSecret() throws Exceptio null, null, EXPECTED_LOCATION, - CLOUD_CONFIG_PROPERTIES + CLOUD_CONFIG_PROPERTIES, + PROXY_CONFIG, + ENDPOINT_CONFIG, + CLIENT_CONFIG ); final S3InputSource serdeWithPrefixes = MAPPER.readValue(MAPPER.writeValueAsString(withPrefixes), S3InputSource.class); @@ -219,7 +240,72 @@ public void testSerdeWithCloudConfigPropertiesWithKeyAndSecret() throws Exceptio } @Test - public void testS3InputSourceUseDefaultPasswordWhenCloudConfigPropertiesWithoutCrediential() + public void testS3InputSourceUseEndPointClientProxy() + { + S3InputSourceConfig mockConfigPropertiesWithoutKeyAndSecret = EasyMock.createMock(S3InputSourceConfig.class); + AWSEndpointConfig mockAwsEndpointConfig = EasyMock.createMock(AWSEndpointConfig.class); + AWSClientConfig mockAwsClientConfig = EasyMock.createMock(AWSClientConfig.class); + AWSProxyConfig mockAwsProxyConfig = EasyMock.createMock(AWSProxyConfig.class); + + EasyMock.reset(mockConfigPropertiesWithoutKeyAndSecret); + EasyMock.reset(mockAwsEndpointConfig); + EasyMock.reset(mockAwsClientConfig); + EasyMock.reset(mockAwsProxyConfig); + + EasyMock.expect(mockAwsEndpointConfig.getUrl()).andStubReturn("endpoint"); + EasyMock.expect(mockAwsEndpointConfig.getSigningRegion()).andStubReturn("region"); + + EasyMock.expect(mockAwsClientConfig.isDisableChunkedEncoding()).andStubReturn(false); + EasyMock.expect(mockAwsClientConfig.isEnablePathStyleAccess()).andStubReturn(false); + EasyMock.expect(mockAwsClientConfig.isForceGlobalBucketAccessEnabled()).andStubReturn(true); + EasyMock.expect(mockAwsClientConfig.getProtocol()).andStubReturn("http"); + + EasyMock.expect(mockAwsProxyConfig.getHost()).andStubReturn(""); + EasyMock.expect(mockAwsProxyConfig.getPort()).andStubReturn(-1); + EasyMock.expect(mockAwsProxyConfig.getUsername()).andStubReturn(""); + EasyMock.expect(mockAwsProxyConfig.getPassword()).andStubReturn(""); + + EasyMock.expect(mockConfigPropertiesWithoutKeyAndSecret.getAssumeRoleArn()).andStubReturn(null); + EasyMock.expect(mockConfigPropertiesWithoutKeyAndSecret.isCredentialsConfigured()) + .andStubReturn(false); + EasyMock.replay(mockConfigPropertiesWithoutKeyAndSecret); + EasyMock.replay(mockAwsEndpointConfig); + EasyMock.replay(mockAwsClientConfig); + EasyMock.replay(mockAwsProxyConfig); + + EasyMock.reset(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER); + + EasyMock.expect(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER.getAmazonS3ClientBuilder()) + .andStubReturn(AMAZON_S3_CLIENT_BUILDER); + + AMAZON_S3_CLIENT_BUILDER.withClientConfiguration(CLIENT_CONFIGURATION); + + EasyMock.expect(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER.build()) + .andReturn(SERVICE); + EasyMock.replay(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER); + final S3InputSource withPrefixes = new S3InputSource( + SERVICE, + SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER, + INPUT_DATA_CONFIG, + null, + null, + EXPECTED_LOCATION, + mockConfigPropertiesWithoutKeyAndSecret, + mockAwsProxyConfig, + mockAwsEndpointConfig, + mockAwsClientConfig + ); + Assert.assertNotNull(withPrefixes); + // This is to force the s3ClientSupplier to initialize the ServerSideEncryptingAmazonS3 + withPrefixes.createEntity(new CloudObjectLocation("bucket", "path")); + EasyMock.verify(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER); + EasyMock.verify(mockAwsEndpointConfig); + EasyMock.verify(mockAwsClientConfig); + EasyMock.verify(mockAwsProxyConfig); + } + + @Test + public void testS3InputSourceUseDefaultPasswordWhenCloudConfigPropertiesWithoutCredential() { S3InputSourceConfig mockConfigPropertiesWithoutKeyAndSecret = EasyMock.createMock(S3InputSourceConfig.class); EasyMock.reset(mockConfigPropertiesWithoutKeyAndSecret); @@ -238,7 +324,10 @@ public void testS3InputSourceUseDefaultPasswordWhenCloudConfigPropertiesWithoutC null, null, EXPECTED_LOCATION, - mockConfigPropertiesWithoutKeyAndSecret + mockConfigPropertiesWithoutKeyAndSecret, + null, + null, + null ); Assert.assertNotNull(withPrefixes); // This is to force the s3ClientSupplier to initialize the ServerSideEncryptingAmazonS3 @@ -248,7 +337,7 @@ public void testS3InputSourceUseDefaultPasswordWhenCloudConfigPropertiesWithoutC } @Test - public void testSerdeS3ClientLazyInitializedWithCrediential() throws Exception + public void testSerdeS3ClientLazyInitializedWithCredential() throws Exception { // Amazon S3 builder should not build anything as we did not make any call that requires the S3 client EasyMock.reset(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER); @@ -260,7 +349,10 @@ public void testSerdeS3ClientLazyInitializedWithCrediential() throws Exception null, null, EXPECTED_LOCATION, - CLOUD_CONFIG_PROPERTIES + CLOUD_CONFIG_PROPERTIES, + PROXY_CONFIG, + ENDPOINT_CONFIG, + CLIENT_CONFIG ); final S3InputSource serdeWithPrefixes = MAPPER.readValue(MAPPER.writeValueAsString(withPrefixes), S3InputSource.class); @@ -269,7 +361,7 @@ public void testSerdeS3ClientLazyInitializedWithCrediential() throws Exception } @Test - public void testSerdeS3ClientLazyInitializedWithoutCrediential() throws Exception + public void testSerdeS3ClientLazyInitializedWithoutCredential() throws Exception { // Amazon S3 builder should not build anything as we did not make any call that requires the S3 client EasyMock.reset(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER); @@ -281,7 +373,10 @@ public void testSerdeS3ClientLazyInitializedWithoutCrediential() throws Exceptio null, null, EXPECTED_LOCATION, - null + null, + PROXY_CONFIG, + ENDPOINT_CONFIG, + CLIENT_CONFIG ); final S3InputSource serdeWithPrefixes = @@ -300,6 +395,9 @@ public void testSerdeWithExtraEmptyLists() throws Exception ImmutableList.of(), ImmutableList.of(), EXPECTED_LOCATION, + null, + null, + null, null ); final S3InputSource serdeWithPrefixes = @@ -319,7 +417,10 @@ public void testSerdeWithInvalidArgs() EXPECTED_URIS, PREFIXES, EXPECTED_LOCATION, - null + null, + PROXY_CONFIG, + ENDPOINT_CONFIG, + CLIENT_CONFIG ); } @@ -335,7 +436,10 @@ public void testSerdeWithOtherInvalidArgs() EXPECTED_URIS, PREFIXES, ImmutableList.of(), - null + null, + PROXY_CONFIG, + ENDPOINT_CONFIG, + CLIENT_CONFIG ); } @@ -351,7 +455,10 @@ public void testSerdeWithOtherOtherInvalidArgs() ImmutableList.of(), PREFIXES, EXPECTED_LOCATION, - null + null, + PROXY_CONFIG, + ENDPOINT_CONFIG, + CLIENT_CONFIG ); } @@ -365,7 +472,10 @@ public void testWithUrisSplit() EXPECTED_URIS, null, null, - null + null, + PROXY_CONFIG, + ENDPOINT_CONFIG, + CLIENT_CONFIG ); Stream>> splits = inputSource.createSplits( @@ -391,7 +501,10 @@ public void testWithPrefixesSplit() null, PREFIXES, null, - null + null, + PROXY_CONFIG, + ENDPOINT_CONFIG, + CLIENT_CONFIG ); Stream>> splits = inputSource.createSplits( @@ -418,7 +531,10 @@ public void testCreateSplitsWithSplitHintSpecRespectingHint() null, PREFIXES, null, - null + null, + PROXY_CONFIG, + ENDPOINT_CONFIG, + CLIENT_CONFIG ); Stream>> splits = inputSource.createSplits( @@ -448,7 +564,10 @@ public void testCreateSplitsWithEmptyObjectsIteratingOnlyNonEmptyObjects() null, PREFIXES, null, - null + null, + PROXY_CONFIG, + ENDPOINT_CONFIG, + CLIENT_CONFIG ); Stream>> splits = inputSource.createSplits( @@ -477,7 +596,10 @@ public void testAccessDeniedWhileListingPrefix() null, ImmutableList.of(PREFIXES.get(0), EXPECTED_URIS.get(1)), null, - null + null, + PROXY_CONFIG, + ENDPOINT_CONFIG, + CLIENT_CONFIG ); expectedException.expectMessage("Failed to get object summaries from S3 bucket[bar], prefix[foo/file2.csv]"); @@ -508,7 +630,10 @@ public void testReader() throws IOException null, ImmutableList.of(PREFIXES.get(0), EXPECTED_URIS.get(1)), null, - null + null, + PROXY_CONFIG, + ENDPOINT_CONFIG, + CLIENT_CONFIG ); InputRowSchema someSchema = new InputRowSchema( @@ -552,7 +677,10 @@ public void testCompressedReader() throws IOException null, ImmutableList.of(PREFIXES.get(0), EXPECTED_COMPRESSED_URIS.get(1)), null, - null + null, + PROXY_CONFIG, + ENDPOINT_CONFIG, + CLIENT_CONFIG ); InputRowSchema someSchema = new InputRowSchema( @@ -677,12 +805,14 @@ public AWSCredentialsProvider getAWSCredentialsProvider() return AWSCredentialsUtils.defaultAWSCredentialsProviderChain(null); } - @Override public List getJacksonModules() + @Override + public List getJacksonModules() { return Collections.emptyList(); } - @Override public void configure(Binder binder) + @Override + public void configure(Binder binder) { } From 4be3d05fd3f6d8b6d5ea00aa8dd44175e86749bd Mon Sep 17 00:00:00 2001 From: Atul Mohan Date: Sun, 10 Jul 2022 16:56:43 -0700 Subject: [PATCH 2/7] Changes to tests --- .../druid/data/input/s3/S3InputSource.java | 40 +++++++- .../org/apache/druid/storage/s3/S3Utils.java | 2 + .../data/input/s3/S3InputSourceTest.java | 99 ++++++++++++++++++- 3 files changed, 135 insertions(+), 6 deletions(-) diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSource.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSource.java index 852a2a9882fc..549df7247e7e 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSource.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSource.java @@ -37,6 +37,9 @@ import com.google.common.base.Suppliers; import com.google.common.collect.Iterators; import org.apache.commons.io.FilenameUtils; +import org.apache.druid.common.aws.AWSClientConfig; +import org.apache.druid.common.aws.AWSEndpointConfig; +import org.apache.druid.common.aws.AWSProxyConfig; import org.apache.druid.data.input.InputEntity; import org.apache.druid.data.input.InputFileAttribute; import org.apache.druid.data.input.InputSplit; @@ -183,10 +186,25 @@ public S3InputSource( List prefixes, List objects, String filter, - S3InputSourceConfig s3InputSourceConfig + S3InputSourceConfig s3InputSourceConfig, + AWSProxyConfig awsProxyConfig, + AWSEndpointConfig awsEndpointConfig, + AWSClientConfig awsClientConfig ) { - this(s3Client, s3ClientBuilder, inputDataConfig, uris, prefixes, objects, filter, s3InputSourceConfig, null); + this(s3Client, + s3ClientBuilder, + inputDataConfig, + uris, + prefixes, + objects, + filter, + s3InputSourceConfig, + awsProxyConfig, + awsEndpointConfig, + awsClientConfig, + null + ); } @VisibleForTesting @@ -199,10 +217,26 @@ public S3InputSource( List objects, String filter, S3InputSourceConfig s3InputSourceConfig, + AWSProxyConfig awsProxyConfig, + AWSEndpointConfig awsEndpointConfig, + AWSClientConfig awsClientConfig, int maxRetries ) { - this(s3Client, s3ClientBuilder, inputDataConfig, uris, prefixes, objects, filter, s3InputSourceConfig, null); + this( + s3Client, + s3ClientBuilder, + inputDataConfig, + uris, + prefixes, + objects, + filter, + s3InputSourceConfig, + awsProxyConfig, + awsEndpointConfig, + awsClientConfig, + null + ); this.maxRetries = maxRetries; } diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java index 8a6cb0d56cd8..a0176984970f 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java @@ -20,6 +20,8 @@ package org.apache.druid.storage.s3; import com.amazonaws.AmazonClientException; +import com.amazonaws.ClientConfiguration; +import com.amazonaws.Protocol; import com.amazonaws.SdkClientException; import com.amazonaws.services.s3.model.AccessControlList; import com.amazonaws.services.s3.model.AmazonS3Exception; diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java index 8da9a3e796c8..eb6c1f87958c 100644 --- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java @@ -19,6 +19,7 @@ package org.apache.druid.data.input.s3; +import com.amazonaws.ClientConfiguration; import com.amazonaws.SdkClientException; import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.services.s3.AmazonS3; @@ -182,6 +183,9 @@ public void testGetUris() null, null, null, + null, + null, + null, null ); @@ -202,6 +206,9 @@ public void testGetPrefixes() PREFIXES, null, null, + null, + null, + null, null ); @@ -222,7 +229,11 @@ public void testGetFilter() null, null, "*.parquet", + null, + null, + null, null + ); Assert.assertEquals( @@ -242,6 +253,9 @@ public void testSerdeWithUris() throws Exception null, null, null, + null, + null, + null, null ); final S3InputSource serdeWithUris = MAPPER.readValue(MAPPER.writeValueAsString(withUris), S3InputSource.class); @@ -259,6 +273,9 @@ public void testSerdeWithPrefixes() throws Exception PREFIXES, null, null, + null, + null, + null, null ); final S3InputSource serdeWithPrefixes = @@ -277,6 +294,9 @@ public void testSerdeWithObjects() throws Exception null, EXPECTED_LOCATION, null, + null, + null, + null, null ); final S3InputSource serdeWithPrefixes = @@ -302,7 +322,10 @@ public void testSerdeWithCloudConfigPropertiesWithKeyAndSecret() throws Exceptio null, EXPECTED_LOCATION, null, - CLOUD_CONFIG_PROPERTIES + CLOUD_CONFIG_PROPERTIES, + PROXY_CONFIG, + ENDPOINT_CONFIG, + CLIENT_CONFIG ); final S3InputSource serdeWithPrefixes = MAPPER.readValue(MAPPER.writeValueAsString(withPrefixes), S3InputSource.class); @@ -363,6 +386,7 @@ public void testS3InputSourceUseEndPointClientProxy() null, null, EXPECTED_LOCATION, + null, mockConfigPropertiesWithoutKeyAndSecret, mockAwsProxyConfig, mockAwsEndpointConfig, @@ -398,7 +422,10 @@ public void testS3InputSourceUseDefaultPasswordWhenCloudConfigPropertiesWithoutC null, EXPECTED_LOCATION, null, - mockConfigPropertiesWithoutKeyAndSecret + mockConfigPropertiesWithoutKeyAndSecret, + PROXY_CONFIG, + ENDPOINT_CONFIG, + CLIENT_CONFIG ); Assert.assertNotNull(withPrefixes); // This is to force the s3ClientSupplier to initialize the ServerSideEncryptingAmazonS3 @@ -421,7 +448,10 @@ public void testSerdeS3ClientLazyInitializedWithCredential() throws Exception null, EXPECTED_LOCATION, null, - CLOUD_CONFIG_PROPERTIES + CLOUD_CONFIG_PROPERTIES, + PROXY_CONFIG, + ENDPOINT_CONFIG, + CLIENT_CONFIG ); final S3InputSource serdeWithPrefixes = MAPPER.readValue(MAPPER.writeValueAsString(withPrefixes), S3InputSource.class); @@ -443,6 +473,9 @@ public void testSerdeS3ClientLazyInitializedWithoutCredential() throws Exception null, EXPECTED_LOCATION, null, + null, + null, + null, null ); final S3InputSource serdeWithPrefixes = @@ -462,6 +495,9 @@ public void testSerdeWithExtraEmptyLists() throws Exception ImmutableList.of(), EXPECTED_LOCATION, null, + null, + null, + null, null ); final S3InputSource serdeWithPrefixes = @@ -482,6 +518,9 @@ public void testWithNullJsonProps() null, null, null, + null, + null, + null, null ); } @@ -499,6 +538,9 @@ public void testIllegalObjectsAndUris() null, EXPECTED_OBJECTS, null, + null, + null, + null, null ); } @@ -516,6 +558,9 @@ public void testIllegalObjectsAndPrefixes() PREFIXES, EXPECTED_OBJECTS, null, + null, + null, + null, null ); } @@ -533,6 +578,9 @@ public void testIllegalUrisAndPrefixes() PREFIXES, null, null, + null, + null, + null, null ); } @@ -550,6 +598,9 @@ public void testSerdeWithInvalidArgs() PREFIXES, EXPECTED_LOCATION, null, + null, + null, + null, null ); } @@ -567,6 +618,9 @@ public void testSerdeWithOtherInvalidArgs() PREFIXES, ImmutableList.of(), null, + null, + null, + null, null ); } @@ -584,6 +638,9 @@ public void testSerdeWithOtherOtherInvalidArgs() PREFIXES, EXPECTED_LOCATION, null, + null, + null, + null, null ); } @@ -599,6 +656,9 @@ public void testWithUrisSplit() null, null, null, + null, + null, + null, null ); @@ -621,6 +681,9 @@ public void testWithUrisFilter() null, null, "*.csv", + null, + null, + null, null ); @@ -643,6 +706,9 @@ public void testWithObjectsFilter() null, OBJECTS_BEFORE_FILTER, "*.csv", + null, + null, + null, null ); @@ -665,6 +731,9 @@ public void testWithoutObjectsFilter() null, EXPECTED_OBJECTS, null, + null, + null, + null, null ); @@ -692,6 +761,9 @@ public void testWithPrefixesSplit() PREFIXES, null, null, + null, + null, + null, null ); @@ -720,6 +792,9 @@ public void testGetPrefixesSplitStreamWithFilter() PREFIXES, null, "*.csv", + null, + null, + null, null ); @@ -748,6 +823,9 @@ public void testCreateSplitsWithSplitHintSpecRespectingHint() PREFIXES, null, null, + null, + null, + null, null ); @@ -779,6 +857,9 @@ public void testCreateSplitsWithEmptyObjectsIteratingOnlyNonEmptyObjects() PREFIXES, null, null, + null, + null, + null, null ); @@ -809,6 +890,9 @@ public void testAccessDeniedWhileListingPrefix() ImmutableList.of(PREFIXES.get(0), EXPECTED_URIS.get(1)), null, null, + null, + null, + null, null ); @@ -841,6 +925,9 @@ public void testReader() throws IOException ImmutableList.of(PREFIXES.get(0), EXPECTED_URIS.get(1)), null, null, + null, + null, + null, null ); @@ -885,6 +972,9 @@ public void testReaderRetriesOnSdkClientExceptionButNeverSucceedsThenThrows() th null, null, null, + null, + null, + null, 3 // only have three retries since they are slow ); @@ -929,6 +1019,9 @@ public void testCompressedReader() throws IOException ImmutableList.of(PREFIXES.get(0), EXPECTED_COMPRESSED_URIS.get(1)), null, null, + null, + null, + null, null ); From a19a2cdb308f3c3560691c376f94e143fc087ccb Mon Sep 17 00:00:00 2001 From: Atul Mohan Date: Mon, 11 Jul 2022 09:41:24 -0700 Subject: [PATCH 3/7] Fix docs --- docs/ingestion/native-batch-input-source.md | 42 +++ docs/ingestion/native-batch.md | 277 -------------------- 2 files changed, 42 insertions(+), 277 deletions(-) diff --git a/docs/ingestion/native-batch-input-source.md b/docs/ingestion/native-batch-input-source.md index 8ce42073dcf9..f4b92bdfe7a5 100644 --- a/docs/ingestion/native-batch-input-source.md +++ b/docs/ingestion/native-batch-input-source.md @@ -138,6 +138,45 @@ Sample specs: ... ``` +```json +... + "ioConfig": { + "type": "index_parallel", + "inputSource": { + "type": "s3", + "uris": ["s3://foo/bar/file.json", "s3://bar/foo/file2.json"], + "endpointConfig": { + "url" : "s3-store.aws.com", + "signingRegion" : "us-west-2" + }, + "clientConfig": { + "protocol" : "http", + "disableChunkedEncoding" : true, + "enablePathStyleAccess" : true, + "forceGlobalBucketAccessEnabled" : false + }, + "proxyConfig": { + "host" : "proxy-s3.aws.com", + "port" : 8888, + "username" : "admin", + "password" : "admin" + }, + + "properties": { + "accessKeyId": "KLJ78979SDFdS2", + "secretAccessKey": "KLS89s98sKJHKJKJH8721lljkd", + "assumeRoleArn": "arn:aws:iam::2981002874992:role/role-s3" + } + }, + "inputFormat": { + "type": "json" + }, + ... + }, +... +``` + + |property|description|default|required?| |--------|-----------|-------|---------| |type|This should be `s3`.|None|yes| @@ -145,6 +184,9 @@ Sample specs: |prefixes|JSON array of URI prefixes for the locations of S3 objects to be ingested. Empty objects starting with one of the given prefixes will be skipped.|None|`uris` or `prefixes` or `objects` must be set| |objects|JSON array of S3 Objects to be ingested.|None|`uris` or `prefixes` or `objects` must be set| |filter|A wildcard filter for files. See [here](http://commons.apache.org/proper/commons-io/apidocs/org/apache/commons/io/filefilter/WildcardFileFilter) for more information. Files matching the filter criteria are considered for ingestion. Files not matching the filter criteria are ignored.|None|no| +| endpointConfig |Config for overriding the default S3 endpoint and signing region. This would allow ingesting data from a different S3 store. Please see [s3 config](../development/extensions-core/s3.md#connecting-to-s3-configuration) for more information.|None|No (defaults will be used if not given) +| clientConfig |S3 client properties for the overridden s3 endpoint. This is used in conjunction with `endPointConfig`. Please see [s3 config](../development/extensions-core/s3.md#connecting-to-s3-configuration) for more information.|None|No (defaults will be used if not given) +| proxyConfig |Properties for specifying proxy information for the overridden s3 endpoint. This is used in conjunction with `clientConfig`. Please see [s3 config](../development/extensions-core/s3.md#connecting-to-s3-configuration) for more information.|None|No (defaults will be used if not given) |properties|Properties Object for overriding the default S3 configuration. See below for more information.|None|No (defaults will be used if not given) Note that the S3 input source will skip all empty objects only when `prefixes` is specified. diff --git a/docs/ingestion/native-batch.md b/docs/ingestion/native-batch.md index 456e27f25991..c441c39aeb13 100644 --- a/docs/ingestion/native-batch.md +++ b/docs/ingestion/native-batch.md @@ -719,281 +719,4 @@ For details on available input sources see: For information on how to combine input sources, see [Combining input sources](./native-batch-input-source.md#combining-input-sources). -While ingesting data using the Index task, it creates segments from the input data and pushes them. For segment pushing, -the Index task supports two segment pushing modes, i.e., _bulk pushing mode_ and _incremental pushing mode_ for -[perfect rollup and best-effort rollup](rollup.md), respectively. - -In the bulk pushing mode, every segment is pushed at the very end of the index task. Until then, created segments -are stored in the memory and local storage of the process running the index task. As a result, this mode might cause a -problem due to limited storage capacity, and is not recommended to use in production. - -On the contrary, in the incremental pushing mode, segments are incrementally pushed, that is they can be pushed -in the middle of the index task. More precisely, the index task collects data and stores created segments in the memory -and disks of the process running that task until the total number of collected rows exceeds `maxTotalRows`. Once it exceeds, -the index task immediately pushes all segments created until that moment, cleans all pushed segments up, and -continues to ingest remaining data. - -To enable bulk pushing mode, `forceGuaranteedRollup` should be set in the TuningConfig. Note that this option cannot -be used with `appendToExisting` of IOConfig. - -## Input Sources - -The input source is the place to define from where your index task reads data. -Only the native Parallel task and Simple task support the input source. - -### S3 Input Source - -> You need to include the [`druid-s3-extensions`](../development/extensions-core/s3.md) as an extension to use the S3 input source. - -The S3 input source is to support reading objects directly from S3. -Objects can be specified either via a list of S3 URI strings or a list of -S3 location prefixes, which will attempt to list the contents and ingest -all objects contained in the locations. The S3 input source is splittable -and can be used by the [Parallel task](#parallel-task), -where each worker task of `index_parallel` will read one or multiple objects. - -Sample specs: - -```json -... - "ioConfig": { - "type": "index_parallel", - "inputSource": { - "type": "s3", - "uris": ["s3://foo/bar/file.json", "s3://bar/foo/file2.json"] - }, - "inputFormat": { - "type": "json" - }, - ... - }, -... -``` - -```json -... - "ioConfig": { - "type": "index_parallel", - "inputSource": { - "type": "s3", - "prefixes": ["s3://foo/bar/", "s3://bar/foo/"] - }, - "inputFormat": { - "type": "json" - }, - ... - }, -... -``` - - -```json -... - "ioConfig": { - "type": "index_parallel", - "inputSource": { - "type": "s3", - "objects": [ - { "bucket": "foo", "path": "bar/file1.json"}, - { "bucket": "bar", "path": "foo/file2.json"} - ] - }, - "inputFormat": { - "type": "json" - }, - ... - }, -... -``` - -```json -... - "ioConfig": { - "type": "index_parallel", - "inputSource": { - "type": "s3", - "uris": ["s3://foo/bar/file.json", "s3://bar/foo/file2.json"], - "properties": { - "accessKeyId": "KLJ78979SDFdS2", - "secretAccessKey": "KLS89s98sKJHKJKJH8721lljkd" - } - }, - "inputFormat": { - "type": "json" - }, - ... - }, -... -``` - -```json -... - "ioConfig": { - "type": "index_parallel", - "inputSource": { - "type": "s3", - "uris": ["s3://foo/bar/file.json", "s3://bar/foo/file2.json"], - "properties": { - "accessKeyId": "KLJ78979SDFdS2", - "secretAccessKey": "KLS89s98sKJHKJKJH8721lljkd", - "assumeRoleArn": "arn:aws:iam::2981002874992:role/role-s3" - } - }, - "inputFormat": { - "type": "json" - }, - ... - }, -... -``` -```json -... - "ioConfig": { - "type": "index_parallel", - "inputSource": { - "type": "s3", - "uris": ["s3://foo/bar/file.json", "s3://bar/foo/file2.json"], - "endpointConfig": { - "url" : "s3-store.aws.com", - "signingRegion" : "us-west-2" - }, - "clientConfig": { - "protocol" : "http", - "disableChunkedEncoding" : true, - "enablePathStyleAccess" : true, - "forceGlobalBucketAccessEnabled" : false - }, - - "proxyConfig": { - "host" : "proxy-s3.aws.com", - "port" : 8888, - "username" : "admin", - "password" : "admin" - }, - - "properties": { - "accessKeyId": "KLJ78979SDFdS2", - "secretAccessKey": "KLS89s98sKJHKJKJH8721lljkd", - "assumeRoleArn": "arn:aws:iam::2981002874992:role/role-s3" - } - }, - "inputFormat": { - "type": "json" - }, - ... - }, -... -``` - - -|property|description|default|required?| -|--------|-----------|-------|---------| -|type|This should be `s3`.|None|yes| -|uris|JSON array of URIs where S3 objects to be ingested are located.|None|`uris` or `prefixes` or `objects` must be set| -|prefixes|JSON array of URI prefixes for the locations of S3 objects to be ingested. Empty objects starting with one of the given prefixes will be skipped.|None|`uris` or `prefixes` or `objects` must be set| -|objects|JSON array of S3 Objects to be ingested.|None|`uris` or `prefixes` or `objects` must be set| -| endpointConfig |Config for overriding the default S3 endpoint and signing region. This would allow ingesting data from a different S3 store. See below for more information.|None|No (defaults will be used if not given) -| clientConfig |S3 client properties for the overridden s3 endpoint. This is used in conjunction with `endPointConfig`. Please see [s3 config](../development/extensions-core/s3.md#connecting-to-s3-configuration) for more information.|None|No (defaults will be used if not given) -| proxyConfig |Properties for specifying proxy information for the overridden s3 endpoint. This is used in conjunction with `clientConfig`. Please see [s3 config](../development/extensions-core/s3.md#connecting-to-s3-configuration) for more information.|None|No (defaults will be used if not given) -|properties|Properties Object for overriding the default S3 configuration. See below for more information.|None|No (defaults will be used if not given) - -Note that the S3 input source will skip all empty objects only when `prefixes` is specified. - -S3 Object: - -|property|description|default|required?| -|--------|-----------|-------|---------| -|bucket|Name of the S3 bucket|None|yes| -|path|The path where data is located.|None|yes| - -Properties Object: - -|property|description|default|required?| -|--------|-----------|-------|---------| -|accessKeyId|The [Password Provider](../operations/password-provider.md) or plain text string of this S3 InputSource's access key|None|yes if secretAccessKey is given| -|secretAccessKey|The [Password Provider](../operations/password-provider.md) or plain text string of this S3 InputSource's secret key|None|yes if accessKeyId is given| -|assumeRoleArn|AWS ARN of the role to assume [see](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp_request.html). **assumeRoleArn** can be used either with the ingestion spec AWS credentials or with the default S3 credentials|None|no| -|assumeRoleExternalId|A unique identifier that might be required when you assume a role in another account [see](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp_request.html)|None|no| - -**Note :** *If accessKeyId and secretAccessKey are not given, the default [S3 credentials provider chain](../development/extensions-core/s3.md#s3-authentication-methods) is used.* - -### Google Cloud Storage Input Source - -> You need to include the [`druid-google-extensions`](../development/extensions-core/google.md) as an extension to use the Google Cloud Storage input source. - -The Google Cloud Storage input source is to support reading objects directly -from Google Cloud Storage. Objects can be specified as list of Google -Cloud Storage URI strings. The Google Cloud Storage input source is splittable -and can be used by the [Parallel task](#parallel-task), where each worker task of `index_parallel` will read -one or multiple objects. - -Sample specs: - -```json -... - "ioConfig": { - "type": "index_parallel", - "inputSource": { - "type": "google", - "uris": ["gs://foo/bar/file.json", "gs://bar/foo/file2.json"] - }, - "inputFormat": { - "type": "json" - }, - ... - }, -... -``` - -```json -... - "ioConfig": { - "type": "index_parallel", - "inputSource": { - "type": "google", - "prefixes": ["gs://foo/bar/", "gs://bar/foo/"] - }, - "inputFormat": { - "type": "json" - }, - ... - }, -... -``` - - -```json -... - "ioConfig": { - "type": "index_parallel", - "inputSource": { - "type": "google", - "objects": [ - { "bucket": "foo", "path": "bar/file1.json"}, - { "bucket": "bar", "path": "foo/file2.json"} - ] - }, - "inputFormat": { - "type": "json" - }, - ... - }, -... -``` - -|property|description|default|required?| -|--------|-----------|-------|---------| -|type|This should be `google`.|None|yes| -|uris|JSON array of URIs where Google Cloud Storage objects to be ingested are located.|None|`uris` or `prefixes` or `objects` must be set| -|prefixes|JSON array of URI prefixes for the locations of Google Cloud Storage objects to be ingested. Empty objects starting with one of the given prefixes will be skipped.|None|`uris` or `prefixes` or `objects` must be set| -|objects|JSON array of Google Cloud Storage objects to be ingested.|None|`uris` or `prefixes` or `objects` must be set| - -Note that the Google Cloud Storage input source will skip all empty objects only when `prefixes` is specified. - -Google Cloud Storage object: - -|property|description|default|required?| -|--------|-----------|-------|---------| -|bucket|Name of the Google Cloud Storage bucket|None|yes| -|path|The path where data is located.|None|yes| From f0d0e47e4385c41dc52b96cf204658d28351d2fb Mon Sep 17 00:00:00 2001 From: Atul Mohan Date: Mon, 11 Jul 2022 13:48:00 -0700 Subject: [PATCH 4/7] Fix config --- .../druid/common/aws/AWSClientConfig.java | 35 ------------------- .../druid/common/aws/AWSEndpointConfig.java | 29 --------------- .../druid/common/aws/AWSProxyConfig.java | 35 ------------------- .../data/input/s3/S3InputSourceTest.java | 12 +++---- 4 files changed, 6 insertions(+), 105 deletions(-) diff --git a/cloud/aws-common/src/main/java/org/apache/druid/common/aws/AWSClientConfig.java b/cloud/aws-common/src/main/java/org/apache/druid/common/aws/AWSClientConfig.java index 5a5d59a8d8d8..5a0a8b0afbe3 100644 --- a/cloud/aws-common/src/main/java/org/apache/druid/common/aws/AWSClientConfig.java +++ b/cloud/aws-common/src/main/java/org/apache/druid/common/aws/AWSClientConfig.java @@ -22,8 +22,6 @@ import com.amazonaws.services.s3.S3ClientOptions; import com.fasterxml.jackson.annotation.JsonProperty; -import java.util.Objects; - public class AWSClientConfig { @JsonProperty @@ -57,37 +55,4 @@ public boolean isForceGlobalBucketAccessEnabled() { return forceGlobalBucketAccessEnabled; } - - @Override - public String toString() - { - return "AWSClientConfig{" + - "protocol='" + protocol + '\'' + - ", disableChunkedEncoding=" + disableChunkedEncoding + - ", enablePathStyleAccess=" + enablePathStyleAccess + - ", forceGlobalBucketAccessEnabled=" + forceGlobalBucketAccessEnabled + - '}'; - } - - @Override - public boolean equals(Object o) - { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - AWSClientConfig that = (AWSClientConfig) o; - return disableChunkedEncoding == that.disableChunkedEncoding - && enablePathStyleAccess == that.enablePathStyleAccess - && forceGlobalBucketAccessEnabled == that.forceGlobalBucketAccessEnabled - && Objects.equals(protocol, that.protocol); - } - - @Override - public int hashCode() - { - return Objects.hash(protocol, disableChunkedEncoding, enablePathStyleAccess, forceGlobalBucketAccessEnabled); - } } diff --git a/cloud/aws-common/src/main/java/org/apache/druid/common/aws/AWSEndpointConfig.java b/cloud/aws-common/src/main/java/org/apache/druid/common/aws/AWSEndpointConfig.java index 039e770fb960..80216d9711f5 100644 --- a/cloud/aws-common/src/main/java/org/apache/druid/common/aws/AWSEndpointConfig.java +++ b/cloud/aws-common/src/main/java/org/apache/druid/common/aws/AWSEndpointConfig.java @@ -22,7 +22,6 @@ import com.fasterxml.jackson.annotation.JsonProperty; import javax.annotation.Nullable; -import java.util.Objects; public class AWSEndpointConfig { @@ -45,32 +44,4 @@ public String getSigningRegion() { return signingRegion; } - - @Override - public String toString() - { - return "AWSEndpointConfig{" + - "url='" + url + '\'' + - ", signingRegion='" + signingRegion + '\'' + - '}'; - } - - @Override - public boolean equals(Object o) - { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - AWSEndpointConfig that = (AWSEndpointConfig) o; - return Objects.equals(url, that.url) && Objects.equals(signingRegion, that.signingRegion); - } - - @Override - public int hashCode() - { - return Objects.hash(url, signingRegion); - } } diff --git a/cloud/aws-common/src/main/java/org/apache/druid/common/aws/AWSProxyConfig.java b/cloud/aws-common/src/main/java/org/apache/druid/common/aws/AWSProxyConfig.java index b4bb4ab22419..085e810834e5 100644 --- a/cloud/aws-common/src/main/java/org/apache/druid/common/aws/AWSProxyConfig.java +++ b/cloud/aws-common/src/main/java/org/apache/druid/common/aws/AWSProxyConfig.java @@ -21,8 +21,6 @@ import com.fasterxml.jackson.annotation.JsonProperty; -import java.util.Objects; - public class AWSProxyConfig { @JsonProperty @@ -56,37 +54,4 @@ public String getPassword() { return password; } - - @Override - public String toString() - { - return "AWSProxyConfig{" + - "host='" + host + '\'' + - ", port=" + port + - ", username='" + username + '\'' + - ", password='" + password + '\'' + - '}'; - } - - @Override - public boolean equals(Object o) - { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - AWSProxyConfig that = (AWSProxyConfig) o; - return port == that.port && Objects.equals(host, that.host) && Objects.equals( - username, - that.username - ) && Objects.equals(password, that.password); - } - - @Override - public int hashCode() - { - return Objects.hash(host, port, username, password); - } } diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java index eb6c1f87958c..359de18039bd 100644 --- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java @@ -323,9 +323,9 @@ public void testSerdeWithCloudConfigPropertiesWithKeyAndSecret() throws Exceptio EXPECTED_LOCATION, null, CLOUD_CONFIG_PROPERTIES, - PROXY_CONFIG, - ENDPOINT_CONFIG, - CLIENT_CONFIG + null, + null, + null ); final S3InputSource serdeWithPrefixes = MAPPER.readValue(MAPPER.writeValueAsString(withPrefixes), S3InputSource.class); @@ -449,9 +449,9 @@ public void testSerdeS3ClientLazyInitializedWithCredential() throws Exception EXPECTED_LOCATION, null, CLOUD_CONFIG_PROPERTIES, - PROXY_CONFIG, - ENDPOINT_CONFIG, - CLIENT_CONFIG + null, + null, + null ); final S3InputSource serdeWithPrefixes = MAPPER.readValue(MAPPER.writeValueAsString(withPrefixes), S3InputSource.class); From c3c29a56c4a6ff5613744d88638da70787f7f471 Mon Sep 17 00:00:00 2001 From: Atul Mohan Date: Tue, 12 Jul 2022 12:23:39 -0700 Subject: [PATCH 5/7] Fix inspections --- .../org/apache/druid/common/aws/AWSClientConfig.java | 11 +++++++++++ .../apache/druid/common/aws/AWSEndpointConfig.java | 9 +++++++++ .../org/apache/druid/common/aws/AWSProxyConfig.java | 11 +++++++++++ 3 files changed, 31 insertions(+) diff --git a/cloud/aws-common/src/main/java/org/apache/druid/common/aws/AWSClientConfig.java b/cloud/aws-common/src/main/java/org/apache/druid/common/aws/AWSClientConfig.java index 5a0a8b0afbe3..7886bd0a9e60 100644 --- a/cloud/aws-common/src/main/java/org/apache/druid/common/aws/AWSClientConfig.java +++ b/cloud/aws-common/src/main/java/org/apache/druid/common/aws/AWSClientConfig.java @@ -55,4 +55,15 @@ public boolean isForceGlobalBucketAccessEnabled() { return forceGlobalBucketAccessEnabled; } + + @Override + public String toString() + { + return "AWSClientConfig{" + + "protocol='" + protocol + '\'' + + ", disableChunkedEncoding=" + disableChunkedEncoding + + ", enablePathStyleAccess=" + enablePathStyleAccess + + ", forceGlobalBucketAccessEnabled=" + forceGlobalBucketAccessEnabled + + '}'; + } } diff --git a/cloud/aws-common/src/main/java/org/apache/druid/common/aws/AWSEndpointConfig.java b/cloud/aws-common/src/main/java/org/apache/druid/common/aws/AWSEndpointConfig.java index 80216d9711f5..302ca0982115 100644 --- a/cloud/aws-common/src/main/java/org/apache/druid/common/aws/AWSEndpointConfig.java +++ b/cloud/aws-common/src/main/java/org/apache/druid/common/aws/AWSEndpointConfig.java @@ -44,4 +44,13 @@ public String getSigningRegion() { return signingRegion; } + + @Override + public String toString() + { + return "AWSEndpointConfig{" + + "url='" + url + '\'' + + ", signingRegion='" + signingRegion + '\'' + + '}'; + } } diff --git a/cloud/aws-common/src/main/java/org/apache/druid/common/aws/AWSProxyConfig.java b/cloud/aws-common/src/main/java/org/apache/druid/common/aws/AWSProxyConfig.java index 085e810834e5..b11f0beae1c8 100644 --- a/cloud/aws-common/src/main/java/org/apache/druid/common/aws/AWSProxyConfig.java +++ b/cloud/aws-common/src/main/java/org/apache/druid/common/aws/AWSProxyConfig.java @@ -54,4 +54,15 @@ public String getPassword() { return password; } + + @Override + public String toString() + { + return "AWSProxyConfig{" + + "host='" + host + '\'' + + ", port=" + port + + ", username='" + username + '\'' + + ", password='" + password + '\'' + + '}'; + } } From e6d2a9f8450f00fcbb728c511300883621fd2a22 Mon Sep 17 00:00:00 2001 From: Atul Mohan Date: Tue, 12 Jul 2022 18:57:09 -0700 Subject: [PATCH 6/7] Fix spelling --- website/.spelling | 3 +++ 1 file changed, 3 insertions(+) diff --git a/website/.spelling b/website/.spelling index ab4a41cd1adc..cc739805782c 100644 --- a/website/.spelling +++ b/website/.spelling @@ -232,6 +232,7 @@ broadcasted checksums classpath clickstream +clientConfig codebase codec colocated @@ -266,6 +267,7 @@ druid–kubernetes-extensions e.g. encodings endian +endpointConfig enum expr failover @@ -378,6 +380,7 @@ procs programmatically proto proxied +proxyConfig QPS quantile quantiles From d413f22f4fd3f4800721972b320fb99054c91e9e Mon Sep 17 00:00:00 2001 From: Atul Mohan Date: Thu, 14 Jul 2022 18:54:07 -0700 Subject: [PATCH 7/7] Remove password from toString --- .../main/java/org/apache/druid/common/aws/AWSProxyConfig.java | 1 - 1 file changed, 1 deletion(-) diff --git a/cloud/aws-common/src/main/java/org/apache/druid/common/aws/AWSProxyConfig.java b/cloud/aws-common/src/main/java/org/apache/druid/common/aws/AWSProxyConfig.java index b11f0beae1c8..5153a74239d3 100644 --- a/cloud/aws-common/src/main/java/org/apache/druid/common/aws/AWSProxyConfig.java +++ b/cloud/aws-common/src/main/java/org/apache/druid/common/aws/AWSProxyConfig.java @@ -62,7 +62,6 @@ public String toString() "host='" + host + '\'' + ", port=" + port + ", username='" + username + '\'' + - ", password='" + password + '\'' + '}'; } }