Skip to content

Commit

Permalink
Update AWS SDK to disable temp proxy bypass, remove fixed executor fo…
Browse files Browse the repository at this point in the history
…r cache assignments (#703)

Co-authored-by: Bryan Burkholder <[email protected]>
  • Loading branch information
bryanlb and bryanlb authored Oct 19, 2023
1 parent e279059 commit 01e8b95
Show file tree
Hide file tree
Showing 5 changed files with 12 additions and 64 deletions.
4 changes: 2 additions & 2 deletions kaldb/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
<opensearch.version>2.7.0</opensearch.version>
<curator.version>5.5.0</curator.version>
<log4j.version>2.20.0</log4j.version>
<aws.sdk.version>2.20.116</aws.sdk.version>
<aws.sdk.version>2.21.2</aws.sdk.version>
<error.prone.version>2.16</error.prone.version>
<junit.jupiter.version>5.10.0</junit.jupiter.version>
</properties>
Expand Down Expand Up @@ -323,7 +323,7 @@
<dependency>
<groupId>software.amazon.awssdk.crt</groupId>
<artifactId>aws-crt</artifactId>
<version>0.22.1</version>
<version>0.27.4</version>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
Expand Down
25 changes: 2 additions & 23 deletions kaldb/src/main/java/com/slack/kaldb/blobfs/s3/S3CrtBlobFs.java
Original file line number Diff line number Diff line change
Expand Up @@ -113,17 +113,14 @@ public static S3AsyncClient initS3Client(KaldbConfigs.S3Config config) {
// continue to attempt to read data from a socket that is no longer returning data
S3CrtHttpConfiguration.Builder httpConfigurationBuilder =
S3CrtHttpConfiguration.builder()
.proxyConfiguration(
S3CrtProxyConfiguration.builder().useEnvironmentVariableValues(false).build())
.connectionTimeout(Duration.ofSeconds(5))
.connectionHealthConfiguration(
S3CrtConnectionHealthConfiguration.builder()
.minimumThroughputTimeout(Duration.ofSeconds(3))
.minimumThroughputInBps(32000L)
.build());

S3CrtProxyConfiguration proxyConfiguration = getProxyConfiguration();
if (proxyConfiguration != null) {
httpConfigurationBuilder.proxyConfiguration(proxyConfiguration);
}
s3AsyncClient.httpConfiguration(httpConfigurationBuilder.build());

if (!isNullOrEmpty(config.getS3EndPoint())) {
Expand All @@ -140,24 +137,6 @@ public static S3AsyncClient initS3Client(KaldbConfigs.S3Config config) {
}
}

/**
* Temporary system properties override for setting the aws crt proxy due to lack of <a
* href="https://github.com/awslabs/aws-c-http/issues/413">NO_PROXY support</a> This can be
* bypassed by providing a "valid" proxy that doesn't do anything, such as a no-op squid sidecar
*/
private static S3CrtProxyConfiguration getProxyConfiguration() {
try {
String scheme = System.getProperty("aws.s3.crt.proxy.scheme");
String host = System.getProperty("aws.s3.crt.proxy.host");
int port = Integer.parseInt(System.getProperty("aws.s3.crt.proxy.port"));
LOG.info("Using proxy for AWS S3 CRT client - scheme/host/port {}/{}/{}", scheme, host, port);
return S3CrtProxyConfiguration.builder().scheme(scheme).host(host).port(port).build();
} catch (Exception e) {
LOG.error("Error getting proxy config", e);
}
return null;
}

@Override
public void init(BlobFsConfig config) {
// Not sure if this interface works for a library. So on ice for now.
Expand Down
11 changes: 3 additions & 8 deletions kaldb/src/main/java/com/slack/kaldb/chunk/ReadOnlyChunkImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -67,8 +66,6 @@ public class ReadOnlyChunkImpl<T> implements Chunk<T> {
private final SnapshotMetadataStore snapshotMetadataStore;
private final SearchMetadataStore searchMetadataStore;
private final MeterRegistry meterRegistry;

private final ExecutorService executorService;
private final BlobFs blobFs;

public static final String CHUNK_ASSIGNMENT_TIMER = "chunk_assignment_timer";
Expand All @@ -93,14 +90,12 @@ public ReadOnlyChunkImpl(
CacheSlotMetadataStore cacheSlotMetadataStore,
ReplicaMetadataStore replicaMetadataStore,
SnapshotMetadataStore snapshotMetadataStore,
SearchMetadataStore searchMetadataStore,
ExecutorService executorService)
SearchMetadataStore searchMetadataStore)
throws Exception {
this.meterRegistry = meterRegistry;
this.blobFs = blobFs;
this.s3Bucket = s3Bucket;
this.dataDirectoryPrefix = dataDirectoryPrefix;
this.executorService = executorService;
this.searchContext = searchContext;
this.slotId = UUID.randomUUID().toString();

Expand Down Expand Up @@ -144,7 +139,7 @@ private void cacheNodeListener(CacheSlotMetadata cacheSlotMetadata) {
newSlotState,
cacheSlotMetadata);
}
executorService.execute(() -> handleChunkAssignment(cacheSlotMetadata));
Thread.ofVirtual().start(() -> handleChunkAssignment(cacheSlotMetadata));
} else if (newSlotState.equals(Metadata.CacheSlotMetadata.CacheSlotState.EVICT)) {
LOG.info("Chunk - EVICT received - {}", cacheSlotMetadata);
if (!cacheSlotLastKnownState.equals(Metadata.CacheSlotMetadata.CacheSlotState.LIVE)) {
Expand All @@ -154,7 +149,7 @@ private void cacheNodeListener(CacheSlotMetadata cacheSlotMetadata) {
newSlotState,
cacheSlotMetadata);
}
executorService.execute(() -> handleChunkEviction(cacheSlotMetadata));
Thread.ofVirtual().start(() -> handleChunkEviction(cacheSlotMetadata));
}
cacheSlotLastKnownState = newSlotState;
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.slack.kaldb.chunkManager;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.slack.kaldb.blobfs.BlobFs;
import com.slack.kaldb.chunk.ReadOnlyChunkImpl;
import com.slack.kaldb.chunk.SearchContext;
Expand All @@ -12,8 +11,6 @@
import com.slack.kaldb.proto.config.KaldbConfigs;
import io.micrometer.core.instrument.MeterRegistry;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.curator.x.async.AsyncCuratorFramework;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -37,7 +34,6 @@ public class CachingChunkManager<T> extends ChunkManagerBase<T> {
private SnapshotMetadataStore snapshotMetadataStore;
private SearchMetadataStore searchMetadataStore;
private CacheSlotMetadataStore cacheSlotMetadataStore;
private final ExecutorService executorService;

public CachingChunkManager(
MeterRegistry registry,
Expand All @@ -56,18 +52,6 @@ public CachingChunkManager(
this.dataDirectoryPrefix = dataDirectoryPrefix;
this.replicaSet = replicaSet;
this.slotCountPerInstance = slotCountPerInstance;

// todo - consider making the thread count a config option; this would allow for more
// fine-grained tuning, but we might not need to expose this to the user if we can set sensible
// defaults
this.executorService =
Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors() >= 4 ? 2 : 1,
new ThreadFactoryBuilder()
.setNameFormat("caching-chunk-manager-%d")
.setUncaughtExceptionHandler(
(t, e) -> LOG.error("Exception on thread {}: {}", t.getName(), e))
.build());
}

@Override
Expand All @@ -92,19 +76,14 @@ protected void startUp() throws Exception {
cacheSlotMetadataStore,
replicaMetadataStore,
snapshotMetadataStore,
searchMetadataStore,
ProtectedExecutorService.wrap(executorService)));
searchMetadataStore));
}
}

@Override
protected void shutDown() throws Exception {
LOG.info("Closing caching chunk manager.");

// Attempt to forcibly shutdown the executor service. This prevents any further downloading of
// data from S3 that would be unused.
executorService.shutdown();

chunkList.forEach(
(readonlyChunk) -> {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import brave.Tracing;
import com.adobe.testing.s3mock.junit5.S3MockExtension;
import com.google.common.util.concurrent.MoreExecutors;
import com.slack.kaldb.blobfs.LocalBlobFs;
import com.slack.kaldb.blobfs.s3.S3CrtBlobFs;
import com.slack.kaldb.blobfs.s3.S3TestUtils;
Expand Down Expand Up @@ -138,8 +137,7 @@ public void shouldHandleChunkLivecycle() throws Exception {
cacheSlotMetadataStore,
replicaMetadataStore,
snapshotMetadataStore,
searchMetadataStore,
MoreExecutors.newDirectExecutorService());
searchMetadataStore);

// wait for chunk to register
await()
Expand Down Expand Up @@ -268,8 +266,7 @@ public void shouldHandleMissingS3Assets() throws Exception {
cacheSlotMetadataStore,
replicaMetadataStore,
snapshotMetadataStore,
searchMetadataStore,
MoreExecutors.newDirectExecutorService());
searchMetadataStore);

// wait for chunk to register
await()
Expand Down Expand Up @@ -335,8 +332,7 @@ public void shouldHandleMissingZkData() throws Exception {
cacheSlotMetadataStore,
replicaMetadataStore,
snapshotMetadataStore,
searchMetadataStore,
MoreExecutors.newDirectExecutorService());
searchMetadataStore);

// wait for chunk to register
await()
Expand Down Expand Up @@ -403,8 +399,7 @@ public void closeShouldCleanupLiveChunkCorrectly() throws Exception {
cacheSlotMetadataStore,
replicaMetadataStore,
snapshotMetadataStore,
searchMetadataStore,
MoreExecutors.newDirectExecutorService());
searchMetadataStore);

// wait for chunk to register
await()
Expand Down

0 comments on commit 01e8b95

Please sign in to comment.