diff --git a/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java b/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java index eff2b52ca22f2..46ee68cf7d488 100644 --- a/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java +++ b/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java @@ -202,6 +202,16 @@ public InternalIndexShard(ShardId shardId, @IndexSettings Settings indexSettings logger.debug("state: [CREATED]"); this.checkIndexOnStartup = indexSettings.get("index.shard.check_on_startup", "false"); + + // since we can do async merging, it will not be called explicitly when indexing (adding / deleting docs), and only when flushing + // so, make sure we periodically call it, this need to be a small enough value so merging will actually + // happen and reduce the number of segments + if (mergeInterval.millis() > 0) { + mergeScheduleFuture = threadPool.schedule(mergeInterval, ThreadPool.Names.SAME, new EngineMerger()); + logger.debug("scheduling optimizer / merger every {}", mergeInterval); + } else { + logger.debug("scheduled optimizer / merger disabled"); + } } public MergeSchedulerProvider mergeScheduler() { @@ -731,7 +741,7 @@ public InternalIndexShard postRecovery(String reason) throws IndexShardStartedEx checkIndex(true); } engine.start(); - startScheduledTasksIfNeeded(); + startEngineRefresher(); changeState(IndexShardState.POST_RECOVERY, reason); } indicesLifecycle.afterIndexShardPostRecovery(this); @@ -778,7 +788,7 @@ public void performRecoveryFinalization(boolean withFlush) throws ElasticsearchE changeState(IndexShardState.POST_RECOVERY, "post recovery"); } indicesLifecycle.afterIndexShardPostRecovery(this); - startScheduledTasksIfNeeded(); + startEngineRefresher(); engine.enableGcDeletes(true); } @@ -913,22 +923,13 @@ private void verifyStarted() throws IllegalIndexShardStateException { } } - private void startScheduledTasksIfNeeded() { + private void startEngineRefresher() { if (refreshInterval.millis() > 0) { refreshScheduledFuture = threadPool.schedule(refreshInterval, ThreadPool.Names.SAME, new EngineRefresher()); logger.debug("scheduling refresher every {}", refreshInterval); } else { logger.debug("scheduled refresher disabled"); } - // since we can do async merging, it will not be called explicitly when indexing (adding / deleting docs), and only when flushing - // so, make sure we periodically call it, this need to be a small enough value so mergine will actually - // happen and reduce the number of segments - if (mergeInterval.millis() > 0) { - mergeScheduleFuture = threadPool.schedule(mergeInterval, ThreadPool.Names.SAME, new EngineMerger()); - logger.debug("scheduling optimizer / merger every {}", mergeInterval); - } else { - logger.debug("scheduled optimizer / merger disabled"); - } } private Query filterQueryIfNeeded(Query query, String[] types) { @@ -1020,7 +1021,8 @@ private void reschedule() { class EngineMerger implements Runnable { @Override public void run() { - if (!engine().possibleMergeNeeded()) { + final Engine engine = engine(); + if (engine == null || engine().possibleMergeNeeded() == false) { synchronized (mutex) { if (state != IndexShardState.CLOSED) { mergeScheduleFuture = threadPool.schedule(mergeInterval, ThreadPool.Names.SAME, this);