Skip to content

Commit

Permalink
Core: Lucene merges must run on target shard during recovery
Browse files Browse the repository at this point in the history
This does not affect 2.0, where we let Lucene launch merges normally
(elastic#8643).

In 1.x, every 1 sec (default), we ask Lucene to kick off any new
merges, but we unfortunately don't turn that logic on in the target
shard until after recovery has finished.

This means if you have a large translog, and/or a smallish index
buffer, way too many segments can accumulate in the target shard
during recovery, making version lookups slower and slower (OI(N^2))
and possibly causing slow recovery issues like elastic#9226.

This fix changes IndexShard to launch merges as soon as the shard is
created, so merging runs during recovery.

Closes elastic#10463
  • Loading branch information
mikemccand committed Apr 11, 2015
1 parent a09a4d0 commit 1590c9c
Showing 1 changed file with 15 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,16 @@ public InternalIndexShard(ShardId shardId, @IndexSettings Settings indexSettings
logger.debug("state: [CREATED]");

this.checkIndexOnStartup = indexSettings.get("index.shard.check_on_startup", "false");

// since we can do async merging, it will not be called explicitly when indexing (adding / deleting docs), and only when flushing
// so, make sure we periodically call it, this need to be a small enough value so merging will actually
// happen and reduce the number of segments
if (mergeInterval.millis() > 0) {
mergeScheduleFuture = threadPool.schedule(mergeInterval, ThreadPool.Names.SAME, new EngineMerger());
logger.debug("scheduling optimizer / merger every {}", mergeInterval);
} else {
logger.debug("scheduled optimizer / merger disabled");
}
}

public MergeSchedulerProvider mergeScheduler() {
Expand Down Expand Up @@ -731,7 +741,7 @@ public InternalIndexShard postRecovery(String reason) throws IndexShardStartedEx
checkIndex(true);
}
engine.start();
startScheduledTasksIfNeeded();
startEngineRefresher();
changeState(IndexShardState.POST_RECOVERY, reason);
}
indicesLifecycle.afterIndexShardPostRecovery(this);
Expand Down Expand Up @@ -778,7 +788,7 @@ public void performRecoveryFinalization(boolean withFlush) throws ElasticsearchE
changeState(IndexShardState.POST_RECOVERY, "post recovery");
}
indicesLifecycle.afterIndexShardPostRecovery(this);
startScheduledTasksIfNeeded();
startEngineRefresher();
engine.enableGcDeletes(true);
}

Expand Down Expand Up @@ -913,22 +923,13 @@ private void verifyStarted() throws IllegalIndexShardStateException {
}
}

private void startScheduledTasksIfNeeded() {
private void startEngineRefresher() {
if (refreshInterval.millis() > 0) {
refreshScheduledFuture = threadPool.schedule(refreshInterval, ThreadPool.Names.SAME, new EngineRefresher());
logger.debug("scheduling refresher every {}", refreshInterval);
} else {
logger.debug("scheduled refresher disabled");
}
// since we can do async merging, it will not be called explicitly when indexing (adding / deleting docs), and only when flushing
// so, make sure we periodically call it, this need to be a small enough value so mergine will actually
// happen and reduce the number of segments
if (mergeInterval.millis() > 0) {
mergeScheduleFuture = threadPool.schedule(mergeInterval, ThreadPool.Names.SAME, new EngineMerger());
logger.debug("scheduling optimizer / merger every {}", mergeInterval);
} else {
logger.debug("scheduled optimizer / merger disabled");
}
}

private Query filterQueryIfNeeded(Query query, String[] types) {
Expand Down Expand Up @@ -1020,7 +1021,8 @@ private void reschedule() {
class EngineMerger implements Runnable {
@Override
public void run() {
if (!engine().possibleMergeNeeded()) {
final Engine engine = engine();
if (engine == null || engine.possibleMergeNeeded() == false) {
synchronized (mutex) {
if (state != IndexShardState.CLOSED) {
mergeScheduleFuture = threadPool.schedule(mergeInterval, ThreadPool.Names.SAME, this);
Expand Down

0 comments on commit 1590c9c

Please sign in to comment.