Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fail force-merges on read-only engines #64756

Merged
merged 12 commits into from
Dec 17, 2020
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.Lock;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
import org.elasticsearch.common.util.concurrent.ReleasableLock;
Expand Down Expand Up @@ -375,6 +376,16 @@ public void flush(boolean force, boolean waitIfOngoing) throws EngineException {
@Override
public void forceMerge(boolean flush, int maxNumSegments, boolean onlyExpungeDeletes,
boolean upgrade, boolean upgradeOnlyAncientSegments, String forceMergeUUID) {
if (maxNumSegments == ForceMergeRequest.Defaults.MAX_NUM_SEGMENTS) {
// noop
} else if (maxNumSegments < lastCommittedSegmentInfos.size()) {
throw new UnsupportedOperationException("force merge is not supported on a read-only engine, " +
"target max number of segments[" + maxNumSegments + "], " +
"current number of segments[" + lastCommittedSegmentInfos.size() + "].");
} else {
logger.debug("current number of segments[{}] is not greater than target max number of segments[{}].",
lastCommittedSegmentInfos.size(), maxNumSegments);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
import org.elasticsearch.core.internal.io.IOUtils;
Expand Down Expand Up @@ -185,6 +187,43 @@ public void testVerifyShardBeforeIndexClosingIsNoOp() throws IOException {
}
}

public void testForceMergeOnReadOnlyEngine() throws IOException {
IOUtils.close(engine, store);
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
try (Store store = createStore()) {
EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get);
int numDocs = scaledRandomIntBetween(10, 100);
int numSegments;
try (InternalEngine engine = createEngine(config)) {
for (int i = 0; i < numDocs; i++) {
ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null);
engine.index(new Engine.Index(newUid(doc), doc, i, primaryTerm.get(), 1, null, Engine.Operation.Origin.REPLICA,
System.nanoTime(), -1, false, SequenceNumbers.UNASSIGNED_SEQ_NO, 0));
engine.flush();
globalCheckpoint.set(i);
}
engine.syncTranslog();
engine.flushAndClose();
numSegments = engine.getLastCommittedSegmentInfos().size();
assertTrue( numSegments > 1);
}

try (ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(config, null , null, true, Function.identity(), true)) {
UnsupportedOperationException exception = expectThrows(UnsupportedOperationException.class,
() -> readOnlyEngine.forceMerge(
true, numSegments-1, false, false, false, UUIDs.randomBase64UUID()));
assertThat(exception.getMessage(), equalTo("force merge is not supported on a read-only engine, " +
"target max number of segments[" + (numSegments-1) + "], current number of segments[" + numSegments + "]."));

readOnlyEngine.forceMerge(true, ForceMergeRequest.Defaults.MAX_NUM_SEGMENTS,
false, false, false, UUIDs.randomBase64UUID());
readOnlyEngine.forceMerge(true, numSegments, false, false, false, UUIDs.randomBase64UUID());
readOnlyEngine.forceMerge(true, numSegments+1, false, false, false, UUIDs.randomBase64UUID());
assertEquals(readOnlyEngine.getLastCommittedSegmentInfos().size(), numSegments);
}
}
}

public void testRecoverFromTranslogAppliesNoOperations() throws IOException {
IOUtils.close(engine, store);
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
Expand Down