diff --git a/qa/evil-tests/src/test/java/org/elasticsearch/index/engine/EvilInternalEngineTests.java b/qa/evil-tests/src/test/java/org/elasticsearch/index/engine/EvilInternalEngineTests.java index c473e4e010552..f7a621c4125a9 100644 --- a/qa/evil-tests/src/test/java/org/elasticsearch/index/engine/EvilInternalEngineTests.java +++ b/qa/evil-tests/src/test/java/org/elasticsearch/index/engine/EvilInternalEngineTests.java @@ -22,6 +22,7 @@ import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.MergePolicy; import org.apache.lucene.index.SegmentCommitInfo; +import org.elasticsearch.common.UUIDs; import org.elasticsearch.index.mapper.ParsedDocument; import java.io.IOException; @@ -89,7 +90,7 @@ public synchronized MergePolicy.OneMerge getNextMerge() { StreamSupport.stream(e.getLastCommittedSegmentInfos().spliterator(), false).collect(Collectors.toList()); segmentsReference.set(segments); // trigger a background merge that will be managed by the concurrent merge scheduler - e.forceMerge(randomBoolean(), 0, false, false, false); + e.forceMerge(randomBoolean(), 0, false, false, false, UUIDs.randomBase64UUID()); /* * Merging happens in the background on a merge thread, and the maybeDie handler is invoked on yet another thread; we have * to wait for these events to finish. diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/forcemerge/ForceMergeRequest.java b/server/src/main/java/org/elasticsearch/action/admin/indices/forcemerge/ForceMergeRequest.java index 20480c45f10a5..7b16cc52e724c 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/forcemerge/ForceMergeRequest.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/forcemerge/ForceMergeRequest.java @@ -19,7 +19,10 @@ package org.elasticsearch.action.admin.indices.forcemerge; +import org.elasticsearch.Version; import org.elasticsearch.action.support.broadcast.BroadcastRequest; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -50,6 +53,15 @@ public static final class Defaults { private boolean onlyExpungeDeletes = Defaults.ONLY_EXPUNGE_DELETES; private boolean flush = Defaults.FLUSH; + private static final Version FORCE_MERGE_UUID_VERSION = Version.V_7_7_0; + + /** + * Force merge UUID to store in the live commit data of a shard under + * {@link org.elasticsearch.index.engine.Engine#FORCE_MERGE_UUID_KEY} after force merging it. + */ + @Nullable + private final String forceMergeUUID; + /** * Constructs a merge request over one or more indices. * @@ -57,6 +69,7 @@ public static final class Defaults { */ public ForceMergeRequest(String... indices) { super(indices); + forceMergeUUID = UUIDs.randomBase64UUID(); } public ForceMergeRequest(StreamInput in) throws IOException { @@ -64,6 +77,11 @@ public ForceMergeRequest(StreamInput in) throws IOException { maxNumSegments = in.readInt(); onlyExpungeDeletes = in.readBoolean(); flush = in.readBoolean(); + if (in.getVersion().onOrAfter(FORCE_MERGE_UUID_VERSION)) { + forceMergeUUID = in.readOptionalString(); + } else { + forceMergeUUID = null; + } } /** @@ -100,6 +118,15 @@ public ForceMergeRequest onlyExpungeDeletes(boolean onlyExpungeDeletes) { return this; } + /** + * Force merge UUID to use when force merging or {@code null} if not using one in a mixed version cluster containing nodes older than + * {@link #FORCE_MERGE_UUID_VERSION}. + */ + @Nullable + public String forceMergeUUID() { + return forceMergeUUID; + } + /** * Should flush be performed after the merge. Defaults to {@code true}. */ @@ -129,6 +156,9 @@ public void writeTo(StreamOutput out) throws IOException { out.writeInt(maxNumSegments); out.writeBoolean(onlyExpungeDeletes); out.writeBoolean(flush); + if (out.getVersion().onOrAfter(FORCE_MERGE_UUID_VERSION)) { + out.writeOptionalString(forceMergeUUID); + } } @Override diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 6a40d1dd578a9..7d4aa125eb542 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -109,6 +109,7 @@ public abstract class Engine implements Closeable { public static final String SYNC_COMMIT_ID = "sync_id"; public static final String HISTORY_UUID_KEY = "history_uuid"; + public static final String FORCE_MERGE_UUID_KEY = "force_merge_uuid"; public static final String MIN_RETAINED_SEQNO = "min_retained_seq_no"; public static final String MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID = "max_unsafe_auto_id_timestamp"; @@ -1073,18 +1074,12 @@ public final CommitId flush() throws EngineException { */ public abstract void rollTranslogGeneration() throws EngineException; - /** - * Force merges to 1 segment - */ - public void forceMerge(boolean flush) throws IOException { - forceMerge(flush, 1, false, false, false); - } - /** * Triggers a forced merge on this engine */ public abstract void forceMerge(boolean flush, int maxNumSegments, boolean onlyExpungeDeletes, - boolean upgrade, boolean upgradeOnlyAncientSegments) throws EngineException, IOException; + boolean upgrade, boolean upgradeOnlyAncientSegments, + @Nullable String forceMergeUUID) throws EngineException, IOException; /** * Snapshots the most recent index and returns a handle to it. If needed will try and "commit" the diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index cb7eb3d512212..95c60b6201e7e 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -190,6 +190,12 @@ public class InternalEngine extends Engine { @Nullable private final String historyUUID; + /** + * UUID value that is updated every time the engine is force merged. + */ + @Nullable + private volatile String forceMergeUUID; + public InternalEngine(EngineConfig engineConfig) { this(engineConfig, LocalCheckpointTracker::new); } @@ -236,7 +242,9 @@ public InternalEngine(EngineConfig engineConfig) { this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier); writer = createWriter(); bootstrapAppendOnlyInfoFromWriter(writer); - historyUUID = loadHistoryUUID(writer); + final Map commitData = commitDataAsMap(writer); + historyUUID = loadHistoryUUID(commitData); + forceMergeUUID = commitData.get(FORCE_MERGE_UUID_KEY); indexWriter = writer; } catch (IOException | TranslogCorruptedException e) { throw new EngineCreationFailureException(shardId, "failed to create engine", e); @@ -604,6 +612,12 @@ public String getHistoryUUID() { return historyUUID; } + /** returns the force merge uuid for the engine */ + @Nullable + public String getForceMergeUUID() { + return forceMergeUUID; + } + /** Returns how many bytes we are currently moving from indexing buffer to segments on disk */ @Override public long getWritingBytes() { @@ -613,8 +627,8 @@ public long getWritingBytes() { /** * Reads the current stored history ID from the IW commit data. */ - private String loadHistoryUUID(final IndexWriter writer) { - final String uuid = commitDataAsMap(writer).get(HISTORY_UUID_KEY); + private String loadHistoryUUID(Map commitData) { + final String uuid = commitData.get(HISTORY_UUID_KEY); if (uuid == null) { throw new IllegalStateException("commit doesn't contain history uuid"); } @@ -1945,7 +1959,8 @@ final Map getVersionMap() { @Override public void forceMerge(final boolean flush, int maxNumSegments, boolean onlyExpungeDeletes, - final boolean upgrade, final boolean upgradeOnlyAncientSegments) throws EngineException, IOException { + final boolean upgrade, final boolean upgradeOnlyAncientSegments, + final String forceMergeUUID) throws EngineException, IOException { /* * We do NOT acquire the readlock here since we are waiting on the merges to finish * that's fine since the IW.rollback should stop all the threads and trigger an IOException @@ -1977,6 +1992,7 @@ public void forceMerge(final boolean flush, int maxNumSegments, boolean onlyExpu indexWriter.maybeMerge(); } else { indexWriter.forceMerge(maxNumSegments, true /* blocks and waits for merges*/); + this.forceMergeUUID = forceMergeUUID; } if (flush) { if (tryRenewSyncCommit() == false) { @@ -2448,6 +2464,10 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl if (softDeleteEnabled) { commitData.put(Engine.MIN_RETAINED_SEQNO, Long.toString(softDeletesPolicy.getMinRetainedSeqNo())); } + final String currentForceMergeUUID = forceMergeUUID; + if (currentForceMergeUUID != null) { + commitData.put(FORCE_MERGE_UUID_KEY, currentForceMergeUUID); + } logger.trace("committing writer with commit data [{}]", commitData); return commitData.entrySet().iterator(); }); diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index 04669f0f32c72..af282311724c4 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -395,7 +395,7 @@ public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineExcepti @Override public void forceMerge(boolean flush, int maxNumSegments, boolean onlyExpungeDeletes, - boolean upgrade, boolean upgradeOnlyAncientSegments) { + boolean upgrade, boolean upgradeOnlyAncientSegments, String forceMergeUUID) { } @Override diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index a207653b6db93..72715222f8861 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1117,7 +1117,7 @@ public void forceMerge(ForceMergeRequest forceMerge) throws IOException { } Engine engine = getEngine(); engine.forceMerge(forceMerge.flush(), forceMerge.maxNumSegments(), - forceMerge.onlyExpungeDeletes(), false, false); + forceMerge.onlyExpungeDeletes(), false, false, forceMerge.forceMergeUUID()); } /** @@ -1133,7 +1133,7 @@ public org.apache.lucene.util.Version upgrade(UpgradeRequest upgrade) throws IOE final Engine engine = getEngine(); engine.forceMerge(true, // we need to flush at the end to make sure the upgrade is durable Integer.MAX_VALUE, // we just want to upgrade the segments, not actually optimize to a single segment - false, true, upgrade.upgradeOnlyAncientSegments()); + false, true, upgrade.upgradeOnlyAncientSegments(), null); org.apache.lucene.util.Version version = minimumCompatibleVersion(); if (logger.isTraceEnabled()) { logger.trace("upgraded segments for {} from version {} to version {}", shardId, previousVersion, version); diff --git a/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java b/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java index e82d4ac58f334..fdf551e46de6b 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java +++ b/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java @@ -38,6 +38,7 @@ import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource; +import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; @@ -139,7 +140,7 @@ void recoverFromLocalShards(BiConsumer mappingUpdateCon // just trigger a merge to do housekeeping on the // copied segments - we will also see them in stats etc. indexShard.getEngine().forceMerge(false, -1, false, - false, false); + false, false, UUIDs.randomBase64UUID()); return true; } catch (IOException ex) { throw new IndexShardRecoveryException(indexShard.shardId(), "failed to recover from local shards", ex); diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/forcemerge/ForceMergeIT.java b/server/src/test/java/org/elasticsearch/action/admin/indices/forcemerge/ForceMergeIT.java new file mode 100644 index 0000000000000..095a166f4f0b7 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/forcemerge/ForceMergeIT.java @@ -0,0 +1,89 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.forcemerge; + +import org.elasticsearch.action.admin.indices.flush.FlushResponse; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.test.ESIntegTestCase; + +import java.io.IOException; + +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; + +public class ForceMergeIT extends ESIntegTestCase { + + public void testForceMergeUUIDConsistent() throws IOException { + internalCluster().ensureAtLeastNumDataNodes(2); + final String index = "test-index"; + createIndex(index, + Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1).build()); + ensureGreen(index); + final ClusterState state = clusterService().state(); + final IndexRoutingTable indexShardRoutingTables = state.routingTable().getIndicesRouting().get(index); + final IndexShardRoutingTable shardRouting = indexShardRoutingTables.getShards().get(0); + final String primaryNodeId = shardRouting.primaryShard().currentNodeId(); + final String replicaNodeId = shardRouting.replicaShards().get(0).currentNodeId(); + final Index idx = shardRouting.primaryShard().index(); + final IndicesService primaryIndicesService = + internalCluster().getInstance(IndicesService.class, state.nodes().get(primaryNodeId).getName()); + final IndicesService replicaIndicesService = internalCluster().getInstance(IndicesService.class, + state.nodes().get(replicaNodeId).getName()); + final IndexShard primary = primaryIndicesService.indexService(idx).getShard(0); + final IndexShard replica = replicaIndicesService.indexService(idx).getShard(0); + + assertThat(getForceMergeUUID(primary), nullValue()); + assertThat(getForceMergeUUID(replica), nullValue()); + + final ForceMergeResponse forceMergeResponse = + client().admin().indices().prepareForceMerge(index).setMaxNumSegments(1).get(); + + assertThat(forceMergeResponse.getFailedShards(), is(0)); + assertThat(forceMergeResponse.getSuccessfulShards(), is(2)); + + // Force flush to force a new commit that contains the force flush UUID + final FlushResponse flushResponse = client().admin().indices().prepareFlush(index).setForce(true).get(); + assertThat(flushResponse.getFailedShards(), is(0)); + assertThat(flushResponse.getSuccessfulShards(), is(2)); + + final String primaryForceMergeUUID = getForceMergeUUID(primary); + assertThat(primaryForceMergeUUID, notNullValue()); + + final String replicaForceMergeUUID = getForceMergeUUID(replica); + assertThat(replicaForceMergeUUID, notNullValue()); + assertThat(primaryForceMergeUUID, is(replicaForceMergeUUID)); + } + + private static String getForceMergeUUID(IndexShard indexShard) throws IOException { + try (Engine.IndexCommitRef indexCommitRef = indexShard.acquireLastIndexCommit(true)) { + return indexCommitRef.getIndexCommit().getUserData().get(Engine.FORCE_MERGE_UUID_KEY); + } + } +} diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index db831aa547907..7087b43017165 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -536,7 +536,7 @@ public void testSegmentsWithMergeFlag() throws Exception { engine.flush(); final long gen1 = store.readLastCommittedSegmentsInfo().getGeneration(); // now, optimize and wait for merges, see that we have no merge flag - engine.forceMerge(true); + engine.forceMerge(true, 1, false, false, false, UUIDs.randomBase64UUID()); for (Segment segment : engine.segments(false)) { assertThat(segment.getMergeId(), nullValue()); @@ -546,7 +546,7 @@ public void testSegmentsWithMergeFlag() throws Exception { final boolean flush = randomBoolean(); final long gen2 = store.readLastCommittedSegmentsInfo().getGeneration(); - engine.forceMerge(flush); + engine.forceMerge(flush, 1, false, false, false, UUIDs.randomBase64UUID()); for (Segment segment : engine.segments(false)) { assertThat(segment.getMergeId(), nullValue()); } @@ -1239,7 +1239,7 @@ public void testRenewSyncFlush() throws Exception { assertEquals(3, engine.segments(false).size()); engine.forceMerge(forceMergeFlushes, 1, false, - false, false); + false, false, UUIDs.randomBase64UUID()); if (forceMergeFlushes == false) { engine.refresh("make all segments visible"); assertEquals(4, engine.segments(false).size()); @@ -1476,7 +1476,7 @@ public void testForceMergeWithoutSoftDeletes() throws IOException { try (Engine.Searcher test = engine.acquireSearcher("test")) { assertEquals(numDocs, test.getIndexReader().numDocs()); } - engine.forceMerge(true, 1, false, false, false); + engine.forceMerge(true, 1, false, false, false, UUIDs.randomBase64UUID()); engine.refresh("test"); assertEquals(engine.segments(true).size(), 1); @@ -1484,7 +1484,7 @@ public void testForceMergeWithoutSoftDeletes() throws IOException { Engine.Index index = indexForDoc(doc); engine.delete(new Engine.Delete(index.type(), index.id(), index.uid(), primaryTerm.get())); //expunge deletes - engine.forceMerge(true, 10, true, false, false); + engine.forceMerge(true, 10, true, false, false, UUIDs.randomBase64UUID()); engine.refresh("test"); assertEquals(engine.segments(true).size(), 1); @@ -1497,7 +1497,7 @@ public void testForceMergeWithoutSoftDeletes() throws IOException { index = indexForDoc(doc); engine.delete(new Engine.Delete(index.type(), index.id(), index.uid(), primaryTerm.get())); //expunge deletes - engine.forceMerge(true, 10, false, false, false); + engine.forceMerge(true, 10, false, false, false, UUIDs.randomBase64UUID()); engine.refresh("test"); assertEquals(engine.segments(true).size(), 1); try (Engine.Searcher test = engine.acquireSearcher("test")) { @@ -1605,7 +1605,7 @@ public void testForceMergeWithSoftDeletesRetention() throws Exception { try (Engine.IndexCommitRef safeCommit = engine.acquireSafeIndexCommit()) { safeCommitCheckpoint = Long.parseLong(safeCommit.getIndexCommit().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); } - engine.forceMerge(true, 1, false, false, false); + engine.forceMerge(true, 1, false, false, false, UUIDs.randomBase64UUID()); assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, mapperService); Map ops = readAllOperationsInLucene(engine, mapperService) .stream().collect(Collectors.toMap(Translog.Operation::seqNo, Function.identity())); @@ -1630,7 +1630,7 @@ public void testForceMergeWithSoftDeletesRetention() throws Exception { globalCheckpoint.set(localCheckpoint); engine.syncTranslog(); - engine.forceMerge(true, 1, false, false, false); + engine.forceMerge(true, 1, false, false, false, UUIDs.randomBase64UUID()); assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, mapperService); assertThat(readAllOperationsInLucene(engine, mapperService), hasSize(liveDocs.size())); } @@ -1695,7 +1695,7 @@ public void testForceMergeWithSoftDeletesRetentionAndRecoverySource() throws Exc safeCommit.getIndexCommit().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); minSeqNoToRetain = Math.min(globalCheckpoint.get() + 1 - retainedExtraOps, safeCommitLocalCheckpoint + 1); } - engine.forceMerge(true, 1, false, false, false); + engine.forceMerge(true, 1, false, false, false, UUIDs.randomBase64UUID()); assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, mapperService); Map ops = readAllOperationsInLucene(engine, mapperService) .stream().collect(Collectors.toMap(Translog.Operation::seqNo, Function.identity())); @@ -1739,7 +1739,7 @@ public void testForceMergeWithSoftDeletesRetentionAndRecoverySource() throws Exc globalCheckpoint.set(engine.getPersistedLocalCheckpoint()); engine.syncTranslog(); } - engine.forceMerge(true, 1, false, false, false); + engine.forceMerge(true, 1, false, false, false, UUIDs.randomBase64UUID()); assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, mapperService); assertThat(readAllOperationsInLucene(engine, mapperService), hasSize(liveDocsWithSource.size())); } @@ -1776,7 +1776,7 @@ public void run() { indexed.countDown(); try { engine.forceMerge(randomBoolean(), 1, false, randomBoolean(), - randomBoolean()); + randomBoolean(), UUIDs.randomBase64UUID()); } catch (IOException e) { return; } @@ -1793,7 +1793,7 @@ public void run() { startGun.countDown(); int someIters = randomIntBetween(1, 10); for (int i = 0; i < someIters; i++) { - engine.forceMerge(randomBoolean(), 1, false, randomBoolean(), randomBoolean()); + engine.forceMerge(randomBoolean(), 1, false, randomBoolean(), randomBoolean(), UUIDs.randomBase64UUID()); } indexed.await(); IOUtils.close(engine); @@ -3192,7 +3192,7 @@ public void run() { switch (operation) { case "optimize": { engine.forceMerge(true, 1, false, false, - false); + false, UUIDs.randomBase64UUID()); break; } case "refresh": { @@ -4435,7 +4435,7 @@ public void testRandomOperations() throws Exception { engine.flush(); } if (randomBoolean()) { - engine.forceMerge(randomBoolean(), between(1, 10), randomBoolean(), false, false); + engine.forceMerge(randomBoolean(), between(1, 10), randomBoolean(), false, false, UUIDs.randomBase64UUID()); } } if (engine.engineConfig.getIndexSettings().isSoftDeleteEnabled()) { @@ -5135,7 +5135,7 @@ public void testShouldPeriodicallyFlushAfterMerge() throws Exception { engine.index(indexForDoc(doc)); assertThat(engine.getTranslog().stats().getUncommittedOperations(), equalTo(2)); engine.refresh("test"); - engine.forceMerge(false, 1, false, false, false); + engine.forceMerge(false, 1, false, false, false, UUIDs.randomBase64UUID()); assertBusy(() -> { // the merge listner runs concurrently after the force merge returned assertThat(engine.shouldPeriodicallyFlush(), equalTo(true)); @@ -5384,7 +5384,7 @@ private void assertOperationHistoryInLucene(List operations) t engine.flush(); } if (rarely()) { - engine.forceMerge(true); + engine.forceMerge(true, 1, false, false, false, UUIDs.randomBase64UUID()); } } MapperService mapperService = createMapperService("test"); @@ -5465,7 +5465,7 @@ public void testKeepMinRetainedSeqNoByMergePolicy() throws IOException { equalTo(engine.getMinRetainedSeqNo())); } if (rarely()) { - engine.forceMerge(randomBoolean()); + engine.forceMerge(randomBoolean(), 1, false, false, false, UUIDs.randomBase64UUID()); } try (Closeable ignored = engine.acquireHistoryRetentionLock(Engine.HistorySource.INDEX)) { long minRetainSeqNos = engine.getMinRetainedSeqNo(); @@ -5955,7 +5955,7 @@ public void testPruneAwayDeletedButRetainedIds() throws Exception { for (int i = 0; i < numDocs; i++) { index(engine, i); } - engine.forceMerge(true); + engine.forceMerge(true, 1, false, false, false, UUIDs.randomBase64UUID()); engine.delete(new Engine.Delete("_doc", "0", newUid("0"), primaryTerm.get())); engine.refresh("test"); // now we have 2 segments since we now added a tombstone plus the old segment with the delete @@ -5974,7 +5974,7 @@ public void testPruneAwayDeletedButRetainedIds() throws Exception { } // lets force merge the tombstone and the original segment and make sure the doc is still there but the ID term is gone - engine.forceMerge(true); + engine.forceMerge(true, 1, false, false, false, UUIDs.randomBase64UUID()); engine.refresh("test"); try (Engine.Searcher searcher = engine.acquireSearcher("test")) { IndexReader reader = searcher.getIndexReader(); @@ -6018,7 +6018,7 @@ public void testRecoverFromLocalTranslog() throws Exception { engine.flush(); } if (randomInt(100) < 5) { - engine.forceMerge(randomBoolean(), 1, false, false, false); + engine.forceMerge(randomBoolean(), 1, false, false, false, UUIDs.randomBase64UUID()); } } if (randomBoolean()) { diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index f34d0cbc10444..55a515ea99a30 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -4183,4 +4183,22 @@ public void testCloseShardWhileEngineIsWarming() throws Exception { recoveryThread.join(); shard.store().close(); } + + public void testRecordsForceMerges() throws IOException { + IndexShard shard = newStartedShard(true); + final String initialForceMergeUUID = ((InternalEngine) shard.getEngine()).getForceMergeUUID(); + assertThat(initialForceMergeUUID, nullValue()); + final ForceMergeRequest firstForceMergeRequest = new ForceMergeRequest().maxNumSegments(1); + shard.forceMerge(firstForceMergeRequest); + final String secondForceMergeUUID = ((InternalEngine) shard.getEngine()).getForceMergeUUID(); + assertThat(secondForceMergeUUID, notNullValue()); + assertThat(secondForceMergeUUID, equalTo(firstForceMergeRequest.forceMergeUUID())); + final ForceMergeRequest secondForceMergeRequest = new ForceMergeRequest().maxNumSegments(1); + shard.forceMerge(secondForceMergeRequest); + final String thirdForceMergeUUID = ((InternalEngine) shard.getEngine()).getForceMergeUUID(); + assertThat(thirdForceMergeUUID, notNullValue()); + assertThat(thirdForceMergeUUID, not(equalTo(secondForceMergeUUID))); + assertThat(thirdForceMergeUUID, equalTo(secondForceMergeRequest.forceMergeUUID())); + closeShards(shard); + } }