Skip to content

Commit

Permalink
Remove license state listeners on closables (#36308)
Browse files Browse the repository at this point in the history
We have a few places where we register license state listeners on
transient components (i.e., resources that can be open and closed during
the lifecycle of the server). In one case (the opt-out query cache) we
were never removing the registered listener, effectively a terrible
memory leak. In another case, we were not un-registered the listener
that we registered, since we were not referencing the same instance of
Runnable. This commit does two things:
  - introduces a marker interface LicenseStateListener so that it is
    easier to identify these listeners in the codebase and avoid classes
    that need to register a license state listener from having to
    implement Runnable which carries a different semantic meaning than
    we want here
  - fixes the two places where we are currently leaking license state
    listeners
  • Loading branch information
jasontedor authored Dec 6, 2018
1 parent adc8355 commit d4d3a3e
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 13 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.license;

import org.elasticsearch.Version;

/**
* Marker interface for callbacks that are invoked when the license state changes.
*/
@FunctionalInterface
public interface LicenseStateListener {

/**
* Callback when the license state changes. See {@link XPackLicenseState#update(License.OperationMode, boolean, Version)}.
*/
void licenseStateChanged();

}
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ private static class Status {
}
}

private final List<Runnable> listeners;
private final List<LicenseStateListener> listeners;
private final boolean isSecurityEnabled;
private final boolean isSecurityExplicitlyEnabled;

Expand Down Expand Up @@ -315,17 +315,17 @@ void update(OperationMode mode, boolean active, @Nullable Version mostRecentTria
}
}
}
listeners.forEach(Runnable::run);
listeners.forEach(LicenseStateListener::licenseStateChanged);
}

/** Add a listener to be notified on license change */
public void addListener(Runnable runnable) {
listeners.add(Objects.requireNonNull(runnable));
public void addListener(final LicenseStateListener listener) {
listeners.add(Objects.requireNonNull(listener));
}

/** Remove a listener */
public void removeListener(Runnable runnable) {
listeners.remove(runnable);
public void removeListener(final LicenseStateListener listener) {
listeners.remove(Objects.requireNonNull(listener));
}

/** Return the current license type. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.ingest.IngestMetadata;
import org.elasticsearch.ingest.PipelineConfiguration;
import org.elasticsearch.license.LicenseStateListener;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.protocol.xpack.watcher.DeleteWatchRequest;
import org.elasticsearch.protocol.xpack.watcher.PutWatchRequest;
Expand Down Expand Up @@ -78,7 +79,7 @@
import static org.elasticsearch.xpack.core.monitoring.exporter.MonitoringTemplateUtils.pipelineName;
import static org.elasticsearch.xpack.monitoring.Monitoring.CLEAN_WATCHER_HISTORY;

public class LocalExporter extends Exporter implements ClusterStateListener, CleanerService.Listener {
public class LocalExporter extends Exporter implements ClusterStateListener, CleanerService.Listener, LicenseStateListener {

private static final Logger logger = LogManager.getLogger(LocalExporter.class);

Expand Down Expand Up @@ -106,9 +107,10 @@ public LocalExporter(Exporter.Config config, Client client, CleanerService clean
this.clusterAlertBlacklist = ClusterAlertsUtil.getClusterAlertsBlacklist(config);
this.cleanerService = cleanerService;
this.dateTimeFormatter = dateTimeFormatter(config);
// if additional listeners are added here, adjust LocalExporterTests#testLocalExporterRemovesListenersOnClose accordingly
clusterService.addListener(this);
cleanerService.add(this);
licenseState.addListener(this::licenseChanged);
licenseState.addListener(this);
}

@Override
Expand All @@ -121,7 +123,8 @@ public void clusterChanged(ClusterChangedEvent event) {
/**
* When the license changes, we need to ensure that Watcher is setup properly.
*/
private void licenseChanged() {
@Override
public void licenseStateChanged() {
watcherSetup.set(false);
}

Expand Down Expand Up @@ -153,7 +156,7 @@ public void doClose() {
// we also remove the listener in resolveBulk after we get to RUNNING, but it's okay to double-remove
clusterService.removeListener(this);
cleanerService.remove(this);
licenseState.removeListener(this::licenseChanged);
licenseState.removeListener(this);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.monitoring.exporter.local;

import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.monitoring.cleaner.CleanerService;
import org.elasticsearch.xpack.monitoring.exporter.Exporter;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;

public class LocalExporterTests extends ESTestCase {

public void testLocalExporterRemovesListenersOnClose() {
final ClusterService clusterService = mock(ClusterService.class);
final XPackLicenseState licenseState = mock(XPackLicenseState.class);
final Exporter.Config config = new Exporter.Config("name", "type", Settings.EMPTY, clusterService, licenseState);
final CleanerService cleanerService = mock(CleanerService.class);
final LocalExporter exporter = new LocalExporter(config, mock(Client.class), cleanerService);
verify(clusterService).addListener(exporter);
verify(cleanerService).add(exporter);
verify(licenseState).addListener(exporter);
exporter.close();
verify(clusterService).removeListener(exporter);
verify(cleanerService).remove(exporter);
verify(licenseState).removeListener(exporter);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.cache.query.QueryCache;
import org.elasticsearch.indices.IndicesQueryCache;
import org.elasticsearch.license.LicenseStateListener;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.xpack.core.security.authz.AuthorizationServiceField;
import org.elasticsearch.xpack.core.security.authz.accesscontrol.IndicesAccessControl;
Expand All @@ -26,7 +27,7 @@
* Opts out of the query cache if field level security is active for the current request,
* and its unsafe to cache.
*/
public final class OptOutQueryCache extends AbstractIndexComponent implements QueryCache {
public final class OptOutQueryCache extends AbstractIndexComponent implements LicenseStateListener, QueryCache {

private final IndicesQueryCache indicesQueryCache;
private final ThreadContext context;
Expand All @@ -43,14 +44,20 @@ public OptOutQueryCache(
this.context = Objects.requireNonNull(context, "threadContext must not be null");
this.indexName = indexSettings.getIndex().getName();
this.licenseState = Objects.requireNonNull(licenseState, "licenseState");
licenseState.addListener(() -> this.clear("license state changed"));
licenseState.addListener(this);
}

@Override
public void close() throws ElasticsearchException {
licenseState.removeListener(this);
clear("close");
}

@Override
public void licenseStateChanged() {
clear("license state changed");
}

@Override
public void clear(String reason) {
logger.debug("full cache clear, reason [{}]", reason);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
import org.apache.lucene.search.BooleanClause;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.QueryCachingPolicy;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.Weight;
import org.apache.lucene.store.Directory;
Expand Down Expand Up @@ -184,6 +184,22 @@ public void testOptOutQueryCacheIndexDoesNotHaveFieldLevelSecurity() {
verify(indicesQueryCache).doCache(same(weight), same(policy));
}

public void testOptOutQueryCacheRemovesLicenseStateListenerOnClose() {
final Settings.Builder settings = Settings.builder()
.put("index.version.created", Version.CURRENT)
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0);
final IndexMetaData indexMetaData = IndexMetaData.builder("index").settings(settings).build();
final IndexSettings indexSettings = new IndexSettings(indexMetaData, Settings.EMPTY);
final IndicesQueryCache indicesQueryCache = mock(IndicesQueryCache.class);
final ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
final XPackLicenseState licenseState = mock(XPackLicenseState.class);
final OptOutQueryCache cache = new OptOutQueryCache(indexSettings, indicesQueryCache, threadContext, licenseState);
verify(licenseState).addListener(cache);
cache.close();
verify(licenseState).removeListener(cache);
}

private static FieldPermissionsDefinition fieldPermissionDef(String[] granted, String[] denied) {
return new FieldPermissionsDefinition(granted, denied);
}
Expand Down

0 comments on commit d4d3a3e

Please sign in to comment.