Skip to content

Commit

Permalink
Bucket aggregation circuit breaker optimization. (elastic#46751)
Browse files Browse the repository at this point in the history
  • Loading branch information
howardhuanghua authored Jan 31, 2020
1 parent 378b27b commit f93b392
Show file tree
Hide file tree
Showing 10 changed files with 84 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,10 @@ long currentMemoryUsage() {
}
}

public long getParentLimit() {
return this.parentSettings.getLimit();
}

/**
* Checks whether the parent breaker has been tripped
*/
Expand Down
7 changes: 4 additions & 3 deletions server/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,7 @@ protected Node(

final SearchService searchService = newSearchService(clusterService, indicesService,
threadPool, scriptModule.getScriptService(), bigArrays, searchModule.getFetchPhase(),
responseCollectorService);
responseCollectorService, circuitBreakerService);

final List<PersistentTasksExecutor<?>> tasksExecutors = pluginsService
.filterPlugins(PersistentTaskPlugin.class).stream()
Expand Down Expand Up @@ -1025,9 +1025,10 @@ PageCacheRecycler createPageCacheRecycler(Settings settings) {
*/
protected SearchService newSearchService(ClusterService clusterService, IndicesService indicesService,
ThreadPool threadPool, ScriptService scriptService, BigArrays bigArrays,
FetchPhase fetchPhase, ResponseCollectorService responseCollectorService) {
FetchPhase fetchPhase, ResponseCollectorService responseCollectorService,
CircuitBreakerService circuitBreakerService) {
return new SearchService(clusterService, indicesService, threadPool,
scriptService, bigArrays, fetchPhase, responseCollectorService);
scriptService, bigArrays, fetchPhase, responseCollectorService, circuitBreakerService);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand Down Expand Up @@ -59,6 +60,7 @@
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.SearchOperationListener;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason;
import org.elasticsearch.node.ResponseCollectorService;
import org.elasticsearch.script.FieldScript;
Expand Down Expand Up @@ -197,7 +199,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv

public SearchService(ClusterService clusterService, IndicesService indicesService,
ThreadPool threadPool, ScriptService scriptService, BigArrays bigArrays, FetchPhase fetchPhase,
ResponseCollectorService responseCollectorService) {
ResponseCollectorService responseCollectorService, CircuitBreakerService circuitBreakerService) {
Settings settings = clusterService.getSettings();
this.threadPool = threadPool;
this.clusterService = clusterService;
Expand All @@ -207,7 +209,8 @@ public SearchService(ClusterService clusterService, IndicesService indicesServic
this.bigArrays = bigArrays;
this.queryPhase = new QueryPhase();
this.fetchPhase = fetchPhase;
this.multiBucketConsumerService = new MultiBucketConsumerService(clusterService, settings);
this.multiBucketConsumerService = new MultiBucketConsumerService(clusterService, settings,
circuitBreakerService.getBreaker(CircuitBreaker.REQUEST));

TimeValue keepAliveInterval = KEEPALIVE_INTERVAL_SETTING.get(settings);
setKeepAlives(DEFAULT_KEEPALIVE_SETTING.get(settings), MAX_KEEPALIVE_SETTING.get(settings));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.elasticsearch.search.aggregations;

import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Setting;
Expand All @@ -41,11 +42,14 @@ public class MultiBucketConsumerService {
public static final Setting<Integer> MAX_BUCKET_SETTING =
Setting.intSetting("search.max_buckets", DEFAULT_MAX_BUCKETS, 0, Setting.Property.NodeScope, Setting.Property.Dynamic);

private final CircuitBreaker breaker;

private volatile int maxBucket;

public MultiBucketConsumerService(ClusterService clusterService, Settings settings) {
this.maxBucket = MAX_BUCKET_SETTING.get(settings);
clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_BUCKET_SETTING, this::setMaxBucket);
public MultiBucketConsumerService(ClusterService clusterService, Settings settings, CircuitBreaker breaker) {
this.breaker = breaker;
this.maxBucket = MAX_BUCKET_SETTING.get(settings);
clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_BUCKET_SETTING, this::setMaxBucket);
}

private void setMaxBucket(int maxBucket) {
Expand Down Expand Up @@ -94,11 +98,14 @@ protected void metadataToXContent(XContentBuilder builder, Params params) throws
*/
public static class MultiBucketConsumer implements IntConsumer {
private final int limit;
private final CircuitBreaker breaker;

// aggregations execute in a single thread so no atomic here
private int count;

public MultiBucketConsumer(int limit) {
public MultiBucketConsumer(int limit, CircuitBreaker breaker) {
this.limit = limit;
this.breaker = breaker;
}

@Override
Expand All @@ -109,6 +116,11 @@ public void accept(int value) {
+ "] but was [" + count + "]. This limit can be set by changing the [" +
MAX_BUCKET_SETTING.getKey() + "] cluster level setting.", limit);
}

// check parent circuit breaker every 1024 buckets
if (value > 0 && (count & 0x3FF) == 0) {
breaker.addEstimateBytesAndMaybeBreak(0, "allocated_buckets");
}
}

public void reset() {
Expand All @@ -125,6 +137,6 @@ public int getLimit() {
}

public MultiBucketConsumer create() {
return new MultiBucketConsumer(maxBucket);
return new MultiBucketConsumer(maxBucket, breaker);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.search.aggregations.MultiBucketConsumerService;
import org.elasticsearch.test.ESTestCase;

import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -293,6 +294,32 @@ public void testTrippedCircuitBreakerDurability() {
}
}

public void testAllocationBucketsBreaker() throws Exception {
Settings clusterSettings = Settings.builder()
.put(HierarchyCircuitBreakerService.TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), "100b")
.put(HierarchyCircuitBreakerService.USE_REAL_MEMORY_USAGE_SETTING.getKey(), "false")
.build();

try (CircuitBreakerService service = new HierarchyCircuitBreakerService(clusterSettings,
new ClusterSettings(clusterSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))) {

long parentLimitBytes = ((HierarchyCircuitBreakerService) service).getParentLimit();
assertEquals(new ByteSizeValue(100, ByteSizeUnit.BYTES).getBytes(), parentLimitBytes);

CircuitBreaker breaker = service.getBreaker(CircuitBreaker.REQUEST);
MultiBucketConsumerService.MultiBucketConsumer multiBucketConsumer =
new MultiBucketConsumerService.MultiBucketConsumer(10000, breaker);

// make sure used bytes is greater than the total circuit breaker limit
breaker.addWithoutBreaking(200);

CircuitBreakingException exception =
expectThrows(CircuitBreakingException.class, () -> multiBucketConsumer.accept(1024));
assertThat(exception.getMessage(), containsString("[parent] Data too large, data for [allocated_buckets] would be"));
assertThat(exception.getMessage(), containsString("which is larger than the limit of [100/100b]"));
}
}

private long mb(long size) {
return new ByteSizeValue(size, ByteSizeUnit.MB).getBytes();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1355,7 +1355,7 @@ clusterService, indicesService, threadPool, shardStateAction, mappingUpdatedActi
final SearchTransportService searchTransportService = new SearchTransportService(transportService,
SearchExecutionStatsCollector.makeWrapper(responseCollectorService));
final SearchService searchService = new SearchService(clusterService, indicesService, threadPool, scriptService,
bigArrays, new FetchPhase(Collections.emptyList()), responseCollectorService);
bigArrays, new FetchPhase(Collections.emptyList()), responseCollectorService, new NoneCircuitBreakerService());
actions.put(SearchAction.INSTANCE,
new TransportSearchAction(threadPool, transportService, searchService,
searchTransportService, new SearchPhaseController(searchService::createReduceContext), clusterService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,12 +124,14 @@ PageCacheRecycler createPageCacheRecycler(Settings settings) {
@Override
protected SearchService newSearchService(ClusterService clusterService, IndicesService indicesService,
ThreadPool threadPool, ScriptService scriptService, BigArrays bigArrays,
FetchPhase fetchPhase, ResponseCollectorService responseCollectorService) {
FetchPhase fetchPhase, ResponseCollectorService responseCollectorService,
CircuitBreakerService circuitBreakerService) {
if (getPluginsService().filterPlugins(MockSearchService.TestPlugin.class).isEmpty()) {
return super.newSearchService(clusterService, indicesService, threadPool, scriptService, bigArrays, fetchPhase,
responseCollectorService);
responseCollectorService, circuitBreakerService);
}
return new MockSearchService(clusterService, indicesService, threadPool, scriptService, bigArrays, fetchPhase);
return new MockSearchService(clusterService, indicesService, threadPool, scriptService,
bigArrays, fetchPhase, circuitBreakerService);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.node.MockNode;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.ScriptService;
Expand Down Expand Up @@ -68,8 +69,8 @@ static void removeActiveContext(SearchContext context) {

public MockSearchService(ClusterService clusterService,
IndicesService indicesService, ThreadPool threadPool, ScriptService scriptService,
BigArrays bigArrays, FetchPhase fetchPhase) {
super(clusterService, indicesService, threadPool, scriptService, bigArrays, fetchPhase, null);
BigArrays bigArrays, FetchPhase fetchPhase, CircuitBreakerService circuitBreakerService) {
super(clusterService, indicesService, threadPool, scriptService, bigArrays, fetchPhase, null, circuitBreakerService);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.lucene.search.Weight;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
Expand Down Expand Up @@ -131,7 +132,7 @@ protected <A extends Aggregator> A createAggregator(AggregationBuilder aggregati
IndexSearcher indexSearcher,
MappedFieldType... fieldTypes) throws IOException {
return createAggregator(aggregationBuilder, indexSearcher, createIndexSettings(),
new MultiBucketConsumer(DEFAULT_MAX_BUCKETS), fieldTypes);
new MultiBucketConsumer(DEFAULT_MAX_BUCKETS, new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST)), fieldTypes);
}

protected <A extends Aggregator> A createAggregator(Query query,
Expand All @@ -140,7 +141,7 @@ protected <A extends Aggregator> A createAggregator(Query query,
IndexSettings indexSettings,
MappedFieldType... fieldTypes) throws IOException {
return createAggregator(query, aggregationBuilder, indexSearcher, indexSettings,
new MultiBucketConsumer(DEFAULT_MAX_BUCKETS), fieldTypes);
new MultiBucketConsumer(DEFAULT_MAX_BUCKETS, new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST)), fieldTypes);
}

protected <A extends Aggregator> A createAggregator(Query query, AggregationBuilder aggregationBuilder,
Expand Down Expand Up @@ -331,7 +332,8 @@ protected <A extends InternalAggregation, C extends Aggregator> A search(IndexSe
AggregationBuilder builder,
int maxBucket,
MappedFieldType... fieldTypes) throws IOException {
MultiBucketConsumer bucketConsumer = new MultiBucketConsumer(maxBucket);
MultiBucketConsumer bucketConsumer = new MultiBucketConsumer(maxBucket,
new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST));
C a = createAggregator(query, builder, searcher, indexSettings, bucketConsumer, fieldTypes);
a.preCollection();
searcher.search(query, a);
Expand Down Expand Up @@ -395,11 +397,13 @@ protected <A extends InternalAggregation, C extends Aggregator> A searchAndReduc
List<InternalAggregation> aggs = new ArrayList<> ();
Query rewritten = searcher.rewrite(query);
Weight weight = searcher.createWeight(rewritten, ScoreMode.COMPLETE, 1f);
MultiBucketConsumer bucketConsumer = new MultiBucketConsumer(maxBucket);
MultiBucketConsumer bucketConsumer = new MultiBucketConsumer(maxBucket,
new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST));
C root = createAggregator(query, builder, searcher, bucketConsumer, fieldTypes);

for (ShardSearcher subSearcher : subSearchers) {
MultiBucketConsumer shardBucketConsumer = new MultiBucketConsumer(maxBucket);
MultiBucketConsumer shardBucketConsumer = new MultiBucketConsumer(maxBucket,
new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST));
C a = createAggregator(query, builder, subSearcher, indexSettings, shardBucketConsumer, fieldTypes);
a.preCollection();
subSearcher.search(weight, a);
Expand All @@ -417,7 +421,8 @@ protected <A extends InternalAggregation, C extends Aggregator> A searchAndReduc
Collections.shuffle(aggs, random());
int r = randomIntBetween(1, toReduceSize);
List<InternalAggregation> toReduce = aggs.subList(0, r);
MultiBucketConsumer reduceBucketConsumer = new MultiBucketConsumer(maxBucket);
MultiBucketConsumer reduceBucketConsumer = new MultiBucketConsumer(maxBucket,
new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST));
InternalAggregation.ReduceContext context =
new InternalAggregation.ReduceContext(root.context().bigArrays(), getMockScriptService(),
reduceBucketConsumer, false);
Expand All @@ -427,7 +432,8 @@ protected <A extends InternalAggregation, C extends Aggregator> A searchAndReduc
aggs.add(reduced);
}
// now do the final reduce
MultiBucketConsumer reduceBucketConsumer = new MultiBucketConsumer(maxBucket);
MultiBucketConsumer reduceBucketConsumer = new MultiBucketConsumer(maxBucket,
new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST));
InternalAggregation.ReduceContext context =
new InternalAggregation.ReduceContext(root.context().bigArrays(), getMockScriptService(), reduceBucketConsumer, true);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.lucene.util.SetOnce;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -269,7 +270,8 @@ public void testReduceRandom() {
Collections.shuffle(toReduce, random());
int r = randomIntBetween(1, toReduceSize);
List<InternalAggregation> internalAggregations = toReduce.subList(0, r);
MultiBucketConsumer bucketConsumer = new MultiBucketConsumer(DEFAULT_MAX_BUCKETS);
MultiBucketConsumer bucketConsumer = new MultiBucketConsumer(DEFAULT_MAX_BUCKETS,
new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST));
InternalAggregation.ReduceContext context =
new InternalAggregation.ReduceContext(bigArrays, mockScriptService, bucketConsumer,false);
@SuppressWarnings("unchecked")
Expand All @@ -285,7 +287,8 @@ public void testReduceRandom() {
toReduce = new ArrayList<>(toReduce.subList(r, toReduceSize));
toReduce.add(reduced);
}
MultiBucketConsumer bucketConsumer = new MultiBucketConsumer(DEFAULT_MAX_BUCKETS);
MultiBucketConsumer bucketConsumer = new MultiBucketConsumer(DEFAULT_MAX_BUCKETS,
new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST));
InternalAggregation.ReduceContext context =
new InternalAggregation.ReduceContext(bigArrays, mockScriptService, bucketConsumer, true);
@SuppressWarnings("unchecked")
Expand Down

0 comments on commit f93b392

Please sign in to comment.