diff --git a/server/src/main/java/org/elasticsearch/index/CompositeIndexEventListener.java b/server/src/main/java/org/elasticsearch/index/CompositeIndexEventListener.java index 543ec9be75d17..d4e6362085238 100644 --- a/server/src/main/java/org/elasticsearch/index/CompositeIndexEventListener.java +++ b/server/src/main/java/org/elasticsearch/index/CompositeIndexEventListener.java @@ -248,6 +248,18 @@ public void beforeIndexAddedToCluster(Index index, Settings indexSettings) { } } + @Override + public void onStoreCreated(ShardId shardId) { + for (IndexEventListener listener : listeners) { + try { + listener.onStoreCreated(shardId); + } catch (Exception e) { + logger.warn("failed to invoke on store created", e); + throw e; + } + } + } + @Override public void onStoreClosed(ShardId shardId) { for (IndexEventListener listener : listeners) { diff --git a/server/src/main/java/org/elasticsearch/index/IndexService.java b/server/src/main/java/org/elasticsearch/index/IndexService.java index 1b1784495e685..e3d8732c7fb81 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexService.java +++ b/server/src/main/java/org/elasticsearch/index/IndexService.java @@ -388,6 +388,7 @@ public synchronized IndexShard createShard( DirectoryService directoryService = indexStore.newDirectoryService(path); store = new Store(shardId, this.indexSettings, directoryService.newDirectory(), lock, new StoreCloseListener(shardId, () -> eventListener.onStoreClosed(shardId))); + eventListener.onStoreCreated(shardId); indexShard = new IndexShard( routing, this.indexSettings, diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexEventListener.java b/server/src/main/java/org/elasticsearch/index/shard/IndexEventListener.java index c0a89e7cf006c..982b42b2c3f66 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexEventListener.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexEventListener.java @@ -160,6 +160,13 @@ default void afterIndexShardDeleted(ShardId shardId, Settings indexSettings) { default void beforeIndexAddedToCluster(Index index, Settings indexSettings) { } + /** + * Called when the given shards store is created. The shard store is created before the shard is created. + * + * @param shardId the shard ID the store belongs to + */ + default void onStoreCreated(ShardId shardId) {} + /** * Called when the given shards store is closed. The store is closed once all resource have been released on the store. * This implies that all index readers are closed and no recoveries are running. diff --git a/server/src/main/java/org/elasticsearch/indices/IndicesService.java b/server/src/main/java/org/elasticsearch/indices/IndicesService.java index 32cbc4f70212b..2977cea2ae813 100644 --- a/server/src/main/java/org/elasticsearch/indices/IndicesService.java +++ b/server/src/main/java/org/elasticsearch/indices/IndicesService.java @@ -62,6 +62,7 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.concurrent.AbstractRefCounted; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.iterable.Iterables; import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; @@ -125,6 +126,7 @@ import java.io.Closeable; import java.io.IOException; +import java.io.UncheckedIOException; import java.nio.file.Files; import java.util.ArrayList; import java.util.Collection; @@ -195,6 +197,7 @@ public class IndicesService extends AbstractLifecycleComponent private final MetaStateService metaStateService; private final Collection>> engineFactoryProviders; private final Map> indexStoreFactories; + final AbstractRefCounted indicesRefCount; // pkg-private for testing @Override protected void doStart() { @@ -250,6 +253,27 @@ public void onRemoval(ShardId shardId, String fieldName, boolean wasEvicted, lon } this.indexStoreFactories = indexStoreFactories; + // doClose() is called when shutting down a node, yet there might still be ongoing requests + // that we need to wait for before closing some resources such as the caches. In order to + // avoid closing these resources while ongoing requests are still being processed, we use a + // ref count which will only close them when both this service and all index services are + // actually closed + indicesRefCount = new AbstractRefCounted("indices") { + @Override + protected void closeInternal() { + try { + IOUtils.close( + analysisRegistry, + indexingMemoryController, + indicesFieldDataCache, + cacheCleaner, + indicesRequestCache, + indicesQueryCache); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + }; } @Override @@ -281,14 +305,8 @@ protected void doStop() { } @Override - protected void doClose() { - IOUtils.closeWhileHandlingException( - analysisRegistry, - indexingMemoryController, - indicesFieldDataCache, - cacheCleaner, - indicesRequestCache, - indicesQueryCache); + protected void doClose() throws IOException { + indicesRefCount.decRef(); } /** @@ -456,9 +474,17 @@ public synchronized IndexService createIndex( } List finalListeners = new ArrayList<>(builtInListeners); final IndexEventListener onStoreClose = new IndexEventListener() { + @Override + public void onStoreCreated(ShardId shardId) { + indicesRefCount.incRef(); + } @Override public void onStoreClosed(ShardId shardId) { - indicesQueryCache.onClose(shardId); + try { + indicesRefCount.decRef(); + } finally { + indicesQueryCache.onClose(shardId); + } } }; finalListeners.add(onStoreClose); diff --git a/server/src/main/java/org/elasticsearch/indices/analysis/PreBuiltCacheFactory.java b/server/src/main/java/org/elasticsearch/indices/analysis/PreBuiltCacheFactory.java index 7539c1653cce4..bc71cb597b234 100644 --- a/server/src/main/java/org/elasticsearch/indices/analysis/PreBuiltCacheFactory.java +++ b/server/src/main/java/org/elasticsearch/indices/analysis/PreBuiltCacheFactory.java @@ -81,7 +81,7 @@ public void put(Version version, T model) { @Override public Collection values() { - return Collections.singleton(model); + return model == null ? Collections.emptySet() : Collections.singleton(model); } } diff --git a/server/src/test/java/org/elasticsearch/indices/IndicesServiceCloseTests.java b/server/src/test/java/org/elasticsearch/indices/IndicesServiceCloseTests.java new file mode 100644 index 0000000000000..9990d7b082e5c --- /dev/null +++ b/server/src/test/java/org/elasticsearch/indices/IndicesServiceCloseTests.java @@ -0,0 +1,124 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.indices; + +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.env.Environment; +import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService; +import org.elasticsearch.node.MockNode; +import org.elasticsearch.node.Node; +import org.elasticsearch.node.NodeValidationException; +import org.elasticsearch.script.ScriptService; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.InternalTestCluster; +import org.elasticsearch.test.MockHttpTransport; +import org.elasticsearch.transport.nio.MockNioTransportPlugin; + +import java.nio.file.Path; +import java.util.Arrays; + +import static org.elasticsearch.cluster.coordination.ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING; +import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; +import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; +import static org.elasticsearch.discovery.SettingsBasedSeedHostsProvider.DISCOVERY_SEED_HOSTS_SETTING; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; + +public class IndicesServiceCloseTests extends ESTestCase { + + private Node startNode() throws NodeValidationException { + final Path tempDir = createTempDir(); + String nodeName = "node_s_0"; + Settings settings = Settings.builder() + .put(ClusterName.CLUSTER_NAME_SETTING.getKey(), InternalTestCluster.clusterName("single-node-cluster", random().nextLong())) + .put(Environment.PATH_HOME_SETTING.getKey(), tempDir) + .put(Environment.PATH_REPO_SETTING.getKey(), tempDir.resolve("repo")) + .put(Environment.PATH_SHARED_DATA_SETTING.getKey(), createTempDir().getParent()) + .put(Node.NODE_NAME_SETTING.getKey(), nodeName) + .put(ScriptService.SCRIPT_MAX_COMPILATIONS_RATE.getKey(), "1000/1m") + .put(EsExecutors.PROCESSORS_SETTING.getKey(), 1) // limit the number of threads created + .put("transport.type", getTestTransportType()) + .put(Node.NODE_DATA_SETTING.getKey(), true) + .put(NodeEnvironment.NODE_ID_SEED_SETTING.getKey(), random().nextLong()) + // default the watermarks low values to prevent tests from failing on nodes without enough disk space + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "1b") + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), "1b") + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.getKey(), "1b") + // turning on the real memory circuit breaker leads to spurious test failures. As have no full control over heap usage, we + // turn it off for these tests. + .put(HierarchyCircuitBreakerService.USE_REAL_MEMORY_USAGE_SETTING.getKey(), false) + .putList(DISCOVERY_SEED_HOSTS_SETTING.getKey()) // empty list disables a port scan for other nodes + .putList(INITIAL_MASTER_NODES_SETTING.getKey(), nodeName) + .build(); + + Node node = new MockNode(settings, Arrays.asList(MockNioTransportPlugin.class, MockHttpTransport.TestPlugin.class), true); + node.start(); + return node; + } + + public void testCloseEmptyIndicesService() throws Exception { + Node node = startNode(); + IndicesService indicesService = node.injector().getInstance(IndicesService.class); + assertEquals(1, indicesService.indicesRefCount.refCount()); + node.close(); + assertEquals(0, indicesService.indicesRefCount.refCount()); + } + + public void testCloseNonEmptyIndicesService() throws Exception { + Node node = startNode(); + IndicesService indicesService = node.injector().getInstance(IndicesService.class); + assertEquals(1, indicesService.indicesRefCount.refCount()); + + assertAcked(node.client().admin().indices().prepareCreate("test") + .setSettings(Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, 0))); + + assertEquals(2, indicesService.indicesRefCount.refCount()); + + node.close(); + assertEquals(0, indicesService.indicesRefCount.refCount()); + } + + public void testCloseWhileOngoingRequest() throws Exception { + Node node = startNode(); + IndicesService indicesService = node.injector().getInstance(IndicesService.class); + assertEquals(1, indicesService.indicesRefCount.refCount()); + + assertAcked(node.client().admin().indices().prepareCreate("test") + .setSettings(Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, 0))); + + assertEquals(2, indicesService.indicesRefCount.refCount()); + + IndexService indexService = indicesService.iterator().next(); + IndexShard shard = indexService.getShard(0); + shard.store().incRef(); + + node.close(); + assertEquals(1, indicesService.indicesRefCount.refCount()); + + shard.store().decRef(); + assertEquals(0, indicesService.indicesRefCount.refCount()); + } + +}