Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flush engine after big merge #46066

Merged
merged 3 commits into from
Aug 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING,
EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE_SETTING,
EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE_SETTING,
IndexSettings.INDEX_FLUSH_AFTER_MERGE_THRESHOLD_SIZE_SETTING,
IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING,
IndexSettings.INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING,
IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING,
Expand Down
20 changes: 20 additions & 0 deletions server/src/main/java/org/elasticsearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,14 @@ public final class IndexSettings {
new ByteSizeValue(Long.MAX_VALUE, ByteSizeUnit.BYTES),
Property.Dynamic, Property.IndexScope);

/**
* The minimum size of a merge that triggers a flush in order to free resources
*/
public static final Setting<ByteSizeValue> INDEX_FLUSH_AFTER_MERGE_THRESHOLD_SIZE_SETTING =
Setting.byteSizeSetting("index.flush_after_merge", new ByteSizeValue(512, ByteSizeUnit.MB),
new ByteSizeValue(0, ByteSizeUnit.BYTES), // always flush after merge
new ByteSizeValue(Long.MAX_VALUE, ByteSizeUnit.BYTES), // never flush after merge
Property.Dynamic, Property.IndexScope);
/**
* The maximum size of a translog generation. This is independent of the maximum size of
* translog operations that have not been flushed.
Expand Down Expand Up @@ -338,6 +346,7 @@ public final class IndexSettings {
private volatile TimeValue translogRetentionAge;
private volatile ByteSizeValue translogRetentionSize;
private volatile ByteSizeValue generationThresholdSize;
private volatile ByteSizeValue flushAfterMergeThresholdSize;
private final MergeSchedulerConfig mergeSchedulerConfig;
private final MergePolicyConfig mergePolicyConfig;
private final IndexSortConfig indexSortConfig;
Expand Down Expand Up @@ -470,6 +479,7 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti
refreshInterval = scopedSettings.get(INDEX_REFRESH_INTERVAL_SETTING);
flushThresholdSize = scopedSettings.get(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING);
generationThresholdSize = scopedSettings.get(INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING);
flushAfterMergeThresholdSize = scopedSettings.get(INDEX_FLUSH_AFTER_MERGE_THRESHOLD_SIZE_SETTING);
mergeSchedulerConfig = new MergeSchedulerConfig(this);
gcDeletesInMillis = scopedSettings.get(INDEX_GC_DELETES_SETTING).getMillis();
softDeleteEnabled = scopedSettings.get(INDEX_SOFT_DELETES_SETTING);
Expand Down Expand Up @@ -530,6 +540,7 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti
scopedSettings.addSettingsUpdateConsumer(INDEX_WARMER_ENABLED_SETTING, this::setEnableWarmer);
scopedSettings.addSettingsUpdateConsumer(INDEX_GC_DELETES_SETTING, this::setGCDeletes);
scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING, this::setTranslogFlushThresholdSize);
scopedSettings.addSettingsUpdateConsumer(INDEX_FLUSH_AFTER_MERGE_THRESHOLD_SIZE_SETTING, this::setFlushAfterMergeThresholdSize);
scopedSettings.addSettingsUpdateConsumer(
INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING,
this::setGenerationThresholdSize);
Expand All @@ -555,6 +566,10 @@ private void setTranslogFlushThresholdSize(ByteSizeValue byteSizeValue) {
this.flushThresholdSize = byteSizeValue;
}

private void setFlushAfterMergeThresholdSize(ByteSizeValue byteSizeValue) {
this.flushAfterMergeThresholdSize = byteSizeValue;
}

private void setTranslogRetentionSize(ByteSizeValue byteSizeValue) {
if (softDeleteEnabled && byteSizeValue.getBytes() >= 0) {
// ignore the translog retention settings if soft-deletes enabled
Expand Down Expand Up @@ -744,6 +759,11 @@ public TimeValue getRefreshInterval() {
*/
public ByteSizeValue getFlushThresholdSize() { return flushThresholdSize; }

/**
* Returns the merge threshold size when to forcefully flush the index and free resources.
*/
public ByteSizeValue getFlushAfterMergeThresholdSize() { return flushAfterMergeThresholdSize; }

/**
* Returns the transaction log retention size which controls how much of the translog is kept around to allow for ops based recoveries
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ public class InternalEngine extends Engine {

private final AtomicBoolean trackTranslogLocation = new AtomicBoolean(false);
private final KeyedLock<Long> noOpKeyedLock = new KeyedLock<>();
private final AtomicBoolean shouldPeriodicallyFlushAfterBigMerge = new AtomicBoolean(false);

@Nullable
private final String historyUUID;
Expand Down Expand Up @@ -1678,6 +1679,9 @@ final boolean tryRenewSyncCommit() {
@Override
public boolean shouldPeriodicallyFlush() {
ensureOpen();
if (shouldPeriodicallyFlushAfterBigMerge.get()) {
return true;
}
final long translogGenerationOfLastCommit =
Long.parseLong(lastCommittedSegmentInfos.userData.get(Translog.TRANSLOG_GENERATION_KEY));
final long flushThreshold = config().getIndexSettings().getFlushThresholdSize().getBytes();
Expand Down Expand Up @@ -2327,7 +2331,7 @@ public void onFailure(Exception e) {
}

@Override
protected void doRun() throws Exception {
protected void doRun() {
// if we have no pending merges and we are supposed to flush once merges have finished
// we try to renew a sync commit which is the case when we are having a big merge after we
// are inactive. If that didn't work we go and do a real flush which is ok since it only doesn't work
Expand All @@ -2339,7 +2343,11 @@ protected void doRun() throws Exception {
}
}
});

} else if (merge.getTotalBytesSize() >= engineConfig.getIndexSettings().getFlushAfterMergeThresholdSize().getBytes()) {
// we hit a significant merge which would allow us to free up memory if we'd commit it hence on the next change
// we should execute a flush on the next operation if that's a flush after inactive or indexing a document.
// we could fork a thread and do it right away but we try to minimize forking and piggyback on outside events.
shouldPeriodicallyFlushAfterBigMerge.set(true);
}
}

Expand Down Expand Up @@ -2407,7 +2415,7 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl
logger.trace("committing writer with commit data [{}]", commitData);
return commitData.entrySet().iterator();
});

shouldPeriodicallyFlushAfterBigMerge.set(false);
writer.commit();
} catch (final Exception ex) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,12 @@
public class OnGoingMerge {

private final String id;
private final List<SegmentCommitInfo> mergedSegments;
private final MergePolicy.OneMerge oneMerge;

public OnGoingMerge(MergePolicy.OneMerge merge) {
this.id = Integer.toString(System.identityHashCode(merge));
this.mergedSegments = merge.segments;
this.oneMerge = merge;

}

/**
Expand All @@ -44,10 +45,20 @@ public String getId() {
return id;
}


/**
* Returns the total size in bytes of this merge. Note that this does not
* indicate the size of the merged segment, but the
* input total size.
*/
public long getTotalBytesSize() {
return oneMerge.totalBytesSize();
}

/**
* The list of segments that are being merged.
*/
public List<SegmentCommitInfo> getMergedSegments() {
return mergedSegments;
return oneMerge.segments;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5061,6 +5061,34 @@ public void testShouldPeriodicallyFlush() throws Exception {
}
}

public void testShouldPeriodicallyFlushAfterMerge() throws Exception {
assertThat("Empty engine does not need flushing", engine.shouldPeriodicallyFlush(), equalTo(false));
ParsedDocument doc =
testParsedDocument(Integer.toString(0), null, testDocumentWithTextField(), SOURCE, null);
engine.index(indexForDoc(doc));
engine.refresh("test");
assertThat("Not exceeded translog flush threshold yet", engine.shouldPeriodicallyFlush(), equalTo(false));
final IndexSettings indexSettings = engine.config().getIndexSettings();
final IndexMetaData indexMetaData = IndexMetaData.builder(indexSettings.getIndexMetaData())
.settings(Settings.builder().put(indexSettings.getSettings())
.put(IndexSettings.INDEX_FLUSH_AFTER_MERGE_THRESHOLD_SIZE_SETTING.getKey(), "0b")).build();
indexSettings.updateIndexMetaData(indexMetaData);
engine.onSettingsChanged();
assertThat(engine.getTranslog().stats().getUncommittedOperations(), equalTo(1));
assertThat(engine.shouldPeriodicallyFlush(), equalTo(false));
doc = testParsedDocument(Integer.toString(1), null, testDocumentWithTextField(), SOURCE, null);
engine.index(indexForDoc(doc));
assertThat(engine.getTranslog().stats().getUncommittedOperations(), equalTo(2));
engine.refresh("test");
engine.forceMerge(false, 1, false, false, false);
assertBusy(() -> {
// the merge listner runs concurrently after the force merge returned
assertThat(engine.shouldPeriodicallyFlush(), equalTo(true));
});
engine.flush();
assertThat(engine.shouldPeriodicallyFlush(), equalTo(false));
}

public void testStressShouldPeriodicallyFlush() throws Exception {
final long flushThreshold = randomLongBetween(120, 5000);
final long generationThreshold = randomLongBetween(1000, 5000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,7 @@ static String[] extractLeaderShardHistoryUUIDs(Map<String, String> ccrIndexMetaD
IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING,
IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING,
IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING,
IndexSettings.INDEX_FLUSH_AFTER_MERGE_THRESHOLD_SIZE_SETTING,
IndexSettings.INDEX_GC_DELETES_SETTING,
IndexSettings.MAX_REFRESH_LISTENERS_PER_SHARD,
IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING,
Expand Down