Skip to content

Commit

Permalink
Ensure pending merges are updated on segment flushes
Browse files Browse the repository at this point in the history
Due to the default of `async_merge` to `true` we never run
the merge policy on a segment flush which prevented the
pending merges from being updated and that caused actual
pending merges not to contribute to the merge decision.

This commit also removes the `index.async.merge` setting is actually
misleading since we take care of merges not being excecuted on the
indexing threads  on a different level (the merge scheduler) since 1.1.

This commit also adds an additional check when to run a refresh
since soely relying on the dirty flag might leave merges un-refreshed
which can cause search slowdowns and higher memory consumption.

Closes #5779
  • Loading branch information
s1monw committed Apr 12, 2014
1 parent b93c003 commit 462e2c1
Show file tree
Hide file tree
Showing 7 changed files with 129 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -674,7 +674,15 @@ protected Searcher newSearcher(String source, IndexSearcher searcher, SearcherMa

@Override
public boolean refreshNeeded() {
return dirty;
try {
// we are either dirty due to a document added or due to a
// finished merge - either way we should refresh
return dirty || !searcherManager.isSearcherCurrent();
} catch (IOException e) {
logger.error("failed to access searcher manager", e);
failEngine(e);
throw new EngineException(shardId, "failed to access searcher manager",e);
}
}

@Override
Expand Down Expand Up @@ -706,7 +714,7 @@ public void refresh(Refresh refresh) throws EngineException {
// maybeRefresh will only allow one refresh to execute, and the rest will "pass through",
// but, we want to make sure not to loose ant refresh calls, if one is taking time
synchronized (refreshMutex) {
if (dirty || refresh.force()) {
if (refreshNeeded() || refresh.force()) {
// we set dirty to false, even though the refresh hasn't happened yet
// as the refresh only holds for data indexed before it. Any data indexed during
// the refresh will not be part of it and will set the dirty flag back to true
Expand Down Expand Up @@ -926,7 +934,7 @@ private void refreshVersioningTable(long time) {

@Override
public void maybeMerge() throws EngineException {
if (!possibleMergeNeeded) {
if (!possibleMergeNeeded()) {
return;
}
possibleMergeNeeded = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import org.apache.lucene.index.LogByteSizeMergePolicy;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.SegmentInfos;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.Preconditions;
import org.elasticsearch.common.inject.Inject;
Expand All @@ -31,7 +30,6 @@
import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.store.Store;

import java.io.IOException;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;

Expand All @@ -41,13 +39,14 @@
public class LogByteSizeMergePolicyProvider extends AbstractMergePolicyProvider<LogByteSizeMergePolicy> {

private final IndexSettingsService indexSettingsService;

public static final String MAX_MERGE_BYTE_SIZE_KEY = "index.merge.policy.max_merge_sizes";
public static final String MIN_MERGE_BYTE_SIZE_KEY = "index.merge.policy.min_merge_size";
public static final String MERGE_FACTORY_KEY = "index.merge.policy.merge_factor";
private volatile ByteSizeValue minMergeSize;
private volatile ByteSizeValue maxMergeSize;
private volatile int mergeFactor;
private volatile int maxMergeDocs;
private final boolean calibrateSizeByDeletes;
private boolean asyncMerge;

private final Set<CustomLogByteSizeMergePolicy> policies = new CopyOnWriteArraySet<>();

Expand All @@ -63,21 +62,15 @@ public LogByteSizeMergePolicyProvider(Store store, IndexSettingsService indexSet
this.mergeFactor = componentSettings.getAsInt("merge_factor", LogByteSizeMergePolicy.DEFAULT_MERGE_FACTOR);
this.maxMergeDocs = componentSettings.getAsInt("max_merge_docs", LogByteSizeMergePolicy.DEFAULT_MAX_MERGE_DOCS);
this.calibrateSizeByDeletes = componentSettings.getAsBoolean("calibrate_size_by_deletes", true);
this.asyncMerge = indexSettings.getAsBoolean("index.merge.async", true);
logger.debug("using [log_bytes_size] merge policy with merge_factor[{}], min_merge_size[{}], max_merge_size[{}], max_merge_docs[{}], calibrate_size_by_deletes[{}], async_merge[{}]",
mergeFactor, minMergeSize, maxMergeSize, maxMergeDocs, calibrateSizeByDeletes, asyncMerge);
logger.debug("using [log_bytes_size] merge policy with merge_factor[{}], min_merge_size[{}], max_merge_size[{}], max_merge_docs[{}], calibrate_size_by_deletes[{}]",
mergeFactor, minMergeSize, maxMergeSize, maxMergeDocs, calibrateSizeByDeletes);

indexSettingsService.addListener(applySettings);
}

@Override
public LogByteSizeMergePolicy newMergePolicy() {
CustomLogByteSizeMergePolicy mergePolicy;
if (asyncMerge) {
mergePolicy = new EnableMergeLogByteSizeMergePolicy(this);
} else {
mergePolicy = new CustomLogByteSizeMergePolicy(this);
}
final CustomLogByteSizeMergePolicy mergePolicy = new CustomLogByteSizeMergePolicy(this);
mergePolicy.setMinMergeMB(minMergeSize.mbFrac());
mergePolicy.setMaxMergeMB(maxMergeSize.mbFrac());
mergePolicy.setMergeFactor(mergeFactor);
Expand Down Expand Up @@ -173,19 +166,4 @@ public MergePolicy clone() {
}
}

public static class EnableMergeLogByteSizeMergePolicy extends CustomLogByteSizeMergePolicy {

public EnableMergeLogByteSizeMergePolicy(LogByteSizeMergePolicyProvider provider) {
super(provider);
}

@Override
public MergeSpecification findMerges(MergeTrigger trigger, SegmentInfos infos) throws IOException {
// we don't enable merges while indexing documents, we do them in the background
if (trigger == MergeTrigger.SEGMENT_FLUSH) {
return null;
}
return super.findMerges(trigger, infos);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,13 @@
package org.elasticsearch.index.merge.policy;

import org.apache.lucene.index.LogDocMergePolicy;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.SegmentInfos;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.Preconditions;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.store.Store;

import java.io.IOException;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;

Expand All @@ -39,12 +36,13 @@
public class LogDocMergePolicyProvider extends AbstractMergePolicyProvider<LogDocMergePolicy> {

private final IndexSettingsService indexSettingsService;

public static final String MAX_MERGE_DOCS_KEY = "index.merge.policy.max_merge_docs";
public static final String MIN_MERGE_DOCS_KEY = "index.merge.policy.min_merge_docs";
public static final String MERGE_FACTORY_KEY = "index.merge.policy.merge_factor";
private volatile int minMergeDocs;
private volatile int maxMergeDocs;
private volatile int mergeFactor;
private final boolean calibrateSizeByDeletes;
private boolean asyncMerge;

private final Set<CustomLogDocMergePolicy> policies = new CopyOnWriteArraySet<>();

Expand All @@ -60,9 +58,8 @@ public LogDocMergePolicyProvider(Store store, IndexSettingsService indexSettings
this.maxMergeDocs = componentSettings.getAsInt("max_merge_docs", LogDocMergePolicy.DEFAULT_MAX_MERGE_DOCS);
this.mergeFactor = componentSettings.getAsInt("merge_factor", LogDocMergePolicy.DEFAULT_MERGE_FACTOR);
this.calibrateSizeByDeletes = componentSettings.getAsBoolean("calibrate_size_by_deletes", true);
this.asyncMerge = indexSettings.getAsBoolean("index.merge.async", true);
logger.debug("using [log_doc] merge policy with merge_factor[{}], min_merge_docs[{}], max_merge_docs[{}], calibrate_size_by_deletes[{}], async_merge[{}]",
mergeFactor, minMergeDocs, maxMergeDocs, calibrateSizeByDeletes, asyncMerge);
logger.debug("using [log_doc] merge policy with merge_factor[{}], min_merge_docs[{}], max_merge_docs[{}], calibrate_size_by_deletes[{}]",
mergeFactor, minMergeDocs, maxMergeDocs, calibrateSizeByDeletes);

indexSettingsService.addListener(applySettings);
}
Expand All @@ -74,12 +71,7 @@ public void close() throws ElasticsearchException {

@Override
public LogDocMergePolicy newMergePolicy() {
CustomLogDocMergePolicy mergePolicy;
if (asyncMerge) {
mergePolicy = new EnableMergeLogDocMergePolicy(this);
} else {
mergePolicy = new CustomLogDocMergePolicy(this);
}
final CustomLogDocMergePolicy mergePolicy = new CustomLogDocMergePolicy(this);
mergePolicy.setMinMergeDocs(minMergeDocs);
mergePolicy.setMaxMergeDocs(maxMergeDocs);
mergePolicy.setMergeFactor(mergeFactor);
Expand Down Expand Up @@ -150,27 +142,4 @@ public void close() {
provider.policies.remove(this);
}
}

public static class EnableMergeLogDocMergePolicy extends CustomLogDocMergePolicy {

public EnableMergeLogDocMergePolicy(LogDocMergePolicyProvider provider) {
super(provider);
}

@Override
public MergeSpecification findMerges(MergeTrigger trigger, SegmentInfos infos) throws IOException {
// we don't enable merges while indexing documents, we do them in the background
if (trigger == MergeTrigger.SEGMENT_FLUSH) {
return null;
}
return super.findMerges(trigger, infos);
}

@Override
public MergePolicy clone() {
// Lucene IW makes a clone internally but since we hold on to this instance
// the clone will just be the identity.
return this;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.elasticsearch.index.merge.policy;

import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.TieredMergePolicy;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.inject.Inject;
Expand All @@ -30,7 +29,6 @@
import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.store.Store;

import java.io.IOException;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;

Expand All @@ -47,7 +45,6 @@ public class TieredMergePolicyProvider extends AbstractMergePolicyProvider<Tiere
private volatile ByteSizeValue maxMergedSegment;
private volatile double segmentsPerTier;
private volatile double reclaimDeletesWeight;
private boolean asyncMerge;

private final ApplySettings applySettings = new ApplySettings();

Expand All @@ -57,7 +54,6 @@ public class TieredMergePolicyProvider extends AbstractMergePolicyProvider<Tiere
public TieredMergePolicyProvider(Store store, IndexSettingsService indexSettingsService) {
super(store);
this.indexSettingsService = indexSettingsService;
this.asyncMerge = indexSettings.getAsBoolean("index.merge.async", true);
this.forceMergeDeletesPctAllowed = componentSettings.getAsDouble("expunge_deletes_allowed", 10d); // percentage
this.floorSegment = componentSettings.getAsBytesSize("floor_segment", new ByteSizeValue(2, ByteSizeUnit.MB));
this.maxMergeAtOnce = componentSettings.getAsInt("max_merge_at_once", 10);
Expand All @@ -69,8 +65,8 @@ public TieredMergePolicyProvider(Store store, IndexSettingsService indexSettings

fixSettingsIfNeeded();

logger.debug("using [tiered] merge policy with expunge_deletes_allowed[{}], floor_segment[{}], max_merge_at_once[{}], max_merge_at_once_explicit[{}], max_merged_segment[{}], segments_per_tier[{}], reclaim_deletes_weight[{}], async_merge[{}]",
forceMergeDeletesPctAllowed, floorSegment, maxMergeAtOnce, maxMergeAtOnceExplicit, maxMergedSegment, segmentsPerTier, reclaimDeletesWeight, asyncMerge);
logger.debug("using [tiered] merge policy with expunge_deletes_allowed[{}], floor_segment[{}], max_merge_at_once[{}], max_merge_at_once_explicit[{}], max_merged_segment[{}], segments_per_tier[{}], reclaim_deletes_weight[{}]",
forceMergeDeletesPctAllowed, floorSegment, maxMergeAtOnce, maxMergeAtOnceExplicit, maxMergedSegment, segmentsPerTier, reclaimDeletesWeight);

indexSettingsService.addListener(applySettings);
}
Expand All @@ -91,12 +87,7 @@ private void fixSettingsIfNeeded() {

@Override
public TieredMergePolicy newMergePolicy() {
CustomTieredMergePolicyProvider mergePolicy;
if (asyncMerge) {
mergePolicy = new EnableMergeTieredMergePolicyProvider(this);
} else {
mergePolicy = new CustomTieredMergePolicyProvider(this);
}
final CustomTieredMergePolicyProvider mergePolicy = new CustomTieredMergePolicyProvider(this);
mergePolicy.setNoCFSRatio(noCFSRatio);
mergePolicy.setForceMergeDeletesPctAllowed(forceMergeDeletesPctAllowed);
mergePolicy.setFloorSegmentMB(floorSegment.mbFrac());
Expand Down Expand Up @@ -222,20 +213,4 @@ public MergePolicy clone() {
return this;
}
}

public static class EnableMergeTieredMergePolicyProvider extends CustomTieredMergePolicyProvider {

public EnableMergeTieredMergePolicyProvider(TieredMergePolicyProvider provider) {
super(provider);
}

@Override
public MergePolicy.MergeSpecification findMerges(MergeTrigger trigger, SegmentInfos infos) throws IOException {
// we don't enable merges while indexing documents, we do them in the background
if (trigger == MergeTrigger.SEGMENT_FLUSH) {
return null;
}
return super.findMerges(trigger, infos);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -152,4 +152,5 @@ private void assertTotalCompoundSegments(int i, int t, String index) {
assertThat(total, Matchers.equalTo(t));

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.engine.internal;

import com.carrotsearch.randomizedtesting.annotations.Nightly;
import com.carrotsearch.randomizedtesting.annotations.Seed;
import com.google.common.base.Predicate;
import org.apache.lucene.index.LogByteSizeMergePolicy;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.index.merge.policy.LogDocMergePolicyProvider;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.hamcrest.Matchers;
import org.junit.Test;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;

/**
*/
@ElasticsearchIntegrationTest.ClusterScope(numNodes = 1, scope = ElasticsearchIntegrationTest.Scope.SUITE)
public class InternalEngineMergeTests extends ElasticsearchIntegrationTest {

@Test
@LuceneTestCase.Slow
public void testMergesHappening() throws InterruptedException, IOException, ExecutionException {
final int numOfShards = 5;
// some settings to keep num segments low
assertAcked(prepareCreate("test").setSettings(ImmutableSettings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, numOfShards)
.put(LogDocMergePolicyProvider.MIN_MERGE_DOCS_KEY, 10)
.put(LogDocMergePolicyProvider.MERGE_FACTORY_KEY, 5)
.put(LogByteSizeMergePolicy.DEFAULT_MIN_MERGE_MB, 0.5)
.build()));
long id = 0;
final int rounds = scaledRandomIntBetween(50, 300);
logger.info("Starting rounds [{}] ", rounds);
for (int i = 0; i < rounds; ++i) {
final int numDocs = scaledRandomIntBetween(100, 1000);
BulkRequestBuilder request = client().prepareBulk();
for (int j = 0; j < numDocs; ++j) {
request.add(Requests.indexRequest("test").type("type1").id(Long.toString(id++)).source(jsonBuilder().startObject().field("l", randomLong()).endObject()));
}
BulkResponse response = request.execute().actionGet();
refresh();
assertNoFailures(response);
IndicesStatsResponse stats = client().admin().indices().prepareStats("test").setSegments(true).setMerge(true).get();
logger.info("index round [{}] - segments {}, total merges {}, current merge {}", i, stats.getPrimaries().getSegments().getCount(), stats.getPrimaries().getMerge().getTotal(), stats.getPrimaries().getMerge().getCurrent());
}
awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object input) {
IndicesStatsResponse stats = client().admin().indices().prepareStats().setSegments(true).setMerge(true).get();
logger.info("numshards {}, segments {}, total merges {}, current merge {}", numOfShards, stats.getPrimaries().getSegments().getCount(), stats.getPrimaries().getMerge().getTotal(), stats.getPrimaries().getMerge().getCurrent());
long current = stats.getPrimaries().getMerge().getCurrent();
long count = stats.getPrimaries().getSegments().getCount();
return count < 50 && current == 0;
}
});
IndicesStatsResponse stats = client().admin().indices().prepareStats().setSegments(true).setMerge(true).get();
logger.info("numshards {}, segments {}, total merges {}, current merge {}", numOfShards, stats.getPrimaries().getSegments().getCount(), stats.getPrimaries().getMerge().getTotal(), stats.getPrimaries().getMerge().getCurrent());
long count = stats.getPrimaries().getSegments().getCount();
assertThat(count, Matchers.lessThanOrEqualTo(50l));
}

}
Loading

0 comments on commit 462e2c1

Please sign in to comment.