Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure pending merges are updated on segment flushes #5780

Merged
merged 2 commits into from
Apr 12, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -674,7 +674,15 @@ protected Searcher newSearcher(String source, IndexSearcher searcher, SearcherMa

@Override
public boolean refreshNeeded() {
return dirty;
try {
// we are either dirty due to a document added or due to a
// finished merge - either way we should refresh
return dirty || !searcherManager.isSearcherCurrent();
} catch (IOException e) {
logger.error("failed to access searcher manager", e);
failEngine(e);
throw new EngineException(shardId, "failed to access searcher manager",e);
}
}

@Override
Expand Down Expand Up @@ -706,7 +714,7 @@ public void refresh(Refresh refresh) throws EngineException {
// maybeRefresh will only allow one refresh to execute, and the rest will "pass through",
// but, we want to make sure not to loose ant refresh calls, if one is taking time
synchronized (refreshMutex) {
if (dirty || refresh.force()) {
if (refreshNeeded() || refresh.force()) {
// we set dirty to false, even though the refresh hasn't happened yet
// as the refresh only holds for data indexed before it. Any data indexed during
// the refresh will not be part of it and will set the dirty flag back to true
Expand Down Expand Up @@ -926,7 +934,7 @@ private void refreshVersioningTable(long time) {

@Override
public void maybeMerge() throws EngineException {
if (!possibleMergeNeeded) {
if (!possibleMergeNeeded()) {
return;
}
possibleMergeNeeded = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import org.apache.lucene.index.LogByteSizeMergePolicy;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.SegmentInfos;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.Preconditions;
import org.elasticsearch.common.inject.Inject;
Expand All @@ -31,7 +30,6 @@
import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.store.Store;

import java.io.IOException;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;

Expand All @@ -41,13 +39,14 @@
public class LogByteSizeMergePolicyProvider extends AbstractMergePolicyProvider<LogByteSizeMergePolicy> {

private final IndexSettingsService indexSettingsService;

public static final String MAX_MERGE_BYTE_SIZE_KEY = "index.merge.policy.max_merge_sizes";
public static final String MIN_MERGE_BYTE_SIZE_KEY = "index.merge.policy.min_merge_size";
public static final String MERGE_FACTORY_KEY = "index.merge.policy.merge_factor";
private volatile ByteSizeValue minMergeSize;
private volatile ByteSizeValue maxMergeSize;
private volatile int mergeFactor;
private volatile int maxMergeDocs;
private final boolean calibrateSizeByDeletes;
private boolean asyncMerge;

private final Set<CustomLogByteSizeMergePolicy> policies = new CopyOnWriteArraySet<>();

Expand All @@ -63,21 +62,15 @@ public LogByteSizeMergePolicyProvider(Store store, IndexSettingsService indexSet
this.mergeFactor = componentSettings.getAsInt("merge_factor", LogByteSizeMergePolicy.DEFAULT_MERGE_FACTOR);
this.maxMergeDocs = componentSettings.getAsInt("max_merge_docs", LogByteSizeMergePolicy.DEFAULT_MAX_MERGE_DOCS);
this.calibrateSizeByDeletes = componentSettings.getAsBoolean("calibrate_size_by_deletes", true);
this.asyncMerge = indexSettings.getAsBoolean("index.merge.async", true);
logger.debug("using [log_bytes_size] merge policy with merge_factor[{}], min_merge_size[{}], max_merge_size[{}], max_merge_docs[{}], calibrate_size_by_deletes[{}], async_merge[{}]",
mergeFactor, minMergeSize, maxMergeSize, maxMergeDocs, calibrateSizeByDeletes, asyncMerge);
logger.debug("using [log_bytes_size] merge policy with merge_factor[{}], min_merge_size[{}], max_merge_size[{}], max_merge_docs[{}], calibrate_size_by_deletes[{}]",
mergeFactor, minMergeSize, maxMergeSize, maxMergeDocs, calibrateSizeByDeletes);

indexSettingsService.addListener(applySettings);
}

@Override
public LogByteSizeMergePolicy newMergePolicy() {
CustomLogByteSizeMergePolicy mergePolicy;
if (asyncMerge) {
mergePolicy = new EnableMergeLogByteSizeMergePolicy(this);
} else {
mergePolicy = new CustomLogByteSizeMergePolicy(this);
}
final CustomLogByteSizeMergePolicy mergePolicy = new CustomLogByteSizeMergePolicy(this);
mergePolicy.setMinMergeMB(minMergeSize.mbFrac());
mergePolicy.setMaxMergeMB(maxMergeSize.mbFrac());
mergePolicy.setMergeFactor(mergeFactor);
Expand Down Expand Up @@ -173,19 +166,4 @@ public MergePolicy clone() {
}
}

public static class EnableMergeLogByteSizeMergePolicy extends CustomLogByteSizeMergePolicy {

public EnableMergeLogByteSizeMergePolicy(LogByteSizeMergePolicyProvider provider) {
super(provider);
}

@Override
public MergeSpecification findMerges(MergeTrigger trigger, SegmentInfos infos) throws IOException {
// we don't enable merges while indexing documents, we do them in the background
if (trigger == MergeTrigger.SEGMENT_FLUSH) {
return null;
}
return super.findMerges(trigger, infos);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,13 @@
package org.elasticsearch.index.merge.policy;

import org.apache.lucene.index.LogDocMergePolicy;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.SegmentInfos;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.Preconditions;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.store.Store;

import java.io.IOException;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;

Expand All @@ -39,12 +36,13 @@
public class LogDocMergePolicyProvider extends AbstractMergePolicyProvider<LogDocMergePolicy> {

private final IndexSettingsService indexSettingsService;

public static final String MAX_MERGE_DOCS_KEY = "index.merge.policy.max_merge_docs";
public static final String MIN_MERGE_DOCS_KEY = "index.merge.policy.min_merge_docs";
public static final String MERGE_FACTORY_KEY = "index.merge.policy.merge_factor";
private volatile int minMergeDocs;
private volatile int maxMergeDocs;
private volatile int mergeFactor;
private final boolean calibrateSizeByDeletes;
private boolean asyncMerge;

private final Set<CustomLogDocMergePolicy> policies = new CopyOnWriteArraySet<>();

Expand All @@ -60,9 +58,8 @@ public LogDocMergePolicyProvider(Store store, IndexSettingsService indexSettings
this.maxMergeDocs = componentSettings.getAsInt("max_merge_docs", LogDocMergePolicy.DEFAULT_MAX_MERGE_DOCS);
this.mergeFactor = componentSettings.getAsInt("merge_factor", LogDocMergePolicy.DEFAULT_MERGE_FACTOR);
this.calibrateSizeByDeletes = componentSettings.getAsBoolean("calibrate_size_by_deletes", true);
this.asyncMerge = indexSettings.getAsBoolean("index.merge.async", true);
logger.debug("using [log_doc] merge policy with merge_factor[{}], min_merge_docs[{}], max_merge_docs[{}], calibrate_size_by_deletes[{}], async_merge[{}]",
mergeFactor, minMergeDocs, maxMergeDocs, calibrateSizeByDeletes, asyncMerge);
logger.debug("using [log_doc] merge policy with merge_factor[{}], min_merge_docs[{}], max_merge_docs[{}], calibrate_size_by_deletes[{}]",
mergeFactor, minMergeDocs, maxMergeDocs, calibrateSizeByDeletes);

indexSettingsService.addListener(applySettings);
}
Expand All @@ -74,12 +71,7 @@ public void close() throws ElasticsearchException {

@Override
public LogDocMergePolicy newMergePolicy() {
CustomLogDocMergePolicy mergePolicy;
if (asyncMerge) {
mergePolicy = new EnableMergeLogDocMergePolicy(this);
} else {
mergePolicy = new CustomLogDocMergePolicy(this);
}
final CustomLogDocMergePolicy mergePolicy = new CustomLogDocMergePolicy(this);
mergePolicy.setMinMergeDocs(minMergeDocs);
mergePolicy.setMaxMergeDocs(maxMergeDocs);
mergePolicy.setMergeFactor(mergeFactor);
Expand Down Expand Up @@ -150,27 +142,4 @@ public void close() {
provider.policies.remove(this);
}
}

public static class EnableMergeLogDocMergePolicy extends CustomLogDocMergePolicy {

public EnableMergeLogDocMergePolicy(LogDocMergePolicyProvider provider) {
super(provider);
}

@Override
public MergeSpecification findMerges(MergeTrigger trigger, SegmentInfos infos) throws IOException {
// we don't enable merges while indexing documents, we do them in the background
if (trigger == MergeTrigger.SEGMENT_FLUSH) {
return null;
}
return super.findMerges(trigger, infos);
}

@Override
public MergePolicy clone() {
// Lucene IW makes a clone internally but since we hold on to this instance
// the clone will just be the identity.
return this;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.elasticsearch.index.merge.policy;

import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.TieredMergePolicy;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.inject.Inject;
Expand All @@ -30,7 +29,6 @@
import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.store.Store;

import java.io.IOException;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;

Expand All @@ -47,7 +45,6 @@ public class TieredMergePolicyProvider extends AbstractMergePolicyProvider<Tiere
private volatile ByteSizeValue maxMergedSegment;
private volatile double segmentsPerTier;
private volatile double reclaimDeletesWeight;
private boolean asyncMerge;

private final ApplySettings applySettings = new ApplySettings();

Expand All @@ -57,7 +54,6 @@ public class TieredMergePolicyProvider extends AbstractMergePolicyProvider<Tiere
public TieredMergePolicyProvider(Store store, IndexSettingsService indexSettingsService) {
super(store);
this.indexSettingsService = indexSettingsService;
this.asyncMerge = indexSettings.getAsBoolean("index.merge.async", true);
this.forceMergeDeletesPctAllowed = componentSettings.getAsDouble("expunge_deletes_allowed", 10d); // percentage
this.floorSegment = componentSettings.getAsBytesSize("floor_segment", new ByteSizeValue(2, ByteSizeUnit.MB));
this.maxMergeAtOnce = componentSettings.getAsInt("max_merge_at_once", 10);
Expand All @@ -69,8 +65,8 @@ public TieredMergePolicyProvider(Store store, IndexSettingsService indexSettings

fixSettingsIfNeeded();

logger.debug("using [tiered] merge policy with expunge_deletes_allowed[{}], floor_segment[{}], max_merge_at_once[{}], max_merge_at_once_explicit[{}], max_merged_segment[{}], segments_per_tier[{}], reclaim_deletes_weight[{}], async_merge[{}]",
forceMergeDeletesPctAllowed, floorSegment, maxMergeAtOnce, maxMergeAtOnceExplicit, maxMergedSegment, segmentsPerTier, reclaimDeletesWeight, asyncMerge);
logger.debug("using [tiered] merge policy with expunge_deletes_allowed[{}], floor_segment[{}], max_merge_at_once[{}], max_merge_at_once_explicit[{}], max_merged_segment[{}], segments_per_tier[{}], reclaim_deletes_weight[{}]",
forceMergeDeletesPctAllowed, floorSegment, maxMergeAtOnce, maxMergeAtOnceExplicit, maxMergedSegment, segmentsPerTier, reclaimDeletesWeight);

indexSettingsService.addListener(applySettings);
}
Expand All @@ -91,12 +87,7 @@ private void fixSettingsIfNeeded() {

@Override
public TieredMergePolicy newMergePolicy() {
CustomTieredMergePolicyProvider mergePolicy;
if (asyncMerge) {
mergePolicy = new EnableMergeTieredMergePolicyProvider(this);
} else {
mergePolicy = new CustomTieredMergePolicyProvider(this);
}
final CustomTieredMergePolicyProvider mergePolicy = new CustomTieredMergePolicyProvider(this);
mergePolicy.setNoCFSRatio(noCFSRatio);
mergePolicy.setForceMergeDeletesPctAllowed(forceMergeDeletesPctAllowed);
mergePolicy.setFloorSegmentMB(floorSegment.mbFrac());
Expand Down Expand Up @@ -222,20 +213,4 @@ public MergePolicy clone() {
return this;
}
}

public static class EnableMergeTieredMergePolicyProvider extends CustomTieredMergePolicyProvider {

public EnableMergeTieredMergePolicyProvider(TieredMergePolicyProvider provider) {
super(provider);
}

@Override
public MergePolicy.MergeSpecification findMerges(MergeTrigger trigger, SegmentInfos infos) throws IOException {
// we don't enable merges while indexing documents, we do them in the background
if (trigger == MergeTrigger.SEGMENT_FLUSH) {
return null;
}
return super.findMerges(trigger, infos);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -152,4 +152,5 @@ private void assertTotalCompoundSegments(int i, int t, String index) {
assertThat(total, Matchers.equalTo(t));

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.engine.internal;

import com.carrotsearch.randomizedtesting.annotations.Nightly;
import com.carrotsearch.randomizedtesting.annotations.Seed;
import com.google.common.base.Predicate;
import org.apache.lucene.index.LogByteSizeMergePolicy;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.index.merge.policy.LogDocMergePolicyProvider;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.hamcrest.Matchers;
import org.junit.Test;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;

/**
*/
@ElasticsearchIntegrationTest.ClusterScope(numNodes = 1, scope = ElasticsearchIntegrationTest.Scope.SUITE)
public class InternalEngineMergeTests extends ElasticsearchIntegrationTest {

@Test
@LuceneTestCase.Slow
public void testMergesHappening() throws InterruptedException, IOException, ExecutionException {
final int numOfShards = 5;
// some settings to keep num segments low
assertAcked(prepareCreate("test").setSettings(ImmutableSettings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, numOfShards)
.put(LogDocMergePolicyProvider.MIN_MERGE_DOCS_KEY, 10)
.put(LogDocMergePolicyProvider.MERGE_FACTORY_KEY, 5)
.put(LogByteSizeMergePolicy.DEFAULT_MIN_MERGE_MB, 0.5)
.build()));
long id = 0;
final int rounds = scaledRandomIntBetween(50, 300);
logger.info("Starting rounds [{}] ", rounds);
for (int i = 0; i < rounds; ++i) {
final int numDocs = scaledRandomIntBetween(100, 1000);
BulkRequestBuilder request = client().prepareBulk();
for (int j = 0; j < numDocs; ++j) {
request.add(Requests.indexRequest("test").type("type1").id(Long.toString(id++)).source(jsonBuilder().startObject().field("l", randomLong()).endObject()));
}
BulkResponse response = request.execute().actionGet();
refresh();
assertNoFailures(response);
IndicesStatsResponse stats = client().admin().indices().prepareStats("test").setSegments(true).setMerge(true).get();
logger.info("index round [{}] - segments {}, total merges {}, current merge {}", i, stats.getPrimaries().getSegments().getCount(), stats.getPrimaries().getMerge().getTotal(), stats.getPrimaries().getMerge().getCurrent());
}
awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object input) {
IndicesStatsResponse stats = client().admin().indices().prepareStats().setSegments(true).setMerge(true).get();
logger.info("numshards {}, segments {}, total merges {}, current merge {}", numOfShards, stats.getPrimaries().getSegments().getCount(), stats.getPrimaries().getMerge().getTotal(), stats.getPrimaries().getMerge().getCurrent());
long current = stats.getPrimaries().getMerge().getCurrent();
long count = stats.getPrimaries().getSegments().getCount();
return count < 50 && current == 0;
}
});
IndicesStatsResponse stats = client().admin().indices().prepareStats().setSegments(true).setMerge(true).get();
logger.info("numshards {}, segments {}, total merges {}, current merge {}", numOfShards, stats.getPrimaries().getSegments().getCount(), stats.getPrimaries().getMerge().getTotal(), stats.getPrimaries().getMerge().getCurrent());
long count = stats.getPrimaries().getSegments().getCount();
assertThat(count, Matchers.lessThanOrEqualTo(50l));
}

}
Loading