Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Init global checkpoint after copy commit in peer recovery #40823

Merged
merged 2 commits into from
Apr 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -103,15 +103,16 @@ public ReadOnlyEngine(EngineConfig config, SeqNoStats seqNoStats, TranslogStats
this.translogStats = translogStats == null ? new TranslogStats(0, 0, 0, 0, 0) : translogStats;
if (seqNoStats == null) {
seqNoStats = buildSeqNoStats(lastCommittedSegmentInfos);
// During a peer-recovery the global checkpoint is not known and up to date when the engine
// is created, so we only check the max seq no / global checkpoint coherency when the global
// Before 8.0 the global checkpoint is not known and up to date when the engine is created after
// peer recovery, so we only check the max seq no / global checkpoint coherency when the global
// checkpoint is different from the unassigned sequence number value.
// In addition to that we only execute the check if the index the engine belongs to has been
// created after the refactoring of the Close Index API and its TransportVerifyShardBeforeCloseAction
// that guarantee that all operations have been flushed to Lucene.
final long globalCheckpoint = engineConfig.getGlobalCheckpointSupplier().getAsLong();
if (globalCheckpoint != SequenceNumbers.UNASSIGNED_SEQ_NO
&& engineConfig.getIndexSettings().getIndexVersionCreated().onOrAfter(Version.V_6_7_0)) {
final Version indexVersionCreated = engineConfig.getIndexSettings().getIndexVersionCreated();
if (indexVersionCreated.onOrAfter(Version.V_8_0_0) ||
(globalCheckpoint != SequenceNumbers.UNASSIGNED_SEQ_NO && indexVersionCreated.onOrAfter(Version.V_6_7_0))) {
if (seqNoStats.getMaxSeqNo() != globalCheckpoint) {
assertMaxSeqNoEqualsToGlobalCheckpoint(seqNoStats.getMaxSeqNo(), globalCheckpoint);
throw new IllegalStateException("Maximum sequence number [" + seqNoStats.getMaxSeqNo()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,8 @@ public PeerRecoveryTargetService(ThreadPool threadPool, TransportService transpo
FilesInfoRequestHandler());
transportService.registerRequestHandler(Actions.FILE_CHUNK, RecoveryFileChunkRequest::new, ThreadPool.Names.GENERIC, new
FileChunkTransportRequestHandler());
transportService.registerRequestHandler(Actions.CLEAN_FILES, RecoveryCleanFilesRequest::new, ThreadPool.Names.GENERIC, new
CleanFilesRequestHandler());
transportService.registerRequestHandler(Actions.CLEAN_FILES, ThreadPool.Names.GENERIC,
RecoveryCleanFilesRequest::new, new CleanFilesRequestHandler());
transportService.registerRequestHandler(Actions.PREPARE_TRANSLOG, ThreadPool.Names.GENERIC,
RecoveryPrepareForTranslogOperationsRequest::new, new PrepareForTranslogOperationsRequestHandler());
transportService.registerRequestHandler(Actions.TRANSLOG_OPS, RecoveryTranslogOperationsRequest::new, ThreadPool.Names.GENERIC,
Expand Down Expand Up @@ -540,7 +540,7 @@ class CleanFilesRequestHandler implements TransportRequestHandler<RecoveryCleanF
public void messageReceived(RecoveryCleanFilesRequest request, TransportChannel channel, Task task) throws Exception {
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId()
)) {
recoveryRef.target().cleanFiles(request.totalTranslogOps(), request.sourceMetaSnapshot());
recoveryRef.target().cleanFiles(request.totalTranslogOps(), request.getGlobalCheckpoint(), request.sourceMetaSnapshot());
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@

package org.elasticsearch.indices.recovery;

import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.transport.TransportRequest;
Expand All @@ -29,37 +31,32 @@

public class RecoveryCleanFilesRequest extends TransportRequest {

private long recoveryId;
private ShardId shardId;
private final long recoveryId;
private final ShardId shardId;
private final Store.MetadataSnapshot snapshotFiles;
private final int totalTranslogOps;
private final long globalCheckpoint;

private Store.MetadataSnapshot snapshotFiles;
private int totalTranslogOps = RecoveryState.Translog.UNKNOWN;

public RecoveryCleanFilesRequest() {
}

RecoveryCleanFilesRequest(long recoveryId, ShardId shardId, Store.MetadataSnapshot snapshotFiles, int totalTranslogOps) {
RecoveryCleanFilesRequest(long recoveryId, ShardId shardId, Store.MetadataSnapshot snapshotFiles,
int totalTranslogOps, long globalCheckpoint) {
this.recoveryId = recoveryId;
this.shardId = shardId;
this.snapshotFiles = snapshotFiles;
this.totalTranslogOps = totalTranslogOps;
this.globalCheckpoint = globalCheckpoint;
}

public long recoveryId() {
return this.recoveryId;
}

public ShardId shardId() {
return shardId;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
RecoveryCleanFilesRequest(StreamInput in) throws IOException {
super(in);
recoveryId = in.readLong();
shardId = ShardId.readShardId(in);
snapshotFiles = new Store.MetadataSnapshot(in);
totalTranslogOps = in.readVInt();
if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
globalCheckpoint = in.readZLong();
} else {
globalCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO;
}
}

@Override
Expand All @@ -69,13 +66,28 @@ public void writeTo(StreamOutput out) throws IOException {
shardId.writeTo(out);
snapshotFiles.writeTo(out);
out.writeVInt(totalTranslogOps);
if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
out.writeZLong(globalCheckpoint);
}
}

public Store.MetadataSnapshot sourceMetaSnapshot() {
return snapshotFiles;
}

public long recoveryId() {
return this.recoveryId;
}

public ShardId shardId() {
return shardId;
}

public int totalTranslogOps() {
return totalTranslogOps;
}

public long getGlobalCheckpoint() {
return globalCheckpoint;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ public void recoverToTarget(ActionListener<RecoveryResponse> listener) {
startingSeqNo = 0;
try {
final int estimateNumOps = shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo);
sendFileResult = phase1(phase1Snapshot.getIndexCommit(), () -> estimateNumOps);
sendFileResult = phase1(phase1Snapshot.getIndexCommit(), shard.getGlobalCheckpoint(), () -> estimateNumOps);
} catch (final Exception e) {
throw new RecoveryEngineException(shard.shardId(), 1, "phase1 failed", e);
} finally {
Expand Down Expand Up @@ -332,7 +332,7 @@ static final class SendFileResult {
* segments that are missing. Only segments that have the same size and
* checksum can be reused
*/
public SendFileResult phase1(final IndexCommit snapshot, final Supplier<Integer> translogOps) {
public SendFileResult phase1(final IndexCommit snapshot, final long globalCheckpoint, final Supplier<Integer> translogOps) {
cancellableThreads.checkForCancel();
// Total size of segment files that are recovered
long totalSize = 0;
Expand Down Expand Up @@ -422,7 +422,7 @@ public SendFileResult phase1(final IndexCommit snapshot, final Supplier<Integer>
// are deleted
try {
cancellableThreads.executeIO(() ->
recoveryTarget.cleanFiles(translogOps.get(), recoverySourceMetadata));
recoveryTarget.cleanFiles(translogOps.get(), globalCheckpoint, recoverySourceMetadata));
} catch (RemoteTransportException | IOException targetException) {
final IOException corruptIndexException;
// we realized that after the index was copied and we wanted to finalize the recovery
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,9 @@ public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTra
ActionListener.completeWith(listener, () -> {
state().getTranslog().totalOperations(totalTranslogOps);
indexShard().openEngineAndSkipTranslogRecovery();
assert indexShard.getGlobalCheckpoint() >= indexShard.seqNoStats().getMaxSeqNo() ||
indexShard.indexSettings().getIndexVersionCreated().before(Version.V_8_0_0)
: "global checkpoint is not initialized [" + indexShard.seqNoStats() + "]";
return null;
});
}
Expand Down Expand Up @@ -382,7 +385,7 @@ public void receiveFileInfo(List<String> phase1FileNames,
}

@Override
public void cleanFiles(int totalTranslogOps, Store.MetadataSnapshot sourceMetaData) throws IOException {
public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetaData) throws IOException {
state().getTranslog().totalOperations(totalTranslogOps);
// first, we go and move files that were created with the recovery id suffix to
// the actual names, its ok if we have a corrupted index here, since we have replicas
Expand All @@ -395,10 +398,11 @@ public void cleanFiles(int totalTranslogOps, Store.MetadataSnapshot sourceMetaDa
if (indexShard.indexSettings().getIndexVersionCreated().before(Version.V_6_0_0_rc1)) {
store.ensureIndexHasHistoryUUID();
}
// TODO: Assign the global checkpoint to the max_seqno of the safe commit if the index version >= 6.2
assert globalCheckpoint >= Long.parseLong(sourceMetaData.getCommitUserData().get(SequenceNumbers.MAX_SEQ_NO))
|| indexShard.indexSettings().getIndexVersionCreated().before(Version.V_8_0_0) :
"invalid global checkpoint[" + globalCheckpoint + "] source_meta_data [" + sourceMetaData.getCommitUserData() + "]";
final String translogUUID = Translog.createEmptyTranslog(
indexShard.shardPath().resolveTranslog(), SequenceNumbers.UNASSIGNED_SEQ_NO, shardId,
indexShard.getPendingPrimaryTerm());
indexShard.shardPath().resolveTranslog(), globalCheckpoint, shardId, indexShard.getPendingPrimaryTerm());
store.associateIndexWithNewTranslog(translogUUID);

if (indexShard.getRetentionLeases().leases().isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,12 @@ void receiveFileInfo(List<String> phase1FileNames,
/**
* After all source files has been sent over, this command is sent to the target so it can clean any local
* files that are not part of the source store
*
* @param totalTranslogOps an update number of translog operations that will be replayed later on
* @param sourceMetaData meta data of the source store
* @param globalCheckpoint the global checkpoint on the primary
* @param sourceMetaData meta data of the source store
*/
void cleanFiles(int totalTranslogOps, Store.MetadataSnapshot sourceMetaData) throws IOException;
void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetaData) throws IOException;

/** writes a partial file chunk to the target store */
void writeFileChunk(StoreFileMetaData fileMetaData, long position, BytesReference content,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,9 @@ public void receiveFileInfo(List<String> phase1FileNames, List<Long> phase1FileS
}

@Override
public void cleanFiles(int totalTranslogOps, Store.MetadataSnapshot sourceMetaData) throws IOException {
public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetaData) throws IOException {
transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.CLEAN_FILES,
new RecoveryCleanFilesRequest(recoveryId, shardId, sourceMetaData, totalTranslogOps),
new RecoveryCleanFilesRequest(recoveryId, shardId, sourceMetaData, totalTranslogOps, globalCheckpoint),
TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(),
EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,9 @@ public void run() {
Future<Void> future = shards.asyncRecoverReplica(replica,
(indexShard, node) -> new RecoveryTarget(indexShard, node, recoveryListener) {
@Override
public void cleanFiles(int totalTranslogOps, Store.MetadataSnapshot sourceMetaData) throws IOException {
super.cleanFiles(totalTranslogOps, sourceMetaData);
public void cleanFiles(int totalTranslogOps, long globalCheckpoint,
Store.MetadataSnapshot sourceMetaData) throws IOException {
super.cleanFiles(totalTranslogOps, globalCheckpoint, sourceMetaData);
latch.countDown();
try {
latch.await();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -853,9 +853,9 @@ public void indexTranslogOperations(
}

@Override
public void cleanFiles(int totalTranslogOps, Store.MetadataSnapshot sourceMetaData) throws IOException {
public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetaData) throws IOException {
blockIfNeeded(RecoveryState.Stage.INDEX);
super.cleanFiles(totalTranslogOps, sourceMetaData);
super.cleanFiles(totalTranslogOps, globalCheckpoint, sourceMetaData);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ public void testWriteFileChunksConcurrently() throws Exception {
for (Thread sender : senders) {
sender.join();
}
recoveryTarget.cleanFiles(0, sourceSnapshot);
recoveryTarget.cleanFiles(0, Long.parseLong(sourceSnapshot.getCommitUserData().get(SequenceNumbers.MAX_SEQ_NO)), sourceSnapshot);
recoveryTarget.decRef();
Store.MetadataSnapshot targetSnapshot = targetShard.snapshotStoreMetadata();
Store.RecoveryDiff diff = sourceSnapshot.recoveryDiff(targetSnapshot);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -473,9 +473,9 @@ public void testThrowExceptionOnPrimaryRelocatedBeforePhase1Started() throws IOE
between(1, 8)) {

@Override
public SendFileResult phase1(final IndexCommit snapshot, final Supplier<Integer> translogOps) {
public SendFileResult phase1(final IndexCommit snapshot, final long globalCheckpoint, final Supplier<Integer> translogOps) {
phase1Called.set(true);
return super.phase1(snapshot, translogOps);
return super.phase1(snapshot, globalCheckpoint, translogOps);
}

@Override
Expand Down Expand Up @@ -715,7 +715,7 @@ public void receiveFileInfo(List<String> phase1FileNames, List<Long> phase1FileS
}

@Override
public void cleanFiles(int totalTranslogOps, Store.MetadataSnapshot sourceMetaData) {
public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetaData) {
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.NoMergePolicy;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.cluster.metadata.IndexMetaData;
Expand All @@ -42,9 +43,11 @@
import org.elasticsearch.index.replication.RecoveryDuringReplicationTests;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.SnapshotMatchers;
import org.elasticsearch.index.translog.Translog;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -323,7 +326,18 @@ public void testPeerRecoverySendSafeCommitInFileBased() throws Exception {
}
IndexShard replicaShard = newShard(primaryShard.shardId(), false);
updateMappings(replicaShard, primaryShard.indexSettings().getIndexMetaData());
recoverReplica(replicaShard, primaryShard, true);
recoverReplica(replicaShard, primaryShard, (r, sourceNode) -> new RecoveryTarget(r, sourceNode, recoveryListener) {
@Override
public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, ActionListener<Void> listener) {
super.prepareForTranslogOperations(fileBasedRecovery, totalTranslogOps, listener);
assertThat(replicaShard.getGlobalCheckpoint(), equalTo(primaryShard.getGlobalCheckpoint()));
}
@Override
public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetaData) throws IOException {
assertThat(globalCheckpoint, equalTo(primaryShard.getGlobalCheckpoint()));
super.cleanFiles(totalTranslogOps, globalCheckpoint, sourceMetaData);
}
}, true, true);
List<IndexCommit> commits = DirectoryReader.listCommits(replicaShard.store().directory());
long maxSeqNo = Long.parseLong(commits.get(0).getUserData().get(SequenceNumbers.MAX_SEQ_NO));
assertThat(maxSeqNo, lessThanOrEqualTo(globalCheckpoint));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ public void receiveFileInfo(List<String> phase1FileNames, List<Long> phase1FileS
}

@Override
public void cleanFiles(int totalTranslogOps, Store.MetadataSnapshot sourceMetaData) throws IOException {
target.cleanFiles(totalTranslogOps, sourceMetaData);
public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetaData) throws IOException {
target.cleanFiles(totalTranslogOps, globalCheckpoint, sourceMetaData);
}

@Override
Expand Down