From 462e2c1a1e4ad3f7a8f5fc70d7ee722623e72657 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Fri, 11 Apr 2014 15:40:07 +0200 Subject: [PATCH] Ensure pending merges are updated on segment flushes Due to the default of `async_merge` to `true` we never run the merge policy on a segment flush which prevented the pending merges from being updated and that caused actual pending merges not to contribute to the merge decision. This commit also removes the `index.async.merge` setting is actually misleading since we take care of merges not being excecuted on the indexing threads on a different level (the merge scheduler) since 1.1. This commit also adds an additional check when to run a refresh since soely relying on the dirty flag might leave merges un-refreshed which can cause search slowdowns and higher memory consumption. Closes #5779 --- .../index/engine/internal/InternalEngine.java | 14 ++- .../LogByteSizeMergePolicyProvider.java | 34 ++----- .../policy/LogDocMergePolicyProvider.java | 43 ++------- .../policy/TieredMergePolicyProvider.java | 31 +----- .../InternalEngineIntegrationTest.java | 1 + .../internal/InternalEngineMergeTests.java | 94 +++++++++++++++++++ .../hamcrest/ElasticsearchAssertions.java | 8 ++ 7 files changed, 129 insertions(+), 96 deletions(-) create mode 100644 src/test/java/org/elasticsearch/index/engine/internal/InternalEngineMergeTests.java diff --git a/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java b/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java index 7fe8849ce27d4..d9cacd94f302c 100644 --- a/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java +++ b/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java @@ -674,7 +674,15 @@ protected Searcher newSearcher(String source, IndexSearcher searcher, SearcherMa @Override public boolean refreshNeeded() { - return dirty; + try { + // we are either dirty due to a document added or due to a + // finished merge - either way we should refresh + return dirty || !searcherManager.isSearcherCurrent(); + } catch (IOException e) { + logger.error("failed to access searcher manager", e); + failEngine(e); + throw new EngineException(shardId, "failed to access searcher manager",e); + } } @Override @@ -706,7 +714,7 @@ public void refresh(Refresh refresh) throws EngineException { // maybeRefresh will only allow one refresh to execute, and the rest will "pass through", // but, we want to make sure not to loose ant refresh calls, if one is taking time synchronized (refreshMutex) { - if (dirty || refresh.force()) { + if (refreshNeeded() || refresh.force()) { // we set dirty to false, even though the refresh hasn't happened yet // as the refresh only holds for data indexed before it. Any data indexed during // the refresh will not be part of it and will set the dirty flag back to true @@ -926,7 +934,7 @@ private void refreshVersioningTable(long time) { @Override public void maybeMerge() throws EngineException { - if (!possibleMergeNeeded) { + if (!possibleMergeNeeded()) { return; } possibleMergeNeeded = false; diff --git a/src/main/java/org/elasticsearch/index/merge/policy/LogByteSizeMergePolicyProvider.java b/src/main/java/org/elasticsearch/index/merge/policy/LogByteSizeMergePolicyProvider.java index a20c2ae61a614..c59669ae5382b 100644 --- a/src/main/java/org/elasticsearch/index/merge/policy/LogByteSizeMergePolicyProvider.java +++ b/src/main/java/org/elasticsearch/index/merge/policy/LogByteSizeMergePolicyProvider.java @@ -21,7 +21,6 @@ import org.apache.lucene.index.LogByteSizeMergePolicy; import org.apache.lucene.index.MergePolicy; -import org.apache.lucene.index.SegmentInfos; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.Preconditions; import org.elasticsearch.common.inject.Inject; @@ -31,7 +30,6 @@ import org.elasticsearch.index.settings.IndexSettingsService; import org.elasticsearch.index.store.Store; -import java.io.IOException; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; @@ -41,13 +39,14 @@ public class LogByteSizeMergePolicyProvider extends AbstractMergePolicyProvider { private final IndexSettingsService indexSettingsService; - + public static final String MAX_MERGE_BYTE_SIZE_KEY = "index.merge.policy.max_merge_sizes"; + public static final String MIN_MERGE_BYTE_SIZE_KEY = "index.merge.policy.min_merge_size"; + public static final String MERGE_FACTORY_KEY = "index.merge.policy.merge_factor"; private volatile ByteSizeValue minMergeSize; private volatile ByteSizeValue maxMergeSize; private volatile int mergeFactor; private volatile int maxMergeDocs; private final boolean calibrateSizeByDeletes; - private boolean asyncMerge; private final Set policies = new CopyOnWriteArraySet<>(); @@ -63,21 +62,15 @@ public LogByteSizeMergePolicyProvider(Store store, IndexSettingsService indexSet this.mergeFactor = componentSettings.getAsInt("merge_factor", LogByteSizeMergePolicy.DEFAULT_MERGE_FACTOR); this.maxMergeDocs = componentSettings.getAsInt("max_merge_docs", LogByteSizeMergePolicy.DEFAULT_MAX_MERGE_DOCS); this.calibrateSizeByDeletes = componentSettings.getAsBoolean("calibrate_size_by_deletes", true); - this.asyncMerge = indexSettings.getAsBoolean("index.merge.async", true); - logger.debug("using [log_bytes_size] merge policy with merge_factor[{}], min_merge_size[{}], max_merge_size[{}], max_merge_docs[{}], calibrate_size_by_deletes[{}], async_merge[{}]", - mergeFactor, minMergeSize, maxMergeSize, maxMergeDocs, calibrateSizeByDeletes, asyncMerge); + logger.debug("using [log_bytes_size] merge policy with merge_factor[{}], min_merge_size[{}], max_merge_size[{}], max_merge_docs[{}], calibrate_size_by_deletes[{}]", + mergeFactor, minMergeSize, maxMergeSize, maxMergeDocs, calibrateSizeByDeletes); indexSettingsService.addListener(applySettings); } @Override public LogByteSizeMergePolicy newMergePolicy() { - CustomLogByteSizeMergePolicy mergePolicy; - if (asyncMerge) { - mergePolicy = new EnableMergeLogByteSizeMergePolicy(this); - } else { - mergePolicy = new CustomLogByteSizeMergePolicy(this); - } + final CustomLogByteSizeMergePolicy mergePolicy = new CustomLogByteSizeMergePolicy(this); mergePolicy.setMinMergeMB(minMergeSize.mbFrac()); mergePolicy.setMaxMergeMB(maxMergeSize.mbFrac()); mergePolicy.setMergeFactor(mergeFactor); @@ -173,19 +166,4 @@ public MergePolicy clone() { } } - public static class EnableMergeLogByteSizeMergePolicy extends CustomLogByteSizeMergePolicy { - - public EnableMergeLogByteSizeMergePolicy(LogByteSizeMergePolicyProvider provider) { - super(provider); - } - - @Override - public MergeSpecification findMerges(MergeTrigger trigger, SegmentInfos infos) throws IOException { - // we don't enable merges while indexing documents, we do them in the background - if (trigger == MergeTrigger.SEGMENT_FLUSH) { - return null; - } - return super.findMerges(trigger, infos); - } - } } diff --git a/src/main/java/org/elasticsearch/index/merge/policy/LogDocMergePolicyProvider.java b/src/main/java/org/elasticsearch/index/merge/policy/LogDocMergePolicyProvider.java index bdb9ad4268f5d..94e8573459f51 100644 --- a/src/main/java/org/elasticsearch/index/merge/policy/LogDocMergePolicyProvider.java +++ b/src/main/java/org/elasticsearch/index/merge/policy/LogDocMergePolicyProvider.java @@ -20,8 +20,6 @@ package org.elasticsearch.index.merge.policy; import org.apache.lucene.index.LogDocMergePolicy; -import org.apache.lucene.index.MergePolicy; -import org.apache.lucene.index.SegmentInfos; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.Preconditions; import org.elasticsearch.common.inject.Inject; @@ -29,7 +27,6 @@ import org.elasticsearch.index.settings.IndexSettingsService; import org.elasticsearch.index.store.Store; -import java.io.IOException; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; @@ -39,12 +36,13 @@ public class LogDocMergePolicyProvider extends AbstractMergePolicyProvider { private final IndexSettingsService indexSettingsService; - + public static final String MAX_MERGE_DOCS_KEY = "index.merge.policy.max_merge_docs"; + public static final String MIN_MERGE_DOCS_KEY = "index.merge.policy.min_merge_docs"; + public static final String MERGE_FACTORY_KEY = "index.merge.policy.merge_factor"; private volatile int minMergeDocs; private volatile int maxMergeDocs; private volatile int mergeFactor; private final boolean calibrateSizeByDeletes; - private boolean asyncMerge; private final Set policies = new CopyOnWriteArraySet<>(); @@ -60,9 +58,8 @@ public LogDocMergePolicyProvider(Store store, IndexSettingsService indexSettings this.maxMergeDocs = componentSettings.getAsInt("max_merge_docs", LogDocMergePolicy.DEFAULT_MAX_MERGE_DOCS); this.mergeFactor = componentSettings.getAsInt("merge_factor", LogDocMergePolicy.DEFAULT_MERGE_FACTOR); this.calibrateSizeByDeletes = componentSettings.getAsBoolean("calibrate_size_by_deletes", true); - this.asyncMerge = indexSettings.getAsBoolean("index.merge.async", true); - logger.debug("using [log_doc] merge policy with merge_factor[{}], min_merge_docs[{}], max_merge_docs[{}], calibrate_size_by_deletes[{}], async_merge[{}]", - mergeFactor, minMergeDocs, maxMergeDocs, calibrateSizeByDeletes, asyncMerge); + logger.debug("using [log_doc] merge policy with merge_factor[{}], min_merge_docs[{}], max_merge_docs[{}], calibrate_size_by_deletes[{}]", + mergeFactor, minMergeDocs, maxMergeDocs, calibrateSizeByDeletes); indexSettingsService.addListener(applySettings); } @@ -74,12 +71,7 @@ public void close() throws ElasticsearchException { @Override public LogDocMergePolicy newMergePolicy() { - CustomLogDocMergePolicy mergePolicy; - if (asyncMerge) { - mergePolicy = new EnableMergeLogDocMergePolicy(this); - } else { - mergePolicy = new CustomLogDocMergePolicy(this); - } + final CustomLogDocMergePolicy mergePolicy = new CustomLogDocMergePolicy(this); mergePolicy.setMinMergeDocs(minMergeDocs); mergePolicy.setMaxMergeDocs(maxMergeDocs); mergePolicy.setMergeFactor(mergeFactor); @@ -150,27 +142,4 @@ public void close() { provider.policies.remove(this); } } - - public static class EnableMergeLogDocMergePolicy extends CustomLogDocMergePolicy { - - public EnableMergeLogDocMergePolicy(LogDocMergePolicyProvider provider) { - super(provider); - } - - @Override - public MergeSpecification findMerges(MergeTrigger trigger, SegmentInfos infos) throws IOException { - // we don't enable merges while indexing documents, we do them in the background - if (trigger == MergeTrigger.SEGMENT_FLUSH) { - return null; - } - return super.findMerges(trigger, infos); - } - - @Override - public MergePolicy clone() { - // Lucene IW makes a clone internally but since we hold on to this instance - // the clone will just be the identity. - return this; - } - } } diff --git a/src/main/java/org/elasticsearch/index/merge/policy/TieredMergePolicyProvider.java b/src/main/java/org/elasticsearch/index/merge/policy/TieredMergePolicyProvider.java index d68e7f5239512..06aef2548fddd 100644 --- a/src/main/java/org/elasticsearch/index/merge/policy/TieredMergePolicyProvider.java +++ b/src/main/java/org/elasticsearch/index/merge/policy/TieredMergePolicyProvider.java @@ -20,7 +20,6 @@ package org.elasticsearch.index.merge.policy; import org.apache.lucene.index.MergePolicy; -import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.index.TieredMergePolicy; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.inject.Inject; @@ -30,7 +29,6 @@ import org.elasticsearch.index.settings.IndexSettingsService; import org.elasticsearch.index.store.Store; -import java.io.IOException; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; @@ -47,7 +45,6 @@ public class TieredMergePolicyProvider extends AbstractMergePolicyProvider() { + @Override + public boolean apply(Object input) { + IndicesStatsResponse stats = client().admin().indices().prepareStats().setSegments(true).setMerge(true).get(); + logger.info("numshards {}, segments {}, total merges {}, current merge {}", numOfShards, stats.getPrimaries().getSegments().getCount(), stats.getPrimaries().getMerge().getTotal(), stats.getPrimaries().getMerge().getCurrent()); + long current = stats.getPrimaries().getMerge().getCurrent(); + long count = stats.getPrimaries().getSegments().getCount(); + return count < 50 && current == 0; + } + }); + IndicesStatsResponse stats = client().admin().indices().prepareStats().setSegments(true).setMerge(true).get(); + logger.info("numshards {}, segments {}, total merges {}, current merge {}", numOfShards, stats.getPrimaries().getSegments().getCount(), stats.getPrimaries().getMerge().getTotal(), stats.getPrimaries().getMerge().getCurrent()); + long count = stats.getPrimaries().getSegments().getCount(); + assertThat(count, Matchers.lessThanOrEqualTo(50l)); + } + +} diff --git a/src/test/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java b/src/test/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java index 2bf5257510420..4deb0e467dc41 100644 --- a/src/test/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java +++ b/src/test/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java @@ -32,6 +32,7 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequestBuilder; import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse; +import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.count.CountResponse; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.percolate.PercolateResponse; @@ -202,6 +203,12 @@ public static void assertFailures(SearchResponse searchResponse) { assertVersionSerializable(searchResponse); } + public static void assertNoFailures(BulkResponse response) { + assertThat("Unexpected ShardFailures: " + response.buildFailureMessage(), + response.hasFailures(), is(false)); + assertVersionSerializable(response); + } + public static void assertFailures(SearchRequestBuilder searchRequestBuilder, RestStatus restStatus, Matcher reasonMatcher) { //when the number for shards is randomized and we expect failures //we can either run into partial or total failures depending on the current number of shards @@ -513,4 +520,5 @@ public boolean apply(Object input) { MockDirectoryHelper.wrappers.clear(); } } + }