From 523fd6bfe61e4c980fb9658c60a5127c6244d232 Mon Sep 17 00:00:00 2001 From: Ignacio Vera Date: Tue, 19 Oct 2021 07:57:23 +0200 Subject: [PATCH 1/2] Fix circuit breaker leak in MultiTerms aggregation (#79362) The MultiTermsAggregator creates a BytesKeyedBucketOrds that never gets closed and therefore it might leak the memory allocated into the circuit breaker. --- .../aggregations/AggregatorTestCase.java | 2 +- x-pack/plugin/analytics/build.gradle | 2 + .../MultiTermsWithRequestBreakerIT.java | 96 +++++++++++++++++++ .../multiterms/MultiTermsAggregator.java | 7 +- .../CategorizeTextAggregator.java | 2 +- 5 files changed, 106 insertions(+), 3 deletions(-) create mode 100644 x-pack/plugin/analytics/src/internalClusterTest/java/org/elasticsearch/xpack/analytics/multiterms/MultiTermsWithRequestBreakerIT.java diff --git a/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java b/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java index 286a2b0975115..f14a8242cfec0 100644 --- a/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java @@ -477,7 +477,7 @@ protected A searchAndReduc indexSettings, query, breakerService, - builder.bytesToPreallocate(), + randomBoolean() ? 0 : builder.bytesToPreallocate(), maxBucket, fieldTypes ); diff --git a/x-pack/plugin/analytics/build.gradle b/x-pack/plugin/analytics/build.gradle index 3f1ba843af178..4318333698e75 100644 --- a/x-pack/plugin/analytics/build.gradle +++ b/x-pack/plugin/analytics/build.gradle @@ -1,4 +1,6 @@ apply plugin: 'elasticsearch.internal-es-plugin' +apply plugin: 'elasticsearch.internal-cluster-test' + esplugin { name 'x-pack-analytics' description 'Elasticsearch Expanded Pack Plugin - Analytics' diff --git a/x-pack/plugin/analytics/src/internalClusterTest/java/org/elasticsearch/xpack/analytics/multiterms/MultiTermsWithRequestBreakerIT.java b/x-pack/plugin/analytics/src/internalClusterTest/java/org/elasticsearch/xpack/analytics/multiterms/MultiTermsWithRequestBreakerIT.java new file mode 100644 index 0000000000000..d878ae93c52e7 --- /dev/null +++ b/x-pack/plugin/analytics/src/internalClusterTest/java/org/elasticsearch/xpack/analytics/multiterms/MultiTermsWithRequestBreakerIT.java @@ -0,0 +1,96 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.xpack.analytics.multiterms; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.common.breaker.CircuitBreakingException; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.search.aggregations.support.MultiValuesSourceFieldConfig; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.xpack.analytics.AnalyticsPlugin; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.stream.IntStream; + +/** + * test forked from CardinalityWithRequestBreakerIT + */ +public class MultiTermsWithRequestBreakerIT extends ESIntegTestCase { + + protected Collection> nodePlugins() { + return List.of(AnalyticsPlugin.class); + } + + /** + * Test that searches using multiterms aggregations returns all request breaker memory. + */ + public void testRequestBreaker() throws Exception { + final String requestBreaker = randomIntBetween(1, 10000) + "kb"; + logger.info("--> Using request breaker setting: {}", requestBreaker); + + indexRandom( + true, + IntStream.range(0, randomIntBetween(10, 1000)) + .mapToObj( + i -> client().prepareIndex("test") + .setId("id_" + i) + .setSource(Map.of("field0", randomAlphaOfLength(5), "field1", randomAlphaOfLength(5))) + ) + .toArray(IndexRequestBuilder[]::new) + ); + + client().admin() + .cluster() + .prepareUpdateSettings() + .setPersistentSettings( + Settings.builder().put(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), requestBreaker) + ) + .get(); + + try { + client().prepareSearch("test") + .addAggregation( + new MultiTermsAggregationBuilder("xxx").terms( + List.of( + new MultiValuesSourceFieldConfig.Builder().setFieldName("field0.keyword").build(), + new MultiValuesSourceFieldConfig.Builder().setFieldName("field1.keyword").build() + ) + ) + ) + .get(); + } catch (ElasticsearchException e) { + if (ExceptionsHelper.unwrap(e, CircuitBreakingException.class) == null) { + throw e; + } + } + + client().admin() + .cluster() + .prepareUpdateSettings() + .setPersistentSettings( + Settings.builder().putNull(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING.getKey()) + ) + .get(); + + // validation done by InternalTestCluster.ensureEstimatedStats() + } +} diff --git a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/multiterms/MultiTermsAggregator.java b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/multiterms/MultiTermsAggregator.java index f9ecfb94fcff8..33155a431176b 100644 --- a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/multiterms/MultiTermsAggregator.java +++ b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/multiterms/MultiTermsAggregator.java @@ -19,6 +19,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.core.CheckedConsumer; +import org.elasticsearch.core.Releasables; import org.elasticsearch.index.fielddata.SortedBinaryDocValues; import org.elasticsearch.index.fielddata.SortedNumericDoubleValues; import org.elasticsearch.search.DocValueFormat; @@ -110,7 +111,6 @@ protected MultiTermsAggregator( .collect(Collectors.toList()); keyConverters = values.stream().map(TermValuesSource::keyConverter).collect(Collectors.toList()); bucketOrds = BytesKeyedBucketOrds.build(context.bigArrays(), cardinality); - } private boolean subAggsNeedScore() { @@ -220,6 +220,11 @@ public void accept(Integer start) throws IOException { }; } + @Override + protected void doClose() { + Releasables.close(bucketOrds); + } + @Override public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { InternalMultiTerms.Bucket[][] topBucketsPerOrd = new InternalMultiTerms.Bucket[owningBucketOrds.length][]; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/categorization/CategorizeTextAggregator.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/categorization/CategorizeTextAggregator.java index 16058fbdae4f2..d413fb055dbbf 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/categorization/CategorizeTextAggregator.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/categorization/CategorizeTextAggregator.java @@ -105,7 +105,7 @@ protected CategorizeTextAggregator( @Override protected void doClose() { super.doClose(); - Releasables.close(this.analyzer, this.bytesRefHash); + Releasables.close(this.analyzer, this.bytesRefHash, this.bucketOrds, this.categorizers); } @Override From b46d980680bcad65060b7d393ec7c695f9c1c2f5 Mon Sep 17 00:00:00 2001 From: iverase Date: Tue, 19 Oct 2021 08:21:19 +0200 Subject: [PATCH 2/2] fix compile error --- .../multiterms/MultiTermsWithRequestBreakerIT.java | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/x-pack/plugin/analytics/src/internalClusterTest/java/org/elasticsearch/xpack/analytics/multiterms/MultiTermsWithRequestBreakerIT.java b/x-pack/plugin/analytics/src/internalClusterTest/java/org/elasticsearch/xpack/analytics/multiterms/MultiTermsWithRequestBreakerIT.java index d878ae93c52e7..addaa4082f757 100644 --- a/x-pack/plugin/analytics/src/internalClusterTest/java/org/elasticsearch/xpack/analytics/multiterms/MultiTermsWithRequestBreakerIT.java +++ b/x-pack/plugin/analytics/src/internalClusterTest/java/org/elasticsearch/xpack/analytics/multiterms/MultiTermsWithRequestBreakerIT.java @@ -27,8 +27,6 @@ import org.elasticsearch.xpack.analytics.AnalyticsPlugin; import java.util.Collection; -import java.util.List; -import java.util.Map; import java.util.stream.IntStream; /** @@ -37,7 +35,7 @@ public class MultiTermsWithRequestBreakerIT extends ESIntegTestCase { protected Collection> nodePlugins() { - return List.of(AnalyticsPlugin.class); + return org.elasticsearch.core.List.of(AnalyticsPlugin.class); } /** @@ -51,9 +49,9 @@ public void testRequestBreaker() throws Exception { true, IntStream.range(0, randomIntBetween(10, 1000)) .mapToObj( - i -> client().prepareIndex("test") + i -> client().prepareIndex("test", "_doc") .setId("id_" + i) - .setSource(Map.of("field0", randomAlphaOfLength(5), "field1", randomAlphaOfLength(5))) + .setSource(org.elasticsearch.core.Map.of("field0", randomAlphaOfLength(5), "field1", randomAlphaOfLength(5))) ) .toArray(IndexRequestBuilder[]::new) ); @@ -70,7 +68,7 @@ public void testRequestBreaker() throws Exception { client().prepareSearch("test") .addAggregation( new MultiTermsAggregationBuilder("xxx").terms( - List.of( + org.elasticsearch.core.List.of( new MultiValuesSourceFieldConfig.Builder().setFieldName("field0.keyword").build(), new MultiValuesSourceFieldConfig.Builder().setFieldName("field1.keyword").build() )