From 7189c57b6cbc07014b16f18942172b7d38d089ad Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Wed, 11 Mar 2020 06:30:36 +0100 Subject: [PATCH] Record Force Merges in Live Commit Data (#52694) (#53372) * Record Force Merges in live commit data Prerequisite of #52182. Record force merges in the live commit data so two shard states with the same sequence number that differ only in whether or not they have been force merged can be distinguished when creating snapshots. --- .../index/engine/EvilInternalEngineTests.java | 3 +- .../indices/forcemerge/ForceMergeRequest.java | 30 +++++++ .../elasticsearch/index/engine/Engine.java | 11 +-- .../index/engine/InternalEngine.java | 28 +++++- .../index/engine/ReadOnlyEngine.java | 2 +- .../elasticsearch/index/shard/IndexShard.java | 4 +- .../index/shard/StoreRecovery.java | 3 +- .../indices/forcemerge/ForceMergeIT.java | 89 +++++++++++++++++++ .../index/engine/InternalEngineTests.java | 40 ++++----- .../index/shard/IndexShardTests.java | 18 ++++ 10 files changed, 191 insertions(+), 37 deletions(-) create mode 100644 server/src/test/java/org/elasticsearch/action/admin/indices/forcemerge/ForceMergeIT.java diff --git a/qa/evil-tests/src/test/java/org/elasticsearch/index/engine/EvilInternalEngineTests.java b/qa/evil-tests/src/test/java/org/elasticsearch/index/engine/EvilInternalEngineTests.java index c473e4e010552..f7a621c4125a9 100644 --- a/qa/evil-tests/src/test/java/org/elasticsearch/index/engine/EvilInternalEngineTests.java +++ b/qa/evil-tests/src/test/java/org/elasticsearch/index/engine/EvilInternalEngineTests.java @@ -22,6 +22,7 @@ import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.MergePolicy; import org.apache.lucene.index.SegmentCommitInfo; +import org.elasticsearch.common.UUIDs; import org.elasticsearch.index.mapper.ParsedDocument; import java.io.IOException; @@ -89,7 +90,7 @@ public synchronized MergePolicy.OneMerge getNextMerge() { StreamSupport.stream(e.getLastCommittedSegmentInfos().spliterator(), false).collect(Collectors.toList()); segmentsReference.set(segments); // trigger a background merge that will be managed by the concurrent merge scheduler - e.forceMerge(randomBoolean(), 0, false, false, false); + e.forceMerge(randomBoolean(), 0, false, false, false, UUIDs.randomBase64UUID()); /* * Merging happens in the background on a merge thread, and the maybeDie handler is invoked on yet another thread; we have * to wait for these events to finish. diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/forcemerge/ForceMergeRequest.java b/server/src/main/java/org/elasticsearch/action/admin/indices/forcemerge/ForceMergeRequest.java index 20480c45f10a5..7b16cc52e724c 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/forcemerge/ForceMergeRequest.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/forcemerge/ForceMergeRequest.java @@ -19,7 +19,10 @@ package org.elasticsearch.action.admin.indices.forcemerge; +import org.elasticsearch.Version; import org.elasticsearch.action.support.broadcast.BroadcastRequest; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -50,6 +53,15 @@ public static final class Defaults { private boolean onlyExpungeDeletes = Defaults.ONLY_EXPUNGE_DELETES; private boolean flush = Defaults.FLUSH; + private static final Version FORCE_MERGE_UUID_VERSION = Version.V_7_7_0; + + /** + * Force merge UUID to store in the live commit data of a shard under + * {@link org.elasticsearch.index.engine.Engine#FORCE_MERGE_UUID_KEY} after force merging it. + */ + @Nullable + private final String forceMergeUUID; + /** * Constructs a merge request over one or more indices. * @@ -57,6 +69,7 @@ public static final class Defaults { */ public ForceMergeRequest(String... indices) { super(indices); + forceMergeUUID = UUIDs.randomBase64UUID(); } public ForceMergeRequest(StreamInput in) throws IOException { @@ -64,6 +77,11 @@ public ForceMergeRequest(StreamInput in) throws IOException { maxNumSegments = in.readInt(); onlyExpungeDeletes = in.readBoolean(); flush = in.readBoolean(); + if (in.getVersion().onOrAfter(FORCE_MERGE_UUID_VERSION)) { + forceMergeUUID = in.readOptionalString(); + } else { + forceMergeUUID = null; + } } /** @@ -100,6 +118,15 @@ public ForceMergeRequest onlyExpungeDeletes(boolean onlyExpungeDeletes) { return this; } + /** + * Force merge UUID to use when force merging or {@code null} if not using one in a mixed version cluster containing nodes older than + * {@link #FORCE_MERGE_UUID_VERSION}. + */ + @Nullable + public String forceMergeUUID() { + return forceMergeUUID; + } + /** * Should flush be performed after the merge. Defaults to {@code true}. */ @@ -129,6 +156,9 @@ public void writeTo(StreamOutput out) throws IOException { out.writeInt(maxNumSegments); out.writeBoolean(onlyExpungeDeletes); out.writeBoolean(flush); + if (out.getVersion().onOrAfter(FORCE_MERGE_UUID_VERSION)) { + out.writeOptionalString(forceMergeUUID); + } } @Override diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 6a40d1dd578a9..7d4aa125eb542 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -109,6 +109,7 @@ public abstract class Engine implements Closeable { public static final String SYNC_COMMIT_ID = "sync_id"; public static final String HISTORY_UUID_KEY = "history_uuid"; + public static final String FORCE_MERGE_UUID_KEY = "force_merge_uuid"; public static final String MIN_RETAINED_SEQNO = "min_retained_seq_no"; public static final String MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID = "max_unsafe_auto_id_timestamp"; @@ -1073,18 +1074,12 @@ public final CommitId flush() throws EngineException { */ public abstract void rollTranslogGeneration() throws EngineException; - /** - * Force merges to 1 segment - */ - public void forceMerge(boolean flush) throws IOException { - forceMerge(flush, 1, false, false, false); - } - /** * Triggers a forced merge on this engine */ public abstract void forceMerge(boolean flush, int maxNumSegments, boolean onlyExpungeDeletes, - boolean upgrade, boolean upgradeOnlyAncientSegments) throws EngineException, IOException; + boolean upgrade, boolean upgradeOnlyAncientSegments, + @Nullable String forceMergeUUID) throws EngineException, IOException; /** * Snapshots the most recent index and returns a handle to it. If needed will try and "commit" the diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index cb7eb3d512212..95c60b6201e7e 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -190,6 +190,12 @@ public class InternalEngine extends Engine { @Nullable private final String historyUUID; + /** + * UUID value that is updated every time the engine is force merged. + */ + @Nullable + private volatile String forceMergeUUID; + public InternalEngine(EngineConfig engineConfig) { this(engineConfig, LocalCheckpointTracker::new); } @@ -236,7 +242,9 @@ public InternalEngine(EngineConfig engineConfig) { this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier); writer = createWriter(); bootstrapAppendOnlyInfoFromWriter(writer); - historyUUID = loadHistoryUUID(writer); + final Map commitData = commitDataAsMap(writer); + historyUUID = loadHistoryUUID(commitData); + forceMergeUUID = commitData.get(FORCE_MERGE_UUID_KEY); indexWriter = writer; } catch (IOException | TranslogCorruptedException e) { throw new EngineCreationFailureException(shardId, "failed to create engine", e); @@ -604,6 +612,12 @@ public String getHistoryUUID() { return historyUUID; } + /** returns the force merge uuid for the engine */ + @Nullable + public String getForceMergeUUID() { + return forceMergeUUID; + } + /** Returns how many bytes we are currently moving from indexing buffer to segments on disk */ @Override public long getWritingBytes() { @@ -613,8 +627,8 @@ public long getWritingBytes() { /** * Reads the current stored history ID from the IW commit data. */ - private String loadHistoryUUID(final IndexWriter writer) { - final String uuid = commitDataAsMap(writer).get(HISTORY_UUID_KEY); + private String loadHistoryUUID(Map commitData) { + final String uuid = commitData.get(HISTORY_UUID_KEY); if (uuid == null) { throw new IllegalStateException("commit doesn't contain history uuid"); } @@ -1945,7 +1959,8 @@ final Map getVersionMap() { @Override public void forceMerge(final boolean flush, int maxNumSegments, boolean onlyExpungeDeletes, - final boolean upgrade, final boolean upgradeOnlyAncientSegments) throws EngineException, IOException { + final boolean upgrade, final boolean upgradeOnlyAncientSegments, + final String forceMergeUUID) throws EngineException, IOException { /* * We do NOT acquire the readlock here since we are waiting on the merges to finish * that's fine since the IW.rollback should stop all the threads and trigger an IOException @@ -1977,6 +1992,7 @@ public void forceMerge(final boolean flush, int maxNumSegments, boolean onlyExpu indexWriter.maybeMerge(); } else { indexWriter.forceMerge(maxNumSegments, true /* blocks and waits for merges*/); + this.forceMergeUUID = forceMergeUUID; } if (flush) { if (tryRenewSyncCommit() == false) { @@ -2448,6 +2464,10 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl if (softDeleteEnabled) { commitData.put(Engine.MIN_RETAINED_SEQNO, Long.toString(softDeletesPolicy.getMinRetainedSeqNo())); } + final String currentForceMergeUUID = forceMergeUUID; + if (currentForceMergeUUID != null) { + commitData.put(FORCE_MERGE_UUID_KEY, currentForceMergeUUID); + } logger.trace("committing writer with commit data [{}]", commitData); return commitData.entrySet().iterator(); }); diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index 04669f0f32c72..af282311724c4 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -395,7 +395,7 @@ public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineExcepti @Override public void forceMerge(boolean flush, int maxNumSegments, boolean onlyExpungeDeletes, - boolean upgrade, boolean upgradeOnlyAncientSegments) { + boolean upgrade, boolean upgradeOnlyAncientSegments, String forceMergeUUID) { } @Override diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index a207653b6db93..72715222f8861 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1117,7 +1117,7 @@ public void forceMerge(ForceMergeRequest forceMerge) throws IOException { } Engine engine = getEngine(); engine.forceMerge(forceMerge.flush(), forceMerge.maxNumSegments(), - forceMerge.onlyExpungeDeletes(), false, false); + forceMerge.onlyExpungeDeletes(), false, false, forceMerge.forceMergeUUID()); } /** @@ -1133,7 +1133,7 @@ public org.apache.lucene.util.Version upgrade(UpgradeRequest upgrade) throws IOE final Engine engine = getEngine(); engine.forceMerge(true, // we need to flush at the end to make sure the upgrade is durable Integer.MAX_VALUE, // we just want to upgrade the segments, not actually optimize to a single segment - false, true, upgrade.upgradeOnlyAncientSegments()); + false, true, upgrade.upgradeOnlyAncientSegments(), null); org.apache.lucene.util.Version version = minimumCompatibleVersion(); if (logger.isTraceEnabled()) { logger.trace("upgraded segments for {} from version {} to version {}", shardId, previousVersion, version); diff --git a/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java b/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java index e82d4ac58f334..fdf551e46de6b 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java +++ b/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java @@ -38,6 +38,7 @@ import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource; +import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; @@ -139,7 +140,7 @@ void recoverFromLocalShards(BiConsumer mappingUpdateCon // just trigger a merge to do housekeeping on the // copied segments - we will also see them in stats etc. indexShard.getEngine().forceMerge(false, -1, false, - false, false); + false, false, UUIDs.randomBase64UUID()); return true; } catch (IOException ex) { throw new IndexShardRecoveryException(indexShard.shardId(), "failed to recover from local shards", ex); diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/forcemerge/ForceMergeIT.java b/server/src/test/java/org/elasticsearch/action/admin/indices/forcemerge/ForceMergeIT.java new file mode 100644 index 0000000000000..095a166f4f0b7 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/forcemerge/ForceMergeIT.java @@ -0,0 +1,89 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.forcemerge; + +import org.elasticsearch.action.admin.indices.flush.FlushResponse; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.test.ESIntegTestCase; + +import java.io.IOException; + +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; + +public class ForceMergeIT extends ESIntegTestCase { + + public void testForceMergeUUIDConsistent() throws IOException { + internalCluster().ensureAtLeastNumDataNodes(2); + final String index = "test-index"; + createIndex(index, + Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1).build()); + ensureGreen(index); + final ClusterState state = clusterService().state(); + final IndexRoutingTable indexShardRoutingTables = state.routingTable().getIndicesRouting().get(index); + final IndexShardRoutingTable shardRouting = indexShardRoutingTables.getShards().get(0); + final String primaryNodeId = shardRouting.primaryShard().currentNodeId(); + final String replicaNodeId = shardRouting.replicaShards().get(0).currentNodeId(); + final Index idx = shardRouting.primaryShard().index(); + final IndicesService primaryIndicesService = + internalCluster().getInstance(IndicesService.class, state.nodes().get(primaryNodeId).getName()); + final IndicesService replicaIndicesService = internalCluster().getInstance(IndicesService.class, + state.nodes().get(replicaNodeId).getName()); + final IndexShard primary = primaryIndicesService.indexService(idx).getShard(0); + final IndexShard replica = replicaIndicesService.indexService(idx).getShard(0); + + assertThat(getForceMergeUUID(primary), nullValue()); + assertThat(getForceMergeUUID(replica), nullValue()); + + final ForceMergeResponse forceMergeResponse = + client().admin().indices().prepareForceMerge(index).setMaxNumSegments(1).get(); + + assertThat(forceMergeResponse.getFailedShards(), is(0)); + assertThat(forceMergeResponse.getSuccessfulShards(), is(2)); + + // Force flush to force a new commit that contains the force flush UUID + final FlushResponse flushResponse = client().admin().indices().prepareFlush(index).setForce(true).get(); + assertThat(flushResponse.getFailedShards(), is(0)); + assertThat(flushResponse.getSuccessfulShards(), is(2)); + + final String primaryForceMergeUUID = getForceMergeUUID(primary); + assertThat(primaryForceMergeUUID, notNullValue()); + + final String replicaForceMergeUUID = getForceMergeUUID(replica); + assertThat(replicaForceMergeUUID, notNullValue()); + assertThat(primaryForceMergeUUID, is(replicaForceMergeUUID)); + } + + private static String getForceMergeUUID(IndexShard indexShard) throws IOException { + try (Engine.IndexCommitRef indexCommitRef = indexShard.acquireLastIndexCommit(true)) { + return indexCommitRef.getIndexCommit().getUserData().get(Engine.FORCE_MERGE_UUID_KEY); + } + } +} diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index db831aa547907..7087b43017165 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -536,7 +536,7 @@ public void testSegmentsWithMergeFlag() throws Exception { engine.flush(); final long gen1 = store.readLastCommittedSegmentsInfo().getGeneration(); // now, optimize and wait for merges, see that we have no merge flag - engine.forceMerge(true); + engine.forceMerge(true, 1, false, false, false, UUIDs.randomBase64UUID()); for (Segment segment : engine.segments(false)) { assertThat(segment.getMergeId(), nullValue()); @@ -546,7 +546,7 @@ public void testSegmentsWithMergeFlag() throws Exception { final boolean flush = randomBoolean(); final long gen2 = store.readLastCommittedSegmentsInfo().getGeneration(); - engine.forceMerge(flush); + engine.forceMerge(flush, 1, false, false, false, UUIDs.randomBase64UUID()); for (Segment segment : engine.segments(false)) { assertThat(segment.getMergeId(), nullValue()); } @@ -1239,7 +1239,7 @@ public void testRenewSyncFlush() throws Exception { assertEquals(3, engine.segments(false).size()); engine.forceMerge(forceMergeFlushes, 1, false, - false, false); + false, false, UUIDs.randomBase64UUID()); if (forceMergeFlushes == false) { engine.refresh("make all segments visible"); assertEquals(4, engine.segments(false).size()); @@ -1476,7 +1476,7 @@ public void testForceMergeWithoutSoftDeletes() throws IOException { try (Engine.Searcher test = engine.acquireSearcher("test")) { assertEquals(numDocs, test.getIndexReader().numDocs()); } - engine.forceMerge(true, 1, false, false, false); + engine.forceMerge(true, 1, false, false, false, UUIDs.randomBase64UUID()); engine.refresh("test"); assertEquals(engine.segments(true).size(), 1); @@ -1484,7 +1484,7 @@ public void testForceMergeWithoutSoftDeletes() throws IOException { Engine.Index index = indexForDoc(doc); engine.delete(new Engine.Delete(index.type(), index.id(), index.uid(), primaryTerm.get())); //expunge deletes - engine.forceMerge(true, 10, true, false, false); + engine.forceMerge(true, 10, true, false, false, UUIDs.randomBase64UUID()); engine.refresh("test"); assertEquals(engine.segments(true).size(), 1); @@ -1497,7 +1497,7 @@ public void testForceMergeWithoutSoftDeletes() throws IOException { index = indexForDoc(doc); engine.delete(new Engine.Delete(index.type(), index.id(), index.uid(), primaryTerm.get())); //expunge deletes - engine.forceMerge(true, 10, false, false, false); + engine.forceMerge(true, 10, false, false, false, UUIDs.randomBase64UUID()); engine.refresh("test"); assertEquals(engine.segments(true).size(), 1); try (Engine.Searcher test = engine.acquireSearcher("test")) { @@ -1605,7 +1605,7 @@ public void testForceMergeWithSoftDeletesRetention() throws Exception { try (Engine.IndexCommitRef safeCommit = engine.acquireSafeIndexCommit()) { safeCommitCheckpoint = Long.parseLong(safeCommit.getIndexCommit().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); } - engine.forceMerge(true, 1, false, false, false); + engine.forceMerge(true, 1, false, false, false, UUIDs.randomBase64UUID()); assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, mapperService); Map ops = readAllOperationsInLucene(engine, mapperService) .stream().collect(Collectors.toMap(Translog.Operation::seqNo, Function.identity())); @@ -1630,7 +1630,7 @@ public void testForceMergeWithSoftDeletesRetention() throws Exception { globalCheckpoint.set(localCheckpoint); engine.syncTranslog(); - engine.forceMerge(true, 1, false, false, false); + engine.forceMerge(true, 1, false, false, false, UUIDs.randomBase64UUID()); assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, mapperService); assertThat(readAllOperationsInLucene(engine, mapperService), hasSize(liveDocs.size())); } @@ -1695,7 +1695,7 @@ public void testForceMergeWithSoftDeletesRetentionAndRecoverySource() throws Exc safeCommit.getIndexCommit().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); minSeqNoToRetain = Math.min(globalCheckpoint.get() + 1 - retainedExtraOps, safeCommitLocalCheckpoint + 1); } - engine.forceMerge(true, 1, false, false, false); + engine.forceMerge(true, 1, false, false, false, UUIDs.randomBase64UUID()); assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, mapperService); Map ops = readAllOperationsInLucene(engine, mapperService) .stream().collect(Collectors.toMap(Translog.Operation::seqNo, Function.identity())); @@ -1739,7 +1739,7 @@ public void testForceMergeWithSoftDeletesRetentionAndRecoverySource() throws Exc globalCheckpoint.set(engine.getPersistedLocalCheckpoint()); engine.syncTranslog(); } - engine.forceMerge(true, 1, false, false, false); + engine.forceMerge(true, 1, false, false, false, UUIDs.randomBase64UUID()); assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, mapperService); assertThat(readAllOperationsInLucene(engine, mapperService), hasSize(liveDocsWithSource.size())); } @@ -1776,7 +1776,7 @@ public void run() { indexed.countDown(); try { engine.forceMerge(randomBoolean(), 1, false, randomBoolean(), - randomBoolean()); + randomBoolean(), UUIDs.randomBase64UUID()); } catch (IOException e) { return; } @@ -1793,7 +1793,7 @@ public void run() { startGun.countDown(); int someIters = randomIntBetween(1, 10); for (int i = 0; i < someIters; i++) { - engine.forceMerge(randomBoolean(), 1, false, randomBoolean(), randomBoolean()); + engine.forceMerge(randomBoolean(), 1, false, randomBoolean(), randomBoolean(), UUIDs.randomBase64UUID()); } indexed.await(); IOUtils.close(engine); @@ -3192,7 +3192,7 @@ public void run() { switch (operation) { case "optimize": { engine.forceMerge(true, 1, false, false, - false); + false, UUIDs.randomBase64UUID()); break; } case "refresh": { @@ -4435,7 +4435,7 @@ public void testRandomOperations() throws Exception { engine.flush(); } if (randomBoolean()) { - engine.forceMerge(randomBoolean(), between(1, 10), randomBoolean(), false, false); + engine.forceMerge(randomBoolean(), between(1, 10), randomBoolean(), false, false, UUIDs.randomBase64UUID()); } } if (engine.engineConfig.getIndexSettings().isSoftDeleteEnabled()) { @@ -5135,7 +5135,7 @@ public void testShouldPeriodicallyFlushAfterMerge() throws Exception { engine.index(indexForDoc(doc)); assertThat(engine.getTranslog().stats().getUncommittedOperations(), equalTo(2)); engine.refresh("test"); - engine.forceMerge(false, 1, false, false, false); + engine.forceMerge(false, 1, false, false, false, UUIDs.randomBase64UUID()); assertBusy(() -> { // the merge listner runs concurrently after the force merge returned assertThat(engine.shouldPeriodicallyFlush(), equalTo(true)); @@ -5384,7 +5384,7 @@ private void assertOperationHistoryInLucene(List operations) t engine.flush(); } if (rarely()) { - engine.forceMerge(true); + engine.forceMerge(true, 1, false, false, false, UUIDs.randomBase64UUID()); } } MapperService mapperService = createMapperService("test"); @@ -5465,7 +5465,7 @@ public void testKeepMinRetainedSeqNoByMergePolicy() throws IOException { equalTo(engine.getMinRetainedSeqNo())); } if (rarely()) { - engine.forceMerge(randomBoolean()); + engine.forceMerge(randomBoolean(), 1, false, false, false, UUIDs.randomBase64UUID()); } try (Closeable ignored = engine.acquireHistoryRetentionLock(Engine.HistorySource.INDEX)) { long minRetainSeqNos = engine.getMinRetainedSeqNo(); @@ -5955,7 +5955,7 @@ public void testPruneAwayDeletedButRetainedIds() throws Exception { for (int i = 0; i < numDocs; i++) { index(engine, i); } - engine.forceMerge(true); + engine.forceMerge(true, 1, false, false, false, UUIDs.randomBase64UUID()); engine.delete(new Engine.Delete("_doc", "0", newUid("0"), primaryTerm.get())); engine.refresh("test"); // now we have 2 segments since we now added a tombstone plus the old segment with the delete @@ -5974,7 +5974,7 @@ public void testPruneAwayDeletedButRetainedIds() throws Exception { } // lets force merge the tombstone and the original segment and make sure the doc is still there but the ID term is gone - engine.forceMerge(true); + engine.forceMerge(true, 1, false, false, false, UUIDs.randomBase64UUID()); engine.refresh("test"); try (Engine.Searcher searcher = engine.acquireSearcher("test")) { IndexReader reader = searcher.getIndexReader(); @@ -6018,7 +6018,7 @@ public void testRecoverFromLocalTranslog() throws Exception { engine.flush(); } if (randomInt(100) < 5) { - engine.forceMerge(randomBoolean(), 1, false, false, false); + engine.forceMerge(randomBoolean(), 1, false, false, false, UUIDs.randomBase64UUID()); } } if (randomBoolean()) { diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index f34d0cbc10444..55a515ea99a30 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -4183,4 +4183,22 @@ public void testCloseShardWhileEngineIsWarming() throws Exception { recoveryThread.join(); shard.store().close(); } + + public void testRecordsForceMerges() throws IOException { + IndexShard shard = newStartedShard(true); + final String initialForceMergeUUID = ((InternalEngine) shard.getEngine()).getForceMergeUUID(); + assertThat(initialForceMergeUUID, nullValue()); + final ForceMergeRequest firstForceMergeRequest = new ForceMergeRequest().maxNumSegments(1); + shard.forceMerge(firstForceMergeRequest); + final String secondForceMergeUUID = ((InternalEngine) shard.getEngine()).getForceMergeUUID(); + assertThat(secondForceMergeUUID, notNullValue()); + assertThat(secondForceMergeUUID, equalTo(firstForceMergeRequest.forceMergeUUID())); + final ForceMergeRequest secondForceMergeRequest = new ForceMergeRequest().maxNumSegments(1); + shard.forceMerge(secondForceMergeRequest); + final String thirdForceMergeUUID = ((InternalEngine) shard.getEngine()).getForceMergeUUID(); + assertThat(thirdForceMergeUUID, notNullValue()); + assertThat(thirdForceMergeUUID, not(equalTo(secondForceMergeUUID))); + assertThat(thirdForceMergeUUID, equalTo(secondForceMergeRequest.forceMergeUUID())); + closeShards(shard); + } }