diff --git a/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java b/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java index 9a1f9d24a0938..8550a41efbde2 100644 --- a/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java +++ b/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java @@ -669,7 +669,15 @@ protected Searcher newSearcher(String source, IndexSearcher searcher, SearcherMa @Override public boolean refreshNeeded() { - return dirty; + try { + // we are either dirty due to a document added or due to a + // finished merge - either way we should refresh + return dirty || !searcherManager.isSearcherCurrent(); + } catch (IOException e) { + logger.error("failed to access searcher manager", e); + failEngine(e); + throw new EngineException(shardId, "failed to access searcher manager",e); + } } @Override @@ -701,7 +709,10 @@ public void refresh(Refresh refresh) throws EngineException { // maybeRefresh will only allow one refresh to execute, and the rest will "pass through", // but, we want to make sure not to loose ant refresh calls, if one is taking time synchronized (refreshMutex) { - if (dirty || refresh.force()) { + if (refreshNeeded() || refresh.force()) { + // we set dirty to false, even though the refresh hasn't happened yet + // as the refresh only holds for data indexed before it. Any data indexed during + // the refresh will not be part of it and will set the dirty flag back to true dirty = false; searcherManager.maybeRefresh(); } @@ -916,7 +927,7 @@ private void refreshVersioningTable(long time) { @Override public void maybeMerge() throws EngineException { - if (!possibleMergeNeeded) { + if (!possibleMergeNeeded()) { return; } possibleMergeNeeded = false; diff --git a/src/main/java/org/elasticsearch/index/merge/policy/LogByteSizeMergePolicyProvider.java b/src/main/java/org/elasticsearch/index/merge/policy/LogByteSizeMergePolicyProvider.java index 85f3d9a8d426b..6a79c9b3e5ee4 100644 --- a/src/main/java/org/elasticsearch/index/merge/policy/LogByteSizeMergePolicyProvider.java +++ b/src/main/java/org/elasticsearch/index/merge/policy/LogByteSizeMergePolicyProvider.java @@ -21,7 +21,6 @@ import org.apache.lucene.index.LogByteSizeMergePolicy; import org.apache.lucene.index.MergePolicy; -import org.apache.lucene.index.SegmentInfos; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.Preconditions; import org.elasticsearch.common.inject.Inject; @@ -31,7 +30,6 @@ import org.elasticsearch.index.settings.IndexSettingsService; import org.elasticsearch.index.store.Store; -import java.io.IOException; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; @@ -41,13 +39,14 @@ public class LogByteSizeMergePolicyProvider extends AbstractMergePolicyProvider { private final IndexSettingsService indexSettingsService; - + public static final String MAX_MERGE_BYTE_SIZE_KEY = "index.merge.policy.max_merge_sizes"; + public static final String MIN_MERGE_BYTE_SIZE_KEY = "index.merge.policy.min_merge_size"; + public static final String MERGE_FACTORY_KEY = "index.merge.policy.merge_factor"; private volatile ByteSizeValue minMergeSize; private volatile ByteSizeValue maxMergeSize; private volatile int mergeFactor; private volatile int maxMergeDocs; private final boolean calibrateSizeByDeletes; - private boolean asyncMerge; private final Set policies = new CopyOnWriteArraySet(); @@ -63,21 +62,15 @@ public LogByteSizeMergePolicyProvider(Store store, IndexSettingsService indexSet this.mergeFactor = componentSettings.getAsInt("merge_factor", LogByteSizeMergePolicy.DEFAULT_MERGE_FACTOR); this.maxMergeDocs = componentSettings.getAsInt("max_merge_docs", LogByteSizeMergePolicy.DEFAULT_MAX_MERGE_DOCS); this.calibrateSizeByDeletes = componentSettings.getAsBoolean("calibrate_size_by_deletes", true); - this.asyncMerge = indexSettings.getAsBoolean("index.merge.async", true); - logger.debug("using [log_bytes_size] merge policy with merge_factor[{}], min_merge_size[{}], max_merge_size[{}], max_merge_docs[{}], calibrate_size_by_deletes[{}], async_merge[{}]", - mergeFactor, minMergeSize, maxMergeSize, maxMergeDocs, calibrateSizeByDeletes, asyncMerge); + logger.debug("using [log_bytes_size] merge policy with merge_factor[{}], min_merge_size[{}], max_merge_size[{}], max_merge_docs[{}], calibrate_size_by_deletes[{}]", + mergeFactor, minMergeSize, maxMergeSize, maxMergeDocs, calibrateSizeByDeletes); indexSettingsService.addListener(applySettings); } @Override public LogByteSizeMergePolicy newMergePolicy() { - CustomLogByteSizeMergePolicy mergePolicy; - if (asyncMerge) { - mergePolicy = new EnableMergeLogByteSizeMergePolicy(this); - } else { - mergePolicy = new CustomLogByteSizeMergePolicy(this); - } + final CustomLogByteSizeMergePolicy mergePolicy = new CustomLogByteSizeMergePolicy(this); mergePolicy.setMinMergeMB(minMergeSize.mbFrac()); mergePolicy.setMaxMergeMB(maxMergeSize.mbFrac()); mergePolicy.setMergeFactor(mergeFactor); @@ -173,19 +166,4 @@ public MergePolicy clone() { } } - public static class EnableMergeLogByteSizeMergePolicy extends CustomLogByteSizeMergePolicy { - - public EnableMergeLogByteSizeMergePolicy(LogByteSizeMergePolicyProvider provider) { - super(provider); - } - - @Override - public MergeSpecification findMerges(MergeTrigger trigger, SegmentInfos infos) throws IOException { - // we don't enable merges while indexing documents, we do them in the background - if (trigger == MergeTrigger.SEGMENT_FLUSH) { - return null; - } - return super.findMerges(trigger, infos); - } - } } diff --git a/src/main/java/org/elasticsearch/index/merge/policy/LogDocMergePolicyProvider.java b/src/main/java/org/elasticsearch/index/merge/policy/LogDocMergePolicyProvider.java index 2e5396e978d4c..f3f8ab3f0a14d 100644 --- a/src/main/java/org/elasticsearch/index/merge/policy/LogDocMergePolicyProvider.java +++ b/src/main/java/org/elasticsearch/index/merge/policy/LogDocMergePolicyProvider.java @@ -20,8 +20,6 @@ package org.elasticsearch.index.merge.policy; import org.apache.lucene.index.LogDocMergePolicy; -import org.apache.lucene.index.MergePolicy; -import org.apache.lucene.index.SegmentInfos; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.Preconditions; import org.elasticsearch.common.inject.Inject; @@ -29,7 +27,6 @@ import org.elasticsearch.index.settings.IndexSettingsService; import org.elasticsearch.index.store.Store; -import java.io.IOException; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; @@ -39,12 +36,13 @@ public class LogDocMergePolicyProvider extends AbstractMergePolicyProvider { private final IndexSettingsService indexSettingsService; - + public static final String MAX_MERGE_DOCS_KEY = "index.merge.policy.max_merge_docs"; + public static final String MIN_MERGE_DOCS_KEY = "index.merge.policy.min_merge_docs"; + public static final String MERGE_FACTORY_KEY = "index.merge.policy.merge_factor"; private volatile int minMergeDocs; private volatile int maxMergeDocs; private volatile int mergeFactor; private final boolean calibrateSizeByDeletes; - private boolean asyncMerge; private final Set policies = new CopyOnWriteArraySet(); @@ -60,9 +58,8 @@ public LogDocMergePolicyProvider(Store store, IndexSettingsService indexSettings this.maxMergeDocs = componentSettings.getAsInt("max_merge_docs", LogDocMergePolicy.DEFAULT_MAX_MERGE_DOCS); this.mergeFactor = componentSettings.getAsInt("merge_factor", LogDocMergePolicy.DEFAULT_MERGE_FACTOR); this.calibrateSizeByDeletes = componentSettings.getAsBoolean("calibrate_size_by_deletes", true); - this.asyncMerge = indexSettings.getAsBoolean("index.merge.async", true); - logger.debug("using [log_doc] merge policy with merge_factor[{}], min_merge_docs[{}], max_merge_docs[{}], calibrate_size_by_deletes[{}], async_merge[{}]", - mergeFactor, minMergeDocs, maxMergeDocs, calibrateSizeByDeletes, asyncMerge); + logger.debug("using [log_doc] merge policy with merge_factor[{}], min_merge_docs[{}], max_merge_docs[{}], calibrate_size_by_deletes[{}]", + mergeFactor, minMergeDocs, maxMergeDocs, calibrateSizeByDeletes); indexSettingsService.addListener(applySettings); } @@ -74,12 +71,7 @@ public void close() throws ElasticsearchException { @Override public LogDocMergePolicy newMergePolicy() { - CustomLogDocMergePolicy mergePolicy; - if (asyncMerge) { - mergePolicy = new EnableMergeLogDocMergePolicy(this); - } else { - mergePolicy = new CustomLogDocMergePolicy(this); - } + final CustomLogDocMergePolicy mergePolicy = new CustomLogDocMergePolicy(this); mergePolicy.setMinMergeDocs(minMergeDocs); mergePolicy.setMaxMergeDocs(maxMergeDocs); mergePolicy.setMergeFactor(mergeFactor); @@ -150,27 +142,4 @@ public void close() { provider.policies.remove(this); } } - - public static class EnableMergeLogDocMergePolicy extends CustomLogDocMergePolicy { - - public EnableMergeLogDocMergePolicy(LogDocMergePolicyProvider provider) { - super(provider); - } - - @Override - public MergeSpecification findMerges(MergeTrigger trigger, SegmentInfos infos) throws IOException { - // we don't enable merges while indexing documents, we do them in the background - if (trigger == MergeTrigger.SEGMENT_FLUSH) { - return null; - } - return super.findMerges(trigger, infos); - } - - @Override - public MergePolicy clone() { - // Lucene IW makes a clone internally but since we hold on to this instance - // the clone will just be the identity. - return this; - } - } } diff --git a/src/main/java/org/elasticsearch/index/merge/policy/TieredMergePolicyProvider.java b/src/main/java/org/elasticsearch/index/merge/policy/TieredMergePolicyProvider.java index 7f9b1af7a12bb..7e4c68fd03f81 100644 --- a/src/main/java/org/elasticsearch/index/merge/policy/TieredMergePolicyProvider.java +++ b/src/main/java/org/elasticsearch/index/merge/policy/TieredMergePolicyProvider.java @@ -20,7 +20,6 @@ package org.elasticsearch.index.merge.policy; import org.apache.lucene.index.MergePolicy; -import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.index.TieredMergePolicy; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.inject.Inject; @@ -30,7 +29,6 @@ import org.elasticsearch.index.settings.IndexSettingsService; import org.elasticsearch.index.store.Store; -import java.io.IOException; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; @@ -47,7 +45,6 @@ public class TieredMergePolicyProvider extends AbstractMergePolicyProvider() { + @Override + public boolean apply(Object input) { + IndicesStatsResponse stats = client().admin().indices().prepareStats().setSegments(true).setMerge(true).get(); + logger.info("numshards {}, segments {}, total merges {}, current merge {}", numOfShards, stats.getPrimaries().getSegments().getCount(), stats.getPrimaries().getMerge().getTotal(), stats.getPrimaries().getMerge().getCurrent()); + long current = stats.getPrimaries().getMerge().getCurrent(); + long count = stats.getPrimaries().getSegments().getCount(); + return count < 50 && current == 0; + } + }); + IndicesStatsResponse stats = client().admin().indices().prepareStats().setSegments(true).setMerge(true).get(); + logger.info("numshards {}, segments {}, total merges {}, current merge {}", numOfShards, stats.getPrimaries().getSegments().getCount(), stats.getPrimaries().getMerge().getTotal(), stats.getPrimaries().getMerge().getCurrent()); + long count = stats.getPrimaries().getSegments().getCount(); + assertThat(count, Matchers.lessThanOrEqualTo(50l)); + } + +} diff --git a/src/test/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java b/src/test/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java index c03a1f3c36a5b..972abe1754531 100644 --- a/src/test/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java +++ b/src/test/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java @@ -29,6 +29,7 @@ import org.elasticsearch.action.ShardOperationFailedException; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequestBuilder; import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse; +import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.count.CountResponse; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.percolate.PercolateResponse; @@ -191,6 +192,12 @@ public static void assertFailures(SearchResponse searchResponse) { assertVersionSerializable(searchResponse); } + public static void assertNoFailures(BulkResponse response) { + assertThat("Unexpected ShardFailures: " + response.buildFailureMessage(), + response.hasFailures(), is(false)); + assertVersionSerializable(response); + } + public static void assertFailures(SearchRequestBuilder searchRequestBuilder, RestStatus restStatus, Matcher reasonMatcher) { //when the number for shards is randomized and we expect failures //we can either run into partial or total failures depending on the current number of shards @@ -480,4 +487,5 @@ public static void assertAllFilesClosed() throws IOException { MockDirectoryHelper.wrappers.clear(); } } + }