From 2bafdef57074d5cccfeff7221d166502a398c4c6 Mon Sep 17 00:00:00 2001 From: Neetika Singhal Date: Tue, 1 Aug 2023 15:04:31 -0700 Subject: [PATCH] Make MultiBucketConsumerService thread safe to use across slices during search Signed-off-by: Neetika Singhal --- CHANGELOG.md | 2 +- .../MultiBucketConsumerService.java | 28 +++++++++++++++---- 2 files changed, 24 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a37976462b38e..80de8c4c9e342 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -109,7 +109,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Create separate SourceLookup instance per segment slice in SignificantTextAggregatorFactory ([#8807](https://github.com/opensearch-project/OpenSearch/pull/8807)) - Add support for aggregation profiler with concurrent aggregation ([#8801](https://github.com/opensearch-project/OpenSearch/pull/8801)) - [Remove] Deprecated Fractional ByteSizeValue support #9005 ([#9005](https://github.com/opensearch-project/OpenSearch/pull/9005)) - +- Make MultiBucketConsumerService thread safe to use across slices during search ([#9047](https://github.com/opensearch-project/OpenSearch/pull/9047)) ### Deprecated ### Removed diff --git a/server/src/main/java/org/opensearch/search/aggregations/MultiBucketConsumerService.java b/server/src/main/java/org/opensearch/search/aggregations/MultiBucketConsumerService.java index f1416fddebfa2..88cf96c31583c 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/MultiBucketConsumerService.java +++ b/server/src/main/java/org/opensearch/search/aggregations/MultiBucketConsumerService.java @@ -33,6 +33,7 @@ import org.opensearch.cluster.service.ClusterService; import org.opensearch.core.common.breaker.CircuitBreaker; +import org.opensearch.core.common.breaker.CircuitBreakingException; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.common.settings.Setting; @@ -42,6 +43,7 @@ import org.opensearch.search.aggregations.bucket.BucketsAggregator; import java.io.IOException; +import java.util.concurrent.atomic.LongAdder; import java.util.function.IntConsumer; /** @@ -129,11 +131,13 @@ public static class MultiBucketConsumer implements IntConsumer { // aggregations execute in a single thread so no atomic here private int count; - private int callCount = 0; + private LongAdder callCount; + private volatile boolean circuitBreakerTripped; public MultiBucketConsumer(int limit, CircuitBreaker breaker) { this.limit = limit; this.breaker = breaker; + callCount = new LongAdder(); } @Override @@ -153,10 +157,24 @@ public void accept(int value) { ); } } - // check parent circuit breaker every 1024 calls - callCount++; - if ((callCount & 0x3FF) == 0) { - breaker.addEstimateBytesAndMaybeBreak(0, "allocated_buckets"); + callCount.increment(); + // tripping the circuit breaker for other threads in case of concurrent search + // if the circuit breaker has tripped for one of the threads already, more info + // can be found on: https://github.com/opensearch-project/OpenSearch/issues/7785 + if (circuitBreakerTripped) { + throw new CircuitBreakingException( + "Circuit breaker has tripped for one of the other threads", + CircuitBreaker.Durability.PERMANENT + ); + } + // check parent circuit breaker every 1024 to (1024 + available processors) calls + if ((callCount.sum() & 0x3FF) <= Runtime.getRuntime().availableProcessors()) { + try { + breaker.addEstimateBytesAndMaybeBreak(0, "allocated_buckets"); + } catch (CircuitBreakingException e) { + circuitBreakerTripped = true; + throw e; + } } }