diff --git a/server/src/main/java/org/elasticsearch/index/CompositeIndexEventListener.java b/server/src/main/java/org/elasticsearch/index/CompositeIndexEventListener.java index e19cbe38c7e37..03f15d7a85d8e 100644 --- a/server/src/main/java/org/elasticsearch/index/CompositeIndexEventListener.java +++ b/server/src/main/java/org/elasticsearch/index/CompositeIndexEventListener.java @@ -118,18 +118,6 @@ public void afterIndexShardClosed(ShardId shardId, @Nullable IndexShard indexSha } } - @Override - public void onShardInactive(IndexShard indexShard) { - for (IndexEventListener listener : listeners) { - try { - listener.onShardInactive(indexShard); - } catch (Exception e) { - logger.warn(() -> new ParameterizedMessage("[{}] failed to invoke on shard inactive callback", - indexShard.shardId().getId()), e); - throw e; - } - } - } @Override public void indexShardStateChanged(IndexShard indexShard, @Nullable IndexShardState previousState, IndexShardState currentState, diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexEventListener.java b/server/src/main/java/org/elasticsearch/index/shard/IndexEventListener.java index 982b42b2c3f66..9cbd9ce4df253 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexEventListener.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexEventListener.java @@ -84,13 +84,6 @@ default void afterIndexShardClosed(ShardId shardId, @Nullable IndexShard indexSh default void indexShardStateChanged(IndexShard indexShard, @Nullable IndexShardState previousState, IndexShardState currentState, @Nullable String reason) {} - /** - * Called when a shard is marked as inactive - * - * @param indexShard The shard that was marked inactive - */ - default void onShardInactive(IndexShard indexShard) {} - /** * Called before the index gets created. Note that this is also called * when the index is created on data nodes diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index d160d2035bff8..8185d8fad5f16 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1578,7 +1578,7 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t onNewEngine(newEngine); currentEngineReference.set(newEngine); // We set active because we are now writing operations to the engine; this way, - // if we go idle after some time and become inactive, we still give sync'd flush a chance to run. + // we can flush if we go idle after some time and become inactive. active.set(true); } // time elapses after the engine is created above (pulling the config settings) until we set the engine reference, during @@ -1758,19 +1758,28 @@ public void addShardFailureCallback(Consumer onShardFailure) { /** * Called by {@link IndexingMemoryController} to check whether more than {@code inactiveTimeNS} has passed since the last - * indexing operation, and notify listeners that we are now inactive so e.g. sync'd flush can happen. + * indexing operation, so we can flush the index. */ - public void checkIdle(long inactiveTimeNS) { + public void flushOnIdle(long inactiveTimeNS) { Engine engineOrNull = getEngineOrNull(); if (engineOrNull != null && System.nanoTime() - engineOrNull.getLastWriteNanos() >= inactiveTimeNS) { boolean wasActive = active.getAndSet(false); if (wasActive) { - logger.debug("shard is now inactive"); - try { - indexEventListener.onShardInactive(this); - } catch (Exception e) { - logger.warn("failed to notify index event listener", e); - } + logger.debug("flushing shard on inactive"); + threadPool.executor(ThreadPool.Names.FLUSH).execute(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + if (state != IndexShardState.CLOSED) { + logger.warn("failed to flush shard on inactive", e); + } + } + + @Override + protected void doRun() { + flush(new FlushRequest().waitIfOngoing(false).force(false)); + periodicFlushMetric.inc(); + } + }); } } } diff --git a/server/src/main/java/org/elasticsearch/indices/IndexingMemoryController.java b/server/src/main/java/org/elasticsearch/indices/IndexingMemoryController.java index e358bc57798b4..8344355a0f372 100644 --- a/server/src/main/java/org/elasticsearch/indices/IndexingMemoryController.java +++ b/server/src/main/java/org/elasticsearch/indices/IndexingMemoryController.java @@ -303,7 +303,7 @@ private void runUnlocked() { long totalBytesWriting = 0; for (IndexShard shard : availableShards()) { - // Give shard a chance to transition to inactive so sync'd flush can happen: + // Give shard a chance to transition to inactive so we can flush checkIdle(shard, inactiveTime.nanos()); // How many bytes this shard is currently (async'd) moving from heap to disk: @@ -400,7 +400,7 @@ private void runUnlocked() { */ protected void checkIdle(IndexShard shard, long inactiveTimeNS) { try { - shard.checkIdle(inactiveTimeNS); + shard.flushOnIdle(inactiveTimeNS); } catch (AlreadyClosedException e) { logger.trace(() -> new ParameterizedMessage("ignore exception while checking if shard {} is inactive", shard.shardId()), e); } diff --git a/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index 50a3e2bea351f..b72a56b4a0543 100644 --- a/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -177,13 +177,7 @@ public IndicesClusterStateService( final RetentionLeaseSyncer retentionLeaseSyncer, final NodeClient client) { this.settings = settings; - this.buildInIndexListener = - Arrays.asList( - peerRecoverySourceService, - recoveryTargetService, - searchService, - syncedFlushService, - snapshotShardsService); + this.buildInIndexListener = Arrays.asList(peerRecoverySourceService, recoveryTargetService, searchService, snapshotShardsService); this.indicesService = indicesService; this.clusterService = clusterService; this.threadPool = threadPool; diff --git a/server/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java b/server/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java index fee11a0a9f54b..c0e0d513b33d7 100644 --- a/server/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java +++ b/server/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java @@ -47,7 +47,6 @@ import org.elasticsearch.index.IndexService; import org.elasticsearch.index.engine.CommitStats; import org.elasticsearch.index.engine.Engine; -import org.elasticsearch.index.shard.IndexEventListener; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardNotFoundException; @@ -71,7 +70,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentMap; -public class SyncedFlushService implements IndexEventListener { +public class SyncedFlushService { private static final Logger logger = LogManager.getLogger(SyncedFlushService.class); @@ -101,25 +100,6 @@ public SyncedFlushService(IndicesService indicesService, new InFlightOpCountTransportHandler()); } - @Override - public void onShardInactive(final IndexShard indexShard) { - // we only want to call sync flush once, so only trigger it when we are on a primary - if (indexShard.routingEntry().primary()) { - attemptSyncedFlush(indexShard.shardId(), new ActionListener() { - @Override - public void onResponse(ShardsSyncedFlushResult syncedFlushResult) { - logger.trace("{} sync flush on inactive shard returned successfully for sync_id: {}", - syncedFlushResult.getShardId(), syncedFlushResult.syncId()); - } - - @Override - public void onFailure(Exception e) { - logger.debug(() -> new ParameterizedMessage("{} sync flush on inactive shard failed", indexShard.shardId()), e); - } - }); - } - } - /** * a utility method to perform a synced flush for all shards of multiple indices. * see {@link #attemptSyncedFlush(ShardId, ActionListener)} diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/create/ShrinkIndexIT.java b/server/src/test/java/org/elasticsearch/action/admin/indices/create/ShrinkIndexIT.java index f24bbf5b999b2..88a090757d5bc 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/create/ShrinkIndexIT.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/create/ShrinkIndexIT.java @@ -533,7 +533,7 @@ public void testShrinkCommitsMergeOnIdle() throws Exception { IndexService indexShards = service.indexService(target.getIndex()); IndexShard shard = indexShards.getShard(0); assertTrue(shard.isActive()); - shard.checkIdle(0); + shard.flushOnIdle(0); assertFalse(shard.isActive()); } } diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java index e524e7bad5d49..86fa8c613b628 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -25,7 +25,6 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; -import org.elasticsearch.action.admin.indices.stats.IndexStats; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; @@ -167,23 +166,6 @@ public void testLockTryingToDelete() throws Exception { } } - public void testMarkAsInactiveTriggersSyncedFlush() throws Exception { - assertAcked(client().admin().indices().prepareCreate("test") - .setSettings(Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, 0))); - client().prepareIndex("test").setSource("{}", XContentType.JSON).get(); - ensureGreen("test"); - IndicesService indicesService = getInstanceFromNode(IndicesService.class); - indicesService.indexService(resolveIndex("test")).getShardOrNull(0).checkIdle(0); - assertBusy(() -> { - IndexStats indexStats = client().admin().indices().prepareStats("test").clear().get().getIndex("test"); - assertNotNull(indexStats.getShards()[0].getCommitStats().getUserData().get(Engine.SYNC_COMMIT_ID)); - indicesService.indexService(resolveIndex("test")).getShardOrNull(0).checkIdle(0); - } - ); - IndexStats indexStats = client().admin().indices().prepareStats("test").get().getIndex("test"); - assertNotNull(indexStats.getShards()[0].getCommitStats().getUserData().get(Engine.SYNC_COMMIT_ID)); - } - public void testDurableFlagHasEffect() throws Exception { createIndex("test"); ensureGreen(); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 59e437da4f397..93867f4aa4c77 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -3419,7 +3419,7 @@ public void testScheduledRefresh() throws Exception { indexDoc(primary, "_doc", "2", "{\"foo\" : \"bar\"}"); assertFalse(primary.scheduledRefresh()); assertTrue(primary.isSearchIdle()); - primary.checkIdle(0); + primary.flushOnIdle(0); assertTrue(primary.scheduledRefresh()); // make sure we refresh once the shard is inactive try (Engine.Searcher searcher = primary.acquireSearcher("test")) { assertEquals(3, searcher.getIndexReader().numDocs()); @@ -3629,38 +3629,13 @@ public void testSegmentMemoryTrackedWithRandomSearchers() throws Exception { assertThat(breaker.getUsed(), equalTo(0L)); } - public void testFlushOnInactive() throws Exception { - Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) - .build(); - IndexMetaData metaData = IndexMetaData.builder("test") - .putMapping("{ \"properties\": { \"foo\": { \"type\": \"text\"}}}") - .settings(settings) - .primaryTerm(0, 1).build(); - ShardRouting shardRouting = - TestShardRouting.newShardRouting(new ShardId(metaData.getIndex(), 0), "n1", true, - ShardRoutingState.INITIALIZING, RecoverySource.EmptyStoreRecoverySource.INSTANCE); - final ShardId shardId = shardRouting.shardId(); - final NodeEnvironment.NodePath nodePath = new NodeEnvironment.NodePath(createTempDir()); - ShardPath shardPath = new ShardPath(false, nodePath.resolve(shardId), nodePath.resolve(shardId), shardId); - AtomicBoolean markedInactive = new AtomicBoolean(); - AtomicReference primaryRef = new AtomicReference<>(); - IndexShard primary = newShard(shardRouting, shardPath, metaData, null, null, new InternalEngineFactory(), () -> { }, - RetentionLeaseSyncer.EMPTY, new IndexEventListener() { - @Override - public void onShardInactive(IndexShard indexShard) { - markedInactive.set(true); - primaryRef.get().flush(new FlushRequest()); - } - }); - primaryRef.set(primary); - recoverShardFromStore(primary); + public void testFlushOnIdle() throws Exception { + IndexShard shard = newStartedShard(); for (int i = 0; i < 3; i++) { - indexDoc(primary, "_doc", "" + i, "{\"foo\" : \"" + randomAlphaOfLength(10) + "\"}"); - primary.refresh("test"); // produce segments + indexDoc(shard, "_doc", Integer.toString(i)); + shard.refresh("test"); // produce segments } - List segments = primary.segments(false); + List segments = shard.segments(false); Set names = new HashSet<>(); for (Segment segment : segments) { assertFalse(segment.committed); @@ -3668,10 +3643,10 @@ public void onShardInactive(IndexShard indexShard) { names.add(segment.getName()); } assertEquals(3, segments.size()); - primary.flush(new FlushRequest()); - primary.forceMerge(new ForceMergeRequest().maxNumSegments(1).flush(false)); - primary.refresh("test"); - segments = primary.segments(false); + shard.flush(new FlushRequest()); + shard.forceMerge(new ForceMergeRequest().maxNumSegments(1).flush(false)); + shard.refresh("test"); + segments = shard.segments(false); for (Segment segment : segments) { if (names.contains(segment.getName())) { assertTrue(segment.committed); @@ -3683,20 +3658,18 @@ public void onShardInactive(IndexShard indexShard) { } assertEquals(4, segments.size()); - assertFalse(markedInactive.get()); - assertBusy(() -> { - primary.checkIdle(0); - assertFalse(primary.isActive()); - }); + shard.flushOnIdle(0); + assertFalse(shard.isActive()); - assertTrue(markedInactive.get()); - segments = primary.segments(false); - assertEquals(1, segments.size()); - for (Segment segment : segments) { - assertTrue(segment.committed); - assertTrue(segment.search); - } - closeShards(primary); + assertBusy(() -> { // flush happens in the background using the flush threadpool + List segmentsAfterFlush = shard.segments(false); + assertEquals(1, segmentsAfterFlush.size()); + for (Segment segment : segmentsAfterFlush) { + assertTrue(segment.committed); + assertTrue(segment.search); + } + }); + closeShards(shard); } public void testOnCloseStats() throws IOException { diff --git a/server/src/test/java/org/elasticsearch/indices/flush/FlushIT.java b/server/src/test/java/org/elasticsearch/indices/flush/FlushIT.java index 79211202a14bd..5b4b80aab8c57 100644 --- a/server/src/test/java/org/elasticsearch/indices/flush/FlushIT.java +++ b/server/src/test/java/org/elasticsearch/indices/flush/FlushIT.java @@ -37,6 +37,7 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.InternalEngine; @@ -47,15 +48,22 @@ import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardTestCase; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.IndexingMemoryController; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.InternalSettingsPlugin; +import org.elasticsearch.test.InternalTestCluster; import java.io.IOException; import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -71,6 +79,11 @@ @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST) public class FlushIT extends ESIntegTestCase { + @Override + protected Collection> nodePlugins() { + return Collections.singletonList(InternalSettingsPlugin.class); + } + public void testWaitIfOngoing() throws InterruptedException { createIndex("test"); ensureGreen("test"); @@ -369,4 +382,29 @@ public void testDoNotRenewSyncedFlushWhenAllSealed() throws Exception { assertThat(forthSeal.successfulShards(), equalTo(numberOfReplicas + 1)); assertThat(forthSeal.syncId(), not(equalTo(thirdSeal.syncId()))); } + + public void testFlushOnInactive() throws Exception { + final String indexName = "flush_on_inactive"; + List dataNodes = internalCluster().startDataOnlyNodes(2, Settings.builder() + .put(IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.getKey(), randomTimeValue(10, 1000, "ms")).build()); + assertAcked(client().admin().indices().prepareCreate(indexName).setSettings(Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), randomTimeValue(50, 200, "ms")) + .put("index.routing.allocation.include._name", String.join(",", dataNodes)) + .build())); + ensureGreen(indexName); + int numDocs = randomIntBetween(1, 10); + for (int i = 0; i < numDocs; i++) { + client().prepareIndex(indexName).setSource("f", "v").get(); + } + if (randomBoolean()) { + internalCluster().restartNode(randomFrom(dataNodes), new InternalTestCluster.RestartCallback()); + ensureGreen(indexName); + } + assertBusy(() -> { + for (ShardStats shardStats : client().admin().indices().prepareStats(indexName).get().getShards()) { + assertThat(shardStats.getStats().getTranslog().getUncommittedOperations(), equalTo(0)); + } + }, 30, TimeUnit.SECONDS); + } } diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index dc119f940de21..5f6143d3ccf39 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -1983,9 +1983,13 @@ public List startMasterOnlyNodes(int numNodes, Settings settings) { } public List startDataOnlyNodes(int numNodes) { + return startDataOnlyNodes(numNodes, Settings.EMPTY); + } + + public List startDataOnlyNodes(int numNodes, Settings settings) { return startNodes( numNodes, - Settings.builder().put(Settings.EMPTY).put(Node.NODE_MASTER_SETTING.getKey(), false) + Settings.builder().put(settings).put(Node.NODE_MASTER_SETTING.getKey(), false) .put(Node.NODE_DATA_SETTING.getKey(), true).build()); } diff --git a/test/framework/src/main/java/org/elasticsearch/test/MockIndexEventListener.java b/test/framework/src/main/java/org/elasticsearch/test/MockIndexEventListener.java index 77d1c315b6970..81178c9019845 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/MockIndexEventListener.java +++ b/test/framework/src/main/java/org/elasticsearch/test/MockIndexEventListener.java @@ -126,11 +126,6 @@ public void indexShardStateChanged(IndexShard indexShard, @Nullable IndexShardSt delegate.indexShardStateChanged(indexShard, previousState, currentState, reason); } - @Override - public void onShardInactive(IndexShard indexShard) { - delegate.onShardInactive(indexShard); - } - @Override public void beforeIndexCreated(Index index, Settings indexSettings) { delegate.beforeIndexCreated(index, indexSettings);