Skip to content

Commit

Permalink
Enable merge on refresh and merge on commit on Opensearch (#2535)
Browse files Browse the repository at this point in the history
Enables merge on refresh and merge on commit in Opensearch by 
way of two new index options: 
index.merge_on_flush.max_full_flush_merge_wait_time and 
index.merge_on_flush.enabled. Default merge_on_flush is disabled and
wait time is 10s. 

Signed-off-by: Andriy Redko <[email protected]>
  • Loading branch information
reta authored Mar 25, 2022
1 parent cc0e66b commit 908682d
Show file tree
Hide file tree
Showing 4 changed files with 274 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,8 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
IndexSettings.FINAL_PIPELINE,
MetadataIndexStateService.VERIFIED_BEFORE_CLOSE_SETTING,
ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_SETTING,
IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED,
IndexSettings.INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME,

// validate that built-in similarities don't get redefined
Setting.groupSetting("index.similarity.", (s) -> {
Expand Down
50 changes: 50 additions & 0 deletions server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,27 @@ public final class IndexSettings {
Setting.Property.IndexScope
);

/**
* Expert: sets the amount of time to wait for merges (during {@link org.apache.lucene.index.IndexWriter#commit}
* or {@link org.apache.lucene.index.IndexWriter#getReader(boolean, boolean)}) returned by MergePolicy.findFullFlushMerges(...).
* If this time is reached, we proceed with the commit based on segments merged up to that point. The merges are not
* aborted, and will still run to completion independent of the commit or getReader call, like natural segment merges.
*/
public static final Setting<TimeValue> INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME = Setting.timeSetting(
"index.merge_on_flush.max_full_flush_merge_wait_time",
new TimeValue(10, TimeUnit.SECONDS),
new TimeValue(0, TimeUnit.MILLISECONDS),
Property.Dynamic,
Property.IndexScope
);

public static final Setting<Boolean> INDEX_MERGE_ON_FLUSH_ENABLED = Setting.boolSetting(
"index.merge_on_flush.enabled",
false,
Property.IndexScope,
Property.Dynamic
);

private final Index index;
private final Version version;
private final Logger logger;
Expand Down Expand Up @@ -584,6 +605,15 @@ private void setRetentionLeaseMillis(final TimeValue retentionLease) {
*/
private volatile int maxRegexLength;

/**
* The max amount of time to wait for merges
*/
private volatile TimeValue maxFullFlushMergeWaitTime;
/**
* Is merge of flush enabled or not
*/
private volatile boolean mergeOnFlushEnabled;

/**
* Returns the default search fields for this index.
*/
Expand Down Expand Up @@ -696,6 +726,8 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
mappingTotalFieldsLimit = scopedSettings.get(INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING);
mappingDepthLimit = scopedSettings.get(INDEX_MAPPING_DEPTH_LIMIT_SETTING);
mappingFieldNameLengthLimit = scopedSettings.get(INDEX_MAPPING_FIELD_NAME_LENGTH_LIMIT_SETTING);
maxFullFlushMergeWaitTime = scopedSettings.get(INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME);
mergeOnFlushEnabled = scopedSettings.get(INDEX_MERGE_ON_FLUSH_ENABLED);

scopedSettings.addSettingsUpdateConsumer(MergePolicyConfig.INDEX_COMPOUND_FORMAT_SETTING, mergePolicyConfig::setNoCFSRatio);
scopedSettings.addSettingsUpdateConsumer(
Expand Down Expand Up @@ -765,6 +797,8 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
scopedSettings.addSettingsUpdateConsumer(INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING, this::setMappingTotalFieldsLimit);
scopedSettings.addSettingsUpdateConsumer(INDEX_MAPPING_DEPTH_LIMIT_SETTING, this::setMappingDepthLimit);
scopedSettings.addSettingsUpdateConsumer(INDEX_MAPPING_FIELD_NAME_LENGTH_LIMIT_SETTING, this::setMappingFieldNameLengthLimit);
scopedSettings.addSettingsUpdateConsumer(INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME, this::setMaxFullFlushMergeWaitTime);
scopedSettings.addSettingsUpdateConsumer(INDEX_MERGE_ON_FLUSH_ENABLED, this::setMergeOnFlushEnabled);
}

private void setSearchIdleAfter(TimeValue searchIdleAfter) {
Expand Down Expand Up @@ -1328,4 +1362,20 @@ public long getMappingFieldNameLengthLimit() {
private void setMappingFieldNameLengthLimit(long value) {
this.mappingFieldNameLengthLimit = value;
}

private void setMaxFullFlushMergeWaitTime(TimeValue timeValue) {
this.maxFullFlushMergeWaitTime = timeValue;
}

private void setMergeOnFlushEnabled(boolean enabled) {
this.mergeOnFlushEnabled = enabled;
}

public TimeValue getMaxFullFlushMergeWaitTime() {
return this.maxFullFlushMergeWaitTime;
}

public boolean isMergeOnFlushEnabled() {
return mergeOnFlushEnabled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.lucene.index.ShuffleForcedMergePolicy;
import org.apache.lucene.index.SoftDeletesRetentionMergePolicy;
import org.apache.lucene.index.Term;
import org.apache.lucene.sandbox.index.MergeOnFlushMergePolicy;
import org.apache.lucene.search.BooleanClause;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.DocIdSetIterator;
Expand Down Expand Up @@ -2425,6 +2426,21 @@ private IndexWriterConfig getIndexWriterConfig() {
// to enable it.
mergePolicy = new ShuffleForcedMergePolicy(mergePolicy);
}

if (config().getIndexSettings().isMergeOnFlushEnabled()) {
final long maxFullFlushMergeWaitMillis = config().getIndexSettings().getMaxFullFlushMergeWaitTime().millis();
if (maxFullFlushMergeWaitMillis > 0) {
iwc.setMaxFullFlushMergeWaitMillis(maxFullFlushMergeWaitMillis);
mergePolicy = new MergeOnFlushMergePolicy(mergePolicy);
} else {
logger.warn(
"The {} is enabled but {} is set to 0, merge on flush will not be activated",
IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED.getKey(),
IndexSettings.INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME.getKey()
);
}
}

iwc.setMergePolicy(new OpenSearchMergePolicy(mergePolicy));
iwc.setSimilarity(engineConfig.getSimilarity());
iwc.setRAMBufferSizeMB(engineConfig.getIndexingBufferSize().getMbFrac());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,212 @@ public void testSegments() throws Exception {
}
}

public void testMergeSegmentsOnCommitIsDisabled() throws Exception {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);

final Settings.Builder settings = Settings.builder()
.put(defaultSettings.getSettings())
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME.getKey(), TimeValue.timeValueMillis(0))
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED.getKey(), true);
final IndexMetadata indexMetadata = IndexMetadata.builder(defaultSettings.getIndexMetadata()).settings(settings).build();
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetadata);

try (
Store store = createStore();
InternalEngine engine = createEngine(
config(indexSettings, store, createTempDir(), NoMergePolicy.INSTANCE, null, null, globalCheckpoint::get)
)
) {
assertThat(engine.segments(false), empty());
int numDocsFirstSegment = randomIntBetween(5, 50);
Set<String> liveDocsFirstSegment = new HashSet<>();
for (int i = 0; i < numDocsFirstSegment; i++) {
String id = Integer.toString(i);
ParsedDocument doc = testParsedDocument(id, null, testDocument(), B_1, null);
engine.index(indexForDoc(doc));
liveDocsFirstSegment.add(id);
}
engine.refresh("test");
List<Segment> segments = engine.segments(randomBoolean());
assertThat(segments, hasSize(1));
assertThat(segments.get(0).getNumDocs(), equalTo(liveDocsFirstSegment.size()));
assertThat(segments.get(0).getDeletedDocs(), equalTo(0));
assertFalse(segments.get(0).committed);
int deletes = 0;
int updates = 0;
int appends = 0;
int iterations = scaledRandomIntBetween(1, 50);
for (int i = 0; i < iterations && liveDocsFirstSegment.isEmpty() == false; i++) {
String idToUpdate = randomFrom(liveDocsFirstSegment);
liveDocsFirstSegment.remove(idToUpdate);
ParsedDocument doc = testParsedDocument(idToUpdate, null, testDocument(), B_1, null);
if (randomBoolean()) {
engine.delete(new Engine.Delete(doc.id(), newUid(doc), primaryTerm.get()));
deletes++;
} else {
engine.index(indexForDoc(doc));
updates++;
}
if (randomBoolean()) {
engine.index(indexForDoc(testParsedDocument(UUIDs.randomBase64UUID(), null, testDocument(), B_1, null)));
appends++;
}
}

boolean committed = randomBoolean();
if (committed) {
engine.flush();
}

engine.refresh("test");
segments = engine.segments(randomBoolean());

assertThat(segments, hasSize(2));
assertThat(segments, hasSize(2));
assertThat(segments.get(0).getNumDocs(), equalTo(liveDocsFirstSegment.size()));
assertThat(segments.get(0).getDeletedDocs(), equalTo(updates + deletes));
assertThat(segments.get(0).committed, equalTo(committed));

assertThat(segments.get(1).getNumDocs(), equalTo(updates + appends));
assertThat(segments.get(1).getDeletedDocs(), equalTo(deletes)); // delete tombstones
assertThat(segments.get(1).committed, equalTo(committed));
}
}

public void testMergeSegmentsOnCommit() throws Exception {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);

final Settings.Builder settings = Settings.builder()
.put(defaultSettings.getSettings())
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME.getKey(), TimeValue.timeValueMillis(5000))
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED.getKey(), true);
final IndexMetadata indexMetadata = IndexMetadata.builder(defaultSettings.getIndexMetadata()).settings(settings).build();
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetadata);

try (
Store store = createStore();
InternalEngine engine = createEngine(
config(indexSettings, store, createTempDir(), NoMergePolicy.INSTANCE, null, null, globalCheckpoint::get)
)
) {
assertThat(engine.segments(false), empty());
int numDocsFirstSegment = randomIntBetween(5, 50);
Set<String> liveDocsFirstSegment = new HashSet<>();
for (int i = 0; i < numDocsFirstSegment; i++) {
String id = Integer.toString(i);
ParsedDocument doc = testParsedDocument(id, null, testDocument(), B_1, null);
engine.index(indexForDoc(doc));
liveDocsFirstSegment.add(id);
}
engine.refresh("test");
List<Segment> segments = engine.segments(randomBoolean());
assertThat(segments, hasSize(1));
assertThat(segments.get(0).getNumDocs(), equalTo(liveDocsFirstSegment.size()));
assertThat(segments.get(0).getDeletedDocs(), equalTo(0));
assertFalse(segments.get(0).committed);
int deletes = 0;
int updates = 0;
int appends = 0;
int iterations = scaledRandomIntBetween(1, 50);
for (int i = 0; i < iterations && liveDocsFirstSegment.isEmpty() == false; i++) {
String idToUpdate = randomFrom(liveDocsFirstSegment);
liveDocsFirstSegment.remove(idToUpdate);
ParsedDocument doc = testParsedDocument(idToUpdate, null, testDocument(), B_1, null);
if (randomBoolean()) {
engine.delete(new Engine.Delete(doc.id(), newUid(doc), primaryTerm.get()));
deletes++;
} else {
engine.index(indexForDoc(doc));
updates++;
}
if (randomBoolean()) {
engine.index(indexForDoc(testParsedDocument(UUIDs.randomBase64UUID(), null, testDocument(), B_1, null)));
appends++;
}
}

boolean committed = randomBoolean();
if (committed) {
engine.flush();
}

engine.refresh("test");
segments = engine.segments(randomBoolean());

// All segments have to be merged into one
assertThat(segments, hasSize(1));
assertThat(segments.get(0).getNumDocs(), equalTo(numDocsFirstSegment + appends - deletes));
assertThat(segments.get(0).getDeletedDocs(), equalTo(0));
assertThat(segments.get(0).committed, equalTo(committed));
}
}

// this test writes documents to the engine while concurrently flushing/commit
public void testConcurrentMergeSegmentsOnCommit() throws Exception {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);

final Settings.Builder settings = Settings.builder()
.put(defaultSettings.getSettings())
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME.getKey(), TimeValue.timeValueMillis(5000))
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED.getKey(), true);
final IndexMetadata indexMetadata = IndexMetadata.builder(defaultSettings.getIndexMetadata()).settings(settings).build();
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetadata);

try (
Store store = createStore();
InternalEngine engine = createEngine(
config(indexSettings, store, createTempDir(), NoMergePolicy.INSTANCE, null, null, globalCheckpoint::get)
)
) {
final int numIndexingThreads = scaledRandomIntBetween(3, 8);
final int numDocsPerThread = randomIntBetween(500, 1000);
final CyclicBarrier barrier = new CyclicBarrier(numIndexingThreads + 1);
final List<Thread> indexingThreads = new ArrayList<>();
final CountDownLatch doneLatch = new CountDownLatch(numIndexingThreads);
// create N indexing threads to index documents simultaneously
for (int threadNum = 0; threadNum < numIndexingThreads; threadNum++) {
final int threadIdx = threadNum;
Thread indexingThread = new Thread(() -> {
try {
barrier.await(); // wait for all threads to start at the same time
// index random number of docs
for (int i = 0; i < numDocsPerThread; i++) {
final String id = "thread" + threadIdx + "#" + i;
ParsedDocument doc = testParsedDocument(id, null, testDocument(), B_1, null);
engine.index(indexForDoc(doc));
}
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
doneLatch.countDown();
}

});
indexingThreads.add(indexingThread);
}

// start the indexing threads
for (Thread thread : indexingThreads) {
thread.start();
}
barrier.await(); // wait for indexing threads to all be ready to start
assertThat(doneLatch.await(10, TimeUnit.SECONDS), is(true));

boolean committed = randomBoolean();
if (committed) {
engine.flush();
}

engine.refresh("test");
List<Segment> segments = engine.segments(randomBoolean());

// All segments have to be merged into one
assertThat(segments, hasSize(1));
assertThat(segments.get(0).getNumDocs(), equalTo(numIndexingThreads * numDocsPerThread));
assertThat(segments.get(0).committed, equalTo(committed));
}
}

public void testCommitStats() throws IOException {
final AtomicLong maxSeqNo = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
final AtomicLong localCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
Expand Down

0 comments on commit 908682d

Please sign in to comment.