Skip to content
This repository has been archived by the owner on Nov 8, 2024. It is now read-only.

Commit

Permalink
refactor: Short, fixed length keep alive, retry and stale checking (#14)
Browse files Browse the repository at this point in the history
* refactor: Short, fixed length keep alive, retry and stale checking

This change addresses the issue reported in #13 and similar related issues to HTTP transport by:

* Attempting to keep alive connections if possible,
* Performing stale checking on the connection to avoid using a pooled connection that's been closed,
* and retrying a request for a handful of connection layer issues (see DefaultHttpRequestRetryHandler).
  • Loading branch information
nblair authored Oct 5, 2018
1 parent efbdf42 commit b042916
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,50 +12,89 @@
*/
package org.sonatype.nexus.blobstore.gcloud.internal;

import java.net.ProxySelector;

import com.google.api.client.http.apache.ApacheHttpTransport;
import com.google.cloud.TransportOptions;
import com.google.cloud.http.HttpTransportOptions;
import com.google.cloud.storage.Storage;
import org.apache.http.client.HttpClient;
import org.apache.http.conn.ClientConnectionManager;
import org.apache.http.conn.params.ConnManagerParams;
import org.apache.http.conn.params.ConnPerRouteBean;
import org.apache.http.conn.scheme.PlainSocketFactory;
import org.apache.http.conn.scheme.Scheme;
import org.apache.http.conn.scheme.SchemeRegistry;
import org.apache.http.conn.ssl.SSLSocketFactory;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.conn.PoolingClientConnectionManager;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.impl.client.DefaultHttpRequestRetryHandler;
import org.apache.http.impl.conn.ProxySelectorRoutePlanner;
import org.apache.http.impl.conn.tsccm.ThreadSafeClientConnManager;
import org.apache.http.params.BasicHttpParams;
import org.apache.http.params.CoreConnectionPNames;
import org.apache.http.params.HttpConnectionParams;
import org.apache.http.params.HttpParams;

/**
* Abstract supertype for Factory classes that generate Google Clients (for Storage, Datastore, etc).
*/
public abstract class AbstractGoogleClientFactory
{
/**
* This method overrides the default {@link com.google.auth.http.HttpTransportFactory} with the Apache HTTP Client
* backed implementation. In addition, it modifies the {@link HttpClient} used internally to use a
* {@link PoolingClientConnectionManager}.
*
* Note: at time of writing, this method uses deprecated classes that have been replaced in HttpClient with
* {@link HttpClientBuilder}. We cannot use {@link HttpClientBuilder} currently because of a problem with the
* Google Cloud Storage library's {@link ApacheHttpTransport} constructor; the {@link HttpClient} instance
* returned by {@link HttpClientBuilder#build()} throws an {@link UnsupportedOperationException} for
* {@link HttpClient#getParams()}.
* Fixed keep-alive for HTTP connections of 1 minute.
*/
public static final long KEEP_ALIVE_DURATION = 60_000L;

/**
* Provide a {@link TransportOptions} backed by Apache HTTP Client.
*
* @see PoolingHttpClientConnectionManager
* @see HttpClientBuilder
* @return customized {@link TransportOptions} to use for our {@link Storage} client instance
* @see ApacheHttpTransport
* @return customized {@link TransportOptions} to use for our Google client instances
*/
TransportOptions transportOptions() {
// replicate default connection and protocol parameters used within {@link ApacheHttpTransport}
PoolingClientConnectionManager connManager = new PoolingClientConnectionManager();
connManager.setDefaultMaxPerRoute(20);
connManager.setMaxTotal(200);
BasicHttpParams params = new BasicHttpParams();
params.setParameter(CoreConnectionPNames.STALE_CONNECTION_CHECK, false);
params.setParameter(CoreConnectionPNames.SOCKET_BUFFER_SIZE, 8192);
DefaultHttpClient client = new DefaultHttpClient(connManager, params);

return HttpTransportOptions.newBuilder()
.setHttpTransportFactory(() -> new ApacheHttpTransport(client))
.setHttpTransportFactory(() -> new ApacheHttpTransport(newHttpClient()))
.build();
}

HttpClient newHttpClient() {
return newDefaultHttpClient(
SSLSocketFactory.getSocketFactory(), newDefaultHttpParams(), ProxySelector.getDefault());
}

/**
* Replicates default connection and protocol parameters used within
* {@link ApacheHttpTransport#newDefaultHttpClient()} with one exception:
*
* Stale checking is enabled.
*/
HttpParams newDefaultHttpParams() {
HttpParams params = new BasicHttpParams();
HttpConnectionParams.setStaleCheckingEnabled(params, true);
HttpConnectionParams.setSocketBufferSize(params, 8192);
ConnManagerParams.setMaxTotalConnections(params, 200);
ConnManagerParams.setMaxConnectionsPerRoute(params, new ConnPerRouteBean(20));
return params;
}

/**
* Replicates {@link ApacheHttpTransport#newDefaultHttpClient()} with one exception:
*
* 1 retry is allowed.
*
* @see DefaultHttpRequestRetryHandler
*/
DefaultHttpClient newDefaultHttpClient(
SSLSocketFactory socketFactory, HttpParams params, ProxySelector proxySelector) {
SchemeRegistry registry = new SchemeRegistry();
registry.register(new Scheme("http", PlainSocketFactory.getSocketFactory(), 80));
registry.register(new Scheme("https", socketFactory, 443));
ClientConnectionManager connectionManager = new ThreadSafeClientConnManager(params, registry);
DefaultHttpClient defaultHttpClient = new DefaultHttpClient(connectionManager, params);
// retry only once
defaultHttpClient.setHttpRequestRetryHandler(new DefaultHttpRequestRetryHandler(1, true));
if (proxySelector != null) {
defaultHttpClient.setRoutePlanner(new ProxySelectorRoutePlanner(registry, proxySelector));
}
defaultHttpClient.setKeepAliveStrategy((response, context) -> KEEP_ALIVE_DURATION);
return defaultHttpClient;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,6 @@ Blob createInternal(final Map<String, String> headers, BlobIngester ingester) {
final String blobPath = contentPath(blobId);
final String attributePath = attributePath(blobId);
final GoogleCloudStorageBlob blob = liveBlobs.getUnchecked(blobId);
GoogleCloudBlobAttributes blobAttributes = null;
Lock lock = blob.lock();
try {
log.debug("Writing blob {} to {}", blobId, blobPath);
Expand All @@ -526,7 +525,7 @@ Blob createInternal(final Map<String, String> headers, BlobIngester ingester) {
final BlobMetrics metrics = new BlobMetrics(new DateTime(), streamMetrics.getSha1(), streamMetrics.getSize());
blob.refresh(headers, metrics);

blobAttributes = new GoogleCloudBlobAttributes(bucket, attributePath, headers, metrics);
GoogleCloudBlobAttributes blobAttributes = new GoogleCloudBlobAttributes(bucket, attributePath, headers, metrics);

blobAttributes.store();
metricsStore.recordAddition(blobAttributes.getMetrics().getContentSize());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,9 @@

import org.sonatype.nexus.blobstore.api.BlobStoreConfiguration;

import com.google.api.client.http.apache.ApacheHttpTransport;
import com.google.auth.oauth2.ServiceAccountCredentials;
import com.google.cloud.TransportOptions;
import com.google.cloud.http.HttpTransportOptions;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
import org.apache.http.client.HttpClient;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.conn.PoolingClientConnectionManager;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.params.BasicHttpParams;
import org.apache.http.params.CoreConnectionPNames;
import org.apache.shiro.util.StringUtils;

import static org.sonatype.nexus.blobstore.gcloud.internal.GoogleCloudBlobStore.CONFIG_KEY;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ package org.sonatype.nexus.blobstore.gcloud.internal

import java.util.stream.Collectors
import java.util.stream.Stream
import java.util.stream.StreamSupport

import org.sonatype.nexus.blobstore.BlobIdLocationResolver
import org.sonatype.nexus.blobstore.DefaultBlobIdLocationResolver
Expand All @@ -36,6 +37,7 @@ import org.slf4j.LoggerFactory
import spock.lang.Specification

import static org.sonatype.nexus.blobstore.DirectPathLocationStrategy.DIRECT_PATH_PREFIX
import static org.sonatype.nexus.blobstore.gcloud.internal.AbstractGoogleClientFactory.KEEP_ALIVE_DURATION

class GoogleCloudBlobStoreIT
extends Specification
Expand Down Expand Up @@ -101,8 +103,12 @@ class GoogleCloudBlobStoreIT
Storage storage = new GoogleCloudStorageFactory().create(config)
log.debug("Tests complete, deleting files from ${bucketName}")
// must delete all the files within the bucket before we can delete the bucket
storage.list(bucketName,
Iterator<com.google.cloud.storage.Blob> list = storage.list(bucketName,
Storage.BlobListOption.prefix("")).iterateAll()
.iterator()

Iterable<com.google.cloud.storage.Blob> iterable = { _ -> list }
StreamSupport.stream(iterable.spliterator(), true)
.forEach({ b -> b.delete(BlobSourceOption.generationMatch()) })
storage.delete(bucketName)
log.info("Integration test complete, bucket ${bucketName} deleted")
Expand Down Expand Up @@ -144,6 +150,7 @@ class GoogleCloudBlobStoreIT
[ (BlobStore.BLOB_NAME_HEADER): 'foo',
(BlobStore.CREATED_BY_HEADER): 'someuser' ] )
assert blob != null
assert blobStore.getBlobAttributes(blob.id) != null
assert blobStore.delete(blob.id, 'testing')
BlobAttributes deletedAttributes = blobStore.getBlobAttributes(blob.id)
assert deletedAttributes.deleted
Expand All @@ -166,6 +173,7 @@ class GoogleCloudBlobStoreIT
(BlobStore.CREATED_BY_HEADER): 'someuser' ] )
assert blob != null
BlobAttributes attributes = blobStore.getBlobAttributes(blob.id)
assert attributes != null
assert blobStore.delete(blob.id, 'testing')
BlobAttributes deletedAttributes = blobStore.getBlobAttributes(blob.id)
assert deletedAttributes.deleted
Expand All @@ -186,6 +194,22 @@ class GoogleCloudBlobStoreIT
!blobStore.undelete(usageChecker, new BlobId("nonexistent"), attributes, false)
}

def "create after keep-alive window closes is still successful"() {
expect:
Blob blob = blobStore.create(new ByteArrayInputStream('hello'.getBytes()),
[ (BlobStore.BLOB_NAME_HEADER): 'foo1',
(BlobStore.CREATED_BY_HEADER): 'someuser' ] )
assert blob != null
// sit for at least the time on our keep alives, so that any held connections close
log.info("waiting for ${(KEEP_ALIVE_DURATION + 1000L) / 1000L} seconds any stale connections to close")
sleep(KEEP_ALIVE_DURATION + 1000L)

Blob blob2 = blobStore.create(new ByteArrayInputStream('hello'.getBytes()),
[ (BlobStore.BLOB_NAME_HEADER): 'foo2',
(BlobStore.CREATED_BY_HEADER): 'someuser' ] )
assert blob2 != null
}

def createFile(Storage storage, String path) {
storage.create(BlobInfo.newBuilder(bucketName, path).build(),
"content".bytes)
Expand Down
18 changes: 18 additions & 0 deletions src/test/resources/logback-test.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<configuration>

<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<!-- encoders are assigned the type
ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>

<logger name="com.google.api.client.http.apache" level="DEBUG" />
<!-- <logger name="org.apache.http.impl.conn.tsccm.ThreadSafeClientConnManager" level="DEBUG" /> -->
<logger name="org.sonatype" level="DEBUG" />

<root level="info">
<appender-ref ref="STDOUT" />
</root>
</configuration>

0 comments on commit b042916

Please sign in to comment.