Skip to content

Commit

Permalink
Flush engine after big merge (elastic#46066) (elastic#46111)
Browse files Browse the repository at this point in the history
Today we might carry on a big merge uncommitted and therefore
occupy a significant amount of diskspace for quite a long time
if for instance indexing load goes down and we are not quickly
reaching the translog size threshold. This change will cause a
flush if we hit a significant merge (512MB by default) which
frees diskspace sooner.
  • Loading branch information
s1monw authored Aug 29, 2019
1 parent f7a9123 commit 9b2ea07
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING,
EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE_SETTING,
EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE_SETTING,
IndexSettings.INDEX_FLUSH_AFTER_MERGE_THRESHOLD_SIZE_SETTING,
IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING,
IndexSettings.INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING,
IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING,
Expand Down
20 changes: 20 additions & 0 deletions server/src/main/java/org/elasticsearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,14 @@ public final class IndexSettings {
new ByteSizeValue(Long.MAX_VALUE, ByteSizeUnit.BYTES),
Property.Dynamic, Property.IndexScope);

/**
* The minimum size of a merge that triggers a flush in order to free resources
*/
public static final Setting<ByteSizeValue> INDEX_FLUSH_AFTER_MERGE_THRESHOLD_SIZE_SETTING =
Setting.byteSizeSetting("index.flush_after_merge", new ByteSizeValue(512, ByteSizeUnit.MB),
new ByteSizeValue(0, ByteSizeUnit.BYTES), // always flush after merge
new ByteSizeValue(Long.MAX_VALUE, ByteSizeUnit.BYTES), // never flush after merge
Property.Dynamic, Property.IndexScope);
/**
* The maximum size of a translog generation. This is independent of the maximum size of
* translog operations that have not been flushed.
Expand Down Expand Up @@ -338,6 +346,7 @@ public final class IndexSettings {
private volatile TimeValue translogRetentionAge;
private volatile ByteSizeValue translogRetentionSize;
private volatile ByteSizeValue generationThresholdSize;
private volatile ByteSizeValue flushAfterMergeThresholdSize;
private final MergeSchedulerConfig mergeSchedulerConfig;
private final MergePolicyConfig mergePolicyConfig;
private final IndexSortConfig indexSortConfig;
Expand Down Expand Up @@ -470,6 +479,7 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti
refreshInterval = scopedSettings.get(INDEX_REFRESH_INTERVAL_SETTING);
flushThresholdSize = scopedSettings.get(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING);
generationThresholdSize = scopedSettings.get(INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING);
flushAfterMergeThresholdSize = scopedSettings.get(INDEX_FLUSH_AFTER_MERGE_THRESHOLD_SIZE_SETTING);
mergeSchedulerConfig = new MergeSchedulerConfig(this);
gcDeletesInMillis = scopedSettings.get(INDEX_GC_DELETES_SETTING).getMillis();
softDeleteEnabled = version.onOrAfter(Version.V_6_5_0) && scopedSettings.get(INDEX_SOFT_DELETES_SETTING);
Expand Down Expand Up @@ -530,6 +540,7 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti
scopedSettings.addSettingsUpdateConsumer(INDEX_WARMER_ENABLED_SETTING, this::setEnableWarmer);
scopedSettings.addSettingsUpdateConsumer(INDEX_GC_DELETES_SETTING, this::setGCDeletes);
scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING, this::setTranslogFlushThresholdSize);
scopedSettings.addSettingsUpdateConsumer(INDEX_FLUSH_AFTER_MERGE_THRESHOLD_SIZE_SETTING, this::setFlushAfterMergeThresholdSize);
scopedSettings.addSettingsUpdateConsumer(
INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING,
this::setGenerationThresholdSize);
Expand All @@ -555,6 +566,10 @@ private void setTranslogFlushThresholdSize(ByteSizeValue byteSizeValue) {
this.flushThresholdSize = byteSizeValue;
}

private void setFlushAfterMergeThresholdSize(ByteSizeValue byteSizeValue) {
this.flushAfterMergeThresholdSize = byteSizeValue;
}

private void setTranslogRetentionSize(ByteSizeValue byteSizeValue) {
if (softDeleteEnabled && byteSizeValue.getBytes() >= 0) {
// ignore the translog retention settings if soft-deletes enabled
Expand Down Expand Up @@ -744,6 +759,11 @@ public TimeValue getRefreshInterval() {
*/
public ByteSizeValue getFlushThresholdSize() { return flushThresholdSize; }

/**
* Returns the merge threshold size when to forcefully flush the index and free resources.
*/
public ByteSizeValue getFlushAfterMergeThresholdSize() { return flushAfterMergeThresholdSize; }

/**
* Returns the transaction log retention size which controls how much of the translog is kept around to allow for ops based recoveries
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ public class InternalEngine extends Engine {

private final AtomicBoolean trackTranslogLocation = new AtomicBoolean(false);
private final KeyedLock<Long> noOpKeyedLock = new KeyedLock<>();
private final AtomicBoolean shouldPeriodicallyFlushAfterBigMerge = new AtomicBoolean(false);

@Nullable
private final String historyUUID;
Expand Down Expand Up @@ -1698,6 +1699,9 @@ final boolean tryRenewSyncCommit() {
@Override
public boolean shouldPeriodicallyFlush() {
ensureOpen();
if (shouldPeriodicallyFlushAfterBigMerge.get()) {
return true;
}
final long translogGenerationOfLastCommit =
Long.parseLong(lastCommittedSegmentInfos.userData.get(Translog.TRANSLOG_GENERATION_KEY));
final long flushThreshold = config().getIndexSettings().getFlushThresholdSize().getBytes();
Expand Down Expand Up @@ -2345,7 +2349,7 @@ public void onFailure(Exception e) {
}

@Override
protected void doRun() throws Exception {
protected void doRun() {
// if we have no pending merges and we are supposed to flush once merges have finished
// we try to renew a sync commit which is the case when we are having a big merge after we
// are inactive. If that didn't work we go and do a real flush which is ok since it only doesn't work
Expand All @@ -2357,7 +2361,11 @@ protected void doRun() throws Exception {
}
}
});

} else if (merge.getTotalBytesSize() >= engineConfig.getIndexSettings().getFlushAfterMergeThresholdSize().getBytes()) {
// we hit a significant merge which would allow us to free up memory if we'd commit it hence on the next change
// we should execute a flush on the next operation if that's a flush after inactive or indexing a document.
// we could fork a thread and do it right away but we try to minimize forking and piggyback on outside events.
shouldPeriodicallyFlushAfterBigMerge.set(true);
}
}

Expand Down Expand Up @@ -2425,7 +2433,7 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl
logger.trace("committing writer with commit data [{}]", commitData);
return commitData.entrySet().iterator();
});

shouldPeriodicallyFlushAfterBigMerge.set(false);
writer.commit();
} catch (final Exception ex) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,12 @@
public class OnGoingMerge {

private final String id;
private final List<SegmentCommitInfo> mergedSegments;
private final MergePolicy.OneMerge oneMerge;

public OnGoingMerge(MergePolicy.OneMerge merge) {
this.id = Integer.toString(System.identityHashCode(merge));
this.mergedSegments = merge.segments;
this.oneMerge = merge;

}

/**
Expand All @@ -44,10 +45,20 @@ public String getId() {
return id;
}


/**
* Returns the total size in bytes of this merge. Note that this does not
* indicate the size of the merged segment, but the
* input total size.
*/
public long getTotalBytesSize() {
return oneMerge.totalBytesSize();
}

/**
* The list of segments that are being merged.
*/
public List<SegmentCommitInfo> getMergedSegments() {
return mergedSegments;
return oneMerge.segments;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5054,6 +5054,34 @@ public void testShouldPeriodicallyFlush() throws Exception {
}
}

public void testShouldPeriodicallyFlushAfterMerge() throws Exception {
assertThat("Empty engine does not need flushing", engine.shouldPeriodicallyFlush(), equalTo(false));
ParsedDocument doc =
testParsedDocument(Integer.toString(0), null, testDocumentWithTextField(), SOURCE, null);
engine.index(indexForDoc(doc));
engine.refresh("test");
assertThat("Not exceeded translog flush threshold yet", engine.shouldPeriodicallyFlush(), equalTo(false));
final IndexSettings indexSettings = engine.config().getIndexSettings();
final IndexMetaData indexMetaData = IndexMetaData.builder(indexSettings.getIndexMetaData())
.settings(Settings.builder().put(indexSettings.getSettings())
.put(IndexSettings.INDEX_FLUSH_AFTER_MERGE_THRESHOLD_SIZE_SETTING.getKey(), "0b")).build();
indexSettings.updateIndexMetaData(indexMetaData);
engine.onSettingsChanged();
assertThat(engine.getTranslog().stats().getUncommittedOperations(), equalTo(1));
assertThat(engine.shouldPeriodicallyFlush(), equalTo(false));
doc = testParsedDocument(Integer.toString(1), null, testDocumentWithTextField(), SOURCE, null);
engine.index(indexForDoc(doc));
assertThat(engine.getTranslog().stats().getUncommittedOperations(), equalTo(2));
engine.refresh("test");
engine.forceMerge(false, 1, false, false, false);
assertBusy(() -> {
// the merge listner runs concurrently after the force merge returned
assertThat(engine.shouldPeriodicallyFlush(), equalTo(true));
});
engine.flush();
assertThat(engine.shouldPeriodicallyFlush(), equalTo(false));
}

public void testStressShouldPeriodicallyFlush() throws Exception {
final long flushThreshold = randomLongBetween(120, 5000);
final long generationThreshold = randomLongBetween(1000, 5000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,7 @@ static String[] extractLeaderShardHistoryUUIDs(Map<String, String> ccrIndexMetaD
nonReplicatedSettings.add(IndexSettings.MAX_ADJACENCY_MATRIX_FILTERS_SETTING);
nonReplicatedSettings.add(IndexSettings.DEFAULT_PIPELINE);
nonReplicatedSettings.add(IndexSettings.INDEX_SEARCH_THROTTLED);
nonReplicatedSettings.add(IndexSettings.INDEX_FLUSH_AFTER_MERGE_THRESHOLD_SIZE_SETTING);
nonReplicatedSettings.add(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING);
nonReplicatedSettings.add(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING);
nonReplicatedSettings.add(IndexSettings.INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING);
Expand Down

0 comments on commit 9b2ea07

Please sign in to comment.