Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add google as external storage for msq export #16051

Merged
merged 8 commits into from
Apr 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions docs/multi-stage-query/reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,39 @@ The following runtime parameters must be configured to export into an S3 destina
| `druid.export.storage.s3.maxRetry` | No | Defines the max number times to attempt S3 API calls to avoid failures due to transient errors. | 10 |
| `druid.export.storage.s3.chunkSize` | No | Defines the size of each chunk to temporarily store in `tempDir`. The chunk size must be between 5 MiB and 5 GiB. A large chunk size reduces the API calls to S3, however it requires more disk space to store the temporary chunks. | 100MiB |


##### GS

Export results to GCS by passing the function `google()` as an argument to the `EXTERN` function. Note that this requires the `druid-google-extensions`.
The `google()` function is a Druid function that configures the connection. Arguments for `google()` should be passed as named parameters with the value in single quotes like the following example:

```sql
INSERT INTO
EXTERN(
google(bucket => 'your_bucket', prefix => 'prefix/to/files')
)
AS CSV
SELECT
<column>
FROM <table>
```

Supported arguments for the function:

| Parameter | Required | Description | Default |
|-------------|----------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------|
| `bucket` | Yes | The GS bucket to which the files are exported to. The bucket and prefix combination should be whitelisted in `druid.export.storage.google.allowedExportPaths`. | n/a |
| `prefix` | Yes | Path where the exported files would be created. The export query expects the destination to be empty. If the location includes other files, then the query will fail. The bucket and prefix combination should be whitelisted in `druid.export.storage.google.allowedExportPaths`. | n/a |

The following runtime parameters must be configured to export into a GCS destination:

| Runtime Parameter | Required | Description | Default |
|--------------------------------------------------|----------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------|
| `druid.export.storage.google.tempLocalDir` | Yes | Directory used on the local storage of the worker to store temporary files required while uploading the data. | n/a |
| `druid.export.storage.google.allowedExportPaths` | Yes | An array of GS prefixes that are allowed as export destinations. Export queries fail if the export destination does not match any of the configured prefixes. Example: `[\"gs://bucket1/export/\", \"gs://bucket2/export/\"]` | n/a |
| `druid.export.storage.google.maxRetry` | No | Defines the max number times to attempt GS API calls to avoid failures due to transient errors. | 10 |
| `druid.export.storage.google.chunkSize` | No | Defines the size of each chunk to temporarily store in `tempDir`. A large chunk size reduces the API calls to GS; however, it requires more disk space to store the temporary chunks. | 4MiB |

##### LOCAL

You can export to the local storage, which exports the results to the filesystem of the MSQ worker.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@

public class GoogleCloudStorageInputSource extends CloudObjectInputSource
{
static final String TYPE_KEY = GoogleStorageDruidModule.SCHEME;
public static final String TYPE_KEY = GoogleStorageDruidModule.SCHEME;
private static final Logger LOG = new Logger(GoogleCloudStorageInputSource.class);

private final GoogleStorage storage;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.storage.google.output;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.java.util.common.HumanReadableBytes;

import javax.annotation.Nullable;
import java.util.List;

public class GoogleExportConfig
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the need to have this class than having these properties directly passed into GoogleExportStorageConnector? It would be nice to have uniformity in various cloud provider implementations.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just followed what S3ExportStorageProvider does. I think the reason for separate config is that these configs needs to be injected while creating ExportStorageProvider instance in the sql planning phase.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah right. I see it now.

{
@JsonProperty("tempLocalDir")
private final String tempLocalDir;
@JsonProperty("chunkSize")
private final HumanReadableBytes chunkSize;
@JsonProperty("maxRetry")
private final Integer maxRetry;
@JsonProperty("allowedExportPaths")
private final List<String> allowedExportPaths;

@JsonCreator
public GoogleExportConfig(
@JsonProperty("tempLocalDir") final String tempLocalDir,
@JsonProperty("chunkSize") @Nullable final HumanReadableBytes chunkSize,
@JsonProperty("maxRetry") @Nullable final Integer maxRetry,
@JsonProperty("allowedExportPaths") final List<String> allowedExportPaths)
{
this.tempLocalDir = tempLocalDir;
this.chunkSize = chunkSize;
this.maxRetry = maxRetry;
this.allowedExportPaths = allowedExportPaths;
}

public String getTempLocalDir()
{
return tempLocalDir;
}

public HumanReadableBytes getChunkSize()
{
return chunkSize;
}

public Integer getMaxRetry()
{
return maxRetry;
}

public List<String> getAllowedExportPaths()
{
return allowedExportPaths;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.storage.google.output;

import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.annotations.VisibleForTesting;
import org.apache.druid.data.input.google.GoogleCloudStorageInputSource;
import org.apache.druid.data.input.impl.CloudObjectLocation;
import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.storage.ExportStorageProvider;
import org.apache.druid.storage.StorageConnector;
import org.apache.druid.storage.google.GoogleInputDataConfig;
import org.apache.druid.storage.google.GoogleStorage;
import org.apache.druid.storage.google.GoogleStorageDruidModule;

import javax.validation.constraints.NotNull;
import java.io.File;
import java.net.URI;
import java.util.List;

@JsonTypeName(GoogleExportStorageProvider.TYPE_NAME)
public class GoogleExportStorageProvider implements ExportStorageProvider
{
public static final String TYPE_NAME = GoogleCloudStorageInputSource.TYPE_KEY;
@JsonProperty
private final String bucket;
@JsonProperty
private final String prefix;

@JacksonInject
GoogleExportConfig googleExportConfig;
@JacksonInject
GoogleStorage googleStorage;
@JacksonInject
GoogleInputDataConfig googleInputDataConfig;

@JsonCreator
public GoogleExportStorageProvider(
@JsonProperty(value = "bucket", required = true) String bucket,
@JsonProperty(value = "prefix", required = true) String prefix
)
{
this.bucket = bucket;
this.prefix = prefix;
}

@Override
public StorageConnector get()
{
final String tempDir = googleExportConfig.getTempLocalDir();
if (tempDir == null) {
throw DruidException.forPersona(DruidException.Persona.OPERATOR)
.ofCategory(DruidException.Category.NOT_FOUND)
.build("The runtime property `druid.export.storage.google.tempLocalDir` must be configured for GCS export.");
}
final List<String> allowedExportPaths = googleExportConfig.getAllowedExportPaths();
if (allowedExportPaths == null) {
throw DruidException.forPersona(DruidException.Persona.OPERATOR)
.ofCategory(DruidException.Category.NOT_FOUND)
.build(
"The runtime property `druid.export.storage.google.allowedExportPaths` must be configured for GCS export.");
}
validatePrefix(allowedExportPaths, bucket, prefix);
final GoogleOutputConfig googleOutputConfig = new GoogleOutputConfig(
bucket,
prefix,
new File(tempDir),
googleExportConfig.getChunkSize(),
googleExportConfig.getMaxRetry()
);
return new GoogleStorageConnector(googleOutputConfig, googleStorage, googleInputDataConfig);
}

@VisibleForTesting
static void validatePrefix(@NotNull final List<String> allowedExportPaths, final String bucket, final String prefix)
{
final URI providedUri = new CloudObjectLocation(bucket, prefix).toUri(GoogleStorageDruidModule.SCHEME_GS);
for (final String path : allowedExportPaths) {
final URI allowedUri = URI.create(path);
if (validateUri(allowedUri, providedUri)) {
return;
}
}
throw DruidException.forPersona(DruidException.Persona.USER)
.ofCategory(DruidException.Category.INVALID_INPUT)
.build("None of the allowed prefixes matched the input path [%s]. "
+ "Please reach out to the cluster admin for the whitelisted paths for export. "
+ "The paths are controlled via the property `druid.export.storage.google.allowedExportPaths`.",
providedUri);
}

private static boolean validateUri(final URI allowedUri, final URI providedUri)
{
if (!allowedUri.getHost().equals(providedUri.getHost())) {
return false;
}
final String allowedPath = StringUtils.maybeAppendTrailingSlash(allowedUri.getPath());
final String providedPath = StringUtils.maybeAppendTrailingSlash(providedUri.getPath());
return providedPath.startsWith(allowedPath);
}

@JsonProperty("bucket")
public String getBucket()
{
return bucket;
}

@JsonProperty("prefix")
public String getPrefix()
{
return prefix;
}

@Override
@JsonIgnore
public String getResourceType()
{
return TYPE_NAME;
}

@Override
@JsonIgnore
public String getBasePath()
{
return new CloudObjectLocation(bucket, prefix).toUri(GoogleStorageDruidModule.SCHEME_GS).toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.inject.Binder;
import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.initialization.DruidModule;

import java.util.Collections;
Expand All @@ -33,12 +34,15 @@ public class GoogleStorageConnectorModule implements DruidModule
public List<? extends Module> getJacksonModules()
{
return Collections.singletonList(
new SimpleModule(this.getClass().getSimpleName()).registerSubtypes(GoogleStorageConnectorProvider.class));
new SimpleModule(this.getClass().getSimpleName())
.registerSubtypes(GoogleStorageConnectorProvider.class)
.registerSubtypes(GoogleExportStorageProvider.class)
);
}

@Override
public void configure(Binder binder)
{

JsonConfigProvider.bind(binder, "druid.export.storage.google", GoogleExportConfig.class);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.storage.google.output;

import com.google.common.collect.ImmutableList;
import org.apache.druid.error.DruidException;
import org.apache.druid.storage.StorageConnector;
import org.junit.Assert;
import org.junit.Test;

import java.util.List;

public class GoogleExportStorageProviderTest
{
private final List<String> validPrefixes = ImmutableList.of(
"gs://bucket-name/validPath1",
"gs://bucket-name/validPath2"
);

@Test
public void testGoogleExportStorageProvider()
{
GoogleExportStorageProvider googleExportStorageProvider = new GoogleExportStorageProvider("bucket-name", "validPath1");
googleExportStorageProvider.googleExportConfig = new GoogleExportConfig("tempLocalDir", null, null, validPrefixes);
StorageConnector storageConnector = googleExportStorageProvider.get();
Assert.assertNotNull(storageConnector);
Assert.assertTrue(storageConnector instanceof GoogleStorageConnector);

Assert.assertEquals("gs://bucket-name/validPath1", googleExportStorageProvider.getBasePath());
}

@Test
public void testValidatePaths()
{
GoogleExportStorageProvider.validatePrefix(validPrefixes, "bucket-name", "validPath1/");
GoogleExportStorageProvider.validatePrefix(validPrefixes, "bucket-name", "validPath1");
GoogleExportStorageProvider.validatePrefix(validPrefixes, "bucket-name", "validPath1/validSubPath/");

GoogleExportStorageProvider.validatePrefix(ImmutableList.of("gs://bucket-name"), "bucket-name", "");
GoogleExportStorageProvider.validatePrefix(ImmutableList.of("gs://bucket-name"), "bucket-name", "validPath");
GoogleExportStorageProvider.validatePrefix(validPrefixes, "bucket-name", "validPath1/../validPath2/");

Assert.assertThrows(
DruidException.class,
() -> GoogleExportStorageProvider.validatePrefix(validPrefixes, "incorrect-bucket", "validPath1/")
);
Assert.assertThrows(
DruidException.class,
() -> GoogleExportStorageProvider.validatePrefix(validPrefixes, "bucket-name", "invalidPath1")
);
Assert.assertThrows(
DruidException.class,
() -> GoogleExportStorageProvider.validatePrefix(validPrefixes, "bucket-name", "validPath123")
);
}
}
1 change: 1 addition & 0 deletions website/.spelling
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
1M
100MiB
32-bit
4MiB
500MiB
64-bit
ACL
Expand Down
Loading