From b4b20bd2db02e26557aae1a71fd7288d5acf4d66 Mon Sep 17 00:00:00 2001 From: Ignacio Vera Date: Tue, 19 Oct 2021 11:21:14 +0200 Subject: [PATCH] Fix circuit breaker leak in MultiTerms aggregation (#79362) (#79422) The MultiTermsAggregator creates a BytesKeyedBucketOrds that never gets closed and therefore it might leak the memory allocated into the circuit breaker. --- .../aggregations/AggregatorTestCase.java | 2 +- x-pack/plugin/analytics/build.gradle | 2 + .../MultiTermsWithRequestBreakerIT.java | 100 ++++++++++++++++++ .../multiterms/MultiTermsAggregator.java | 8 +- 4 files changed, 110 insertions(+), 2 deletions(-) create mode 100644 x-pack/plugin/analytics/src/internalClusterTest/java/org/elasticsearch/xpack/analytics/multiterms/MultiTermsWithRequestBreakerIT.java diff --git a/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java b/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java index d9d4543ca8d3c..ef84c2e4271d9 100644 --- a/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java @@ -459,7 +459,7 @@ protected A searchAndReduc indexSettings, query, breakerService, - builder.bytesToPreallocate(), + randomBoolean() ? 0 : builder.bytesToPreallocate(), maxBucket, fieldTypes ); diff --git a/x-pack/plugin/analytics/build.gradle b/x-pack/plugin/analytics/build.gradle index 50a87c35b439d..098a8805400ce 100644 --- a/x-pack/plugin/analytics/build.gradle +++ b/x-pack/plugin/analytics/build.gradle @@ -1,4 +1,6 @@ apply plugin: 'elasticsearch.internal-es-plugin' +apply plugin: 'elasticsearch.internal-cluster-test' + esplugin { name 'x-pack-analytics' description 'Elasticsearch Expanded Pack Plugin - Analytics' diff --git a/x-pack/plugin/analytics/src/internalClusterTest/java/org/elasticsearch/xpack/analytics/multiterms/MultiTermsWithRequestBreakerIT.java b/x-pack/plugin/analytics/src/internalClusterTest/java/org/elasticsearch/xpack/analytics/multiterms/MultiTermsWithRequestBreakerIT.java new file mode 100644 index 0000000000000..8fa34d7a29de0 --- /dev/null +++ b/x-pack/plugin/analytics/src/internalClusterTest/java/org/elasticsearch/xpack/analytics/multiterms/MultiTermsWithRequestBreakerIT.java @@ -0,0 +1,100 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.xpack.analytics.multiterms; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.common.breaker.CircuitBreakingException; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.search.aggregations.support.MultiValuesSourceFieldConfig; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.xpack.analytics.AnalyticsPlugin; + +import java.util.Collection; +import java.util.stream.IntStream; + +/** + * test forked from CardinalityWithRequestBreakerIT + */ +public class MultiTermsWithRequestBreakerIT extends ESIntegTestCase { + + @Override + protected Collection> nodePlugins() { + return org.elasticsearch.core.List.of(AnalyticsPlugin.class); + } + + @Override + protected Collection> transportClientPlugins() { + return org.elasticsearch.core.List.of(AnalyticsPlugin.class); + } + + /** + * Test that searches using multiterms aggregations returns all request breaker memory. + */ + public void testRequestBreaker() throws Exception { + final String requestBreaker = randomIntBetween(1, 10000) + "kb"; + logger.info("--> Using request breaker setting: {}", requestBreaker); + + indexRandom( + true, + IntStream.range(0, randomIntBetween(10, 1000)) + .mapToObj( + i -> client().prepareIndex("test", "_doc") + .setId("id_" + i) + .setSource(org.elasticsearch.core.Map.of("field0", randomAlphaOfLength(5), "field1", randomAlphaOfLength(5))) + ) + .toArray(IndexRequestBuilder[]::new) + ); + + client().admin() + .cluster() + .prepareUpdateSettings() + .setPersistentSettings( + Settings.builder().put(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), requestBreaker) + ) + .get(); + + try { + client().prepareSearch("test") + .addAggregation( + new MultiTermsAggregationBuilder("xxx").terms( + org.elasticsearch.core.List.of( + new MultiValuesSourceFieldConfig.Builder().setFieldName("field0.keyword").build(), + new MultiValuesSourceFieldConfig.Builder().setFieldName("field1.keyword").build() + ) + ) + ) + .get(); + } catch (ElasticsearchException e) { + if (ExceptionsHelper.unwrap(e, CircuitBreakingException.class) == null) { + throw e; + } + } + + client().admin() + .cluster() + .prepareUpdateSettings() + .setPersistentSettings( + Settings.builder().putNull(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING.getKey()) + ) + .get(); + + // validation done by InternalTestCluster.ensureEstimatedStats() + } +} diff --git a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/multiterms/MultiTermsAggregator.java b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/multiterms/MultiTermsAggregator.java index cbd3389511a7b..939f55c5a8c26 100644 --- a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/multiterms/MultiTermsAggregator.java +++ b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/multiterms/MultiTermsAggregator.java @@ -14,11 +14,12 @@ import org.apache.lucene.util.BytesRefBuilder; import org.apache.lucene.util.PriorityQueue; import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.core.CheckedConsumer; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.core.CheckedConsumer; +import org.elasticsearch.core.Releasables; import org.elasticsearch.index.fielddata.SortedBinaryDocValues; import org.elasticsearch.index.fielddata.SortedNumericDoubleValues; import org.elasticsearch.search.DocValueFormat; @@ -220,6 +221,11 @@ public void accept(Integer start) throws IOException { }; } + @Override + protected void doClose() { + Releasables.close(bucketOrds); + } + @Override public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { InternalMultiTerms.Bucket[][] topBucketsPerOrd = new InternalMultiTerms.Bucket[owningBucketOrds.length][];