Skip to content
This repository has been archived by the owner on Nov 8, 2024. It is now read-only.

refactor: customize HTTP transport for GCS client #4

Merged
merged 1 commit into from
Jul 13, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-storage</artifactId>
<version>1.28.0</version>
<version>1.35.0</version>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
import com.google.cloud.storage.Bucket;
import com.google.cloud.storage.BucketInfo;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.Storage.BlobField;
import com.google.cloud.storage.Storage.BlobGetOption;
import com.google.cloud.storage.Storage.BlobListOption;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.LoadingCache;
Expand Down Expand Up @@ -139,6 +141,12 @@ protected void doStart() throws Exception {
metricsStore.start();
}

@Override
protected void doStop() throws Exception {
liveBlobs = null;
metricsStore.stop();
}

@Override
@Guarded(by = STARTED)
public Blob create(final InputStream inputStream, final Map<String, String> headers) {
Expand All @@ -162,7 +170,7 @@ public Blob create(final Path path, final Map<String, String> map, final long si
@Override
@Guarded(by = STARTED)
public Blob copy(final BlobId blobId, final Map<String, String> headers) {
GoogleCloudStorageBlob sourceBlob = checkNotNull(getInternal(blobId));
GoogleCloudStorageBlob sourceBlob = (GoogleCloudStorageBlob) checkNotNull(get(blobId));

return createInternal(headers, destination -> {
sourceBlob.getBlob().copyTo(getConfiguredBucketName(), destination);
Expand All @@ -175,17 +183,12 @@ public Blob copy(final BlobId blobId, final Map<String, String> headers) {
@Override
@Guarded(by = STARTED)
public Blob get(final BlobId blobId) {
return getInternal(blobId);
return get(blobId, false);
}

@Nullable
@Override
public Blob get(final BlobId blobId, final boolean includeDeleted) {
// TODO implement soft-delete
return getInternal(blobId);
}

GoogleCloudStorageBlob getInternal(final BlobId blobId) {
checkNotNull(blobId);

final GoogleCloudStorageBlob blob = liveBlobs.getUnchecked(blobId);
Expand All @@ -194,14 +197,14 @@ GoogleCloudStorageBlob getInternal(final BlobId blobId) {
Lock lock = blob.lock();
try {
if (blob.isStale()) {
GoogleCloudBlobAttributes blobAttributes = new GoogleCloudBlobAttributes(bucket, attributePath(blobId).toString());
GoogleCloudBlobAttributes blobAttributes = new GoogleCloudBlobAttributes(bucket, attributePath(blobId));
boolean loaded = blobAttributes.load();
if (!loaded) {
log.warn("Attempt to access non-existent blob {} ({})", blobId, blobAttributes);
return null;
}

if (blobAttributes.isDeleted()) {
if (blobAttributes.isDeleted() && !includeDeleted) {
log.warn("Attempt to access soft-deleted blob {} ({})", blobId, blobAttributes);
return null;
}
Expand All @@ -226,7 +229,7 @@ GoogleCloudStorageBlob getInternal(final BlobId blobId) {

@Override
@Guarded(by = STARTED)
public boolean delete(final BlobId blobId, final String s) {
public boolean delete(final BlobId blobId, final String reason) {
// FIXME: implement soft delete
return deleteHard(blobId);
}
Expand All @@ -243,13 +246,8 @@ public boolean deleteHard(final BlobId blobId) {
GoogleCloudBlobAttributes blobAttributes = new GoogleCloudBlobAttributes(bucket, attributePath);
Long contentSize = getContentSizeForDeletion(blobAttributes);

GoogleCloudStorageBlob blob = getInternal(blobId);
boolean blobDeleted = false;
if (blob != null) {
blobDeleted = blob.getBlob().delete();
}

blobAttributes.setDeleted(blobDeleted);
boolean blobDeleted = storage.delete(getConfiguredBucketName(), contentPath(blobId));
storage.delete(getConfiguredBucketName(), attributePath);

if (blobDeleted && contentSize != null) {
metricsStore.recordDeletion(contentSize);
Expand Down Expand Up @@ -310,7 +308,7 @@ protected Bucket getOrCreateStorageBucket() {
@Override
@Guarded(by = {NEW, STOPPED, FAILED})
public void remove() {
// TODO delete bucket?
// TODO delete bucket only if it is empty
}

@Override
Expand Down Expand Up @@ -413,17 +411,29 @@ Blob createInternal(final Map<String, String> headers, BlobIngester ingester) {
return blob;
}
catch (IOException e) {
// TODO delete what we created?
blob.getBlob().delete();
if (blobAttributes != null) {
blobAttributes.setDeleted(true);
}
deleteNonExplosively(attributePath);
deleteNonExplosively(blobPath);
throw new BlobStoreException(e, blobId);
}
finally {
lock.unlock();
}
}

/**
* Intended for use only within catch blocks that intend to throw their own {@link BlobStoreException}
* for another good reason.
*
* @param contentPath the path within the configured bucket to delete
*/
private void deleteNonExplosively(final String contentPath) {
try {
storage.delete(getConfiguredBucketName(), contentPath);
} catch (Exception e) {
log.warn("caught exception attempting to delete during cleanup", e);
}
}

/**
* Returns path for blob-id content file relative to root directory.
*/
Expand Down Expand Up @@ -473,7 +483,7 @@ public InputStream getInputStream() {
}

com.google.cloud.storage.Blob getBlob() {
return bucket.get(contentPath(getId()));
return bucket.get(contentPath(getId()), BlobGetOption.fields(BlobField.MEDIA_LINK));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import com.google.cloud.ReadChannel;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.Bucket;
import com.google.cloud.storage.Storage.BlobField;
import com.google.cloud.storage.Storage.BlobGetOption;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -45,7 +47,7 @@ public GoogleCloudPropertiesFile(final Bucket bucket, final String key) {
public void load() throws IOException {
log.debug("Loading properties: {}", key);

Blob blob = bucket.get(key);
Blob blob = bucket.get(key, BlobGetOption.fields(BlobField.MEDIA_LINK));
try (ReadChannel channel = blob.reader()) {
load(Channels.newInputStream(channel));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@
import com.google.cloud.http.HttpTransportOptions;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
import org.apache.http.client.HttpClient;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.conn.PoolingClientConnectionManager;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.params.BasicHttpParams;
import org.apache.http.params.CoreConnectionPNames;
import org.apache.shiro.util.StringUtils;

import static org.sonatype.nexus.blobstore.gcloud.internal.GoogleCloudBlobStore.CONFIG_KEY;
Expand All @@ -44,9 +51,33 @@ Storage create(final BlobStoreConfiguration configuration) throws Exception {
return builder.build().getService();
}

/**
* This method overrides the default {@link com.google.auth.http.HttpTransportFactory} with the Apache HTTP Client
* backed implementation. In addition, it modifies the {@link HttpClient} used internally to use a
* {@link PoolingClientConnectionManager}.
*
* Note: at time of writing, this method uses deprecated classes that have been replaced in HttpClient with
* {@link HttpClientBuilder}. We cannot use {@link HttpClientBuilder} currently because of a problem with the
* Google Cloud Storage library's {@link ApacheHttpTransport} constructor; the {@link HttpClient} instance
* returned by {@link HttpClientBuilder#build()} throws an {@link UnsupportedOperationException} for
* {@link HttpClient#getParams()}.
*
* @see PoolingHttpClientConnectionManager
* @see HttpClientBuilder
* @return customized {@link TransportOptions} to use for our {@link Storage} client instance
*/
TransportOptions transportOptions() {
return HttpTransportOptions.newBuilder()
.setHttpTransportFactory(() -> new ApacheHttpTransport())
.build();
// replicate default connection and protocol parameters used within {@link ApacheHttpTransport}
PoolingClientConnectionManager connManager = new PoolingClientConnectionManager();
connManager.setDefaultMaxPerRoute(20);
connManager.setMaxTotal(200);
BasicHttpParams params = new BasicHttpParams();
params.setParameter(CoreConnectionPNames.STALE_CONNECTION_CHECK, false);
params.setParameter(CoreConnectionPNames.SOCKET_BUFFER_SIZE, 8192);
DefaultHttpClient client = new DefaultHttpClient(connManager, params);

return HttpTransportOptions.newBuilder()
.setHttpTransportFactory(() -> new ApacheHttpTransport(client))
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import com.google.api.gax.paging.Page
import com.google.cloud.storage.Bucket
import com.google.cloud.storage.Storage
import com.google.cloud.storage.Storage.BlobListOption
import org.apache.commons.io.IOUtils
import spock.lang.Specification

class GoogleCloudBlobStoreTest
Expand Down Expand Up @@ -131,8 +132,8 @@ class GoogleCloudBlobStoreTest
storage.get('mybucket') >> bucket
blobStore.init(config)
blobStore.doStart()
bucket.get('content/existing.properties') >> mockGoogleObject(tempFileAttributes)
bucket.get('content/existing.bytes') >> mockGoogleObject(tempFileBytes)
bucket.get('content/existing.properties', _) >> mockGoogleObject(tempFileAttributes)
bucket.get('content/existing.bytes', _) >> mockGoogleObject(tempFileBytes)

when: 'call create'
Blob blob = blobStore.get(new BlobId('existing'))
Expand Down Expand Up @@ -168,7 +169,7 @@ class GoogleCloudBlobStoreTest
def 'start will accept a metadata.properties originally created with file blobstore'() {
given: 'metadata.properties comes from a file blobstore'
storage.get('mybucket') >> bucket
2 * bucket.get('metadata.properties') >> mockGoogleObject(fileMetadata)
2 * bucket.get('metadata.properties', _) >> mockGoogleObject(fileMetadata)

when: 'doStart is called'
blobStore.init(config)
Expand All @@ -182,7 +183,7 @@ class GoogleCloudBlobStoreTest
given: 'metadata.properties comes from some unknown blobstore'
storage.get('mybucket') >> bucket
storage.get('mybucket') >> bucket
2 * bucket.get('metadata.properties') >> mockGoogleObject(otherMetadata)
2 * bucket.get('metadata.properties', _) >> mockGoogleObject(otherMetadata)

when: 'doStart is called'
blobStore.init(config)
Expand All @@ -197,7 +198,7 @@ class GoogleCloudBlobStoreTest
storage.get('mybucket') >> bucket
blobStore.init(config)
blobStore.doStart()
bucket.get('content/existing.properties') >> mockGoogleObject(tempFileAttributes)
bucket.get('content/existing.properties', _) >> mockGoogleObject(tempFileAttributes)

when: 'call exists'
boolean exists = blobStore.exists(new BlobId('existing'))
Expand All @@ -224,7 +225,7 @@ class GoogleCloudBlobStoreTest
storage.get('mybucket') >> bucket
blobStore.init(config)
blobStore.doStart()
bucket.get('content/existing.properties') >> { throw new IOException("this is a test") }
bucket.get('content/existing.properties', _) >> { throw new IOException("this is a test") }

when: 'call exists'
blobStore.exists(new BlobId('existing'))
Expand All @@ -236,6 +237,7 @@ class GoogleCloudBlobStoreTest
private mockGoogleObject(File file) {
com.google.cloud.storage.Blob blob = Mock()
blob.reader() >> new DelegatingReadChannel(FileChannel.open(file.toPath()))
blob.getContent() >> IOUtils.toByteArray(new FileInputStream(file))
blob
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import java.nio.channels.FileChannel

import com.google.cloud.storage.Blob
import com.google.cloud.storage.Bucket
import org.apache.commons.io.IOUtils
import spock.lang.Specification

/**
Expand All @@ -42,7 +43,8 @@ class GoogleCloudPropertiesFileTest
def setup() {
Blob blob = Mock()
blob.reader() >> new DelegatingReadChannel(FileChannel.open(tempFile.toPath()))
bucket.get('mykey') >> blob
blob.getContent() >> IOUtils.toByteArray(new FileInputStream(tempFile))
bucket.get('mykey', _) >> blob
}

def "Load ingests properties from google cloud storage object"() {
Expand Down