pre
@Override
public long getObjectSize(CloudObjectLocation location) throws IOException
{
- final StorageObject storageObject = storage.getMetadata(location.getBucket(), location.getPath());
+ final GoogleStorageObjectMetadata storageObject = storage.getMetadata(location.getBucket(), location.getPath());
return getSize(storageObject);
}
}
@@ -147,15 +146,15 @@ public long getObjectSize(CloudObjectLocation location) throws IOException
return new SplitWidget();
}
- private static long getSize(final StorageObject object)
+ private static long getSize(final GoogleStorageObjectMetadata object)
{
- final BigInteger sizeInBigInteger = object.getSize();
+ final Long sizeInLong = object.getSize();
- if (sizeInBigInteger == null) {
+ if (sizeInLong == null) {
return Long.MAX_VALUE;
} else {
try {
- return sizeInBigInteger.longValueExact();
+ return sizeInLong;
}
catch (ArithmeticException e) {
LOG.warn(
@@ -164,7 +163,7 @@ private static long getSize(final StorageObject object)
+ "The max long value will be used for its size instead.",
object.getBucket(),
object.getName(),
- sizeInBigInteger
+ sizeInLong
);
return Long.MAX_VALUE;
}
diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleByteSource.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleByteSource.java
index 977353f9f204..03554fc5c639 100644
--- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleByteSource.java
+++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleByteSource.java
@@ -51,12 +51,12 @@ public String getPath()
@Override
public InputStream openStream() throws IOException
{
- return storage.get(bucket, path);
+ return storage.getInputStream(bucket, path);
}
public InputStream openStream(long start) throws IOException
{
- return storage.get(bucket, path, start);
+ return storage.getInputStream(bucket, path, start);
}
@Override
diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleDataSegmentPuller.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleDataSegmentPuller.java
index fc3f7d371f44..61595264fc67 100644
--- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleDataSegmentPuller.java
+++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleDataSegmentPuller.java
@@ -83,7 +83,7 @@ FileUtils.FileCopyResult getSegmentFiles(final String bucket, final String path,
public InputStream getInputStream(URI uri) throws IOException
{
String path = StringUtils.maybeRemoveLeadingSlash(uri.getPath());
- return storage.get(uri.getHost() != null ? uri.getHost() : uri.getAuthority(), path);
+ return storage.getInputStream(uri.getHost() != null ? uri.getHost() : uri.getAuthority(), path);
}
@Override
diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorage.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorage.java
index f181d08f443c..91d290b17856 100644
--- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorage.java
+++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorage.java
@@ -20,13 +20,26 @@
package org.apache.druid.storage.google;
import com.google.api.client.http.AbstractInputStreamContent;
-import com.google.api.services.storage.Storage;
-import com.google.api.services.storage.Storage.Objects.Get;
-import com.google.api.services.storage.model.StorageObject;
+import com.google.api.gax.paging.Page;
+import com.google.cloud.ReadChannel;
+import com.google.cloud.WriteChannel;
+import com.google.cloud.storage.Blob;
+import com.google.cloud.storage.BlobId;
+import com.google.cloud.storage.BlobInfo;
+import com.google.cloud.storage.Storage;
import com.google.common.base.Supplier;
+import com.google.common.collect.Iterables;
+import org.apache.druid.java.util.common.HumanReadableBytes;
+import org.apache.druid.java.util.common.IOE;
+import javax.annotation.Nullable;
import java.io.IOException;
import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.channels.Channels;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
public class GoogleStorage
{
@@ -36,69 +49,204 @@ public class GoogleStorage
* if we have a Storage instead of a supplier of it, it can cause unnecessary config validation
* against Google storage even when it's not used at all. To perform the config validation
* only when it is actually used, we use a supplier.
- *
+ *
* See OmniDataSegmentKiller for how DataSegmentKillers are initialized.
*/
private final Supplier storage;
- public GoogleStorage(Supplier storage)
+ private final HumanReadableBytes DEFAULT_WRITE_CHUNK_SIZE = new HumanReadableBytes("4MiB");
+
+ public GoogleStorage(final Supplier storage)
{
this.storage = storage;
}
public void insert(final String bucket, final String path, AbstractInputStreamContent mediaContent) throws IOException
{
- Storage.Objects.Insert insertObject = storage.get().objects().insert(bucket, null, mediaContent);
- insertObject.setName(path);
- insertObject.getMediaHttpUploader().setDirectUploadEnabled(false);
- insertObject.execute();
+ storage.get().createFrom(getBlobInfo(bucket, path), mediaContent.getInputStream());
}
- public InputStream get(final String bucket, final String path) throws IOException
+ public InputStream getInputStream(final String bucket, final String path) throws IOException
{
- return get(bucket, path, 0);
+ return getInputStream(bucket, path, 0, null, null);
}
- public InputStream get(final String bucket, final String path, long start) throws IOException
+ public InputStream getInputStream(
+ final String bucket,
+ final String path,
+ long start
+ ) throws IOException
{
- final Get get = storage.get().objects().get(bucket, path);
- InputStream inputStream = get.executeMediaAsInputStream();
- inputStream.skip(start);
- return inputStream;
+ return getInputStream(bucket, path, start, null, null);
}
- public StorageObject getMetadata(final String bucket, final String path) throws IOException
+ public InputStream getInputStream(
+ final String bucket,
+ final String path,
+ long start,
+ Long length
+ ) throws IOException
{
- return storage.get().objects().get(bucket, path).execute();
+ return getInputStream(bucket, path, start, length, null);
}
- public void delete(final String bucket, final String path) throws IOException
+ public InputStream getInputStream(
+ final String bucket,
+ final String path,
+ long start,
+ @Nullable Long length,
+ @Nullable final Integer chunkSize
+ )
+ throws IOException
{
- storage.get().objects().delete(bucket, path).execute();
+ ReadChannel reader = storage.get().reader(bucket, path);
+ reader.seek(start);
+ if (length != null) {
+ reader.limit(start + length);
+ }
+ if (chunkSize != null) {
+ reader.setChunkSize(chunkSize);
+ }
+ // Using default read buffer size (2 MB)
+ return Channels.newInputStream(reader);
}
- public boolean exists(final String bucket, final String path)
+ public OutputStream getObjectOutputStream(
+ final String bucket,
+ final String path,
+ @Nullable final Integer chunkSize
+ )
+ {
+ WriteChannel writer = storage.get().writer(getBlobInfo(bucket, path));
+ // Limit GCS internal write buffer memory to prevent OOM errors
+ writer.setChunkSize(chunkSize == null ? DEFAULT_WRITE_CHUNK_SIZE.getBytesInInt() : chunkSize);
+
+ return Channels.newOutputStream(writer);
+ }
+
+ public GoogleStorageObjectMetadata getMetadata(
+ final String bucket,
+ final String path
+ ) throws IOException
+ {
+ Blob blob = storage.get().get(bucket, path, Storage.BlobGetOption.fields(Storage.BlobField.values()));
+ if (blob == null) {
+ throw new IOE("Failed fetching google cloud storage object [bucket: %s, path: %s]", bucket, path);
+ }
+ return new GoogleStorageObjectMetadata(
+ blob.getBucket(),
+ blob.getName(),
+ blob.getSize(),
+ blob.getUpdateTimeOffsetDateTime()
+ .toEpochSecond()
+ );
+ }
+
+ public void delete(final String bucket, final String path) throws IOException
{
- try {
- return storage.get().objects().get(bucket, path).executeUsingHead().isSuccessStatusCode();
+ if (!storage.get().delete(bucket, path)) {
+ throw new IOE(
+ "Failed deleting google cloud storage object [bucket: %s path: %s]",
+ bucket,
+ path
+ );
}
- catch (Exception e) {
- return false;
+ }
+
+ /**
+ * Deletes a list of objects in a bucket
+ *
+ * @param bucket GCS bucket
+ * @param paths Iterable for absolute paths of objects to be deleted inside the bucket
+ */
+ public void batchDelete(final String bucket, final Iterable paths) throws IOException
+ {
+ List statuses = storage.get().delete(Iterables.transform(paths, input -> BlobId.of(bucket, input)));
+ if (statuses.contains(false)) {
+ throw new IOE("Failed deleting google cloud storage object(s)");
}
}
-
+
+ public boolean exists(final String bucket, final String path)
+ {
+ Blob blob = storage.get().get(bucket, path);
+ return blob != null;
+ }
+
public long size(final String bucket, final String path) throws IOException
{
- return storage.get().objects().get(bucket, path).execute().getSize().longValue();
+ Blob blob = storage.get().get(bucket, path, Storage.BlobGetOption.fields(Storage.BlobField.SIZE));
+ if (blob == null) {
+ throw new IOE("Failed fetching google cloud storage object [bucket: %s, path: %s]", bucket, path);
+ }
+ return blob.getSize();
}
public String version(final String bucket, final String path) throws IOException
{
- return storage.get().objects().get(bucket, path).execute().getEtag();
+ Blob blob = storage.get().get(bucket, path, Storage.BlobGetOption.fields(Storage.BlobField.GENERATION));
+ if (blob == null) {
+ throw new IOE("Failed fetching google cloud storage object [bucket: %s, path: %s]", bucket, path);
+ }
+ return blob.getGeneratedId();
}
- public Storage.Objects.List list(final String bucket) throws IOException
+ /***
+ * Provides a paged listing of objects for a given bucket and prefix
+ * @param bucket GCS bucket
+ * @param prefix Path prefix
+ * @param pageSize Number of objects per page
+ * @param pageToken Continuation token for the next page; use null for the first page
+ * or the nextPageToken from the previous {@link GoogleStorageObjectPage}
+ */
+ public GoogleStorageObjectPage list(
+ final String bucket,
+ @Nullable final String prefix,
+ @Nullable final Long pageSize,
+ @Nullable final String pageToken
+ ) throws IOException
{
- return storage.get().objects().list(bucket);
+ List options = new ArrayList<>();
+
+ if (prefix != null) {
+ options.add(Storage.BlobListOption.prefix(prefix));
+ }
+
+ if (pageSize != null) {
+ options.add(Storage.BlobListOption.pageSize(pageSize));
+ }
+
+ if (pageToken != null) {
+ options.add(Storage.BlobListOption.pageToken(pageToken));
+ }
+
+ Page blobPage = storage.get().list(bucket, options.toArray(new Storage.BlobListOption[0]));
+
+ if (blobPage == null) {
+ throw new IOE("Failed fetching google cloud storage object [bucket: %s, prefix: %s]", bucket, prefix);
+ }
+
+
+ List googleStorageObjectMetadataList =
+ blobPage.streamValues()
+ .map(blob -> new GoogleStorageObjectMetadata(
+ blob.getBucket(),
+ blob.getName(),
+ blob.getSize(),
+ blob.getUpdateTimeOffsetDateTime()
+ .toEpochSecond()
+ ))
+ .collect(Collectors.toList());
+
+ return new GoogleStorageObjectPage(googleStorageObjectMetadataList, blobPage.getNextPageToken());
+
+ }
+
+
+ private BlobInfo getBlobInfo(final String bucket, final String path)
+ {
+ BlobId blobId = BlobId.of(bucket, path);
+ return BlobInfo.newBuilder(blobId).build();
+
}
}
diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorageDruidModule.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorageDruidModule.java
index cb90cb51fb81..0467906a6ca4 100644
--- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorageDruidModule.java
+++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorageDruidModule.java
@@ -23,10 +23,8 @@
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
-import com.google.api.client.http.HttpRequestInitializer;
-import com.google.api.client.http.HttpTransport;
-import com.google.api.client.json.JsonFactory;
-import com.google.api.services.storage.Storage;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.StorageOptions;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Provider;
@@ -86,6 +84,7 @@ public void configure(Binder binder)
{
LOG.info("Configuring GoogleStorageDruidModule...");
+ JsonConfigProvider.bind(binder, "druid.google", GoogleInputDataConfig.class);
JsonConfigProvider.bind(binder, "druid.google", GoogleAccountConfig.class);
Binders.dataSegmentPusherBinder(binder).addBinding(SCHEME).to(GoogleDataSegmentPusher.class)
@@ -104,16 +103,9 @@ public void configure(Binder binder)
@Provides
@LazySingleton
- public Storage getGcpStorage(
- HttpTransport httpTransport,
- JsonFactory jsonFactory,
- HttpRequestInitializer requestInitializer
- )
+ public Storage getGcpStorage()
{
- return new Storage
- .Builder(httpTransport, jsonFactory, requestInitializer)
- .setApplicationName(APPLICATION_NAME)
- .build();
+ return StorageOptions.getDefaultInstance().getService();
}
/**
diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorageObjectMetadata.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorageObjectMetadata.java
new file mode 100644
index 000000000000..87feb774a5d8
--- /dev/null
+++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorageObjectMetadata.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.google;
+
+import java.util.Objects;
+
+public class GoogleStorageObjectMetadata
+{
+ final String bucket;
+ final String name;
+ final Long size;
+ Long lastUpdateTime;
+
+ public GoogleStorageObjectMetadata(final String bucket, final String name, final Long size, final Long lastUpdateTime)
+ {
+ this.bucket = bucket;
+ this.name = name;
+ this.size = size;
+ this.lastUpdateTime = lastUpdateTime;
+ }
+
+ public void setLastUpdateTime(Long lastUpdateTime)
+ {
+ this.lastUpdateTime = lastUpdateTime;
+ }
+
+
+ public String getBucket()
+ {
+ return bucket;
+ }
+
+ public String getName()
+ {
+ return name;
+ }
+
+ public Long getSize()
+ {
+ return size;
+ }
+
+ public Long getLastUpdateTime()
+ {
+ return lastUpdateTime;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ GoogleStorageObjectMetadata that = (GoogleStorageObjectMetadata) o;
+ return Objects.equals(bucket, that.bucket)
+ && Objects.equals(name, that.name)
+ && Objects.equals(size, that.size);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(bucket, name, size);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "GoogleStorageObjectMetadata{" +
+ "bucket='" + bucket + '\'' +
+ ", name='" + name + '\'' +
+ ", size=" + size +
+ ", lastUpdateTime=" + lastUpdateTime +
+ '}';
+ }
+}
diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorageObjectPage.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorageObjectPage.java
new file mode 100644
index 000000000000..e58059125a15
--- /dev/null
+++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorageObjectPage.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.google;
+
+import javax.annotation.Nullable;
+import java.util.List;
+
+public class GoogleStorageObjectPage
+{
+ final List objectList;
+
+ @Nullable
+ final String nextPageToken;
+
+ public GoogleStorageObjectPage(
+ List objectList,
+ String nextPageToken
+ )
+ {
+ this.objectList = objectList;
+ this.nextPageToken = nextPageToken;
+ }
+
+ public List getObjectList()
+ {
+ return objectList;
+ }
+
+ @Nullable
+ public String getNextPageToken()
+ {
+ return nextPageToken;
+ }
+}
diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleTaskLogs.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleTaskLogs.java
index ae4024172a6f..4f7444f8ea9e 100644
--- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleTaskLogs.java
+++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleTaskLogs.java
@@ -204,7 +204,7 @@ public void killOlderThan(long timestamp) throws IOException
inputDataConfig,
config.getBucket(),
config.getPrefix(),
- (object) -> object.getUpdated().getValue() < timestamp
+ (object) -> object.getLastUpdateTime() < timestamp
);
}
catch (Exception e) {
diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleTimestampVersionedDataFinder.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleTimestampVersionedDataFinder.java
index d1ed8a7ef6a2..b93128cc2fa4 100644
--- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleTimestampVersionedDataFinder.java
+++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleTimestampVersionedDataFinder.java
@@ -19,8 +19,6 @@
package org.apache.druid.storage.google;
-import com.google.api.services.storage.model.Objects;
-import com.google.api.services.storage.model.StorageObject;
import com.google.inject.Inject;
import org.apache.druid.data.SearchableVersionedDataFinder;
import org.apache.druid.data.input.impl.CloudObjectLocation;
@@ -49,21 +47,27 @@ public URI getLatestVersion(URI descriptorBase, @Nullable Pattern pattern)
long mostRecent = Long.MIN_VALUE;
URI latest = null;
final CloudObjectLocation baseLocation = new CloudObjectLocation(descriptorBase);
- final Objects objects = storage.list(baseLocation.getBucket()).setPrefix(baseLocation.getPath()).setMaxResults(MAX_LISTING_KEYS).execute();
- for (StorageObject storageObject : objects.getItems()) {
- if (GoogleUtils.isDirectoryPlaceholder(storageObject)) {
+ final GoogleStorageObjectPage googleStorageObjectPage = storage.list(
+ baseLocation.getBucket(),
+ baseLocation.getPath(),
+ MAX_LISTING_KEYS,
+ null
+ );
+ for (GoogleStorageObjectMetadata objectMetadata : googleStorageObjectPage.getObjectList()) {
+ if (GoogleUtils.isDirectoryPlaceholder(objectMetadata)) {
continue;
}
// remove path prefix from file name
- final CloudObjectLocation objectLocation = new CloudObjectLocation(storageObject.getBucket(),
- storageObject.getName()
+ final CloudObjectLocation objectLocation = new CloudObjectLocation(
+ objectMetadata.getBucket(),
+ objectMetadata.getName()
);
final String keyString = StringUtils
- .maybeRemoveLeadingSlash(storageObject.getName().substring(baseLocation.getPath().length()));
+ .maybeRemoveLeadingSlash(objectMetadata.getName().substring(baseLocation.getPath().length()));
if (pattern != null && !pattern.matcher(keyString).matches()) {
continue;
}
- final long latestModified = storageObject.getUpdated().getValue();
+ final long latestModified = objectMetadata.getLastUpdateTime();
if (latestModified >= mostRecent) {
mostRecent = latestModified;
latest = objectLocation.toUri(GoogleStorageDruidModule.SCHEME_GS);
@@ -72,7 +76,7 @@ public URI getLatestVersion(URI descriptorBase, @Nullable Pattern pattern)
return latest;
}
catch (IOException e) {
- throw new RuntimeException(e);
+ throw new RuntimeException();
}
}
}
diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleUtils.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleUtils.java
index 25b4f3286ea7..a819442ef351 100644
--- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleUtils.java
+++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleUtils.java
@@ -20,7 +20,6 @@
package org.apache.druid.storage.google;
import com.google.api.client.http.HttpResponseException;
-import com.google.api.services.storage.model.StorageObject;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
import org.apache.druid.data.input.impl.CloudObjectLocation;
@@ -45,22 +44,22 @@ public static boolean isRetryable(Throwable t)
return t instanceof IOException;
}
- static T retryGoogleCloudStorageOperation(RetryUtils.Task f) throws Exception
+ public static T retryGoogleCloudStorageOperation(RetryUtils.Task f) throws Exception
{
return RetryUtils.retry(f, GOOGLE_RETRY, RetryUtils.DEFAULT_MAX_TRIES);
}
- public static URI objectToUri(StorageObject object)
+ public static URI objectToUri(GoogleStorageObjectMetadata object)
{
return objectToCloudObjectLocation(object).toUri(GoogleStorageDruidModule.SCHEME_GS);
}
- public static CloudObjectLocation objectToCloudObjectLocation(StorageObject object)
+ public static CloudObjectLocation objectToCloudObjectLocation(GoogleStorageObjectMetadata object)
{
return new CloudObjectLocation(object.getBucket(), object.getName());
}
- public static Iterator lazyFetchingStorageObjectsIterator(
+ public static Iterator lazyFetchingStorageObjectsIterator(
final GoogleStorage storage,
final Iterator uris,
final long maxListingLength
@@ -85,18 +84,18 @@ public static void deleteObjectsInPath(
GoogleInputDataConfig config,
String bucket,
String prefix,
- Predicate filter
+ Predicate filter
)
throws Exception
{
- final Iterator iterator = lazyFetchingStorageObjectsIterator(
+ final Iterator iterator = lazyFetchingStorageObjectsIterator(
storage,
ImmutableList.of(new CloudObjectLocation(bucket, prefix).toUri(GoogleStorageDruidModule.SCHEME_GS)).iterator(),
config.getMaxListingLength()
);
while (iterator.hasNext()) {
- final StorageObject nextObject = iterator.next();
+ final GoogleStorageObjectMetadata nextObject = iterator.next();
if (filter.apply(nextObject)) {
retryGoogleCloudStorageOperation(() -> {
storage.delete(nextObject.getBucket(), nextObject.getName());
@@ -110,13 +109,13 @@ public static void deleteObjectsInPath(
* Similar to {@link org.apache.druid.storage.s3.ObjectSummaryIterator#isDirectoryPlaceholder}
* Copied to avoid creating dependency on s3 extensions
*/
- public static boolean isDirectoryPlaceholder(final StorageObject storageObject)
+ public static boolean isDirectoryPlaceholder(final GoogleStorageObjectMetadata objectMetadata)
{
// Recognize "standard" directory place-holder indications
- if (storageObject.getName().endsWith("/") && storageObject.getSize().intValue() == 0) {
+ if (objectMetadata.getName().endsWith("/") && objectMetadata.getSize().intValue() == 0) {
return true;
}
// Recognize place-holder objects created by the Google Storage console or S3 Organizer Firefox extension.
- return storageObject.getName().endsWith("_$folder$") && storageObject.getSize().intValue() == 0;
+ return objectMetadata.getName().endsWith("_$folder$") && objectMetadata.getSize().intValue() == 0;
}
}
diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/ObjectStorageIterator.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/ObjectStorageIterator.java
index 10275112f6f4..b1ad9871776d 100644
--- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/ObjectStorageIterator.java
+++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/ObjectStorageIterator.java
@@ -19,9 +19,6 @@
package org.apache.druid.storage.google;
-import com.google.api.services.storage.Storage;
-import com.google.api.services.storage.model.Objects;
-import com.google.api.services.storage.model.StorageObject;
import org.apache.druid.java.util.common.StringUtils;
import java.io.IOException;
@@ -29,61 +26,48 @@
import java.util.Iterator;
import java.util.NoSuchElementException;
-public class ObjectStorageIterator implements Iterator
+public class ObjectStorageIterator implements Iterator
{
private final GoogleStorage storage;
private final Iterator uris;
private final long maxListingLength;
-
- private Storage.Objects.List listRequest;
- private Objects results;
+ private GoogleStorageObjectPage googleStorageObjectPage;
private URI currentUri;
private String nextPageToken;
- private Iterator storageObjectsIterator;
- private StorageObject currentObject;
+ private Iterator blobIterator;
+ private GoogleStorageObjectMetadata currentObject;
public ObjectStorageIterator(GoogleStorage storage, Iterator uris, long maxListingLength)
{
this.storage = storage;
this.uris = uris;
this.maxListingLength = maxListingLength;
- this.nextPageToken = null;
- prepareNextRequest();
- fetchNextBatch();
+ advanceURI();
+ fetchNextPage();
advanceStorageObject();
}
- private void prepareNextRequest()
+
+ private void advanceURI()
+ {
+ currentUri = uris.next();
+ }
+
+ private void fetchNextPage()
{
try {
- currentUri = uris.next();
String currentBucket = currentUri.getAuthority();
String currentPrefix = StringUtils.maybeRemoveLeadingSlash(currentUri.getPath());
- nextPageToken = null;
- listRequest = storage.list(currentBucket)
- .setPrefix(currentPrefix)
- .setMaxResults(maxListingLength);
-
+ googleStorageObjectPage = storage.list(currentBucket, currentPrefix, maxListingLength, nextPageToken);
+ blobIterator = googleStorageObjectPage.getObjectList().iterator();
+ nextPageToken = googleStorageObjectPage.getNextPageToken();
}
catch (IOException io) {
throw new RuntimeException(io);
}
}
- private void fetchNextBatch()
- {
- try {
- listRequest.setPageToken(nextPageToken);
- results = GoogleUtils.retryGoogleCloudStorageOperation(() -> listRequest.execute());
- storageObjectsIterator = results.getItems().iterator();
- nextPageToken = results.getNextPageToken();
- }
- catch (Exception ex) {
- throw new RuntimeException(ex);
- }
- }
-
@Override
public boolean hasNext()
{
@@ -91,35 +75,35 @@ public boolean hasNext()
}
@Override
- public StorageObject next()
+ public GoogleStorageObjectMetadata next()
{
if (!hasNext()) {
throw new NoSuchElementException();
}
- final StorageObject retVal = currentObject;
+ final GoogleStorageObjectMetadata retVal = currentObject;
advanceStorageObject();
return retVal;
}
private void advanceStorageObject()
{
- while (storageObjectsIterator.hasNext() || nextPageToken != null || uris.hasNext()) {
- while (storageObjectsIterator.hasNext()) {
- final StorageObject next = storageObjectsIterator.next();
+ while (blobIterator.hasNext() || nextPageToken != null || uris.hasNext()) {
+ while (blobIterator.hasNext()) {
+ final GoogleStorageObjectMetadata next = blobIterator.next();
// list with prefix can return directories, but they should always end with `/`, ignore them.
// also skips empty objects.
- if (!next.getName().endsWith("/") && next.getSize().signum() > 0) {
+ if (!next.getName().endsWith("/") && Long.signum(next.getSize()) > 0) {
currentObject = next;
return;
}
}
if (nextPageToken != null) {
- fetchNextBatch();
+ fetchNextPage();
} else if (uris.hasNext()) {
- prepareNextRequest();
- fetchNextBatch();
+ advanceURI();
+ fetchNextPage();
}
}
diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleInputRange.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleInputRange.java
new file mode 100644
index 000000000000..a3d1c863a755
--- /dev/null
+++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleInputRange.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.google.output;
+
+import java.util.Objects;
+
+public class GoogleInputRange
+{
+ private final long start;
+ private final long size;
+ private final String bucket;
+ private final String path;
+
+ public GoogleInputRange(long start, long size, String bucket, String path)
+ {
+ this.start = start;
+ this.size = size;
+ this.bucket = bucket;
+ this.path = path;
+ }
+
+ public long getStart()
+ {
+ return start;
+ }
+
+ public long getSize()
+ {
+ return size;
+ }
+
+ public String getBucket()
+ {
+ return bucket;
+ }
+
+ public String getPath()
+ {
+ return path;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ GoogleInputRange that = (GoogleInputRange) o;
+ return start == that.start
+ && size == that.size
+ && Objects.equals(bucket, that.bucket)
+ && Objects.equals(path, that.path);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(start, size, bucket, path);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "GoogleInputRange{" +
+ "start=" + start +
+ ", size=" + size +
+ ", bucket='" + bucket + '\'' +
+ ", path='" + path + '\'' +
+ '}';
+ }
+}
diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleOutputConfig.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleOutputConfig.java
new file mode 100644
index 000000000000..c9c78151ae99
--- /dev/null
+++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleOutputConfig.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.google.output;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.error.InvalidInput;
+import org.apache.druid.java.util.common.HumanReadableBytes;
+import org.apache.druid.java.util.common.RetryUtils;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.util.Objects;
+
+public class GoogleOutputConfig
+{
+
+ @JsonProperty
+ private final String bucket;
+
+ @JsonProperty
+ private final String prefix;
+
+ @JsonProperty
+ private final File tempDir;
+
+ @JsonProperty
+ private HumanReadableBytes chunkSize;
+
+ private static final HumanReadableBytes DEFAULT_CHUNK_SIZE = new HumanReadableBytes("4MiB");
+
+ // GCS imposed minimum chunk size
+ private static final long GOOGLE_MIN_CHUNK_SIZE_BYTES = new HumanReadableBytes("256KiB").getBytes();
+
+ // Self-imposed max chunk size since this size is allocated per open file consuming significant memory.
+ private static final long GOOGLE_MAX_CHUNK_SIZE_BYTES = new HumanReadableBytes("16MiB").getBytes();
+
+
+ @JsonProperty
+ private int maxRetry;
+
+ public GoogleOutputConfig(
+ final String bucket,
+ final String prefix,
+ final File tempDir,
+ @Nullable final HumanReadableBytes chunkSize,
+ @Nullable final Integer maxRetry
+ )
+ {
+ this.bucket = bucket;
+ this.prefix = prefix;
+ this.tempDir = tempDir;
+ this.chunkSize = chunkSize != null ? chunkSize : DEFAULT_CHUNK_SIZE;
+ this.maxRetry = maxRetry != null ? maxRetry : RetryUtils.DEFAULT_MAX_TRIES;
+
+ validateFields();
+ }
+
+ public String getBucket()
+ {
+ return bucket;
+ }
+
+ public String getPrefix()
+ {
+ return prefix;
+ }
+
+ public File getTempDir()
+ {
+ return tempDir;
+ }
+
+ public HumanReadableBytes getChunkSize()
+ {
+ return chunkSize;
+ }
+
+ public Integer getMaxRetry()
+ {
+ return maxRetry;
+ }
+
+ private void validateFields()
+ {
+ if (chunkSize.getBytes() < GOOGLE_MIN_CHUNK_SIZE_BYTES || chunkSize.getBytes() > GOOGLE_MAX_CHUNK_SIZE_BYTES) {
+ throw InvalidInput.exception(
+ "'chunkSize' [%d] bytes to the GoogleConfig should be between [%d] bytes and [%d] bytes",
+ chunkSize.getBytes(),
+ GOOGLE_MIN_CHUNK_SIZE_BYTES,
+ GOOGLE_MAX_CHUNK_SIZE_BYTES
+ );
+ }
+ }
+
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ GoogleOutputConfig that = (GoogleOutputConfig) o;
+ return Objects.equals(bucket, that.bucket)
+ && Objects.equals(prefix, that.prefix)
+ && Objects.equals(tempDir, that.tempDir)
+ && Objects.equals(chunkSize, that.chunkSize)
+ && Objects.equals(maxRetry, that.maxRetry);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(bucket, prefix, tempDir, chunkSize, maxRetry);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "GoogleOutputConfig{" +
+ "container='" + bucket + '\'' +
+ ", prefix='" + prefix + '\'' +
+ ", tempDir=" + tempDir +
+ ", chunkSize=" + chunkSize +
+ ", maxRetry=" + maxRetry +
+ '}';
+ }
+}
diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnector.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnector.java
new file mode 100644
index 000000000000..6edbad3beaf1
--- /dev/null
+++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnector.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.google.output;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
+import org.apache.druid.data.input.impl.CloudObjectLocation;
+import org.apache.druid.data.input.impl.prefetch.ObjectOpenFunction;
+import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.RE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.storage.google.GoogleInputDataConfig;
+import org.apache.druid.storage.google.GoogleStorage;
+import org.apache.druid.storage.google.GoogleStorageDruidModule;
+import org.apache.druid.storage.google.GoogleStorageObjectMetadata;
+import org.apache.druid.storage.google.GoogleUtils;
+import org.apache.druid.storage.remote.ChunkingStorageConnector;
+import org.apache.druid.storage.remote.ChunkingStorageConnectorParameters;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Iterator;
+
+public class GoogleStorageConnector extends ChunkingStorageConnector
+{
+
+ private static final String DELIM = "/";
+ private static final Joiner JOINER = Joiner.on(DELIM).skipNulls();
+ private static final Logger log = new Logger(GoogleStorageConnector.class);
+
+ private final GoogleStorage storage;
+ private final GoogleOutputConfig config;
+ private final GoogleInputDataConfig inputDataConfig;
+
+ public GoogleStorageConnector(
+ GoogleOutputConfig config,
+ GoogleStorage googleStorage,
+ GoogleInputDataConfig inputDataConfig
+ )
+ {
+ this.storage = googleStorage;
+ this.config = config;
+ this.inputDataConfig = inputDataConfig;
+
+ Preconditions.checkNotNull(config, "config is null");
+ Preconditions.checkNotNull(config.getTempDir(), "tempDir is null in google config");
+
+ try {
+ FileUtils.mkdirp(config.getTempDir());
+ }
+ catch (IOException e) {
+ throw new RE(
+ e,
+ StringUtils.format("Cannot create tempDir : [%s] for google storage connector", config.getTempDir())
+ );
+ }
+ }
+
+ @Override
+ public boolean pathExists(String path)
+ {
+ return storage.exists(config.getBucket(), objectPath(path));
+ }
+
+ @Override
+ public OutputStream write(String path)
+ {
+ return storage.getObjectOutputStream(config.getBucket(), objectPath(path), config.getChunkSize().getBytesInInt());
+ }
+
+ @Override
+ public void deleteFile(String path) throws IOException
+ {
+ try {
+ final String fullPath = objectPath(path);
+ log.debug("Deleting file at bucket: [%s], path: [%s]", config.getBucket(), fullPath);
+
+ GoogleUtils.retryGoogleCloudStorageOperation(
+ () -> {
+ storage.delete(config.getBucket(), fullPath);
+ return null;
+ }
+ );
+ }
+ catch (Exception e) {
+ log.error("Error occurred while deleting file at path [%s]. Error: [%s]", path, e.getMessage());
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public void deleteFiles(Iterable paths) throws IOException
+ {
+ storage.batchDelete(config.getBucket(), Iterables.transform(paths, this::objectPath));
+ }
+
+ @Override
+ public void deleteRecursively(String path) throws IOException
+ {
+ final String fullPath = objectPath(path);
+ Iterator storageObjects = GoogleUtils.lazyFetchingStorageObjectsIterator(
+ storage,
+ ImmutableList.of(new CloudObjectLocation(config.getBucket(), fullPath)
+ .toUri(GoogleStorageDruidModule.SCHEME_GS)).iterator(),
+ inputDataConfig.getMaxListingLength()
+ );
+
+ storage.batchDelete(
+ config.getBucket(),
+ () -> Iterators.transform(storageObjects, GoogleStorageObjectMetadata::getName)
+ );
+ }
+
+ @Override
+ public Iterator listDir(String dirName)
+ {
+ final String fullPath = objectPath(dirName);
+ Iterator storageObjects = GoogleUtils.lazyFetchingStorageObjectsIterator(
+ storage,
+ ImmutableList.of(new CloudObjectLocation(config.getBucket(), fullPath)
+ .toUri(GoogleStorageDruidModule.SCHEME_GS)).iterator(),
+ inputDataConfig.getMaxListingLength()
+ );
+
+ return Iterators.transform(
+ storageObjects,
+ storageObject -> {
+ String[] split = storageObject.getName().split(fullPath, 2);
+ if (split.length > 1) {
+ return split[1];
+ } else {
+ return "";
+ }
+ }
+ );
+ }
+
+ @Override
+ public ChunkingStorageConnectorParameters buildInputParams(String path) throws IOException
+ {
+ long size = storage.size(config.getBucket(), objectPath(path));
+ return buildInputParams(path, 0, size);
+ }
+
+ @Override
+ public ChunkingStorageConnectorParameters buildInputParams(String path, long from, long size)
+ {
+ ChunkingStorageConnectorParameters.Builder builder = new ChunkingStorageConnectorParameters.Builder<>();
+ builder.start(from);
+ builder.end(from + size);
+ builder.cloudStoragePath(objectPath(path));
+ builder.tempDirSupplier(config::getTempDir);
+ builder.maxRetry(config.getMaxRetry());
+ builder.retryCondition(GoogleUtils.GOOGLE_RETRY);
+ builder.objectSupplier(((start, end) -> new GoogleInputRange(
+ start,
+ end - start,
+ config.getBucket(),
+ objectPath(path)
+ )));
+ builder.objectOpenFunction(new ObjectOpenFunction()
+ {
+ @Override
+ public InputStream open(GoogleInputRange googleInputRange) throws IOException
+ {
+ return storage.getInputStream(
+ googleInputRange.getBucket(),
+ googleInputRange.getPath(),
+ googleInputRange.getStart(),
+ googleInputRange.getSize()
+ );
+ }
+
+ @Override
+ public InputStream open(GoogleInputRange googleInputRange, long offset) throws IOException
+ {
+ long rangeStart = googleInputRange.getStart() + offset;
+ return storage.getInputStream(
+ googleInputRange.getBucket(),
+ googleInputRange.getPath(),
+ rangeStart,
+ googleInputRange.getSize()
+ );
+ }
+ });
+
+ return builder.build();
+ }
+
+ private String objectPath(String path)
+ {
+ return JOINER.join(config.getPrefix(), path);
+ }
+}
diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnectorModule.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnectorModule.java
new file mode 100644
index 000000000000..cba33b5804c0
--- /dev/null
+++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnectorModule.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.google.output;
+
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.google.inject.Binder;
+import org.apache.druid.initialization.DruidModule;
+
+import java.util.Collections;
+import java.util.List;
+
+public class GoogleStorageConnectorModule implements DruidModule
+{
+ @Override
+ public List extends Module> getJacksonModules()
+ {
+ return Collections.singletonList(
+ new SimpleModule(this.getClass().getSimpleName()).registerSubtypes(GoogleStorageConnectorProvider.class));
+ }
+
+ @Override
+ public void configure(Binder binder)
+ {
+
+ }
+}
diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnectorProvider.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnectorProvider.java
new file mode 100644
index 000000000000..f33a3b1f44db
--- /dev/null
+++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnectorProvider.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.google.output;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.druid.java.util.common.HumanReadableBytes;
+import org.apache.druid.storage.StorageConnector;
+import org.apache.druid.storage.StorageConnectorProvider;
+import org.apache.druid.storage.google.GoogleInputDataConfig;
+import org.apache.druid.storage.google.GoogleStorage;
+import org.apache.druid.storage.google.GoogleStorageDruidModule;
+
+import javax.annotation.Nullable;
+import java.io.File;
+
+@JsonTypeName(GoogleStorageDruidModule.SCHEME)
+public class GoogleStorageConnectorProvider extends GoogleOutputConfig implements StorageConnectorProvider
+{
+
+ @JacksonInject
+ GoogleStorage googleStorage;
+
+ @JacksonInject
+ GoogleInputDataConfig googleInputDataConfig;
+
+ @JsonCreator
+ public GoogleStorageConnectorProvider(
+ @JsonProperty(value = "bucket", required = true) String bucket,
+ @JsonProperty(value = "prefix", required = true) String prefix,
+ @JsonProperty(value = "tempDir", required = true) File tempDir,
+ @JsonProperty(value = "chunkSize") @Nullable HumanReadableBytes chunkSize,
+ @JsonProperty(value = "maxRetry") @Nullable Integer maxRetry
+ )
+ {
+ super(bucket, prefix, tempDir, chunkSize, maxRetry);
+ }
+
+ @Override
+ public StorageConnector get()
+ {
+ return new GoogleStorageConnector(this, googleStorage, googleInputDataConfig);
+ }
+
+}
diff --git a/extensions-core/google-extensions/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule b/extensions-core/google-extensions/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule
index 3d89b7320313..92cb05897cc3 100644
--- a/extensions-core/google-extensions/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule
+++ b/extensions-core/google-extensions/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule
@@ -13,4 +13,5 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-org.apache.druid.storage.google.GoogleStorageDruidModule
+org.apache.druid.storage.google.output.GoogleStorageConnectorModule
+org.apache.druid.storage.google.GoogleStorageDruidModule
\ No newline at end of file
diff --git a/extensions-core/google-extensions/src/test/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSourceTest.java b/extensions-core/google-extensions/src/test/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSourceTest.java
index 556eb840ea9f..ae968aa3d6fa 100644
--- a/extensions-core/google-extensions/src/test/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSourceTest.java
+++ b/extensions-core/google-extensions/src/test/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSourceTest.java
@@ -23,9 +23,6 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.fasterxml.jackson.module.guice.ObjectMapperModule;
-import com.google.api.services.storage.Storage;
-import com.google.api.services.storage.model.Objects;
-import com.google.api.services.storage.model.StorageObject;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Guice;
@@ -56,6 +53,8 @@
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.druid.storage.google.GoogleInputDataConfig;
import org.apache.druid.storage.google.GoogleStorage;
+import org.apache.druid.storage.google.GoogleStorageObjectMetadata;
+import org.apache.druid.storage.google.GoogleStorageObjectPage;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.apache.druid.utils.CompressionUtils;
import org.easymock.EasyMock;
@@ -68,7 +67,6 @@
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
-import java.math.BigInteger;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
@@ -114,6 +112,10 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe
private static final byte[] CONTENT =
StringUtils.toUtf8(StringUtils.format("%d,hello,world", NOW.getMillis()));
+ private static final String BUCKET = "TEST_BUCKET";
+ private static final String OBJECT_NAME = "TEST_NAME";
+ private static final Long UPDATE_TIME = 111L;
+
@Rule
public ExpectedException expectedException = ExpectedException.none();
@@ -207,21 +209,31 @@ public void testGetTypes()
}
@Test
- public void testWithUrisSplit() throws Exception
+ public void testWithUrisSplit() throws IOException
{
EasyMock.reset(STORAGE);
+
+ GoogleStorageObjectMetadata objectMetadata = new GoogleStorageObjectMetadata(
+ BUCKET,
+ OBJECT_NAME,
+ (long) CONTENT.length,
+ UPDATE_TIME
+ );
+
EasyMock.expect(
STORAGE.getMetadata(
EXPECTED_URIS.get(0).getAuthority(),
StringUtils.maybeRemoveLeadingSlash(EXPECTED_URIS.get(0).getPath())
)
- ).andReturn(new StorageObject().setSize(BigInteger.valueOf(CONTENT.length)));
+ ).andReturn(objectMetadata);
+
EasyMock.expect(
STORAGE.getMetadata(
EXPECTED_URIS.get(1).getAuthority(),
StringUtils.maybeRemoveLeadingSlash(EXPECTED_URIS.get(1).getPath())
)
- ).andReturn(new StorageObject().setSize(BigInteger.valueOf(CONTENT.length)));
+ ).andReturn(objectMetadata);
+
EasyMock.replay(STORAGE);
GoogleCloudStorageInputSource inputSource =
new GoogleCloudStorageInputSource(
@@ -243,21 +255,28 @@ public void testWithUrisSplit() throws Exception
}
@Test
- public void testWithUrisGlob() throws Exception
+ public void testWithUrisGlob() throws IOException
{
+ GoogleStorageObjectMetadata objectMetadata = new GoogleStorageObjectMetadata(
+ BUCKET,
+ OBJECT_NAME,
+ (long) CONTENT.length,
+ UPDATE_TIME
+ );
+
EasyMock.reset(STORAGE);
EasyMock.expect(
STORAGE.getMetadata(
EXPECTED_URIS.get(0).getAuthority(),
StringUtils.maybeRemoveLeadingSlash(EXPECTED_URIS.get(0).getPath())
)
- ).andReturn(new StorageObject().setSize(BigInteger.valueOf(CONTENT.length)));
+ ).andReturn(objectMetadata);
EasyMock.expect(
STORAGE.getMetadata(
EXPECTED_URIS.get(1).getAuthority(),
StringUtils.maybeRemoveLeadingSlash(EXPECTED_URIS.get(1).getPath())
)
- ).andReturn(new StorageObject().setSize(BigInteger.valueOf(CONTENT.length)));
+ ).andReturn(objectMetadata);
EasyMock.replay(STORAGE);
GoogleCloudStorageInputSource inputSource = new GoogleCloudStorageInputSource(
STORAGE,
@@ -488,28 +507,30 @@ private static void addExpectedPrefixObjects(URI prefix, List uris) throws
{
final String bucket = prefix.getAuthority();
- Storage.Objects.List listRequest = EasyMock.createMock(Storage.Objects.List.class);
- EasyMock.expect(STORAGE.list(EasyMock.eq(bucket))).andReturn(listRequest).once();
- EasyMock.expect(listRequest.setPageToken(EasyMock.anyString())).andReturn(listRequest).once();
- EasyMock.expect(listRequest.setMaxResults((long) MAX_LISTING_LENGTH)).andReturn(listRequest).once();
- EasyMock.expect(listRequest.setPrefix(EasyMock.eq(StringUtils.maybeRemoveLeadingSlash(prefix.getPath()))))
- .andReturn(listRequest)
- .once();
+ GoogleStorageObjectPage response = EasyMock.createMock(GoogleStorageObjectPage.class);
- List mockObjects = new ArrayList<>();
+ List mockObjects = new ArrayList<>();
for (URI uri : uris) {
- StorageObject s = new StorageObject();
- s.setBucket(bucket);
- s.setName(uri.getPath());
- s.setSize(BigInteger.valueOf(CONTENT.length));
+ GoogleStorageObjectMetadata s = new GoogleStorageObjectMetadata(
+ bucket,
+ uri.getPath(),
+ (long) CONTENT.length,
+ UPDATE_TIME
+ );
mockObjects.add(s);
}
- Objects response = new Objects();
- response.setItems(mockObjects);
- EasyMock.expect(listRequest.execute()).andReturn(response).once();
- EasyMock.expect(response.getItems()).andReturn(mockObjects).once();
- EasyMock.replay(listRequest);
+ EasyMock.expect(STORAGE.list(
+ EasyMock.eq(bucket),
+ EasyMock.eq(StringUtils.maybeRemoveLeadingSlash(prefix.getPath())),
+ EasyMock.eq((long) MAX_LISTING_LENGTH),
+ EasyMock.eq(null)
+ )).andReturn(response).once();
+
+ EasyMock.expect(response.getObjectList()).andReturn(mockObjects).once();
+ EasyMock.expect(response.getNextPageToken()).andReturn(null).once();
+
+ EasyMock.replay(response);
}
private static void addExpectedGetObjectMock(URI uri) throws IOException
@@ -517,7 +538,7 @@ private static void addExpectedGetObjectMock(URI uri) throws IOException
CloudObjectLocation location = new CloudObjectLocation(uri);
EasyMock.expect(
- STORAGE.get(EasyMock.eq(location.getBucket()), EasyMock.eq(location.getPath()), EasyMock.eq(0L))
+ STORAGE.getInputStream(EasyMock.eq(location.getBucket()), EasyMock.eq(location.getPath()), EasyMock.eq(0L))
).andReturn(new ByteArrayInputStream(CONTENT)).once();
}
@@ -529,7 +550,7 @@ private static void addExpectedGetCompressedObjectMock(URI uri) throws IOExcepti
CompressionUtils.gzip(new ByteArrayInputStream(CONTENT), gzipped);
EasyMock.expect(
- STORAGE.get(EasyMock.eq(location.getBucket()), EasyMock.eq(location.getPath()), EasyMock.eq(0L))
+ STORAGE.getInputStream(EasyMock.eq(location.getBucket()), EasyMock.eq(location.getPath()), EasyMock.eq(0L))
).andReturn(new ByteArrayInputStream(gzipped.toByteArray())).once();
}
diff --git a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleByteSourceTest.java b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleByteSourceTest.java
index a65f4750d3b6..db5c0e724bd3 100644
--- a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleByteSourceTest.java
+++ b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleByteSourceTest.java
@@ -36,7 +36,7 @@ public void openStreamTest() throws IOException
GoogleStorage storage = createMock(GoogleStorage.class);
InputStream stream = createMock(InputStream.class);
- EasyMock.expect(storage.get(bucket, path)).andReturn(stream);
+ EasyMock.expect(storage.getInputStream(bucket, path)).andReturn(stream);
replayAll();
@@ -54,7 +54,7 @@ public void openStreamWithRecoverableErrorTest() throws IOException
final String path = "/path/to/file";
GoogleStorage storage = createMock(GoogleStorage.class);
- EasyMock.expect(storage.get(bucket, path)).andThrow(new IOException(""));
+ EasyMock.expect(storage.getInputStream(bucket, path)).andThrow(new IOException(""));
replayAll();
diff --git a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleDataSegmentKillerTest.java b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleDataSegmentKillerTest.java
index 97a0e61dce3f..8d9612f4d8d9 100644
--- a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleDataSegmentKillerTest.java
+++ b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleDataSegmentKillerTest.java
@@ -24,8 +24,6 @@
import com.google.api.client.http.HttpHeaders;
import com.google.api.client.http.HttpResponseException;
import com.google.api.client.json.jackson2.JacksonFactory;
-import com.google.api.services.storage.Storage;
-import com.google.api.services.storage.model.StorageObject;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.common.ISE;
@@ -169,17 +167,10 @@ public void test_killAll_accountConfigWithNullBucketAndPrefix_throwsISEException
@Test
public void test_killAll_noException_deletesAllTaskLogs() throws IOException
{
- StorageObject object1 = GoogleTestUtils.newStorageObject(BUCKET, KEY_1, TIME_0);
- StorageObject object2 = GoogleTestUtils.newStorageObject(BUCKET, KEY_2, TIME_1);
+ GoogleStorageObjectMetadata object1 = GoogleTestUtils.newStorageObject(BUCKET, KEY_1, TIME_0);
+ GoogleStorageObjectMetadata object2 = GoogleTestUtils.newStorageObject(BUCKET, KEY_2, TIME_1);
- Storage.Objects.List listRequest = GoogleTestUtils.expectListRequest(storage, PREFIX_URI);
-
- GoogleTestUtils.expectListObjects(
- listRequest,
- PREFIX_URI,
- MAX_KEYS,
- ImmutableList.of(object1, object2)
- );
+ GoogleTestUtils.expectListObjectsPageRequest(storage, PREFIX_URI, MAX_KEYS, ImmutableList.of(object1, object2));
GoogleTestUtils.expectDeleteObjects(
storage,
@@ -190,29 +181,22 @@ public void test_killAll_noException_deletesAllTaskLogs() throws IOException
EasyMock.expect(accountConfig.getPrefix()).andReturn(PREFIX).anyTimes();
EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS);
- EasyMock.replay(listRequest, accountConfig, inputDataConfig, storage);
+ EasyMock.replay(accountConfig, inputDataConfig, storage);
GoogleDataSegmentKiller killer = new GoogleDataSegmentKiller(storage, accountConfig, inputDataConfig);
killer.killAll();
- EasyMock.verify(listRequest, accountConfig, inputDataConfig, storage);
+ EasyMock.verify(accountConfig, inputDataConfig, storage);
}
@Test
public void test_killAll_recoverableExceptionWhenDeletingObjects_deletesAllTaskLogs() throws IOException
{
- StorageObject object1 = GoogleTestUtils.newStorageObject(BUCKET, KEY_1, TIME_0);
+ GoogleStorageObjectMetadata object1 = GoogleTestUtils.newStorageObject(BUCKET, KEY_1, TIME_0);
- Storage.Objects.List listRequest = GoogleTestUtils.expectListRequest(storage, PREFIX_URI);
-
- GoogleTestUtils.expectListObjects(
- listRequest,
- PREFIX_URI,
- MAX_KEYS,
- ImmutableList.of(object1)
- );
+ GoogleTestUtils.expectListObjectsPageRequest(storage, PREFIX_URI, MAX_KEYS, ImmutableList.of(object1));
GoogleTestUtils.expectDeleteObjects(
storage,
@@ -224,30 +208,22 @@ public void test_killAll_recoverableExceptionWhenDeletingObjects_deletesAllTaskL
EasyMock.expect(accountConfig.getPrefix()).andReturn(PREFIX).anyTimes();
EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS);
- EasyMock.replay(listRequest, accountConfig, inputDataConfig, storage);
+ EasyMock.replay(accountConfig, inputDataConfig, storage);
GoogleDataSegmentKiller killer = new GoogleDataSegmentKiller(storage, accountConfig, inputDataConfig);
killer.killAll();
- EasyMock.verify(listRequest, accountConfig, inputDataConfig, storage);
+ EasyMock.verify(accountConfig, inputDataConfig, storage);
}
@Test
public void test_killAll_nonrecoverableExceptionWhenListingObjects_doesntDeleteAnyTaskLogs()
{
boolean ioExceptionThrown = false;
- Storage.Objects.List listRequest = null;
try {
- StorageObject object1 = GoogleTestUtils.newStorageObject(BUCKET, KEY_1, TIME_0);
-
- listRequest = GoogleTestUtils.expectListRequest(storage, PREFIX_URI);
+ GoogleStorageObjectMetadata object1 = GoogleTestUtils.newStorageObject(BUCKET, KEY_1, TIME_0);
- GoogleTestUtils.expectListObjects(
- listRequest,
- PREFIX_URI,
- MAX_KEYS,
- ImmutableList.of(object1)
- );
+ GoogleTestUtils.expectListObjectsPageRequest(storage, PREFIX_URI, MAX_KEYS, ImmutableList.of(object1));
GoogleTestUtils.expectDeleteObjects(
storage,
@@ -259,7 +235,7 @@ public void test_killAll_nonrecoverableExceptionWhenListingObjects_doesntDeleteA
EasyMock.expect(accountConfig.getPrefix()).andReturn(PREFIX).anyTimes();
EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS);
- EasyMock.replay(listRequest, accountConfig, inputDataConfig, storage);
+ EasyMock.replay(accountConfig, inputDataConfig, storage);
GoogleDataSegmentKiller killer = new GoogleDataSegmentKiller(storage, accountConfig, inputDataConfig);
killer.killAll();
@@ -270,6 +246,6 @@ public void test_killAll_nonrecoverableExceptionWhenListingObjects_doesntDeleteA
Assert.assertTrue(ioExceptionThrown);
- EasyMock.verify(listRequest, accountConfig, inputDataConfig, storage);
+ EasyMock.verify(accountConfig, inputDataConfig, storage);
}
}
diff --git a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleDataSegmentPullerTest.java b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleDataSegmentPullerTest.java
index deb2383fd6c9..059432ba8188 100644
--- a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleDataSegmentPullerTest.java
+++ b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleDataSegmentPullerTest.java
@@ -52,7 +52,7 @@ public void testDeleteOutputDirectoryWhenErrorIsRaisedPullingSegmentFiles()
300,
"test"
);
- EasyMock.expect(storage.get(EasyMock.eq(BUCKET), EasyMock.eq(PATH))).andThrow(exception);
+ EasyMock.expect(storage.getInputStream(EasyMock.eq(BUCKET), EasyMock.eq(PATH))).andThrow(exception);
replayAll();
@@ -93,7 +93,7 @@ public void testGetInputStreamBucketNameWithUnderscores() throws IOException
String prefix = "prefix/";
GoogleStorage storage = createMock(GoogleStorage.class);
- EasyMock.expect(storage.get(EasyMock.eq(bucket), EasyMock.eq(prefix))).andReturn(EasyMock.createMock(InputStream.class));
+ EasyMock.expect(storage.getInputStream(EasyMock.eq(bucket), EasyMock.eq(prefix))).andReturn(EasyMock.createMock(InputStream.class));
EasyMock.replay(storage);
GoogleDataSegmentPuller puller = new GoogleDataSegmentPuller(storage);
diff --git a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleStorageDruidModuleTest.java b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleStorageDruidModuleTest.java
index 9ca384cf4682..e45877c89f4d 100644
--- a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleStorageDruidModuleTest.java
+++ b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleStorageDruidModuleTest.java
@@ -19,14 +19,9 @@
package org.apache.druid.storage.google;
-import com.google.api.client.googleapis.testing.auth.oauth2.MockGoogleCredential;
-import com.google.api.client.http.HttpRequestInitializer;
-import com.google.api.client.http.HttpTransport;
-import com.google.api.client.json.JsonFactory;
-import com.google.api.services.storage.Storage;
+import com.google.cloud.storage.Storage;
import com.google.common.collect.ImmutableList;
import com.google.inject.Injector;
-import org.apache.druid.common.gcp.GcpMockModule;
import org.apache.druid.guice.GuiceInjectors;
import org.apache.druid.segment.loading.OmniDataSegmentKiller;
import org.junit.Assert;
@@ -42,23 +37,7 @@ public void testSegmentKillerBoundedSingleton()
// HttpRquestInitializer, the test throws an exception from that method, meaning that if they are not loaded
// lazily, the exception should end up thrown.
// 2. That the same object is returned.
- Injector injector = GuiceInjectors.makeStartupInjectorWithModules(
- ImmutableList.of(
- new GcpMockModule()
- {
-
- @Override
- public HttpRequestInitializer mockRequestInitializer(
- HttpTransport transport,
- JsonFactory factory
- )
- {
- return new MockGoogleCredential.Builder().setTransport(transport).setJsonFactory(factory).build();
- }
- },
- new GoogleStorageDruidModule()
- )
- );
+ Injector injector = GuiceInjectors.makeStartupInjectorWithModules(ImmutableList.of(new GoogleStorageDruidModule()));
OmniDataSegmentKiller killer = injector.getInstance(OmniDataSegmentKiller.class);
Assert.assertTrue(killer.getKillers().containsKey(GoogleStorageDruidModule.SCHEME));
Assert.assertSame(
@@ -78,23 +57,7 @@ public void testLazyInstantiation()
// HttpRquestInitializer, the test throws an exception from that method, meaning that if they are not loaded
// lazily, the exception should end up thrown.
// 2. That the same object is returned.
- Injector injector = GuiceInjectors.makeStartupInjectorWithModules(
- ImmutableList.of(
- new GcpMockModule()
- {
-
- @Override
- public HttpRequestInitializer mockRequestInitializer(
- HttpTransport transport,
- JsonFactory factory
- )
- {
- throw new UnsupportedOperationException("should not be called, because this should be lazy");
- }
- },
- new GoogleStorageDruidModule()
- )
- );
+ Injector injector = GuiceInjectors.makeStartupInjectorWithModules(ImmutableList.of(new GoogleStorageDruidModule()));
final GoogleStorage instance = injector.getInstance(GoogleStorage.class);
Assert.assertSame(instance, injector.getInstance(GoogleStorage.class));
}
diff --git a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleStorageTest.java b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleStorageTest.java
index 848cf97fed17..d92339f53c79 100644
--- a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleStorageTest.java
+++ b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleStorageTest.java
@@ -19,73 +19,243 @@
package org.apache.druid.storage.google;
-import com.google.api.client.googleapis.testing.auth.oauth2.MockGoogleCredential;
-import com.google.api.client.http.ByteArrayContent;
-import com.google.api.client.http.HttpRequestInitializer;
-import com.google.api.client.json.jackson2.JacksonFactory;
-import com.google.api.client.testing.http.MockHttpTransport;
-import com.google.api.client.testing.http.MockLowLevelHttpRequest;
-import com.google.api.client.testing.http.MockLowLevelHttpResponse;
-import com.google.api.services.storage.Storage;
-import com.google.common.base.Suppliers;
-import org.apache.druid.java.util.common.StringUtils;
-import org.junit.Assert;
+import com.google.api.gax.paging.Page;
+import com.google.cloud.storage.Blob;
+import com.google.cloud.storage.BlobId;
+import com.google.cloud.storage.Storage;
+import com.google.common.collect.ImmutableList;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
-import java.io.InputStream;
+import java.time.OffsetDateTime;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
public class GoogleStorageTest
{
+ Storage mockStorage;
+ GoogleStorage googleStorage;
+
+ Blob blob;
+
+ static final String BUCKET = "bucket";
+ static final String PATH = "/path";
+ static final long SIZE = 100;
+ static final OffsetDateTime UPDATE_TIME = OffsetDateTime.MIN;
+
+ @Before
+ public void setUp()
+ {
+ mockStorage = EasyMock.mock(Storage.class);
+
+ googleStorage = new GoogleStorage(() -> mockStorage);
+
+ blob = EasyMock.mock(Blob.class);
+ }
+
+ @Test
+ public void testDeleteSuccess() throws IOException
+ {
+ EasyMock.expect(mockStorage.delete(EasyMock.eq(BUCKET), EasyMock.eq(PATH))).andReturn(true);
+ EasyMock.replay(mockStorage);
+ googleStorage.delete(BUCKET, PATH);
+ }
+
+ @Test
+ public void testDeleteFailure()
+ {
+ EasyMock.expect(mockStorage.delete(EasyMock.eq(BUCKET), EasyMock.eq(PATH))).andReturn(false);
+ EasyMock.replay(mockStorage);
+ boolean thrownIOException = false;
+ try {
+ googleStorage.delete(BUCKET, PATH);
+
+ }
+ catch (IOException e) {
+ thrownIOException = true;
+ }
+ assertTrue(thrownIOException);
+ }
+
@Test
- public void testGet() throws IOException
+ public void testBatchDeleteSuccess() throws IOException
{
- String content = "abcdefghij";
- MockLowLevelHttpResponse response = new MockLowLevelHttpResponse();
- response.setContent(content);
- GoogleStorage googleStorage = makeGoogleStorage(response);
- InputStream is = googleStorage.get("bucket", "path");
- String actual = GoogleTestUtils.readAsString(is);
- Assert.assertEquals(content, actual);
+ List paths = ImmutableList.of("/path1", "/path2");
+ final Capture> pathIterable = Capture.newInstance();
+ EasyMock.expect(mockStorage.delete(EasyMock.capture(pathIterable))).andReturn(ImmutableList.of(true, true));
+ EasyMock.replay(mockStorage);
+
+ googleStorage.batchDelete(BUCKET, paths);
+
+ List recordedBlobIds = new ArrayList<>();
+ pathIterable.getValue().iterator().forEachRemaining(recordedBlobIds::add);
+
+ List recordedPaths = recordedBlobIds.stream().map(BlobId::getName).collect(Collectors.toList());
+
+ assertTrue(paths.size() == recordedPaths.size() && paths.containsAll(recordedPaths) && recordedPaths.containsAll(
+ paths));
+ assertEquals(BUCKET, recordedBlobIds.get(0).getBucket());
}
@Test
- public void testGetWithOffset() throws IOException
+ public void testBatchDeleteFailure()
{
- String content = "abcdefghij";
- MockLowLevelHttpResponse response = new MockLowLevelHttpResponse();
- response.setContent(content);
- GoogleStorage googleStorage = makeGoogleStorage(response);
- InputStream is = googleStorage.get("bucket", "path", 2);
- String actual = GoogleTestUtils.readAsString(is);
- Assert.assertEquals(content.substring(2), actual);
+ List paths = ImmutableList.of("/path1", "/path2");
+ EasyMock.expect(mockStorage.delete((Iterable) EasyMock.anyObject()))
+ .andReturn(ImmutableList.of(false, true));
+ EasyMock.replay(mockStorage);
+ boolean thrownIOException = false;
+ try {
+ googleStorage.batchDelete(BUCKET, paths);
+
+ }
+ catch (IOException e) {
+ thrownIOException = true;
+ }
+ assertTrue(thrownIOException);
}
@Test
- public void testInsert() throws IOException
+ public void testGetMetadata() throws IOException
{
- String content = "abcdefghij";
- MockLowLevelHttpResponse response = new MockLowLevelHttpResponse();
- response.addHeader("Location", "http://random-path");
- response.setContent("{}");
- MockHttpTransport transport = new MockHttpTransport.Builder().setLowLevelHttpResponse(response).build();
- GoogleStorage googleStorage = makeGoogleStorage(transport);
- googleStorage.insert("bucket", "path", new ByteArrayContent("text/html", StringUtils.toUtf8(content)));
- MockLowLevelHttpRequest request = transport.getLowLevelHttpRequest();
- String actual = request.getContentAsString();
- Assert.assertEquals(content, actual);
+ EasyMock.expect(mockStorage.get(
+ EasyMock.eq(BUCKET),
+ EasyMock.eq(PATH),
+ EasyMock.anyObject(Storage.BlobGetOption.class)
+ )).andReturn(blob);
+
+ EasyMock.expect(blob.getBucket()).andReturn(BUCKET);
+ EasyMock.expect(blob.getName()).andReturn(PATH);
+ EasyMock.expect(blob.getSize()).andReturn(SIZE);
+ EasyMock.expect(blob.getUpdateTimeOffsetDateTime()).andReturn(UPDATE_TIME);
+
+ EasyMock.replay(mockStorage, blob);
+
+ GoogleStorageObjectMetadata objectMetadata = googleStorage.getMetadata(BUCKET, PATH);
+ assertEquals(objectMetadata, new GoogleStorageObjectMetadata(BUCKET, PATH, SIZE, UPDATE_TIME.toEpochSecond()));
+
}
- private GoogleStorage makeGoogleStorage(MockLowLevelHttpResponse response)
+ @Test
+ public void testExistsTrue()
{
- MockHttpTransport transport = new MockHttpTransport.Builder().setLowLevelHttpResponse(response).build();
- return makeGoogleStorage(transport);
+ EasyMock.expect(mockStorage.get(EasyMock.eq(BUCKET), EasyMock.eq(PATH))).andReturn(blob);
+ EasyMock.replay(mockStorage);
+ assertTrue(googleStorage.exists(BUCKET, PATH));
}
- private GoogleStorage makeGoogleStorage(MockHttpTransport transport)
+ @Test
+ public void testExistsFalse()
{
- HttpRequestInitializer initializer = new MockGoogleCredential.Builder().build();
- Storage storage = new Storage(transport, JacksonFactory.getDefaultInstance(), initializer);
- return new GoogleStorage(Suppliers.ofInstance(storage));
+ EasyMock.expect(mockStorage.get(EasyMock.eq(BUCKET), EasyMock.eq(PATH))).andReturn(null);
+ EasyMock.replay(mockStorage);
+ assertFalse(googleStorage.exists(BUCKET, PATH));
+ }
+
+ @Test
+ public void testSize() throws IOException
+ {
+ EasyMock.expect(mockStorage.get(
+ EasyMock.eq(BUCKET),
+ EasyMock.eq(PATH),
+ EasyMock.anyObject(Storage.BlobGetOption.class)
+ )).andReturn(blob);
+
+ EasyMock.expect(blob.getSize()).andReturn(SIZE);
+
+ EasyMock.replay(mockStorage, blob);
+
+ long size = googleStorage.size(BUCKET, PATH);
+
+ assertEquals(size, SIZE);
+ }
+
+ @Test
+ public void testVersion() throws IOException
+ {
+ final String version = "7";
+ EasyMock.expect(mockStorage.get(
+ EasyMock.eq(BUCKET),
+ EasyMock.eq(PATH),
+ EasyMock.anyObject(Storage.BlobGetOption.class)
+ )).andReturn(blob);
+
+ EasyMock.expect(blob.getGeneratedId()).andReturn(version);
+
+ EasyMock.replay(mockStorage, blob);
+
+ assertEquals(version, googleStorage.version(BUCKET, PATH));
+ }
+
+ @Test
+ public void testList() throws IOException
+ {
+ Page blobPage = EasyMock.mock(Page.class);
+ EasyMock.expect(mockStorage.list(
+ EasyMock.eq(BUCKET),
+ EasyMock.anyObject()
+ )).andReturn(blobPage);
+
+ Blob blob1 = EasyMock.mock(Blob.class);
+ Blob blob2 = EasyMock.mock(Blob.class);
+
+ final String bucket1 = "BUCKET_1";
+ final String path1 = "PATH_1";
+ final long size1 = 7;
+ final OffsetDateTime updateTime1 = OffsetDateTime.MIN;
+
+ final String bucket2 = "BUCKET_2";
+ final String path2 = "PATH_2";
+ final long size2 = 9;
+ final OffsetDateTime updateTime2 = OffsetDateTime.MIN;
+
+ final String nextPageToken = "TOKEN";
+
+ EasyMock.expect(blob1.getBucket()).andReturn(bucket1);
+ EasyMock.expect(blob1.getName()).andReturn(path1);
+ EasyMock.expect(blob1.getSize()).andReturn(size1);
+ EasyMock.expect(blob1.getUpdateTimeOffsetDateTime()).andReturn(updateTime1);
+
+ EasyMock.expect(blob2.getBucket()).andReturn(bucket2);
+ EasyMock.expect(blob2.getName()).andReturn(path2);
+ EasyMock.expect(blob2.getSize()).andReturn(size2);
+ EasyMock.expect(blob2.getUpdateTimeOffsetDateTime()).andReturn(updateTime2);
+
+
+ List blobs = ImmutableList.of(blob1, blob2);
+
+ EasyMock.expect(blobPage.streamValues()).andReturn(blobs.stream());
+
+ EasyMock.expect(blobPage.getNextPageToken()).andReturn(nextPageToken);
+
+
+ EasyMock.replay(mockStorage, blobPage, blob1, blob2);
+
+ GoogleStorageObjectMetadata objectMetadata1 = new GoogleStorageObjectMetadata(
+ bucket1,
+ path1,
+ size1,
+ updateTime1.toEpochSecond()
+ );
+ GoogleStorageObjectMetadata objectMetadata2 = new GoogleStorageObjectMetadata(
+ bucket2,
+ path2,
+ size2,
+ updateTime2.toEpochSecond()
+ );
+
+ GoogleStorageObjectPage objectPage = googleStorage.list(BUCKET, PATH, null, null);
+
+ assertEquals(objectPage.getObjectList().get(0), objectMetadata1);
+ assertEquals(objectPage.getObjectList().get(1), objectMetadata2);
+ assertEquals(objectPage.getNextPageToken(), nextPageToken);
}
}
diff --git a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleTaskLogsTest.java b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleTaskLogsTest.java
index 9bfe2706f803..a0f17c97d910 100644
--- a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleTaskLogsTest.java
+++ b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleTaskLogsTest.java
@@ -22,8 +22,6 @@
import com.google.api.client.http.HttpHeaders;
import com.google.api.client.http.HttpResponseException;
import com.google.api.client.http.InputStreamContent;
-import com.google.api.services.storage.Storage;
-import com.google.api.services.storage.model.StorageObject;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
@@ -146,7 +144,7 @@ public void testStreamTaskLogWithoutOffset() throws Exception
final String logPath = PREFIX + "/" + TASKID;
EasyMock.expect(storage.exists(BUCKET, logPath)).andReturn(true);
EasyMock.expect(storage.size(BUCKET, logPath)).andReturn((long) testLog.length());
- EasyMock.expect(storage.get(BUCKET, logPath, 0)).andReturn(new ByteArrayInputStream(StringUtils.toUtf8(testLog)));
+ EasyMock.expect(storage.getInputStream(BUCKET, logPath, 0)).andReturn(new ByteArrayInputStream(StringUtils.toUtf8(testLog)));
replayAll();
@@ -168,7 +166,7 @@ public void testStreamTaskLogWithPositiveOffset() throws Exception
final String logPath = PREFIX + "/" + TASKID;
EasyMock.expect(storage.exists(BUCKET, logPath)).andReturn(true);
EasyMock.expect(storage.size(BUCKET, logPath)).andReturn((long) testLog.length());
- EasyMock.expect(storage.get(BUCKET, logPath, offset))
+ EasyMock.expect(storage.getInputStream(BUCKET, logPath, offset))
.andReturn(new ByteArrayInputStream(StringUtils.toUtf8(expectedLog)));
replayAll();
@@ -192,7 +190,7 @@ public void testStreamTaskLogWithNegative() throws Exception
final String logPath = PREFIX + "/" + TASKID;
EasyMock.expect(storage.exists(BUCKET, logPath)).andReturn(true);
EasyMock.expect(storage.size(BUCKET, logPath)).andReturn((long) testLog.length());
- EasyMock.expect(storage.get(BUCKET, logPath, internalOffset))
+ EasyMock.expect(storage.getInputStream(BUCKET, logPath, internalOffset))
.andReturn(new ByteArrayInputStream(StringUtils.toUtf8(expectedLog)));
replayAll();
@@ -214,7 +212,7 @@ public void testStreamTaskStatus() throws Exception
final String logPath = PREFIX + "/" + TASKID + ".status.json";
EasyMock.expect(storage.exists(BUCKET, logPath)).andReturn(true);
EasyMock.expect(storage.size(BUCKET, logPath)).andReturn((long) taskStatus.length());
- EasyMock.expect(storage.get(BUCKET, logPath, 0)).andReturn(new ByteArrayInputStream(StringUtils.toUtf8(taskStatus)));
+ EasyMock.expect(storage.getInputStream(BUCKET, logPath, 0)).andReturn(new ByteArrayInputStream(StringUtils.toUtf8(taskStatus)));
replayAll();
@@ -230,18 +228,11 @@ public void testStreamTaskStatus() throws Exception
@Test
public void test_killAll_noException_deletesAllTaskLogs() throws IOException
{
- StorageObject object1 = GoogleTestUtils.newStorageObject(BUCKET, KEY_1, TIME_0);
- StorageObject object2 = GoogleTestUtils.newStorageObject(BUCKET, KEY_2, TIME_1);
+ GoogleStorageObjectMetadata object1 = GoogleTestUtils.newStorageObject(BUCKET, KEY_1, TIME_0);
+ GoogleStorageObjectMetadata object2 = GoogleTestUtils.newStorageObject(BUCKET, KEY_2, TIME_1);
EasyMock.expect(timeSupplier.getAsLong()).andReturn(TIME_NOW);
- Storage.Objects.List listRequest = GoogleTestUtils.expectListRequest(storage, PREFIX_URI);
-
- GoogleTestUtils.expectListObjects(
- listRequest,
- PREFIX_URI,
- MAX_KEYS,
- ImmutableList.of(object1, object2)
- );
+ GoogleTestUtils.expectListObjectsPageRequest(storage, PREFIX_URI, MAX_KEYS, ImmutableList.of(object1, object2));
GoogleTestUtils.expectDeleteObjects(
storage,
@@ -250,29 +241,22 @@ public void test_killAll_noException_deletesAllTaskLogs() throws IOException
);
EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS);
- EasyMock.replay(listRequest, inputDataConfig, storage, timeSupplier);
+ EasyMock.replay(inputDataConfig, storage, timeSupplier);
googleTaskLogs.killAll();
- EasyMock.verify(listRequest, inputDataConfig, storage, timeSupplier);
+ EasyMock.verify(inputDataConfig, storage, timeSupplier);
}
@Test
public void test_killAll_recoverableExceptionWhenDeletingObjects_deletesAllTaskLogs() throws IOException
{
- StorageObject object1 = GoogleTestUtils.newStorageObject(BUCKET, KEY_1, TIME_0);
+ GoogleStorageObjectMetadata object1 = GoogleTestUtils.newStorageObject(BUCKET, KEY_1, TIME_0);
EasyMock.expect(timeSupplier.getAsLong()).andReturn(TIME_NOW);
- Storage.Objects.List listRequest = GoogleTestUtils.expectListRequest(storage, PREFIX_URI);
-
- GoogleTestUtils.expectListObjects(
- listRequest,
- PREFIX_URI,
- MAX_KEYS,
- ImmutableList.of(object1)
- );
+ GoogleTestUtils.expectListObjectsPageRequest(storage, PREFIX_URI, MAX_KEYS, ImmutableList.of(object1));
GoogleTestUtils.expectDeleteObjects(
storage,
@@ -282,31 +266,23 @@ public void test_killAll_recoverableExceptionWhenDeletingObjects_deletesAllTaskL
EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS);
- EasyMock.replay(listRequest, inputDataConfig, storage, timeSupplier);
+ EasyMock.replay(inputDataConfig, storage, timeSupplier);
googleTaskLogs.killAll();
- EasyMock.verify(listRequest, inputDataConfig, storage, timeSupplier);
+ EasyMock.verify(inputDataConfig, storage, timeSupplier);
}
@Test
public void test_killAll_nonrecoverableExceptionWhenListingObjects_doesntDeleteAnyTaskLogs()
{
boolean ioExceptionThrown = false;
- Storage.Objects.List listRequest = null;
try {
- StorageObject object1 = GoogleTestUtils.newStorageObject(BUCKET, KEY_1, TIME_0);
+ GoogleStorageObjectMetadata object1 = GoogleTestUtils.newStorageObject(BUCKET, KEY_1, TIME_0);
EasyMock.expect(timeSupplier.getAsLong()).andReturn(TIME_NOW);
- listRequest = GoogleTestUtils.expectListRequest(storage, PREFIX_URI);
-
- GoogleTestUtils.expectListObjects(
- listRequest,
- PREFIX_URI,
- MAX_KEYS,
- ImmutableList.of(object1)
- );
+ GoogleTestUtils.expectListObjectsPageRequest(storage, PREFIX_URI, MAX_KEYS, ImmutableList.of(object1));
GoogleTestUtils.expectDeleteObjects(
storage,
@@ -316,7 +292,7 @@ public void test_killAll_nonrecoverableExceptionWhenListingObjects_doesntDeleteA
EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS);
- EasyMock.replay(listRequest, inputDataConfig, storage, timeSupplier);
+ EasyMock.replay(inputDataConfig, storage, timeSupplier);
googleTaskLogs.killAll();
}
@@ -326,23 +302,16 @@ public void test_killAll_nonrecoverableExceptionWhenListingObjects_doesntDeleteA
Assert.assertTrue(ioExceptionThrown);
- EasyMock.verify(listRequest, inputDataConfig, storage, timeSupplier);
+ EasyMock.verify(inputDataConfig, storage, timeSupplier);
}
@Test
public void test_killOlderThan_noException_deletesOnlyTaskLogsOlderThan() throws IOException
{
- StorageObject object1 = GoogleTestUtils.newStorageObject(BUCKET, KEY_1, TIME_0);
- StorageObject object2 = GoogleTestUtils.newStorageObject(BUCKET, KEY_2, TIME_FUTURE);
+ GoogleStorageObjectMetadata object1 = GoogleTestUtils.newStorageObject(BUCKET, KEY_1, TIME_0);
+ GoogleStorageObjectMetadata object2 = GoogleTestUtils.newStorageObject(BUCKET, KEY_2, TIME_FUTURE);
- Storage.Objects.List listRequest = GoogleTestUtils.expectListRequest(storage, PREFIX_URI);
-
- GoogleTestUtils.expectListObjects(
- listRequest,
- PREFIX_URI,
- MAX_KEYS,
- ImmutableList.of(object1, object2)
- );
+ GoogleTestUtils.expectListObjectsPageRequest(storage, PREFIX_URI, MAX_KEYS, ImmutableList.of(object1, object2));
GoogleTestUtils.expectDeleteObjects(
storage,
@@ -351,25 +320,18 @@ public void test_killOlderThan_noException_deletesOnlyTaskLogsOlderThan() throws
);
EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS);
- EasyMock.replay(listRequest, inputDataConfig, storage);
+ EasyMock.replay(inputDataConfig, storage);
googleTaskLogs.killOlderThan(TIME_NOW);
- EasyMock.verify(listRequest, inputDataConfig, storage);
+ EasyMock.verify(inputDataConfig, storage);
}
@Test
public void test_killOlderThan_recoverableExceptionWhenListingObjects_deletesAllTaskLogs() throws IOException
{
- StorageObject object1 = GoogleTestUtils.newStorageObject(BUCKET, KEY_1, TIME_0);
-
- Storage.Objects.List listRequest = GoogleTestUtils.expectListRequest(storage, PREFIX_URI);
+ GoogleStorageObjectMetadata object1 = GoogleTestUtils.newStorageObject(BUCKET, KEY_1, TIME_0);
- GoogleTestUtils.expectListObjects(
- listRequest,
- PREFIX_URI,
- MAX_KEYS,
- ImmutableList.of(object1)
- );
+ GoogleTestUtils.expectListObjectsPageRequest(storage, PREFIX_URI, MAX_KEYS, ImmutableList.of(object1));
GoogleTestUtils.expectDeleteObjects(
storage,
@@ -379,29 +341,21 @@ public void test_killOlderThan_recoverableExceptionWhenListingObjects_deletesAll
EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS);
- EasyMock.replay(listRequest, inputDataConfig, storage);
+ EasyMock.replay(inputDataConfig, storage);
googleTaskLogs.killOlderThan(TIME_NOW);
- EasyMock.verify(listRequest, inputDataConfig, storage);
+ EasyMock.verify(inputDataConfig, storage);
}
@Test
public void test_killOlderThan_nonrecoverableExceptionWhenListingObjects_doesntDeleteAnyTaskLogs()
{
boolean ioExceptionThrown = false;
- Storage.Objects.List listRequest = null;
try {
- StorageObject object1 = GoogleTestUtils.newStorageObject(BUCKET, KEY_1, TIME_0);
-
- listRequest = GoogleTestUtils.expectListRequest(storage, PREFIX_URI);
+ GoogleStorageObjectMetadata object1 = GoogleTestUtils.newStorageObject(BUCKET, KEY_1, TIME_0);
- GoogleTestUtils.expectListObjects(
- listRequest,
- PREFIX_URI,
- MAX_KEYS,
- ImmutableList.of(object1)
- );
+ GoogleTestUtils.expectListObjectsPageRequest(storage, PREFIX_URI, MAX_KEYS, ImmutableList.of(object1));
GoogleTestUtils.expectDeleteObjects(
storage,
@@ -411,7 +365,7 @@ public void test_killOlderThan_nonrecoverableExceptionWhenListingObjects_doesntD
EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS);
- EasyMock.replay(listRequest, inputDataConfig, storage);
+ EasyMock.replay(inputDataConfig, storage);
googleTaskLogs.killOlderThan(TIME_NOW);
}
@@ -421,6 +375,6 @@ public void test_killOlderThan_nonrecoverableExceptionWhenListingObjects_doesntD
Assert.assertTrue(ioExceptionThrown);
- EasyMock.verify(listRequest, inputDataConfig, storage);
+ EasyMock.verify(inputDataConfig, storage);
}
}
diff --git a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleTestUtils.java b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleTestUtils.java
index 219d96c21662..c68911448e26 100644
--- a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleTestUtils.java
+++ b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleTestUtils.java
@@ -19,21 +19,17 @@
package org.apache.druid.storage.google;
-import com.google.api.client.util.DateTime;
-import com.google.api.services.storage.Storage;
-import com.google.api.services.storage.model.Objects;
-import com.google.api.services.storage.model.StorageObject;
import org.apache.commons.io.IOUtils;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.StringUtils;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
import org.easymock.IExpectationSetters;
+import org.joda.time.DateTime;
import java.io.IOException;
import java.io.InputStream;
import java.io.StringWriter;
-import java.math.BigInteger;
import java.net.URI;
import java.util.HashMap;
import java.util.List;
@@ -41,79 +37,60 @@
public class GoogleTestUtils extends EasyMockSupport
{
- private static final org.joda.time.DateTime NOW = DateTimes.nowUtc();
+ private static final DateTime NOW = DateTimes.nowUtc();
private static final byte[] CONTENT =
StringUtils.toUtf8(StringUtils.format("%d,hello,world", NOW.getMillis()));
- public static StorageObject newStorageObject(
+ public static GoogleStorageObjectMetadata newStorageObject(
String bucket,
String key,
long lastModifiedTimestamp
)
{
- StorageObject object = new StorageObject();
- object.setBucket(bucket);
- object.setName(key);
- object.setUpdated(new DateTime(lastModifiedTimestamp));
- object.setEtag("etag");
- object.setSize(BigInteger.valueOf(CONTENT.length));
+ GoogleStorageObjectMetadata object = new GoogleStorageObjectMetadata(bucket, key, (long) CONTENT.length,
+ lastModifiedTimestamp
+ );
return object;
}
- public static Storage.Objects.List expectListRequest(
+ public static void expectListObjectsPageRequest(
GoogleStorage storage,
- URI prefix
- ) throws IOException
- {
- Storage.Objects.List listRequest = EasyMock.createMock(Storage.Objects.List.class);
- String bucket = prefix.getAuthority();
- EasyMock.expect(
- storage.list(bucket)
- ).andReturn(listRequest).once();
- return listRequest;
- }
-
- public static void expectListObjects(
- Storage.Objects.List listRequest,
URI prefix,
long maxListingLength,
- List objects
+ List objectMetadataList
) throws IOException
{
- EasyMock.expect(listRequest.setPrefix(StringUtils.maybeRemoveLeadingSlash(prefix.getPath()))).andReturn(listRequest);
- EasyMock.expect(listRequest.setMaxResults(maxListingLength)).andReturn(listRequest);
- EasyMock.expect(listRequest.setPageToken(EasyMock.anyString())).andReturn(listRequest).anyTimes();
-
- Objects resultObjects = new Objects();
- resultObjects.setItems(objects);
-
- EasyMock.expect(
- listRequest.execute()
- ).andReturn(resultObjects).once();
+ GoogleStorageObjectPage objectMetadataPage = new GoogleStorageObjectPage(objectMetadataList, null);
+ String bucket = prefix.getAuthority();
+ EasyMock.expect(storage.list(bucket, StringUtils.maybeRemoveLeadingSlash(prefix.getPath()), maxListingLength, null))
+ .andReturn(objectMetadataPage)
+ .once();
}
public static void expectDeleteObjects(
GoogleStorage storage,
- List deleteObjectExpected,
- Map deleteObjectToException
+ List deleteObjectExpected,
+ Map deleteObjectToException
) throws IOException
{
- Map> requestToResultExpectationSetter = new HashMap<>();
- for (Map.Entry deleteObjectAndException : deleteObjectToException.entrySet()) {
- StorageObject deleteObject = deleteObjectAndException.getKey();
+ Map> requestToResultExpectationSetter = new HashMap<>();
+ for (Map.Entry deleteObjectAndException : deleteObjectToException.entrySet()) {
+ GoogleStorageObjectMetadata deleteObject = deleteObjectAndException.getKey();
Exception exception = deleteObjectAndException.getValue();
- IExpectationSetters resultExpectationSetter = requestToResultExpectationSetter.get(deleteObject);
+ IExpectationSetters resultExpectationSetter = requestToResultExpectationSetter.get(
+ deleteObject);
if (resultExpectationSetter == null) {
storage.delete(deleteObject.getBucket(), deleteObject.getName());
- resultExpectationSetter = EasyMock.expectLastCall().andThrow(exception);
+ resultExpectationSetter = EasyMock.expectLastCall().andThrow(exception);
requestToResultExpectationSetter.put(deleteObject, resultExpectationSetter);
} else {
resultExpectationSetter.andThrow(exception);
}
}
- for (StorageObject deleteObject : deleteObjectExpected) {
- IExpectationSetters resultExpectationSetter = requestToResultExpectationSetter.get(deleteObject);
+ for (GoogleStorageObjectMetadata deleteObject : deleteObjectExpected) {
+ IExpectationSetters resultExpectationSetter = requestToResultExpectationSetter.get(
+ deleteObject);
if (resultExpectationSetter == null) {
storage.delete(deleteObject.getBucket(), deleteObject.getName());
resultExpectationSetter = EasyMock.expectLastCall();
diff --git a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleTimestampVersionedDataFinderTest.java b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleTimestampVersionedDataFinderTest.java
index 408033db053e..b9417b7f7f0e 100644
--- a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleTimestampVersionedDataFinderTest.java
+++ b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleTimestampVersionedDataFinderTest.java
@@ -19,8 +19,6 @@
package org.apache.druid.storage.google;
-import com.google.api.client.util.DateTime;
-import com.google.api.services.storage.model.StorageObject;
import com.google.common.collect.ImmutableList;
import org.apache.druid.java.util.common.StringUtils;
import org.junit.Assert;
@@ -38,14 +36,14 @@ public void getLatestVersion()
String keyPrefix = "prefix/dir/0";
// object for directory prefix/dir/0/
- final StorageObject storageObject1 = ObjectStorageIteratorTest.makeStorageObject(bucket, keyPrefix + "//", 0);
- storageObject1.setUpdated(new DateTime(System.currentTimeMillis()));
- final StorageObject storageObject2 = ObjectStorageIteratorTest.makeStorageObject(bucket, keyPrefix + "/v1", 1);
- storageObject2.setUpdated(new DateTime(System.currentTimeMillis()));
- final StorageObject storageObject3 = ObjectStorageIteratorTest.makeStorageObject(bucket, keyPrefix + "/v2", 1);
- storageObject3.setUpdated(new DateTime(System.currentTimeMillis() + 100));
- final StorageObject storageObject4 = ObjectStorageIteratorTest.makeStorageObject(bucket, keyPrefix + "/other", 4);
- storageObject4.setUpdated(new DateTime(System.currentTimeMillis() + 100));
+ final GoogleStorageObjectMetadata storageObject1 = ObjectStorageIteratorTest.makeStorageObject(bucket, keyPrefix + "//", 0);
+ storageObject1.setLastUpdateTime(System.currentTimeMillis());
+ final GoogleStorageObjectMetadata storageObject2 = ObjectStorageIteratorTest.makeStorageObject(bucket, keyPrefix + "/v1", 1);
+ storageObject2.setLastUpdateTime(System.currentTimeMillis());
+ final GoogleStorageObjectMetadata storageObject3 = ObjectStorageIteratorTest.makeStorageObject(bucket, keyPrefix + "/v2", 1);
+ storageObject3.setLastUpdateTime(System.currentTimeMillis() + 100);
+ final GoogleStorageObjectMetadata storageObject4 = ObjectStorageIteratorTest.makeStorageObject(bucket, keyPrefix + "/other", 4);
+ storageObject4.setLastUpdateTime(System.currentTimeMillis() + 100);
final GoogleStorage storage = ObjectStorageIteratorTest.makeMockClient(ImmutableList.of(storageObject1, storageObject2, storageObject3, storageObject4));
final GoogleTimestampVersionedDataFinder finder = new GoogleTimestampVersionedDataFinder(storage);
@@ -62,14 +60,14 @@ public void getLatestVersionTrailingSlashKeyPrefix()
String keyPrefix = "prefix/dir/0/";
// object for directory prefix/dir/0/
- final StorageObject storageObject1 = ObjectStorageIteratorTest.makeStorageObject(bucket, keyPrefix + "/", 0);
- storageObject1.setUpdated(new DateTime(System.currentTimeMillis()));
- final StorageObject storageObject2 = ObjectStorageIteratorTest.makeStorageObject(bucket, keyPrefix + "v1", 1);
- storageObject2.setUpdated(new DateTime(System.currentTimeMillis()));
- final StorageObject storageObject3 = ObjectStorageIteratorTest.makeStorageObject(bucket, keyPrefix + "v2", 1);
- storageObject3.setUpdated(new DateTime(System.currentTimeMillis() + 100));
- final StorageObject storageObject4 = ObjectStorageIteratorTest.makeStorageObject(bucket, keyPrefix + "other", 4);
- storageObject4.setUpdated(new DateTime(System.currentTimeMillis() + 100));
+ final GoogleStorageObjectMetadata storageObject1 = ObjectStorageIteratorTest.makeStorageObject(bucket, keyPrefix + "/", 0);
+ storageObject1.setLastUpdateTime(System.currentTimeMillis());
+ final GoogleStorageObjectMetadata storageObject2 = ObjectStorageIteratorTest.makeStorageObject(bucket, keyPrefix + "v1", 1);
+ storageObject2.setLastUpdateTime(System.currentTimeMillis());
+ final GoogleStorageObjectMetadata storageObject3 = ObjectStorageIteratorTest.makeStorageObject(bucket, keyPrefix + "v2", 1);
+ storageObject3.setLastUpdateTime(System.currentTimeMillis() + 100);
+ final GoogleStorageObjectMetadata storageObject4 = ObjectStorageIteratorTest.makeStorageObject(bucket, keyPrefix + "other", 4);
+ storageObject4.setLastUpdateTime(System.currentTimeMillis() + 100);
final GoogleStorage storage = ObjectStorageIteratorTest.makeMockClient(ImmutableList.of(storageObject1, storageObject2, storageObject3, storageObject4));
final GoogleTimestampVersionedDataFinder finder = new GoogleTimestampVersionedDataFinder(storage);
diff --git a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/ObjectStorageIteratorTest.java b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/ObjectStorageIteratorTest.java
index a0b2bd8f2973..a1f227ab5d48 100644
--- a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/ObjectStorageIteratorTest.java
+++ b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/ObjectStorageIteratorTest.java
@@ -19,19 +19,11 @@
package org.apache.druid.storage.google;
-import com.google.api.client.http.HttpRequestInitializer;
-import com.google.api.client.http.HttpTransport;
-import com.google.api.client.json.JsonFactory;
-import com.google.api.services.storage.Storage;
-import com.google.api.services.storage.model.StorageObject;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
-import org.apache.druid.storage.google.ObjectStorageIteratorTest.MockStorage.MockObjects.MockList;
-import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Test;
-import java.math.BigInteger;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
@@ -39,7 +31,7 @@
public class ObjectStorageIteratorTest
{
- private static final ImmutableList TEST_OBJECTS =
+ private static final ImmutableList TEST_OBJECTS =
ImmutableList.of(
makeStorageObject("b", "foo", 10L),
makeStorageObject("b", "foo/", 0L), // directory
@@ -163,11 +155,11 @@ private static void test(
final int maxListingLength
)
{
- final List expectedObjects = new ArrayList<>();
+ final List expectedObjects = new ArrayList<>();
// O(N^2) but who cares -- the list is short.
for (final String uri : expectedUris) {
- final List matches = TEST_OBJECTS
+ final List matches = TEST_OBJECTS
.stream()
.filter(storageObject -> GoogleUtils.objectToUri(storageObject).toString().equals(uri))
.collect(Collectors.toList());
@@ -175,7 +167,7 @@ private static void test(
expectedObjects.add(Iterables.getOnlyElement(matches));
}
- final List actualObjects = ImmutableList.copyOf(
+ final List actualObjects = ImmutableList.copyOf(
GoogleUtils.lazyFetchingStorageObjectsIterator(
makeMockClient(TEST_OBJECTS),
prefixes.stream().map(URI::create).iterator(),
@@ -194,70 +186,33 @@ private static void test(
* Makes a mock Google Storage client that handles enough of "List" to test the functionality of the
* {@link ObjectStorageIterator} class.
*/
- static GoogleStorage makeMockClient(final List storageObjects)
+ static GoogleStorage makeMockClient(final List storageObjects)
{
return new GoogleStorage(null)
{
@Override
- public Storage.Objects.List list(final String bucket)
+ public GoogleStorageObjectPage list(
+ final String bucket,
+ final String prefix,
+ final Long pageSize,
+ final String pageToken
+ )
{
- return mockList(bucket, storageObjects);
- }
- };
- }
-
- @SuppressWarnings("UnnecessaryFullyQualifiedName")
- static class MockStorage extends Storage
- {
- private MockStorage()
- {
- super(
- EasyMock.niceMock(HttpTransport.class),
- EasyMock.niceMock(JsonFactory.class),
- EasyMock.niceMock(HttpRequestInitializer.class)
- );
- }
-
- private MockList mockList(String bucket, java.util.List storageObjects)
- {
- return new MockObjects().mockList(bucket, storageObjects);
- }
-
- class MockObjects extends Storage.Objects
- {
- private MockList mockList(String bucket, java.util.List storageObjects)
- {
- return new MockList(bucket, storageObjects);
- }
-
- class MockList extends Objects.List
- {
- private final java.util.List storageObjects;
-
- private MockList(String bucket, java.util.List storageObjects)
- {
- super(bucket);
- this.storageObjects = storageObjects;
- }
-
- @Override
- public com.google.api.services.storage.model.Objects execute()
{
// Continuation token is an index in the "objects" list.
- final String continuationToken = getPageToken();
- final int startIndex = continuationToken == null ? 0 : Integer.parseInt(continuationToken);
+ final int startIndex = pageToken == null ? 0 : Integer.parseInt(pageToken);
// Find matching objects.
- java.util.List objects = new ArrayList<>();
+ List objects = new ArrayList<>();
int nextIndex = -1;
for (int i = startIndex; i < storageObjects.size(); i++) {
- final StorageObject storageObject = storageObjects.get(i);
+ final GoogleStorageObjectMetadata storageObject = storageObjects.get(i);
- if (storageObject.getBucket().equals(getBucket())
- && storageObject.getName().startsWith(getPrefix())) {
+ if (storageObject.getBucket().equals(bucket)
+ && storageObject.getName().startsWith(prefix)) {
- if (objects.size() == getMaxResults()) {
+ if (objects.size() == pageSize) {
// We reached our max key limit; set nextIndex (which will lead to a result with truncated = true).
nextIndex = i;
break;
@@ -268,30 +223,18 @@ public com.google.api.services.storage.model.Objects execute()
}
}
- com.google.api.services.storage.model.Objects retVal = new com.google.api.services.storage.model.Objects();
- retVal.setItems(objects);
- if (nextIndex >= 0) {
- retVal.setNextPageToken(String.valueOf(nextIndex));
- } else {
- retVal.setNextPageToken(null);
- }
+ GoogleStorageObjectPage retVal = new GoogleStorageObjectPage(
+ objects,
+ nextIndex >= 0 ? String.valueOf(nextIndex) : null
+ );
return retVal;
}
}
- }
- }
-
- private static MockList mockList(String bucket, List storageObjects)
- {
- return new MockStorage().mockList(bucket, storageObjects);
+ };
}
- static StorageObject makeStorageObject(final String bucket, final String key, final long size)
+ static GoogleStorageObjectMetadata makeStorageObject(final String bucket, final String key, final long size)
{
- final StorageObject summary = new StorageObject();
- summary.setBucket(bucket);
- summary.setName(key);
- summary.setSize(BigInteger.valueOf(size));
- return summary;
+ return new GoogleStorageObjectMetadata(bucket, key, size, null);
}
}
diff --git a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/output/GoogleInputRangeTest.java b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/output/GoogleInputRangeTest.java
new file mode 100644
index 000000000000..1c3dabcf984e
--- /dev/null
+++ b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/output/GoogleInputRangeTest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.google.output;
+
+import nl.jqno.equalsverifier.EqualsVerifier;
+import org.junit.Test;
+
+public class GoogleInputRangeTest
+{
+ @Test
+ public void testEquals()
+ {
+ EqualsVerifier.forClass(GoogleInputRange.class)
+ .usingGetClass()
+ .verify();
+ }
+}
diff --git a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/output/GoogleOutputConfigTest.java b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/output/GoogleOutputConfigTest.java
new file mode 100644
index 000000000000..59081d96149b
--- /dev/null
+++ b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/output/GoogleOutputConfigTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.google.output;
+
+
+import org.apache.druid.error.DruidException;
+import org.apache.druid.java.util.common.HumanReadableBytes;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class GoogleOutputConfigTest
+{
+
+ @Rule
+ public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ private static final String BUCKET = "bucket";
+ private static final String PREFIX = "prefix";
+ private static final int MAX_RETRY_COUNT = 0;
+
+ @Test
+ public void testTooLargeChunkSize()
+ {
+ HumanReadableBytes chunkSize = new HumanReadableBytes("17MiB");
+ Assert.assertThrows(
+ DruidException.class,
+ () -> new GoogleOutputConfig(BUCKET, PREFIX, temporaryFolder.newFolder(), chunkSize, MAX_RETRY_COUNT)
+ );
+ }
+}
diff --git a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/output/GoogleStorageConnectorProviderTest.java b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/output/GoogleStorageConnectorProviderTest.java
new file mode 100644
index 000000000000..df6c66e84c3f
--- /dev/null
+++ b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/output/GoogleStorageConnectorProviderTest.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.google.output;
+
+import com.fasterxml.jackson.databind.InjectableValues;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.inject.Injector;
+import com.google.inject.Key;
+import com.google.inject.ProvisionException;
+import com.google.inject.name.Names;
+import org.apache.druid.guice.JsonConfigProvider;
+import org.apache.druid.guice.LazySingleton;
+import org.apache.druid.guice.StartupInjectorBuilder;
+import org.apache.druid.storage.StorageConnector;
+import org.apache.druid.storage.StorageConnectorModule;
+import org.apache.druid.storage.StorageConnectorProvider;
+import org.apache.druid.storage.google.GoogleInputDataConfig;
+import org.apache.druid.storage.google.GoogleStorage;
+import org.apache.druid.storage.google.GoogleStorageDruidModule;
+import org.easymock.EasyMock;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.Properties;
+
+public class GoogleStorageConnectorProviderTest
+{
+ private static final String CUSTOM_NAMESPACE = "custom";
+
+ @Test
+ public void createGoogleStorageFactoryWithRequiredProperties()
+ {
+
+ final Properties properties = new Properties();
+ properties.setProperty(CUSTOM_NAMESPACE + ".type", "google");
+ properties.setProperty(CUSTOM_NAMESPACE + ".bucket", "bucket");
+ properties.setProperty(CUSTOM_NAMESPACE + ".prefix", "prefix");
+ properties.setProperty(CUSTOM_NAMESPACE + ".tempDir", "/tmp");
+ StorageConnectorProvider googleStorageConnectorProvider = getStorageConnectorProvider(properties);
+
+ Assert.assertTrue(googleStorageConnectorProvider instanceof GoogleStorageConnectorProvider);
+ Assert.assertTrue(googleStorageConnectorProvider.get() instanceof GoogleStorageConnector);
+ Assert.assertEquals("bucket", ((GoogleStorageConnectorProvider) googleStorageConnectorProvider).getBucket());
+ Assert.assertEquals("prefix", ((GoogleStorageConnectorProvider) googleStorageConnectorProvider).getPrefix());
+ Assert.assertEquals(new File("/tmp"), ((GoogleStorageConnectorProvider) googleStorageConnectorProvider).getTempDir());
+
+ }
+
+ @Test
+ public void createGoogleStorageFactoryWithMissingPrefix()
+ {
+
+ final Properties properties = new Properties();
+ properties.setProperty(CUSTOM_NAMESPACE + ".type", "bucket");
+ properties.setProperty(CUSTOM_NAMESPACE + ".bucket", "bucket");
+ properties.setProperty(CUSTOM_NAMESPACE + ".tempDir", "/tmp");
+ Assert.assertThrows(
+ "Missing required creator property 'prefix'",
+ ProvisionException.class,
+ () -> getStorageConnectorProvider(properties)
+ );
+ }
+
+
+ @Test
+ public void createGoogleStorageFactoryWithMissingbucket()
+ {
+
+ final Properties properties = new Properties();
+ properties.setProperty(CUSTOM_NAMESPACE + ".type", "Google");
+ properties.setProperty(CUSTOM_NAMESPACE + ".prefix", "prefix");
+ properties.setProperty(CUSTOM_NAMESPACE + ".tempDir", "/tmp");
+ Assert.assertThrows(
+ "Missing required creator property 'bucket'",
+ ProvisionException.class,
+ () -> getStorageConnectorProvider(properties)
+ );
+ }
+
+ @Test
+ public void createGoogleStorageFactoryWithMissingTempDir()
+ {
+
+ final Properties properties = new Properties();
+ properties.setProperty(CUSTOM_NAMESPACE + ".type", "Google");
+ properties.setProperty(CUSTOM_NAMESPACE + ".bucket", "bucket");
+ properties.setProperty(CUSTOM_NAMESPACE + ".prefix", "prefix");
+
+ Assert.assertThrows(
+ "Missing required creator property 'tempDir'",
+ ProvisionException.class,
+ () -> getStorageConnectorProvider(properties)
+ );
+ }
+
+ private StorageConnectorProvider getStorageConnectorProvider(Properties properties)
+ {
+ StartupInjectorBuilder startupInjectorBuilder = new StartupInjectorBuilder().add(
+ new GoogleStorageDruidModule(),
+ new StorageConnectorModule(),
+ new GoogleStorageConnectorModule(),
+ binder -> {
+ JsonConfigProvider.bind(
+ binder,
+ CUSTOM_NAMESPACE,
+ StorageConnectorProvider.class,
+ Names.named(CUSTOM_NAMESPACE)
+ );
+
+ binder.bind(Key.get(StorageConnector.class, Names.named(CUSTOM_NAMESPACE)))
+ .toProvider(Key.get(StorageConnectorProvider.class, Names.named(CUSTOM_NAMESPACE)))
+ .in(LazySingleton.class);
+ }
+ ).withProperties(properties);
+
+ Injector injector = startupInjectorBuilder.build();
+ injector.getInstance(ObjectMapper.class).registerModules(new GoogleStorageConnectorModule().getJacksonModules());
+ injector.getInstance(ObjectMapper.class).setInjectableValues(
+ new InjectableValues.Std()
+ .addValue(
+ GoogleStorage.class,
+ EasyMock.mock(GoogleStorage.class)
+ ).addValue(
+ GoogleInputDataConfig.class,
+ EasyMock.mock(GoogleInputDataConfig.class)
+ ));
+
+
+ return injector.getInstance(Key.get(
+ StorageConnectorProvider.class,
+ Names.named(CUSTOM_NAMESPACE)
+ ));
+ }
+}
diff --git a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/output/GoogleStorageConnectorTest.java b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/output/GoogleStorageConnectorTest.java
new file mode 100644
index 000000000000..7a5e6ba107b8
--- /dev/null
+++ b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/output/GoogleStorageConnectorTest.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.google.output;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import org.apache.commons.io.IOUtils;
+import org.apache.druid.java.util.common.HumanReadableBytes;
+import org.apache.druid.storage.google.GoogleInputDataConfig;
+import org.apache.druid.storage.google.GoogleStorage;
+import org.apache.druid.storage.google.GoogleStorageObjectMetadata;
+import org.apache.druid.storage.google.GoogleStorageObjectPage;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+public class GoogleStorageConnectorTest
+{
+ private static final String BUCKET = "BUCKET";
+ private static final String PREFIX = "PREFIX";
+ private static final String TEST_FILE = "TEST_FILE";
+ @Rule
+ public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ private static final int MAX_LISTING_LEN = 10;
+
+ private static final HumanReadableBytes CHUNK_SIZE = new HumanReadableBytes("4MiB");
+
+ GoogleStorageConnector googleStorageConnector;
+ private final GoogleStorage googleStorage = EasyMock.createMock(GoogleStorage.class);
+
+ @Before
+ public void setUp() throws IOException
+ {
+ GoogleOutputConfig config = new GoogleOutputConfig(BUCKET, PREFIX, temporaryFolder.newFolder(), CHUNK_SIZE, null);
+ GoogleInputDataConfig inputDataConfig = new GoogleInputDataConfig();
+ inputDataConfig.setMaxListingLength(MAX_LISTING_LEN);
+ googleStorageConnector = new GoogleStorageConnector(config, googleStorage, inputDataConfig);
+ }
+
+ @Test
+ public void testPathExistsSuccess()
+ {
+ final Capture bucket = Capture.newInstance();
+ final Capture path = Capture.newInstance();
+ EasyMock.expect(googleStorage.exists(EasyMock.capture(bucket), EasyMock.capture(path))).andReturn(true);
+ EasyMock.replay(googleStorage);
+ Assert.assertTrue(googleStorageConnector.pathExists(TEST_FILE));
+ Assert.assertEquals(BUCKET, bucket.getValue());
+ Assert.assertEquals(PREFIX + "/" + TEST_FILE, path.getValue());
+ EasyMock.verify(googleStorage);
+ }
+
+ @Test
+ public void testPathExistsFailure()
+ {
+ final Capture bucket = Capture.newInstance();
+ final Capture path = Capture.newInstance();
+ EasyMock.expect(googleStorage.exists(EasyMock.capture(bucket), EasyMock.capture(path))).andReturn(false);
+ EasyMock.replay(googleStorage);
+ Assert.assertFalse(googleStorageConnector.pathExists(TEST_FILE));
+ Assert.assertEquals(BUCKET, bucket.getValue());
+ Assert.assertEquals(PREFIX + "/" + TEST_FILE, path.getValue());
+ EasyMock.verify(googleStorage);
+ }
+
+ @Test
+ public void testDeleteFile() throws IOException
+ {
+ Capture bucketCapture = EasyMock.newCapture();
+ Capture pathCapture = EasyMock.newCapture();
+ googleStorage.delete(
+ EasyMock.capture(bucketCapture),
+ EasyMock.capture(pathCapture)
+ );
+
+ EasyMock.replay(googleStorage);
+ googleStorageConnector.deleteFile(TEST_FILE);
+ Assert.assertEquals(BUCKET, bucketCapture.getValue());
+ Assert.assertEquals(PREFIX + "/" + TEST_FILE, pathCapture.getValue());
+ }
+
+ @Test
+ public void testDeleteFiles() throws IOException
+ {
+ Capture containerCapture = EasyMock.newCapture();
+ Capture> pathsCapture = EasyMock.newCapture();
+ googleStorage.batchDelete(EasyMock.capture(containerCapture), EasyMock.capture(pathsCapture));
+ EasyMock.replay(googleStorage);
+ googleStorageConnector.deleteFiles(ImmutableList.of(TEST_FILE + "_1.part", TEST_FILE + "_2.json"));
+ Assert.assertEquals(BUCKET, containerCapture.getValue());
+ Assert.assertEquals(
+ ImmutableList.of(
+ PREFIX + "/" + TEST_FILE + "_1.part",
+ PREFIX + "/" + TEST_FILE + "_2.json"
+ ),
+ Lists.newArrayList(pathsCapture.getValue())
+ );
+ EasyMock.reset(googleStorage);
+ }
+
+ @Test
+ public void testListDir() throws IOException
+ {
+ GoogleStorageObjectMetadata objectMetadata1 = new GoogleStorageObjectMetadata(
+ BUCKET,
+ PREFIX + "/x/y" + TEST_FILE,
+ (long) 3,
+ null
+ );
+ GoogleStorageObjectMetadata objectMetadata2 = new GoogleStorageObjectMetadata(
+ BUCKET,
+ PREFIX + "/p/q/r/" + TEST_FILE,
+ (long) 4,
+ null
+ );
+ Capture maxListingCapture = EasyMock.newCapture();
+ Capture pageTokenCapture = EasyMock.newCapture();
+ EasyMock.expect(googleStorage.list(
+ EasyMock.anyString(),
+ EasyMock.anyString(),
+ EasyMock.capture(maxListingCapture),
+ EasyMock.capture(pageTokenCapture)
+ ))
+ .andReturn(new GoogleStorageObjectPage(ImmutableList.of(objectMetadata1, objectMetadata2), null));
+ EasyMock.replay(googleStorage);
+ List ret = Lists.newArrayList(googleStorageConnector.listDir(""));
+ Assert.assertEquals(ImmutableList.of("x/y" + TEST_FILE, "p/q/r/" + TEST_FILE), ret);
+ Assert.assertEquals(MAX_LISTING_LEN, maxListingCapture.getValue().intValue());
+ Assert.assertEquals(null, pageTokenCapture.getValue());
+
+ }
+
+ @Test
+ public void testRead() throws IOException
+ {
+ String data = "test";
+ EasyMock.expect(googleStorage.size(EasyMock.anyString(), EasyMock.anyString()))
+ .andReturn(4L);
+ EasyMock.expect(
+ googleStorage.getInputStream(
+ EasyMock.anyString(),
+ EasyMock.anyString(),
+ EasyMock.anyLong(),
+ EasyMock.anyLong()
+ )
+ ).andReturn(IOUtils.toInputStream(data, StandardCharsets.UTF_8));
+
+ EasyMock.replay(googleStorage);
+ InputStream is = googleStorageConnector.read(TEST_FILE);
+ byte[] dataBytes = new byte[data.length()];
+ Assert.assertEquals(data.length(), is.read(dataBytes));
+ Assert.assertEquals(-1, is.read());
+ Assert.assertEquals(data, new String(dataBytes, StandardCharsets.UTF_8));
+ }
+
+ @Test
+ public void testReadRange() throws IOException
+ {
+ String data = "test";
+
+ for (int start = 0; start < data.length(); ++start) {
+ for (long length = 1; length <= data.length() - start; ++length) {
+ String dataQueried = data.substring(start, start + ((Long) length).intValue());
+ EasyMock.expect(googleStorage.getInputStream(
+ EasyMock.anyString(),
+ EasyMock.anyString(),
+ EasyMock.anyLong(),
+ EasyMock.anyLong()
+ ))
+ .andReturn(IOUtils.toInputStream(dataQueried, StandardCharsets.UTF_8));
+ EasyMock.replay(googleStorage);
+
+ InputStream is = googleStorageConnector.readRange(TEST_FILE, start, length);
+ byte[] dataBytes = new byte[((Long) length).intValue()];
+ Assert.assertEquals(length, is.read(dataBytes));
+ Assert.assertEquals(-1, is.read());
+ Assert.assertEquals(dataQueried, new String(dataBytes, StandardCharsets.UTF_8));
+ EasyMock.reset(googleStorage);
+ }
+ }
+
+ }
+}
diff --git a/integration-tests-ex/cases/pom.xml b/integration-tests-ex/cases/pom.xml
index a2b5e596671a..4eb51f9b75d1 100644
--- a/integration-tests-ex/cases/pom.xml
+++ b/integration-tests-ex/cases/pom.xml
@@ -145,23 +145,6 @@
aws-java-sdk-core
${aws.sdk.version}