Skip to content

Commit

Permalink
Virtual threads test
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanlb committed Oct 18, 2023
1 parent e3e3de4 commit 6ec79d3
Show file tree
Hide file tree
Showing 8 changed files with 111 additions and 204 deletions.
3 changes: 2 additions & 1 deletion kaldb/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -562,7 +562,8 @@
<fork>true</fork>
<compilerArgs>
<arg>-XDcompilePolicy=simple</arg>
<arg> -Xplugin:ErrorProne -XepDisableWarningsInGeneratedCode -XepExcludedPaths:.*/protobuf/.* -Xep:WildcardImport:ERROR -Xep:AssertEqualsArgumentOrderChecker:ERROR</arg>
<arg>--enable-preview</arg>
<arg>-Xplugin:ErrorProne -XepDisableWarningsInGeneratedCode -XepExcludedPaths:.*/protobuf/.* -Xep:WildcardImport:ERROR -Xep:AssertEqualsArgumentOrderChecker:ERROR</arg>
<arg>-J--add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED</arg>
<arg>-J--add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED</arg>
<arg>-J--add-exports=jdk.compiler/com.sun.tools.javac.main=ALL-UNNAMED</arg>
Expand Down
11 changes: 3 additions & 8 deletions kaldb/src/main/java/com/slack/kaldb/chunk/ReadOnlyChunkImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -67,8 +66,6 @@ public class ReadOnlyChunkImpl<T> implements Chunk<T> {
private final SnapshotMetadataStore snapshotMetadataStore;
private final SearchMetadataStore searchMetadataStore;
private final MeterRegistry meterRegistry;

private final ExecutorService executorService;
private final BlobFs blobFs;

public static final String CHUNK_ASSIGNMENT_TIMER = "chunk_assignment_timer";
Expand All @@ -93,14 +90,12 @@ public ReadOnlyChunkImpl(
CacheSlotMetadataStore cacheSlotMetadataStore,
ReplicaMetadataStore replicaMetadataStore,
SnapshotMetadataStore snapshotMetadataStore,
SearchMetadataStore searchMetadataStore,
ExecutorService executorService)
SearchMetadataStore searchMetadataStore)
throws Exception {
this.meterRegistry = meterRegistry;
this.blobFs = blobFs;
this.s3Bucket = s3Bucket;
this.dataDirectoryPrefix = dataDirectoryPrefix;
this.executorService = executorService;
this.searchContext = searchContext;
this.slotId = UUID.randomUUID().toString();

Expand Down Expand Up @@ -144,7 +139,7 @@ private void cacheNodeListener(CacheSlotMetadata cacheSlotMetadata) {
newSlotState,
cacheSlotMetadata);
}
executorService.execute(() -> handleChunkAssignment(cacheSlotMetadata));
Thread.ofVirtual().start(() -> handleChunkAssignment(cacheSlotMetadata));
} else if (newSlotState.equals(Metadata.CacheSlotMetadata.CacheSlotState.EVICT)) {
LOG.info("Chunk - EVICT received - {}", cacheSlotMetadata);
if (!cacheSlotLastKnownState.equals(Metadata.CacheSlotMetadata.CacheSlotState.LIVE)) {
Expand All @@ -154,7 +149,7 @@ private void cacheNodeListener(CacheSlotMetadata cacheSlotMetadata) {
newSlotState,
cacheSlotMetadata);
}
executorService.execute(() -> handleChunkEviction(cacheSlotMetadata));
Thread.ofVirtual().start(() -> handleChunkEviction(cacheSlotMetadata));
}
cacheSlotLastKnownState = newSlotState;
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.slack.kaldb.chunkManager;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.slack.kaldb.blobfs.BlobFs;
import com.slack.kaldb.chunk.ReadOnlyChunkImpl;
import com.slack.kaldb.chunk.SearchContext;
Expand All @@ -12,8 +11,6 @@
import com.slack.kaldb.proto.config.KaldbConfigs;
import io.micrometer.core.instrument.MeterRegistry;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.curator.x.async.AsyncCuratorFramework;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -37,7 +34,6 @@ public class CachingChunkManager<T> extends ChunkManagerBase<T> {
private SnapshotMetadataStore snapshotMetadataStore;
private SearchMetadataStore searchMetadataStore;
private CacheSlotMetadataStore cacheSlotMetadataStore;
private final ExecutorService executorService;

public CachingChunkManager(
MeterRegistry registry,
Expand All @@ -56,18 +52,6 @@ public CachingChunkManager(
this.dataDirectoryPrefix = dataDirectoryPrefix;
this.replicaSet = replicaSet;
this.slotCountPerInstance = slotCountPerInstance;

// todo - consider making the thread count a config option; this would allow for more
// fine-grained tuning, but we might not need to expose this to the user if we can set sensible
// defaults
this.executorService =
Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors() >= 4 ? 2 : 1,
new ThreadFactoryBuilder()
.setNameFormat("caching-chunk-manager-%d")
.setUncaughtExceptionHandler(
(t, e) -> LOG.error("Exception on thread {}: {}", t.getName(), e))
.build());
}

@Override
Expand All @@ -92,19 +76,14 @@ protected void startUp() throws Exception {
cacheSlotMetadataStore,
replicaMetadataStore,
snapshotMetadataStore,
searchMetadataStore,
ProtectedExecutorService.wrap(executorService)));
searchMetadataStore));
}
}

@Override
protected void shutDown() throws Exception {
LOG.info("Closing caching chunk manager.");

// Attempt to forcibly shutdown the executor service. This prevents any further downloading of
// data from S3 that would be unused.
executorService.shutdown();

chunkList.forEach(
(readonlyChunk) -> {
try {
Expand Down
146 changes: 46 additions & 100 deletions kaldb/src/main/java/com/slack/kaldb/chunkManager/ChunkManagerBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,21 @@
import brave.propagation.CurrentTraceContext;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.slack.kaldb.chunk.Chunk;
import com.slack.kaldb.logstore.search.SearchQuery;
import com.slack.kaldb.logstore.search.SearchResult;
import com.slack.kaldb.logstore.search.SearchResultAggregator;
import com.slack.kaldb.logstore.search.SearchResultAggregatorImpl;
import com.slack.kaldb.metadata.schema.FieldType;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -44,37 +36,6 @@ public abstract class ChunkManagerBase<T> extends AbstractIdleService implements
// to the amount of reads, and it must be a threadsafe implementation
protected final List<Chunk<T>> chunkList = new CopyOnWriteArrayList<>();

private static final ListeningExecutorService queryExecutorService = queryThreadPool();

private static final ScheduledExecutorService queryCancellationService =
Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder()
.setNameFormat("chunk-manager-query-cancellation-%d")
.setUncaughtExceptionHandler(
(t, e) -> LOG.error("Exception on thread {}: {}", t.getName(), e))
.build());

/*
* We want to provision the chunk query capacity such that we can almost saturate the CPU. In the event we allow
* these to saturate the CPU it can result in the container being killed due to failed healthchecks.
*
* Revisit the thread pool settings if this becomes a perf issue. Also, we may need
* different thread pools for indexer and cache nodes in the future.
*/
private static ListeningExecutorService queryThreadPool() {
// todo - consider making the thread count a config option; this would allow for more
// fine-grained tuning, but we might not need to expose this to the user if we can set sensible
// defaults
return MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(
Math.max(1, Runtime.getRuntime().availableProcessors() - 2),
new ThreadFactoryBuilder()
.setNameFormat("chunk-manager-query-%d")
.setUncaughtExceptionHandler(
(t, e) -> LOG.error("Exception on thread {}: {}", t.getName(), e))
.build()));
}

/*
* Query the chunks in the time range, aggregate the results per aggregation policy and return the results.
* We aggregate locally and then the query aggregator will aggregate again. This is OKAY for the current use-case we support
Expand Down Expand Up @@ -107,72 +68,57 @@ public SearchResult<T> query(SearchQuery query, Duration queryTimeout) {
// a single IndexSearcher.
Collections.shuffle(chunksMatchingQuery);

List<ListenableFuture<SearchResult<T>>> queries =
chunksMatchingQuery.stream()
.map(
(chunk) ->
queryExecutorService.submit(
currentTraceContext.wrap(
() -> {
try {
if (Thread.interrupted()) {
LOG.warn(
"Chunk query thread timed out without starting work, returning error result.");
return errorResult;
}
return chunk.query(query);
} catch (Exception err) {
// Only log the exception message as warn, and not the entire trace
// as this can cause performance issues if significant amounts of
// invalid queries are received
LOG.warn("Chunk Query Exception: {}", err.getMessage());
LOG.debug("Chunk Query Exception", err);
// We catch IllegalArgumentException ( and any other exception that
// represents a parse failure ) and instead of returning an empty
// result we throw back an error to the user
if (err instanceof IllegalArgumentException) {
throw err;
}
return errorResult;
}
})))
.peek(
(future) ->
queryCancellationService.schedule(
() -> future.cancel(true), queryTimeout.toMillis(), TimeUnit.MILLISECONDS))
.collect(Collectors.toList());

Future<List<SearchResult<T>>> searchResultFuture = Futures.successfulAsList(queries);
try {
List<SearchResult<T>> searchResults =
searchResultFuture.get(queryTimeout.toMillis(), TimeUnit.MILLISECONDS);

// check if all results are null, and if so return an error to the user
if (searchResults.size() > 0 && searchResults.stream().allMatch(Objects::isNull)) {
try (var scope = new StructuredTaskScope<SearchResult<T>>()) {
List<StructuredTaskScope.Subtask<SearchResult<T>>> chunkSubtasks =
chunksMatchingQuery.stream()
.map((chunk) -> scope.fork(currentTraceContext.wrap(() -> chunk.query(query))))
.toList();
try {
Futures.allAsList(queries).get(0, TimeUnit.SECONDS);
} catch (Exception e) {
throw new IllegalArgumentException(e);
scope.joinUntil(Instant.now().plusSeconds(queryTimeout.toSeconds()));
} catch (TimeoutException timeoutException) {
scope.shutdown();
scope.join();
}
// not expected to happen - we should be guaranteed that the list has at least one failed
// future, which should throw when we try to get on allAsList
throw new IllegalArgumentException(
"Chunk query error - all results returned null values with no exceptions thrown");
}

//noinspection unchecked
SearchResult<T> aggregatedResults =
((SearchResultAggregator<T>) new SearchResultAggregatorImpl<>(query))
.aggregate(searchResults, false);
return incrementNodeCount(aggregatedResults);
List<SearchResult<T>> searchResults =
chunkSubtasks.stream()
.map(
searchResultSubtask -> {
try {
return searchResultSubtask.get();
} catch (Exception err) {
// Only log the exception message as warn, and not the entire trace
// as this can cause performance issues if significant amounts of
// invalid queries are received
LOG.warn("Chunk Query Exception: {}", err.getMessage());
LOG.debug("Chunk Query Exception", err);
// We catch IllegalArgumentException ( and any other exception that
// represents a parse failure ) and instead of returning an empty
// result we throw back an error to the user
if (err instanceof IllegalArgumentException) {
throw err;
}
return errorResult;
}
})
.toList();

// check if all results are null, and if so return an error to the user
if (!searchResults.isEmpty() && searchResults.stream().allMatch(Objects::isNull)) {
throw new IllegalArgumentException(
"Chunk query error - all results returned null values");
}

//noinspection unchecked
SearchResult<T> aggregatedResults =
((SearchResultAggregator<T>) new SearchResultAggregatorImpl<>(query))
.aggregate(searchResults, false);
return incrementNodeCount(aggregatedResults);
}
} catch (Exception e) {
LOG.error("Error searching across chunks ", e);
throw new RuntimeException(e);
} finally {
// always request future cancellation. This won't interrupt I/O or downstream futures,
// but is good practice. Since this is backed by a CompletableFuture
// mayInterruptIfRunning has no effect
searchResultFuture.cancel(true);
}
}

Expand Down
Loading

0 comments on commit 6ec79d3

Please sign in to comment.