diff --git a/kaldb/pom.xml b/kaldb/pom.xml index db0e38b73a..c4adf9a18e 100644 --- a/kaldb/pom.xml +++ b/kaldb/pom.xml @@ -562,7 +562,8 @@ true -XDcompilePolicy=simple - -Xplugin:ErrorProne -XepDisableWarningsInGeneratedCode -XepExcludedPaths:.*/protobuf/.* -Xep:WildcardImport:ERROR -Xep:AssertEqualsArgumentOrderChecker:ERROR + --enable-preview + -Xplugin:ErrorProne -XepDisableWarningsInGeneratedCode -XepExcludedPaths:.*/protobuf/.* -Xep:WildcardImport:ERROR -Xep:AssertEqualsArgumentOrderChecker:ERROR -J--add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED -J--add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED -J--add-exports=jdk.compiler/com.sun.tools.javac.main=ALL-UNNAMED diff --git a/kaldb/src/main/java/com/slack/kaldb/chunk/ReadOnlyChunkImpl.java b/kaldb/src/main/java/com/slack/kaldb/chunk/ReadOnlyChunkImpl.java index c0170b20d7..c255fcd3e0 100644 --- a/kaldb/src/main/java/com/slack/kaldb/chunk/ReadOnlyChunkImpl.java +++ b/kaldb/src/main/java/com/slack/kaldb/chunk/ReadOnlyChunkImpl.java @@ -29,7 +29,6 @@ import java.util.Objects; import java.util.UUID; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; @@ -67,8 +66,6 @@ public class ReadOnlyChunkImpl implements Chunk { private final SnapshotMetadataStore snapshotMetadataStore; private final SearchMetadataStore searchMetadataStore; private final MeterRegistry meterRegistry; - - private final ExecutorService executorService; private final BlobFs blobFs; public static final String CHUNK_ASSIGNMENT_TIMER = "chunk_assignment_timer"; @@ -93,14 +90,12 @@ public ReadOnlyChunkImpl( CacheSlotMetadataStore cacheSlotMetadataStore, ReplicaMetadataStore replicaMetadataStore, SnapshotMetadataStore snapshotMetadataStore, - SearchMetadataStore searchMetadataStore, - ExecutorService executorService) + SearchMetadataStore searchMetadataStore) throws Exception { this.meterRegistry = meterRegistry; this.blobFs = blobFs; this.s3Bucket = s3Bucket; this.dataDirectoryPrefix = dataDirectoryPrefix; - this.executorService = executorService; this.searchContext = searchContext; this.slotId = UUID.randomUUID().toString(); @@ -144,7 +139,7 @@ private void cacheNodeListener(CacheSlotMetadata cacheSlotMetadata) { newSlotState, cacheSlotMetadata); } - executorService.execute(() -> handleChunkAssignment(cacheSlotMetadata)); + Thread.ofVirtual().start(() -> handleChunkAssignment(cacheSlotMetadata)); } else if (newSlotState.equals(Metadata.CacheSlotMetadata.CacheSlotState.EVICT)) { LOG.info("Chunk - EVICT received - {}", cacheSlotMetadata); if (!cacheSlotLastKnownState.equals(Metadata.CacheSlotMetadata.CacheSlotState.LIVE)) { @@ -154,7 +149,7 @@ private void cacheNodeListener(CacheSlotMetadata cacheSlotMetadata) { newSlotState, cacheSlotMetadata); } - executorService.execute(() -> handleChunkEviction(cacheSlotMetadata)); + Thread.ofVirtual().start(() -> handleChunkEviction(cacheSlotMetadata)); } cacheSlotLastKnownState = newSlotState; } else { diff --git a/kaldb/src/main/java/com/slack/kaldb/chunkManager/CachingChunkManager.java b/kaldb/src/main/java/com/slack/kaldb/chunkManager/CachingChunkManager.java index ad84052f58..524fe6c95d 100644 --- a/kaldb/src/main/java/com/slack/kaldb/chunkManager/CachingChunkManager.java +++ b/kaldb/src/main/java/com/slack/kaldb/chunkManager/CachingChunkManager.java @@ -1,6 +1,5 @@ package com.slack.kaldb.chunkManager; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.slack.kaldb.blobfs.BlobFs; import com.slack.kaldb.chunk.ReadOnlyChunkImpl; import com.slack.kaldb.chunk.SearchContext; @@ -12,8 +11,6 @@ import com.slack.kaldb.proto.config.KaldbConfigs; import io.micrometer.core.instrument.MeterRegistry; import java.io.IOException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import org.apache.curator.x.async.AsyncCuratorFramework; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,7 +34,6 @@ public class CachingChunkManager extends ChunkManagerBase { private SnapshotMetadataStore snapshotMetadataStore; private SearchMetadataStore searchMetadataStore; private CacheSlotMetadataStore cacheSlotMetadataStore; - private final ExecutorService executorService; public CachingChunkManager( MeterRegistry registry, @@ -56,18 +52,6 @@ public CachingChunkManager( this.dataDirectoryPrefix = dataDirectoryPrefix; this.replicaSet = replicaSet; this.slotCountPerInstance = slotCountPerInstance; - - // todo - consider making the thread count a config option; this would allow for more - // fine-grained tuning, but we might not need to expose this to the user if we can set sensible - // defaults - this.executorService = - Executors.newFixedThreadPool( - Runtime.getRuntime().availableProcessors() >= 4 ? 2 : 1, - new ThreadFactoryBuilder() - .setNameFormat("caching-chunk-manager-%d") - .setUncaughtExceptionHandler( - (t, e) -> LOG.error("Exception on thread {}: {}", t.getName(), e)) - .build()); } @Override @@ -92,8 +76,7 @@ protected void startUp() throws Exception { cacheSlotMetadataStore, replicaMetadataStore, snapshotMetadataStore, - searchMetadataStore, - ProtectedExecutorService.wrap(executorService))); + searchMetadataStore)); } } @@ -101,10 +84,6 @@ protected void startUp() throws Exception { protected void shutDown() throws Exception { LOG.info("Closing caching chunk manager."); - // Attempt to forcibly shutdown the executor service. This prevents any further downloading of - // data from S3 that would be unused. - executorService.shutdown(); - chunkList.forEach( (readonlyChunk) -> { try { diff --git a/kaldb/src/main/java/com/slack/kaldb/chunkManager/ChunkManagerBase.java b/kaldb/src/main/java/com/slack/kaldb/chunkManager/ChunkManagerBase.java index c4cc793480..fa9cd7bd17 100644 --- a/kaldb/src/main/java/com/slack/kaldb/chunkManager/ChunkManagerBase.java +++ b/kaldb/src/main/java/com/slack/kaldb/chunkManager/ChunkManagerBase.java @@ -4,11 +4,6 @@ import brave.propagation.CurrentTraceContext; import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.AbstractIdleService; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.slack.kaldb.chunk.Chunk; import com.slack.kaldb.logstore.search.SearchQuery; import com.slack.kaldb.logstore.search.SearchResult; @@ -16,17 +11,14 @@ import com.slack.kaldb.logstore.search.SearchResultAggregatorImpl; import com.slack.kaldb.metadata.schema.FieldType; import java.time.Duration; +import java.time.Instant; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,37 +36,6 @@ public abstract class ChunkManagerBase extends AbstractIdleService implements // to the amount of reads, and it must be a threadsafe implementation protected final List> chunkList = new CopyOnWriteArrayList<>(); - private static final ListeningExecutorService queryExecutorService = queryThreadPool(); - - private static final ScheduledExecutorService queryCancellationService = - Executors.newSingleThreadScheduledExecutor( - new ThreadFactoryBuilder() - .setNameFormat("chunk-manager-query-cancellation-%d") - .setUncaughtExceptionHandler( - (t, e) -> LOG.error("Exception on thread {}: {}", t.getName(), e)) - .build()); - - /* - * We want to provision the chunk query capacity such that we can almost saturate the CPU. In the event we allow - * these to saturate the CPU it can result in the container being killed due to failed healthchecks. - * - * Revisit the thread pool settings if this becomes a perf issue. Also, we may need - * different thread pools for indexer and cache nodes in the future. - */ - private static ListeningExecutorService queryThreadPool() { - // todo - consider making the thread count a config option; this would allow for more - // fine-grained tuning, but we might not need to expose this to the user if we can set sensible - // defaults - return MoreExecutors.listeningDecorator( - Executors.newFixedThreadPool( - Math.max(1, Runtime.getRuntime().availableProcessors() - 2), - new ThreadFactoryBuilder() - .setNameFormat("chunk-manager-query-%d") - .setUncaughtExceptionHandler( - (t, e) -> LOG.error("Exception on thread {}: {}", t.getName(), e)) - .build())); - } - /* * Query the chunks in the time range, aggregate the results per aggregation policy and return the results. * We aggregate locally and then the query aggregator will aggregate again. This is OKAY for the current use-case we support @@ -107,72 +68,57 @@ public SearchResult query(SearchQuery query, Duration queryTimeout) { // a single IndexSearcher. Collections.shuffle(chunksMatchingQuery); - List>> queries = - chunksMatchingQuery.stream() - .map( - (chunk) -> - queryExecutorService.submit( - currentTraceContext.wrap( - () -> { - try { - if (Thread.interrupted()) { - LOG.warn( - "Chunk query thread timed out without starting work, returning error result."); - return errorResult; - } - return chunk.query(query); - } catch (Exception err) { - // Only log the exception message as warn, and not the entire trace - // as this can cause performance issues if significant amounts of - // invalid queries are received - LOG.warn("Chunk Query Exception: {}", err.getMessage()); - LOG.debug("Chunk Query Exception", err); - // We catch IllegalArgumentException ( and any other exception that - // represents a parse failure ) and instead of returning an empty - // result we throw back an error to the user - if (err instanceof IllegalArgumentException) { - throw err; - } - return errorResult; - } - }))) - .peek( - (future) -> - queryCancellationService.schedule( - () -> future.cancel(true), queryTimeout.toMillis(), TimeUnit.MILLISECONDS)) - .collect(Collectors.toList()); - - Future>> searchResultFuture = Futures.successfulAsList(queries); try { - List> searchResults = - searchResultFuture.get(queryTimeout.toMillis(), TimeUnit.MILLISECONDS); - - // check if all results are null, and if so return an error to the user - if (searchResults.size() > 0 && searchResults.stream().allMatch(Objects::isNull)) { + try (var scope = new StructuredTaskScope>()) { + List>> chunkSubtasks = + chunksMatchingQuery.stream() + .map((chunk) -> scope.fork(currentTraceContext.wrap(() -> chunk.query(query)))) + .toList(); try { - Futures.allAsList(queries).get(0, TimeUnit.SECONDS); - } catch (Exception e) { - throw new IllegalArgumentException(e); + scope.joinUntil(Instant.now().plusSeconds(queryTimeout.toSeconds())); + } catch (TimeoutException timeoutException) { + scope.shutdown(); + scope.join(); } - // not expected to happen - we should be guaranteed that the list has at least one failed - // future, which should throw when we try to get on allAsList - throw new IllegalArgumentException( - "Chunk query error - all results returned null values with no exceptions thrown"); - } - //noinspection unchecked - SearchResult aggregatedResults = - ((SearchResultAggregator) new SearchResultAggregatorImpl<>(query)) - .aggregate(searchResults, false); - return incrementNodeCount(aggregatedResults); + List> searchResults = + chunkSubtasks.stream() + .map( + searchResultSubtask -> { + try { + return searchResultSubtask.get(); + } catch (Exception err) { + // Only log the exception message as warn, and not the entire trace + // as this can cause performance issues if significant amounts of + // invalid queries are received + LOG.warn("Chunk Query Exception: {}", err.getMessage()); + LOG.debug("Chunk Query Exception", err); + // We catch IllegalArgumentException ( and any other exception that + // represents a parse failure ) and instead of returning an empty + // result we throw back an error to the user + if (err instanceof IllegalArgumentException) { + throw err; + } + return errorResult; + } + }) + .toList(); + + // check if all results are null, and if so return an error to the user + if (!searchResults.isEmpty() && searchResults.stream().allMatch(Objects::isNull)) { + throw new IllegalArgumentException( + "Chunk query error - all results returned null values"); + } + + //noinspection unchecked + SearchResult aggregatedResults = + ((SearchResultAggregator) new SearchResultAggregatorImpl<>(query)) + .aggregate(searchResults, false); + return incrementNodeCount(aggregatedResults); + } } catch (Exception e) { LOG.error("Error searching across chunks ", e); throw new RuntimeException(e); - } finally { - // always request future cancellation. This won't interrupt I/O or downstream futures, - // but is good practice. Since this is backed by a CompletableFuture - // mayInterruptIfRunning has no effect - searchResultFuture.cancel(true); } } diff --git a/kaldb/src/main/java/com/slack/kaldb/logstore/search/KaldbDistributedQueryService.java b/kaldb/src/main/java/com/slack/kaldb/logstore/search/KaldbDistributedQueryService.java index 27ac96c860..449743facf 100644 --- a/kaldb/src/main/java/com/slack/kaldb/logstore/search/KaldbDistributedQueryService.java +++ b/kaldb/src/main/java/com/slack/kaldb/logstore/search/KaldbDistributedQueryService.java @@ -8,7 +8,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.MoreExecutors; import com.linecorp.armeria.client.grpc.GrpcClients; import com.slack.kaldb.logstore.LogMessage; import com.slack.kaldb.metadata.core.KaldbMetadataStoreChangeListener; @@ -25,22 +24,10 @@ import io.micrometer.core.instrument.MeterRegistry; import java.io.Closeable; import java.time.Duration; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; +import java.time.Instant; +import java.util.*; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Function; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -196,6 +183,7 @@ private void doStubUpdate() { private KaldbServiceGrpc.KaldbServiceFutureStub getKaldbServiceGrpcClient(String server) { return GrpcClients.builder(server) .build(KaldbServiceGrpc.KaldbServiceFutureStub.class) + .withExecutor(Executors.newVirtualThreadPerTaskExecutor()) // This enables compression for requests // Independent of this setting, servers choose whether to compress responses .withCompression("gzip"); @@ -370,63 +358,61 @@ private List> distributedSearch( span.tag("queryServerCount", String.valueOf(nodesAndSnapshotsToQuery.size())); List>> queryServers = new ArrayList<>(stubs.size()); - for (Map.Entry> searchNode : nodesAndSnapshotsToQuery.entrySet()) { - KaldbServiceGrpc.KaldbServiceFutureStub stub = getStub(searchNode.getKey()); - if (stub == null) { - // TODO: insert a failed result in the results object that we return from this method - // mimicing - continue; - } - - KaldbSearch.SearchRequest localSearchReq = - distribSearchReq.toBuilder().addAllChunkIds(searchNode.getValue()).build(); - - // make sure all underlying futures finish executing (successful/cancelled/failed/other) - // and cannot be pending when the successfulAsList.get(SAME_TIMEOUT_MS) runs - ListenableFuture searchRequest = - stub.withDeadlineAfter(defaultQueryTimeout.toMillis(), TimeUnit.MILLISECONDS) - .withInterceptors( - GrpcTracing.newBuilder(Tracing.current()).build().newClientInterceptor()) - .search(localSearchReq); - Function> searchRequestTransform = - SearchResultUtils::fromSearchResultProtoOrEmpty; - - queryServers.add( - Futures.transform( - searchRequest, searchRequestTransform::apply, MoreExecutors.directExecutor())); - } - Future>> searchFuture = Futures.successfulAsList(queryServers); try { - List> searchResults = - searchFuture.get(requestTimeout.toMillis(), TimeUnit.MILLISECONDS); - LOG.debug("searchResults.size={} searchResults={}", searchResults.size(), searchResults); + try (var scope = new StructuredTaskScope>()) { + List>> searchSubtasks = + nodesAndSnapshotsToQuery.entrySet().stream() + .map( + (searchNode) -> + scope.fork( + () -> { + KaldbServiceGrpc.KaldbServiceFutureStub stub = + getStub(searchNode.getKey()); + + if (stub == null) { + // TODO: insert a failed result in the results object that we return + // from this method + return null; + } + + KaldbSearch.SearchRequest localSearchReq = + distribSearchReq.toBuilder() + .addAllChunkIds(searchNode.getValue()) + .build(); + return SearchResultUtils.fromSearchResultProtoOrEmpty( + stub.withInterceptors( + GrpcTracing.newBuilder(Tracing.current()) + .build() + .newClientInterceptor()) + .search(localSearchReq) + .get()); + })) + .toList(); + + try { + scope.joinUntil(Instant.now().plusSeconds(requestTimeout.toSeconds())); + } catch (TimeoutException timeoutException) { + scope.shutdown(); + scope.join(); + } - List> response = new ArrayList(searchResults.size()); - for (SearchResult searchResult : searchResults) { - response.add(searchResult == null ? SearchResult.empty() : searchResult); + List> response = new ArrayList(searchSubtasks.size()); + for (StructuredTaskScope.Subtask> searchResult : searchSubtasks) { + try { + response.add(searchResult.get() == null ? SearchResult.empty() : searchResult.get()); + } catch (Exception e) { + LOG.error("Error fetching search result", e); + response.add(SearchResult.empty()); + } + } + return response; } - return response; - } catch (TimeoutException e) { - // We provide a deadline to the stub of "defaultQueryTimeout" - if this is sufficiently lower - // than the request timeout, we would expect searchFuture.get(requestTimeout) to never throw - // an exception. This however doesn't necessarily hold true if the query node is CPU - // saturated, and there is not enough cpu time to fail the pending stub queries that have - // exceeded their deadline - causing the searchFuture get to fail with a timeout. - LOG.error( - "Search failed with timeout exception. This is potentially due to CPU saturation of the query node.", - e); - span.error(e); - return List.of(SearchResult.empty()); } catch (Exception e) { LOG.error("Search failed with ", e); span.error(e); return List.of(SearchResult.empty()); } finally { - // always request future cancellation, so that any exceptions or incomplete futures don't - // continue to consume CPU on work that will not be used - searchFuture.cancel(false); - LOG.debug("Finished distributed search for request: {}", distribSearchReq); span.finish(); } } diff --git a/kaldb/src/main/java/com/slack/kaldb/server/ArmeriaService.java b/kaldb/src/main/java/com/slack/kaldb/server/ArmeriaService.java index 598a34c1a8..d016e6dfa2 100644 --- a/kaldb/src/main/java/com/slack/kaldb/server/ArmeriaService.java +++ b/kaldb/src/main/java/com/slack/kaldb/server/ArmeriaService.java @@ -33,6 +33,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -169,6 +170,8 @@ public ArmeriaService build() { .build()); spanHandlers.forEach(tracingBuilder::addSpanHandler); serverBuilder.decorator(BraveService.newDecorator(tracingBuilder.build())); + serverBuilder.blockingTaskExecutor( + new ScheduledThreadPoolExecutor(0, Thread.ofVirtual().factory()), false); return new ArmeriaService(serverBuilder.build(), serviceName); } diff --git a/kaldb/src/test/java/com/slack/kaldb/chunk/ReadOnlyChunkImplTest.java b/kaldb/src/test/java/com/slack/kaldb/chunk/ReadOnlyChunkImplTest.java index 7b703d09ac..2a8dd4a516 100644 --- a/kaldb/src/test/java/com/slack/kaldb/chunk/ReadOnlyChunkImplTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/chunk/ReadOnlyChunkImplTest.java @@ -17,7 +17,6 @@ import brave.Tracing; import com.adobe.testing.s3mock.junit5.S3MockExtension; -import com.google.common.util.concurrent.MoreExecutors; import com.slack.kaldb.blobfs.LocalBlobFs; import com.slack.kaldb.blobfs.s3.S3CrtBlobFs; import com.slack.kaldb.blobfs.s3.S3TestUtils; @@ -138,8 +137,7 @@ public void shouldHandleChunkLivecycle() throws Exception { cacheSlotMetadataStore, replicaMetadataStore, snapshotMetadataStore, - searchMetadataStore, - MoreExecutors.newDirectExecutorService()); + searchMetadataStore); // wait for chunk to register await() @@ -268,8 +266,7 @@ public void shouldHandleMissingS3Assets() throws Exception { cacheSlotMetadataStore, replicaMetadataStore, snapshotMetadataStore, - searchMetadataStore, - MoreExecutors.newDirectExecutorService()); + searchMetadataStore); // wait for chunk to register await() @@ -335,8 +332,7 @@ public void shouldHandleMissingZkData() throws Exception { cacheSlotMetadataStore, replicaMetadataStore, snapshotMetadataStore, - searchMetadataStore, - MoreExecutors.newDirectExecutorService()); + searchMetadataStore); // wait for chunk to register await() @@ -403,8 +399,7 @@ public void closeShouldCleanupLiveChunkCorrectly() throws Exception { cacheSlotMetadataStore, replicaMetadataStore, snapshotMetadataStore, - searchMetadataStore, - MoreExecutors.newDirectExecutorService()); + searchMetadataStore); // wait for chunk to register await() diff --git a/kaldb/src/test/java/com/slack/kaldb/testlib/KaldbSearchUtils.java b/kaldb/src/test/java/com/slack/kaldb/testlib/KaldbSearchUtils.java index 2d32681a73..cea40ac7ef 100644 --- a/kaldb/src/test/java/com/slack/kaldb/testlib/KaldbSearchUtils.java +++ b/kaldb/src/test/java/com/slack/kaldb/testlib/KaldbSearchUtils.java @@ -5,6 +5,7 @@ import com.slack.kaldb.logstore.search.aggregations.DateHistogramAggBuilder; import com.slack.kaldb.proto.service.KaldbSearch; import com.slack.kaldb.proto.service.KaldbServiceGrpc; +import java.util.concurrent.Executors; public class KaldbSearchUtils { @@ -13,6 +14,7 @@ public static KaldbSearch.SearchResult searchUsingGrpcApi( KaldbServiceGrpc.KaldbServiceBlockingStub kaldbService = GrpcClients.builder(uri(port)) .build(KaldbServiceGrpc.KaldbServiceBlockingStub.class) + .withExecutor(Executors.newVirtualThreadPerTaskExecutor()) .withCompression("gzip"); return kaldbService.search(