Skip to content

Commit

Permalink
Record Force Merges in Live Commit Data (#52694)
Browse files Browse the repository at this point in the history
* Record Force Merges in live commit data

Prerequisite of #52182. Record force merges in the live commit data
so two shard states with the same sequence number that differ only in whether
or not they have been force merged can be distinguished when creating snapshots.
  • Loading branch information
original-brownbear authored Mar 10, 2020
1 parent 1073d09 commit 713e931
Show file tree
Hide file tree
Showing 11 changed files with 192 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.SegmentCommitInfo;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.index.mapper.ParsedDocument;

import java.io.IOException;
Expand Down Expand Up @@ -89,7 +90,7 @@ public synchronized MergePolicy.OneMerge getNextMerge() {
StreamSupport.stream(e.getLastCommittedSegmentInfos().spliterator(), false).collect(Collectors.toList());
segmentsReference.set(segments);
// trigger a background merge that will be managed by the concurrent merge scheduler
e.forceMerge(randomBoolean(), 0, false, false, false);
e.forceMerge(randomBoolean(), 0, false, false, false, UUIDs.randomBase64UUID());
/*
* Merging happens in the background on a merge thread, and the maybeDie handler is invoked on yet another thread; we have
* to wait for these events to finish.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@

package org.elasticsearch.action.admin.indices.forcemerge;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.broadcast.BroadcastRequest;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;

Expand Down Expand Up @@ -53,20 +56,35 @@ public static final class Defaults {
private boolean onlyExpungeDeletes = Defaults.ONLY_EXPUNGE_DELETES;
private boolean flush = Defaults.FLUSH;

private static final Version FORCE_MERGE_UUID_VERSION = Version.V_8_0_0;

/**
* Force merge UUID to store in the live commit data of a shard under
* {@link org.elasticsearch.index.engine.Engine#FORCE_MERGE_UUID_KEY} after force merging it.
*/
@Nullable
private final String forceMergeUUID;

/**
* Constructs a merge request over one or more indices.
*
* @param indices The indices to merge, no indices passed means all indices will be merged.
*/
public ForceMergeRequest(String... indices) {
super(indices);
forceMergeUUID = UUIDs.randomBase64UUID();
}

public ForceMergeRequest(StreamInput in) throws IOException {
super(in);
maxNumSegments = in.readInt();
onlyExpungeDeletes = in.readBoolean();
flush = in.readBoolean();
if (in.getVersion().onOrAfter(FORCE_MERGE_UUID_VERSION)) {
forceMergeUUID = in.readOptionalString();
} else {
forceMergeUUID = null;
}
}

/**
Expand Down Expand Up @@ -103,6 +121,15 @@ public ForceMergeRequest onlyExpungeDeletes(boolean onlyExpungeDeletes) {
return this;
}

/**
* Force merge UUID to use when force merging or {@code null} if not using one in a mixed version cluster containing nodes older than
* {@link #FORCE_MERGE_UUID_VERSION}.
*/
@Nullable
public String forceMergeUUID() {
return forceMergeUUID;
}

/**
* Should flush be performed after the merge. Defaults to {@code true}.
*/
Expand Down Expand Up @@ -132,6 +159,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeInt(maxNumSegments);
out.writeBoolean(onlyExpungeDeletes);
out.writeBoolean(flush);
if (out.getVersion().onOrAfter(FORCE_MERGE_UUID_VERSION)) {
out.writeOptionalString(forceMergeUUID);
}
}

@Override
Expand Down
11 changes: 3 additions & 8 deletions server/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ public abstract class Engine implements Closeable {

public static final String SYNC_COMMIT_ID = "sync_id"; // TODO: Remove sync_id in 9.0
public static final String HISTORY_UUID_KEY = "history_uuid";
public static final String FORCE_MERGE_UUID_KEY = "force_merge_uuid";
public static final String MIN_RETAINED_SEQNO = "min_retained_seq_no";
public static final String MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID = "max_unsafe_auto_id_timestamp";

Expand Down Expand Up @@ -1035,18 +1036,12 @@ public final void flush() throws EngineException {
*/
public abstract void rollTranslogGeneration() throws EngineException;

/**
* Force merges to 1 segment
*/
public void forceMerge(boolean flush) throws IOException {
forceMerge(flush, 1, false, false, false);
}

/**
* Triggers a forced merge on this engine
*/
public abstract void forceMerge(boolean flush, int maxNumSegments, boolean onlyExpungeDeletes,
boolean upgrade, boolean upgradeOnlyAncientSegments) throws EngineException, IOException;
boolean upgrade, boolean upgradeOnlyAncientSegments,
@Nullable String forceMergeUUID) throws EngineException, IOException;

/**
* Snapshots the most recent index and returns a handle to it. If needed will try and "commit" the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,12 @@ public class InternalEngine extends Engine {
@Nullable
private final String historyUUID;

/**
* UUID value that is updated every time the engine is force merged.
*/
@Nullable
private volatile String forceMergeUUID;

public InternalEngine(EngineConfig engineConfig) {
this(engineConfig, LocalCheckpointTracker::new);
}
Expand Down Expand Up @@ -222,7 +228,9 @@ public InternalEngine(EngineConfig engineConfig) {
this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier);
writer = createWriter();
bootstrapAppendOnlyInfoFromWriter(writer);
historyUUID = loadHistoryUUID(writer);
final Map<String, String> commitData = commitDataAsMap(writer);
historyUUID = loadHistoryUUID(commitData);
forceMergeUUID = commitData.get(FORCE_MERGE_UUID_KEY);
indexWriter = writer;
} catch (IOException | TranslogCorruptedException e) {
throw new EngineCreationFailureException(shardId, "failed to create engine", e);
Expand Down Expand Up @@ -558,6 +566,12 @@ public String getHistoryUUID() {
return historyUUID;
}

/** returns the force merge uuid for the engine */
@Nullable
public String getForceMergeUUID() {
return forceMergeUUID;
}

/** Returns how many bytes we are currently moving from indexing buffer to segments on disk */
@Override
public long getWritingBytes() {
Expand All @@ -567,8 +581,8 @@ public long getWritingBytes() {
/**
* Reads the current stored history ID from the IW commit data.
*/
private String loadHistoryUUID(final IndexWriter writer) {
final String uuid = commitDataAsMap(writer).get(HISTORY_UUID_KEY);
private String loadHistoryUUID(Map<String, String> commitData) {
final String uuid = commitData.get(HISTORY_UUID_KEY);
if (uuid == null) {
throw new IllegalStateException("commit doesn't contain history uuid");
}
Expand Down Expand Up @@ -1815,7 +1829,8 @@ final Map<BytesRef, VersionValue> getVersionMap() {

@Override
public void forceMerge(final boolean flush, int maxNumSegments, boolean onlyExpungeDeletes,
final boolean upgrade, final boolean upgradeOnlyAncientSegments) throws EngineException, IOException {
final boolean upgrade, final boolean upgradeOnlyAncientSegments,
final String forceMergeUUID) throws EngineException, IOException {
if (onlyExpungeDeletes && maxNumSegments >= 0) {
throw new IllegalArgumentException("only_expunge_deletes and max_num_segments are mutually exclusive");
}
Expand Down Expand Up @@ -1850,6 +1865,7 @@ public void forceMerge(final boolean flush, int maxNumSegments, boolean onlyExpu
indexWriter.maybeMerge();
} else {
indexWriter.forceMerge(maxNumSegments, true /* blocks and waits for merges*/);
this.forceMergeUUID = forceMergeUUID;
}
if (flush) {
flush(false, true);
Expand Down Expand Up @@ -2297,12 +2313,16 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl
* {@link IndexWriter#commit()} call flushes all documents, we defer computation of the maximum sequence number to the time
* of invocation of the commit data iterator (which occurs after all documents have been flushed to Lucene).
*/
final Map<String, String> commitData = new HashMap<>(6);
final Map<String, String> commitData = new HashMap<>(7);
commitData.put(Translog.TRANSLOG_UUID_KEY, translog.getTranslogUUID());
commitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(localCheckpoint));
commitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(localCheckpointTracker.getMaxSeqNo()));
commitData.put(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.toString(maxUnsafeAutoIdTimestamp.get()));
commitData.put(HISTORY_UUID_KEY, historyUUID);
final String currentForceMergeUUID = forceMergeUUID;
if (currentForceMergeUUID != null) {
commitData.put(FORCE_MERGE_UUID_KEY, currentForceMergeUUID);
}
commitData.put(Engine.MIN_RETAINED_SEQNO, Long.toString(softDeletesPolicy.getMinRetainedSeqNo()));
logger.trace("committing writer with commit data [{}]", commitData);
return commitData.entrySet().iterator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ public void flush(boolean force, boolean waitIfOngoing) throws EngineException {

@Override
public void forceMerge(boolean flush, int maxNumSegments, boolean onlyExpungeDeletes,
boolean upgrade, boolean upgradeOnlyAncientSegments) {
boolean upgrade, boolean upgradeOnlyAncientSegments, String forceMergeUUID) {
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1073,7 +1073,7 @@ public void forceMerge(ForceMergeRequest forceMerge) throws IOException {
}
Engine engine = getEngine();
engine.forceMerge(forceMerge.flush(), forceMerge.maxNumSegments(),
forceMerge.onlyExpungeDeletes(), false, false);
forceMerge.onlyExpungeDeletes(), false, false, forceMerge.forceMergeUUID());
}

/**
Expand All @@ -1089,7 +1089,7 @@ public org.apache.lucene.util.Version upgrade(UpgradeRequest upgrade) throws IOE
final Engine engine = getEngine();
engine.forceMerge(true, // we need to flush at the end to make sure the upgrade is durable
Integer.MAX_VALUE, // we just want to upgrade the segments, not actually optimize to a single segment
false, true, upgrade.upgradeOnlyAncientSegments());
false, true, upgrade.upgradeOnlyAncientSegments(), null);
org.apache.lucene.util.Version version = minimumCompatibleVersion();
if (logger.isTraceEnabled()) {
logger.trace("upgraded segments for {} from version {} to version {}", shardId, previousVersion, version);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
Expand Down Expand Up @@ -135,7 +136,7 @@ void recoverFromLocalShards(Consumer<MappingMetaData> mappingUpdateConsumer, fin
// just trigger a merge to do housekeeping on the
// copied segments - we will also see them in stats etc.
indexShard.getEngine().forceMerge(false, -1, false,
false, false);
false, false, UUIDs.randomBase64UUID());
return true;
} catch (IOException ex) {
throw new IndexShardRecoveryException(indexShard.shardId(), "failed to recover from local shards", ex);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.action.admin.indices.forcemerge;

import org.elasticsearch.action.admin.indices.flush.FlushResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.test.ESIntegTestCase;

import java.io.IOException;

import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;

public class ForceMergeIT extends ESIntegTestCase {

public void testForceMergeUUIDConsistent() throws IOException {
internalCluster().ensureAtLeastNumDataNodes(2);
final String index = "test-index";
createIndex(index,
Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1).build());
ensureGreen(index);
final ClusterState state = clusterService().state();
final IndexRoutingTable indexShardRoutingTables = state.routingTable().getIndicesRouting().get(index);
final IndexShardRoutingTable shardRouting = indexShardRoutingTables.getShards().get(0);
final String primaryNodeId = shardRouting.primaryShard().currentNodeId();
final String replicaNodeId = shardRouting.replicaShards().get(0).currentNodeId();
final Index idx = shardRouting.primaryShard().index();
final IndicesService primaryIndicesService =
internalCluster().getInstance(IndicesService.class, state.nodes().get(primaryNodeId).getName());
final IndicesService replicaIndicesService = internalCluster().getInstance(IndicesService.class,
state.nodes().get(replicaNodeId).getName());
final IndexShard primary = primaryIndicesService.indexService(idx).getShard(0);
final IndexShard replica = replicaIndicesService.indexService(idx).getShard(0);

assertThat(getForceMergeUUID(primary), nullValue());
assertThat(getForceMergeUUID(replica), nullValue());

final ForceMergeResponse forceMergeResponse =
client().admin().indices().prepareForceMerge(index).setMaxNumSegments(1).get();

assertThat(forceMergeResponse.getFailedShards(), is(0));
assertThat(forceMergeResponse.getSuccessfulShards(), is(2));

// Force flush to force a new commit that contains the force flush UUID
final FlushResponse flushResponse = client().admin().indices().prepareFlush(index).setForce(true).get();
assertThat(flushResponse.getFailedShards(), is(0));
assertThat(flushResponse.getSuccessfulShards(), is(2));

final String primaryForceMergeUUID = getForceMergeUUID(primary);
assertThat(primaryForceMergeUUID, notNullValue());

final String replicaForceMergeUUID = getForceMergeUUID(replica);
assertThat(replicaForceMergeUUID, notNullValue());
assertThat(primaryForceMergeUUID, is(replicaForceMergeUUID));
}

private static String getForceMergeUUID(IndexShard indexShard) throws IOException {
try (Engine.IndexCommitRef indexCommitRef = indexShard.acquireLastIndexCommit(true)) {
return indexCommitRef.getIndexCommit().getUserData().get(Engine.FORCE_MERGE_UUID_KEY);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ReleasableLock;
import org.elasticsearch.index.Index;
Expand Down Expand Up @@ -98,7 +99,7 @@ void syncFlush(String syncId) throws IOException {
// make sure that we have committed translog; otherwise, we can flush after relaying translog in store recovery
flush(true, true);
// make sure that background merges won't happen; otherwise, IndexWriter#hasUncommittedChanges can become true again
forceMerge(false);
forceMerge(false, 1, false, false, false, UUIDs.randomBase64UUID());
assertNotNull(indexWriter);
try (ReleasableLock ignored = writeLock.acquire()) {
assertThat(getTranslogStats().getUncommittedOperations(), equalTo(0));
Expand Down
Loading

0 comments on commit 713e931

Please sign in to comment.