Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't close caches while there might still be in-flight requests. #38958

Merged
merged 2 commits into from
Feb 18, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,18 @@ public void beforeIndexAddedToCluster(Index index, Settings indexSettings) {
}
}

@Override
public void onStoreCreated(ShardId shardId) {
for (IndexEventListener listener : listeners) {
try {
listener.onStoreCreated(shardId);
} catch (Exception e) {
logger.warn("failed to invoke on store created", e);
throw e;
}
}
}

@Override
public void onStoreClosed(ShardId shardId) {
for (IndexEventListener listener : listeners) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,7 @@ public synchronized IndexShard createShard(
DirectoryService directoryService = indexStore.newDirectoryService(path);
store = new Store(shardId, this.indexSettings, directoryService.newDirectory(), lock,
new StoreCloseListener(shardId, () -> eventListener.onStoreClosed(shardId)));
eventListener.onStoreCreated(shardId);
indexShard = new IndexShard(
routing,
this.indexSettings,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,13 @@ default void afterIndexShardDeleted(ShardId shardId, Settings indexSettings) {
default void beforeIndexAddedToCluster(Index index, Settings indexSettings) {
}

/**
* Called when the given shards store is created. The shard store is created before the shard is created.
*
* @param shardId the shard ID the store belongs to
*/
default void onStoreCreated(ShardId shardId) {}

/**
* Called when the given shards store is closed. The store is closed once all resource have been released on the store.
* This implies that all index readers are closed and no recoveries are running.
Expand Down
44 changes: 35 additions & 9 deletions server/src/main/java/org/elasticsearch/indices/IndicesService.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.iterable.Iterables;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
Expand Down Expand Up @@ -125,6 +126,7 @@

import java.io.Closeable;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -195,6 +197,7 @@ public class IndicesService extends AbstractLifecycleComponent
private final MetaStateService metaStateService;
private final Collection<Function<IndexSettings, Optional<EngineFactory>>> engineFactoryProviders;
private final Map<String, Function<IndexSettings, IndexStore>> indexStoreFactories;
final AbstractRefCounted indicesRefCount; // pkg-private for testing

@Override
protected void doStart() {
Expand Down Expand Up @@ -250,6 +253,27 @@ public void onRemoval(ShardId shardId, String fieldName, boolean wasEvicted, lon
}

this.indexStoreFactories = indexStoreFactories;
// doClose() is called when shutting down a node, yet there might still be ongoing requests
// that we need to wait for before closing some resources such as the caches. In order to
// avoid closing these resources while ongoing requests are still being processed, we use a
// ref count which will only close them when both this service and all index services are
// actually closed
indicesRefCount = new AbstractRefCounted("indices") {
@Override
protected void closeInternal() {
try {
IOUtils.close(
analysisRegistry,
indexingMemoryController,
indicesFieldDataCache,
cacheCleaner,
indicesRequestCache,
indicesQueryCache);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
};
}

@Override
Expand Down Expand Up @@ -281,14 +305,8 @@ protected void doStop() {
}

@Override
protected void doClose() {
IOUtils.closeWhileHandlingException(
analysisRegistry,
indexingMemoryController,
indicesFieldDataCache,
cacheCleaner,
indicesRequestCache,
indicesQueryCache);
protected void doClose() throws IOException {
indicesRefCount.decRef();
}

/**
Expand Down Expand Up @@ -456,9 +474,17 @@ public synchronized IndexService createIndex(
}
List<IndexEventListener> finalListeners = new ArrayList<>(builtInListeners);
final IndexEventListener onStoreClose = new IndexEventListener() {
@Override
public void onStoreCreated(ShardId shardId) {
indicesRefCount.incRef();
}
@Override
public void onStoreClosed(ShardId shardId) {
indicesQueryCache.onClose(shardId);
try {
indicesRefCount.decRef();
} finally {
indicesQueryCache.onClose(shardId);
}
}
};
finalListeners.add(onStoreClose);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public void put(Version version, T model) {

@Override
public Collection<T> values() {
return Collections.singleton(model);
return model == null ? Collections.emptySet() : Collections.singleton(model);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.indices;

import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
import org.elasticsearch.node.MockNode;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.NodeValidationException;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.MockHttpTransport;
import org.elasticsearch.transport.nio.MockNioTransportPlugin;

import java.nio.file.Path;
import java.util.Arrays;

import static org.elasticsearch.cluster.coordination.ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
import static org.elasticsearch.discovery.SettingsBasedSeedHostsProvider.DISCOVERY_SEED_HOSTS_SETTING;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;

public class IndicesServiceCloseTests extends ESTestCase {

private Node startNode() throws NodeValidationException {
final Path tempDir = createTempDir();
String nodeName = "node_s_0";
Settings settings = Settings.builder()
.put(ClusterName.CLUSTER_NAME_SETTING.getKey(), InternalTestCluster.clusterName("single-node-cluster", random().nextLong()))
.put(Environment.PATH_HOME_SETTING.getKey(), tempDir)
.put(Environment.PATH_REPO_SETTING.getKey(), tempDir.resolve("repo"))
.put(Environment.PATH_SHARED_DATA_SETTING.getKey(), createTempDir().getParent())
.put(Node.NODE_NAME_SETTING.getKey(), nodeName)
.put(ScriptService.SCRIPT_MAX_COMPILATIONS_RATE.getKey(), "1000/1m")
.put(EsExecutors.PROCESSORS_SETTING.getKey(), 1) // limit the number of threads created
.put("transport.type", getTestTransportType())
.put(Node.NODE_DATA_SETTING.getKey(), true)
.put(NodeEnvironment.NODE_ID_SEED_SETTING.getKey(), random().nextLong())
// default the watermarks low values to prevent tests from failing on nodes without enough disk space
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "1b")
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), "1b")
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.getKey(), "1b")
// turning on the real memory circuit breaker leads to spurious test failures. As have no full control over heap usage, we
// turn it off for these tests.
.put(HierarchyCircuitBreakerService.USE_REAL_MEMORY_USAGE_SETTING.getKey(), false)
.putList(DISCOVERY_SEED_HOSTS_SETTING.getKey()) // empty list disables a port scan for other nodes
.putList(INITIAL_MASTER_NODES_SETTING.getKey(), nodeName)
.build();

Node node = new MockNode(settings, Arrays.asList(MockNioTransportPlugin.class, MockHttpTransport.TestPlugin.class), true);
node.start();
return node;
}

public void testCloseEmptyIndicesService() throws Exception {
Node node = startNode();
IndicesService indicesService = node.injector().getInstance(IndicesService.class);
assertEquals(1, indicesService.indicesRefCount.refCount());
node.close();
assertEquals(0, indicesService.indicesRefCount.refCount());
}

public void testCloseNonEmptyIndicesService() throws Exception {
Node node = startNode();
IndicesService indicesService = node.injector().getInstance(IndicesService.class);
assertEquals(1, indicesService.indicesRefCount.refCount());

assertAcked(node.client().admin().indices().prepareCreate("test")
.setSettings(Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, 0)));

assertEquals(2, indicesService.indicesRefCount.refCount());

node.close();
assertEquals(0, indicesService.indicesRefCount.refCount());
}

public void testCloseWhileOngoingRequest() throws Exception {
Node node = startNode();
IndicesService indicesService = node.injector().getInstance(IndicesService.class);
assertEquals(1, indicesService.indicesRefCount.refCount());

assertAcked(node.client().admin().indices().prepareCreate("test")
.setSettings(Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, 0)));

assertEquals(2, indicesService.indicesRefCount.refCount());

IndexService indexService = indicesService.iterator().next();
IndexShard shard = indexService.getShard(0);
shard.store().incRef();

node.close();
assertEquals(1, indicesService.indicesRefCount.refCount());

shard.store().decRef();
assertEquals(0, indicesService.indicesRefCount.refCount());
}

}