From d4d3a3e46750b0a9ed078864bb6111882946b687 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Thu, 6 Dec 2018 12:52:04 -0500 Subject: [PATCH] Remove license state listeners on closables (#36308) We have a few places where we register license state listeners on transient components (i.e., resources that can be open and closed during the lifecycle of the server). In one case (the opt-out query cache) we were never removing the registered listener, effectively a terrible memory leak. In another case, we were not un-registered the listener that we registered, since we were not referencing the same instance of Runnable. This commit does two things: - introduces a marker interface LicenseStateListener so that it is easier to identify these listeners in the codebase and avoid classes that need to register a license state listener from having to implement Runnable which carries a different semantic meaning than we want here - fixes the two places where we are currently leaking license state listeners --- .../license/LicenseStateListener.java | 22 +++++++++++ .../license/XPackLicenseState.java | 12 +++--- .../exporter/local/LocalExporter.java | 11 ++++-- .../exporter/local/LocalExporterTests.java | 37 +++++++++++++++++++ .../authz/accesscontrol/OptOutQueryCache.java | 11 +++++- .../accesscontrol/OptOutQueryCacheTests.java | 18 ++++++++- 6 files changed, 98 insertions(+), 13 deletions(-) create mode 100644 x-pack/plugin/core/src/main/java/org/elasticsearch/license/LicenseStateListener.java create mode 100644 x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/local/LocalExporterTests.java diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/LicenseStateListener.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/LicenseStateListener.java new file mode 100644 index 0000000000000..ef3302613c333 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/LicenseStateListener.java @@ -0,0 +1,22 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.license; + +import org.elasticsearch.Version; + +/** + * Marker interface for callbacks that are invoked when the license state changes. + */ +@FunctionalInterface +public interface LicenseStateListener { + + /** + * Callback when the license state changes. See {@link XPackLicenseState#update(License.OperationMode, boolean, Version)}. + */ + void licenseStateChanged(); + +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/XPackLicenseState.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/XPackLicenseState.java index 3cb189b5795d0..0b9640839202b 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/XPackLicenseState.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/XPackLicenseState.java @@ -266,7 +266,7 @@ private static class Status { } } - private final List listeners; + private final List listeners; private final boolean isSecurityEnabled; private final boolean isSecurityExplicitlyEnabled; @@ -315,17 +315,17 @@ void update(OperationMode mode, boolean active, @Nullable Version mostRecentTria } } } - listeners.forEach(Runnable::run); + listeners.forEach(LicenseStateListener::licenseStateChanged); } /** Add a listener to be notified on license change */ - public void addListener(Runnable runnable) { - listeners.add(Objects.requireNonNull(runnable)); + public void addListener(final LicenseStateListener listener) { + listeners.add(Objects.requireNonNull(listener)); } /** Remove a listener */ - public void removeListener(Runnable runnable) { - listeners.remove(runnable); + public void removeListener(final LicenseStateListener listener) { + listeners.remove(Objects.requireNonNull(listener)); } /** Return the current license type. */ diff --git a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/local/LocalExporter.java b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/local/LocalExporter.java index 908919ed2ce08..70ac5a0f7a730 100644 --- a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/local/LocalExporter.java +++ b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/local/LocalExporter.java @@ -34,6 +34,7 @@ import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.ingest.IngestMetadata; import org.elasticsearch.ingest.PipelineConfiguration; +import org.elasticsearch.license.LicenseStateListener; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.protocol.xpack.watcher.DeleteWatchRequest; import org.elasticsearch.protocol.xpack.watcher.PutWatchRequest; @@ -78,7 +79,7 @@ import static org.elasticsearch.xpack.core.monitoring.exporter.MonitoringTemplateUtils.pipelineName; import static org.elasticsearch.xpack.monitoring.Monitoring.CLEAN_WATCHER_HISTORY; -public class LocalExporter extends Exporter implements ClusterStateListener, CleanerService.Listener { +public class LocalExporter extends Exporter implements ClusterStateListener, CleanerService.Listener, LicenseStateListener { private static final Logger logger = LogManager.getLogger(LocalExporter.class); @@ -106,9 +107,10 @@ public LocalExporter(Exporter.Config config, Client client, CleanerService clean this.clusterAlertBlacklist = ClusterAlertsUtil.getClusterAlertsBlacklist(config); this.cleanerService = cleanerService; this.dateTimeFormatter = dateTimeFormatter(config); + // if additional listeners are added here, adjust LocalExporterTests#testLocalExporterRemovesListenersOnClose accordingly clusterService.addListener(this); cleanerService.add(this); - licenseState.addListener(this::licenseChanged); + licenseState.addListener(this); } @Override @@ -121,7 +123,8 @@ public void clusterChanged(ClusterChangedEvent event) { /** * When the license changes, we need to ensure that Watcher is setup properly. */ - private void licenseChanged() { + @Override + public void licenseStateChanged() { watcherSetup.set(false); } @@ -153,7 +156,7 @@ public void doClose() { // we also remove the listener in resolveBulk after we get to RUNNING, but it's okay to double-remove clusterService.removeListener(this); cleanerService.remove(this); - licenseState.removeListener(this::licenseChanged); + licenseState.removeListener(this); } } diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/local/LocalExporterTests.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/local/LocalExporterTests.java new file mode 100644 index 0000000000000..84d143a2c5b18 --- /dev/null +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/local/LocalExporterTests.java @@ -0,0 +1,37 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.monitoring.exporter.local; + +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.license.XPackLicenseState; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.monitoring.cleaner.CleanerService; +import org.elasticsearch.xpack.monitoring.exporter.Exporter; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +public class LocalExporterTests extends ESTestCase { + + public void testLocalExporterRemovesListenersOnClose() { + final ClusterService clusterService = mock(ClusterService.class); + final XPackLicenseState licenseState = mock(XPackLicenseState.class); + final Exporter.Config config = new Exporter.Config("name", "type", Settings.EMPTY, clusterService, licenseState); + final CleanerService cleanerService = mock(CleanerService.class); + final LocalExporter exporter = new LocalExporter(config, mock(Client.class), cleanerService); + verify(clusterService).addListener(exporter); + verify(cleanerService).add(exporter); + verify(licenseState).addListener(exporter); + exporter.close(); + verify(clusterService).removeListener(exporter); + verify(cleanerService).remove(exporter); + verify(licenseState).removeListener(exporter); + } + +} diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/accesscontrol/OptOutQueryCache.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/accesscontrol/OptOutQueryCache.java index 1ace72a1da03f..78058080e5b17 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/accesscontrol/OptOutQueryCache.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/accesscontrol/OptOutQueryCache.java @@ -14,6 +14,7 @@ import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.cache.query.QueryCache; import org.elasticsearch.indices.IndicesQueryCache; +import org.elasticsearch.license.LicenseStateListener; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.xpack.core.security.authz.AuthorizationServiceField; import org.elasticsearch.xpack.core.security.authz.accesscontrol.IndicesAccessControl; @@ -26,7 +27,7 @@ * Opts out of the query cache if field level security is active for the current request, * and its unsafe to cache. */ -public final class OptOutQueryCache extends AbstractIndexComponent implements QueryCache { +public final class OptOutQueryCache extends AbstractIndexComponent implements LicenseStateListener, QueryCache { private final IndicesQueryCache indicesQueryCache; private final ThreadContext context; @@ -43,14 +44,20 @@ public OptOutQueryCache( this.context = Objects.requireNonNull(context, "threadContext must not be null"); this.indexName = indexSettings.getIndex().getName(); this.licenseState = Objects.requireNonNull(licenseState, "licenseState"); - licenseState.addListener(() -> this.clear("license state changed")); + licenseState.addListener(this); } @Override public void close() throws ElasticsearchException { + licenseState.removeListener(this); clear("close"); } + @Override + public void licenseStateChanged() { + clear("license state changed"); + } + @Override public void clear(String reason) { logger.debug("full cache clear, reason [{}]", reason); diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/accesscontrol/OptOutQueryCacheTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/accesscontrol/OptOutQueryCacheTests.java index d2b6c736fd877..3eab571437191 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/accesscontrol/OptOutQueryCacheTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/accesscontrol/OptOutQueryCacheTests.java @@ -11,8 +11,8 @@ import org.apache.lucene.search.BooleanClause; import org.apache.lucene.search.BooleanQuery; import org.apache.lucene.search.IndexSearcher; -import org.apache.lucene.search.ScoreMode; import org.apache.lucene.search.QueryCachingPolicy; +import org.apache.lucene.search.ScoreMode; import org.apache.lucene.search.TermQuery; import org.apache.lucene.search.Weight; import org.apache.lucene.store.Directory; @@ -184,6 +184,22 @@ public void testOptOutQueryCacheIndexDoesNotHaveFieldLevelSecurity() { verify(indicesQueryCache).doCache(same(weight), same(policy)); } + public void testOptOutQueryCacheRemovesLicenseStateListenerOnClose() { + final Settings.Builder settings = Settings.builder() + .put("index.version.created", Version.CURRENT) + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 0); + final IndexMetaData indexMetaData = IndexMetaData.builder("index").settings(settings).build(); + final IndexSettings indexSettings = new IndexSettings(indexMetaData, Settings.EMPTY); + final IndicesQueryCache indicesQueryCache = mock(IndicesQueryCache.class); + final ThreadContext threadContext = new ThreadContext(Settings.EMPTY); + final XPackLicenseState licenseState = mock(XPackLicenseState.class); + final OptOutQueryCache cache = new OptOutQueryCache(indexSettings, indicesQueryCache, threadContext, licenseState); + verify(licenseState).addListener(cache); + cache.close(); + verify(licenseState).removeListener(cache); + } + private static FieldPermissionsDefinition fieldPermissionDef(String[] granted, String[] denied) { return new FieldPermissionsDefinition(granted, denied); }