diff --git a/docs/multi-stage-query/reference.md b/docs/multi-stage-query/reference.md index f03ac25b1575..7a9cb17acb7f 100644 --- a/docs/multi-stage-query/reference.md +++ b/docs/multi-stage-query/reference.md @@ -149,6 +149,39 @@ The following runtime parameters must be configured to export into an S3 destina | `druid.export.storage.s3.maxRetry` | No | Defines the max number times to attempt S3 API calls to avoid failures due to transient errors. | 10 | | `druid.export.storage.s3.chunkSize` | No | Defines the size of each chunk to temporarily store in `tempDir`. The chunk size must be between 5 MiB and 5 GiB. A large chunk size reduces the API calls to S3, however it requires more disk space to store the temporary chunks. | 100MiB | + +##### GS + +Export results to GCS by passing the function `google()` as an argument to the `EXTERN` function. Note that this requires the `druid-google-extensions`. +The `google()` function is a Druid function that configures the connection. Arguments for `google()` should be passed as named parameters with the value in single quotes like the following example: + +```sql +INSERT INTO + EXTERN( + google(bucket => 'your_bucket', prefix => 'prefix/to/files') + ) +AS CSV +SELECT + +FROM +``` + +Supported arguments for the function: + +| Parameter | Required | Description | Default | +|-------------|----------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------| +| `bucket` | Yes | The GS bucket to which the files are exported to. The bucket and prefix combination should be whitelisted in `druid.export.storage.google.allowedExportPaths`. | n/a | +| `prefix` | Yes | Path where the exported files would be created. The export query expects the destination to be empty. If the location includes other files, then the query will fail. The bucket and prefix combination should be whitelisted in `druid.export.storage.google.allowedExportPaths`. | n/a | + +The following runtime parameters must be configured to export into a GCS destination: + +| Runtime Parameter | Required | Description | Default | +|--------------------------------------------------|----------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------| +| `druid.export.storage.google.tempLocalDir` | Yes | Directory used on the local storage of the worker to store temporary files required while uploading the data. | n/a | +| `druid.export.storage.google.allowedExportPaths` | Yes | An array of GS prefixes that are allowed as export destinations. Export queries fail if the export destination does not match any of the configured prefixes. Example: `[\"gs://bucket1/export/\", \"gs://bucket2/export/\"]` | n/a | +| `druid.export.storage.google.maxRetry` | No | Defines the max number times to attempt GS API calls to avoid failures due to transient errors. | 10 | +| `druid.export.storage.google.chunkSize` | No | Defines the size of each chunk to temporarily store in `tempDir`. A large chunk size reduces the API calls to GS; however, it requires more disk space to store the temporary chunks. | 4MiB | + ##### LOCAL You can export to the local storage, which exports the results to the filesystem of the MSQ worker. diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java index b99e8a36df94..c241d02fc427 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java @@ -50,7 +50,7 @@ public class GoogleCloudStorageInputSource extends CloudObjectInputSource { - static final String TYPE_KEY = GoogleStorageDruidModule.SCHEME; + public static final String TYPE_KEY = GoogleStorageDruidModule.SCHEME; private static final Logger LOG = new Logger(GoogleCloudStorageInputSource.class); private final GoogleStorage storage; diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleExportConfig.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleExportConfig.java new file mode 100644 index 000000000000..ccb38851ec47 --- /dev/null +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleExportConfig.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.storage.google.output; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.java.util.common.HumanReadableBytes; + +import javax.annotation.Nullable; +import java.util.List; + +public class GoogleExportConfig +{ + @JsonProperty("tempLocalDir") + private final String tempLocalDir; + @JsonProperty("chunkSize") + private final HumanReadableBytes chunkSize; + @JsonProperty("maxRetry") + private final Integer maxRetry; + @JsonProperty("allowedExportPaths") + private final List allowedExportPaths; + + @JsonCreator + public GoogleExportConfig( + @JsonProperty("tempLocalDir") final String tempLocalDir, + @JsonProperty("chunkSize") @Nullable final HumanReadableBytes chunkSize, + @JsonProperty("maxRetry") @Nullable final Integer maxRetry, + @JsonProperty("allowedExportPaths") final List allowedExportPaths) + { + this.tempLocalDir = tempLocalDir; + this.chunkSize = chunkSize; + this.maxRetry = maxRetry; + this.allowedExportPaths = allowedExportPaths; + } + + public String getTempLocalDir() + { + return tempLocalDir; + } + + public HumanReadableBytes getChunkSize() + { + return chunkSize; + } + + public Integer getMaxRetry() + { + return maxRetry; + } + + public List getAllowedExportPaths() + { + return allowedExportPaths; + } +} diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleExportStorageProvider.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleExportStorageProvider.java new file mode 100644 index 000000000000..480b80e118a7 --- /dev/null +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleExportStorageProvider.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.storage.google.output; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.annotations.VisibleForTesting; +import org.apache.druid.data.input.google.GoogleCloudStorageInputSource; +import org.apache.druid.data.input.impl.CloudObjectLocation; +import org.apache.druid.error.DruidException; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.storage.ExportStorageProvider; +import org.apache.druid.storage.StorageConnector; +import org.apache.druid.storage.google.GoogleInputDataConfig; +import org.apache.druid.storage.google.GoogleStorage; +import org.apache.druid.storage.google.GoogleStorageDruidModule; + +import javax.validation.constraints.NotNull; +import java.io.File; +import java.net.URI; +import java.util.List; + +@JsonTypeName(GoogleExportStorageProvider.TYPE_NAME) +public class GoogleExportStorageProvider implements ExportStorageProvider +{ + public static final String TYPE_NAME = GoogleCloudStorageInputSource.TYPE_KEY; + @JsonProperty + private final String bucket; + @JsonProperty + private final String prefix; + + @JacksonInject + GoogleExportConfig googleExportConfig; + @JacksonInject + GoogleStorage googleStorage; + @JacksonInject + GoogleInputDataConfig googleInputDataConfig; + + @JsonCreator + public GoogleExportStorageProvider( + @JsonProperty(value = "bucket", required = true) String bucket, + @JsonProperty(value = "prefix", required = true) String prefix + ) + { + this.bucket = bucket; + this.prefix = prefix; + } + + @Override + public StorageConnector get() + { + final String tempDir = googleExportConfig.getTempLocalDir(); + if (tempDir == null) { + throw DruidException.forPersona(DruidException.Persona.OPERATOR) + .ofCategory(DruidException.Category.NOT_FOUND) + .build("The runtime property `druid.export.storage.google.tempLocalDir` must be configured for GCS export."); + } + final List allowedExportPaths = googleExportConfig.getAllowedExportPaths(); + if (allowedExportPaths == null) { + throw DruidException.forPersona(DruidException.Persona.OPERATOR) + .ofCategory(DruidException.Category.NOT_FOUND) + .build( + "The runtime property `druid.export.storage.google.allowedExportPaths` must be configured for GCS export."); + } + validatePrefix(allowedExportPaths, bucket, prefix); + final GoogleOutputConfig googleOutputConfig = new GoogleOutputConfig( + bucket, + prefix, + new File(tempDir), + googleExportConfig.getChunkSize(), + googleExportConfig.getMaxRetry() + ); + return new GoogleStorageConnector(googleOutputConfig, googleStorage, googleInputDataConfig); + } + + @VisibleForTesting + static void validatePrefix(@NotNull final List allowedExportPaths, final String bucket, final String prefix) + { + final URI providedUri = new CloudObjectLocation(bucket, prefix).toUri(GoogleStorageDruidModule.SCHEME_GS); + for (final String path : allowedExportPaths) { + final URI allowedUri = URI.create(path); + if (validateUri(allowedUri, providedUri)) { + return; + } + } + throw DruidException.forPersona(DruidException.Persona.USER) + .ofCategory(DruidException.Category.INVALID_INPUT) + .build("None of the allowed prefixes matched the input path [%s]. " + + "Please reach out to the cluster admin for the whitelisted paths for export. " + + "The paths are controlled via the property `druid.export.storage.google.allowedExportPaths`.", + providedUri); + } + + private static boolean validateUri(final URI allowedUri, final URI providedUri) + { + if (!allowedUri.getHost().equals(providedUri.getHost())) { + return false; + } + final String allowedPath = StringUtils.maybeAppendTrailingSlash(allowedUri.getPath()); + final String providedPath = StringUtils.maybeAppendTrailingSlash(providedUri.getPath()); + return providedPath.startsWith(allowedPath); + } + + @JsonProperty("bucket") + public String getBucket() + { + return bucket; + } + + @JsonProperty("prefix") + public String getPrefix() + { + return prefix; + } + + @Override + @JsonIgnore + public String getResourceType() + { + return TYPE_NAME; + } + + @Override + @JsonIgnore + public String getBasePath() + { + return new CloudObjectLocation(bucket, prefix).toUri(GoogleStorageDruidModule.SCHEME_GS).toString(); + } +} diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnectorModule.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnectorModule.java index cba33b5804c0..452ccb1524af 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnectorModule.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnectorModule.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.module.SimpleModule; import com.google.inject.Binder; +import org.apache.druid.guice.JsonConfigProvider; import org.apache.druid.initialization.DruidModule; import java.util.Collections; @@ -33,12 +34,15 @@ public class GoogleStorageConnectorModule implements DruidModule public List getJacksonModules() { return Collections.singletonList( - new SimpleModule(this.getClass().getSimpleName()).registerSubtypes(GoogleStorageConnectorProvider.class)); + new SimpleModule(this.getClass().getSimpleName()) + .registerSubtypes(GoogleStorageConnectorProvider.class) + .registerSubtypes(GoogleExportStorageProvider.class) + ); } @Override public void configure(Binder binder) { - + JsonConfigProvider.bind(binder, "druid.export.storage.google", GoogleExportConfig.class); } } diff --git a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/output/GoogleExportStorageProviderTest.java b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/output/GoogleExportStorageProviderTest.java new file mode 100644 index 000000000000..e40846a848d7 --- /dev/null +++ b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/output/GoogleExportStorageProviderTest.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.storage.google.output; + +import com.google.common.collect.ImmutableList; +import org.apache.druid.error.DruidException; +import org.apache.druid.storage.StorageConnector; +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; + +public class GoogleExportStorageProviderTest +{ + private final List validPrefixes = ImmutableList.of( + "gs://bucket-name/validPath1", + "gs://bucket-name/validPath2" + ); + + @Test + public void testGoogleExportStorageProvider() + { + GoogleExportStorageProvider googleExportStorageProvider = new GoogleExportStorageProvider("bucket-name", "validPath1"); + googleExportStorageProvider.googleExportConfig = new GoogleExportConfig("tempLocalDir", null, null, validPrefixes); + StorageConnector storageConnector = googleExportStorageProvider.get(); + Assert.assertNotNull(storageConnector); + Assert.assertTrue(storageConnector instanceof GoogleStorageConnector); + + Assert.assertEquals("gs://bucket-name/validPath1", googleExportStorageProvider.getBasePath()); + } + + @Test + public void testValidatePaths() + { + GoogleExportStorageProvider.validatePrefix(validPrefixes, "bucket-name", "validPath1/"); + GoogleExportStorageProvider.validatePrefix(validPrefixes, "bucket-name", "validPath1"); + GoogleExportStorageProvider.validatePrefix(validPrefixes, "bucket-name", "validPath1/validSubPath/"); + + GoogleExportStorageProvider.validatePrefix(ImmutableList.of("gs://bucket-name"), "bucket-name", ""); + GoogleExportStorageProvider.validatePrefix(ImmutableList.of("gs://bucket-name"), "bucket-name", "validPath"); + GoogleExportStorageProvider.validatePrefix(validPrefixes, "bucket-name", "validPath1/../validPath2/"); + + Assert.assertThrows( + DruidException.class, + () -> GoogleExportStorageProvider.validatePrefix(validPrefixes, "incorrect-bucket", "validPath1/") + ); + Assert.assertThrows( + DruidException.class, + () -> GoogleExportStorageProvider.validatePrefix(validPrefixes, "bucket-name", "invalidPath1") + ); + Assert.assertThrows( + DruidException.class, + () -> GoogleExportStorageProvider.validatePrefix(validPrefixes, "bucket-name", "validPath123") + ); + } +} diff --git a/website/.spelling b/website/.spelling index 037e6b50bc15..404755ebdca3 100644 --- a/website/.spelling +++ b/website/.spelling @@ -21,6 +21,7 @@ 1M 100MiB 32-bit +4MiB 500MiB 64-bit ACL