Skip to content

Commit

Permalink
Merge branch 'master' into compile-with-jdk-9
Browse files Browse the repository at this point in the history
* master:
  TEST: init unassigned gcp in testAcquireIndexCommit
  Replica start peer recovery with safe commit (elastic#28181)
  Truncate tlog cli should assign global checkpoint (elastic#28192)
  • Loading branch information
jasontedor committed Jan 13, 2018
2 parents 1269506 + 82722eb commit f3a76a7
Show file tree
Hide file tree
Showing 18 changed files with 228 additions and 138 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1512,6 +1512,11 @@ public interface Warmer {
*/
public abstract Engine recoverFromTranslog() throws IOException;

/**
* Do not replay translog operations, but make the engine be ready.
*/
public abstract void skipTranslogRecovery();

/**
* Returns <code>true</code> iff this engine is currently recovering from translog.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,15 @@ public InternalEngine recoverFromTranslog() throws IOException {
return this;
}

@Override
public void skipTranslogRecovery() {
if (openMode != EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
throw new IllegalStateException("Can't skip translog recovery with open mode: " + openMode);
}
assert pendingTranslogRecovery.get() : "translogRecovery is not pending but should be";
pendingTranslogRecovery.set(false); // we are good - now we can commit
}

private IndexCommit getStartingCommitPoint() throws IOException {
if (openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
final long lastSyncedGlobalCheckpoint = translog.getLastSyncedGlobalCheckpoint();
Expand Down
16 changes: 13 additions & 3 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -1304,9 +1304,20 @@ public void openIndexAndCreateTranslog(boolean forceNewHistoryUUID, long globalC
* opens the engine on top of the existing lucene engine and translog.
* Operations from the translog will be replayed to bring lucene up to date.
**/
public void openIndexAndTranslog() throws IOException {
public void openIndexAndRecoveryFromTranslog() throws IOException {
assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.EXISTING_STORE;
innerOpenEngineAndTranslog(EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, false);
getEngine().recoverFromTranslog();
}

/**
* Opens the engine on top of the existing lucene engine and translog.
* The translog is kept but its operations won't be replayed.
*/
public void openIndexAndSkipTranslogRecovery() throws IOException {
assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.PEER;
innerOpenEngineAndTranslog(EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, false);
getEngine().skipTranslogRecovery();
}

private void innerOpenEngineAndTranslog(final EngineConfig.OpenMode openMode, final boolean forceNewHistoryUUID) throws IOException {
Expand Down Expand Up @@ -1339,13 +1350,12 @@ private void innerOpenEngineAndTranslog(final EngineConfig.OpenMode openMode, fi
globalCheckpointTracker.updateGlobalCheckpointOnReplica(Translog.readGlobalCheckpoint(translogConfig.getTranslogPath()),
"read from translog checkpoint");
}
Engine newEngine = createNewEngine(config);
createNewEngine(config);
verifyNotClosed();
if (openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
// We set active because we are now writing operations to the engine; this way, if we go idle after some time and become inactive,
// we still give sync'd flush a chance to run:
active.set(true);
newEngine.recoverFromTranslog();
}
assertSequenceNumbersInCommit();
assert recoveryState.getStage() == RecoveryState.Stage.TRANSLOG : "TRANSLOG stage expected but was: " + recoveryState.getStage();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRe
logger.debug("failed to list file details", e);
}
if (indexShouldExists) {
indexShard.openIndexAndTranslog();
indexShard.openIndexAndRecoveryFromTranslog();
indexShard.getEngine().fillSeqNoGaps(indexShard.getPrimaryTerm());
} else {
indexShard.createIndexAndTranslog();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,19 @@ protected void execute(Terminal terminal, OptionSet options, Environment env) th
}

// Retrieve the generation and UUID from the existing data
commitData = commits.get(commits.size() - 1).getUserData();
commitData = new HashMap<>(commits.get(commits.size() - 1).getUserData());
String translogGeneration = commitData.get(Translog.TRANSLOG_GENERATION_KEY);
String translogUUID = commitData.get(Translog.TRANSLOG_UUID_KEY);
final long globalCheckpoint;
// In order to have a safe commit invariant, we have to assign the global checkpoint to the max_seqno of the last commit.
// We can only safely do it because we will generate a new history uuid this shard.
if (commitData.containsKey(SequenceNumbers.MAX_SEQ_NO)) {
globalCheckpoint = Long.parseLong(commitData.get(SequenceNumbers.MAX_SEQ_NO));
// Also advances the local checkpoint of the last commit to its max_seqno.
commitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(globalCheckpoint));
} else {
globalCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO;
}
if (translogGeneration == null || translogUUID == null) {
throw new ElasticsearchException("shard must have a valid translog generation and UUID but got: [{}] and: [{}]",
translogGeneration, translogUUID);
Expand All @@ -153,7 +163,7 @@ protected void execute(Terminal terminal, OptionSet options, Environment env) th
// Write empty checkpoint and translog to empty files
long gen = Long.parseLong(translogGeneration);
int translogLen = writeEmptyTranslog(tempEmptyTranslog, translogUUID);
writeEmptyCheckpoint(tempEmptyCheckpoint, translogLen, gen);
writeEmptyCheckpoint(tempEmptyCheckpoint, translogLen, gen, globalCheckpoint);

terminal.println("Removing existing translog files");
IOUtils.rm(translogFiles.toArray(new Path[]{}));
Expand Down Expand Up @@ -190,9 +200,9 @@ protected void execute(Terminal terminal, OptionSet options, Environment env) th
}

/** Write a checkpoint file to the given location with the given generation */
public static void writeEmptyCheckpoint(Path filename, int translogLength, long translogGeneration) throws IOException {
static void writeEmptyCheckpoint(Path filename, int translogLength, long translogGeneration, long globalCheckpoint) throws IOException {
Checkpoint emptyCheckpoint = Checkpoint.emptyTranslogCheckpoint(translogLength, translogGeneration,
SequenceNumbers.UNASSIGNED_SEQ_NO, translogGeneration);
globalCheckpoint, translogGeneration);
Checkpoint.write(FileChannel::open, filename, emptyCheckpoint,
StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE_NEW);
// fsync with metadata here to make sure.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.RateLimiter;
import org.elasticsearch.ElasticsearchException;
Expand All @@ -39,6 +41,7 @@
import org.elasticsearch.common.util.CancellableThreads;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.engine.CombinedDeletionPolicy;
import org.elasticsearch.index.engine.RecoveryEngineException;
import org.elasticsearch.index.mapper.MapperException;
import org.elasticsearch.index.seqno.SequenceNumbers;
Expand All @@ -60,6 +63,7 @@
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

Expand Down Expand Up @@ -108,8 +112,8 @@ public PeerRecoveryTargetService(Settings settings, ThreadPool threadPool, Trans
FileChunkTransportRequestHandler());
transportService.registerRequestHandler(Actions.CLEAN_FILES, RecoveryCleanFilesRequest::new, ThreadPool.Names.GENERIC, new
CleanFilesRequestHandler());
transportService.registerRequestHandler(Actions.PREPARE_TRANSLOG, RecoveryPrepareForTranslogOperationsRequest::new, ThreadPool
.Names.GENERIC, new PrepareForTranslogOperationsRequestHandler());
transportService.registerRequestHandler(Actions.PREPARE_TRANSLOG, ThreadPool.Names.GENERIC,
RecoveryPrepareForTranslogOperationsRequest::new, new PrepareForTranslogOperationsRequestHandler());
transportService.registerRequestHandler(Actions.TRANSLOG_OPS, RecoveryTranslogOperationsRequest::new, ThreadPool.Names.GENERIC,
new TranslogOperationsRequestHandler());
transportService.registerRequestHandler(Actions.FINALIZE, RecoveryFinalizeRecoveryRequest::new, ThreadPool.Names.GENERIC, new
Expand Down Expand Up @@ -353,7 +357,9 @@ private StartRecoveryRequest getStartRecoveryRequest(final RecoveryTarget recove
public static long getStartingSeqNo(final RecoveryTarget recoveryTarget) {
try {
final long globalCheckpoint = Translog.readGlobalCheckpoint(recoveryTarget.translogLocation());
final SequenceNumbers.CommitInfo seqNoStats = recoveryTarget.store().loadSeqNoInfo(null);
final List<IndexCommit> existingCommits = DirectoryReader.listCommits(recoveryTarget.store().directory());
final IndexCommit safeCommit = CombinedDeletionPolicy.findSafeCommitPoint(existingCommits, globalCheckpoint);
final SequenceNumbers.CommitInfo seqNoStats = recoveryTarget.store().loadSeqNoInfo(safeCommit);
if (seqNoStats.maxSeqNo <= globalCheckpoint) {
assert seqNoStats.localCheckpoint <= globalCheckpoint;
/*
Expand Down Expand Up @@ -387,7 +393,7 @@ class PrepareForTranslogOperationsRequestHandler implements TransportRequestHand
public void messageReceived(RecoveryPrepareForTranslogOperationsRequest request, TransportChannel channel) throws Exception {
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId()
)) {
recoveryRef.target().prepareForTranslogOperations(request.totalTranslogOps());
recoveryRef.target().prepareForTranslogOperations(request.deleteLocalTranslog(), request.totalTranslogOps());
}
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,33 @@

import java.io.IOException;

public class RecoveryPrepareForTranslogOperationsRequest extends TransportRequest {
class RecoveryPrepareForTranslogOperationsRequest extends TransportRequest {

private long recoveryId;
private ShardId shardId;
private int totalTranslogOps = RecoveryState.Translog.UNKNOWN;
private final long recoveryId;
private final ShardId shardId;
private final int totalTranslogOps;
private final boolean deleteLocalTranslog;

public RecoveryPrepareForTranslogOperationsRequest() {
}

RecoveryPrepareForTranslogOperationsRequest(long recoveryId, ShardId shardId, int totalTranslogOps) {
RecoveryPrepareForTranslogOperationsRequest(long recoveryId, ShardId shardId, int totalTranslogOps, boolean deleteLocalTranslog) {
this.recoveryId = recoveryId;
this.shardId = shardId;
this.totalTranslogOps = totalTranslogOps;
this.deleteLocalTranslog = deleteLocalTranslog;
}

RecoveryPrepareForTranslogOperationsRequest(StreamInput in) throws IOException {
super.readFrom(in);
recoveryId = in.readLong();
shardId = ShardId.readShardId(in);
totalTranslogOps = in.readVInt();
if (in.getVersion().before(Version.V_6_0_0_alpha1)) {
in.readLong(); // maxUnsafeAutoIdTimestamp
}
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
deleteLocalTranslog = in.readBoolean();
} else {
deleteLocalTranslog = true;
}
}

public long recoveryId() {
Expand All @@ -55,15 +69,11 @@ public int totalTranslogOps() {
return totalTranslogOps;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
recoveryId = in.readLong();
shardId = ShardId.readShardId(in);
totalTranslogOps = in.readVInt();
if (in.getVersion().before(Version.V_6_0_0_alpha1)) {
in.readLong(); // maxUnsafeAutoIdTimestamp
}
/**
* Whether or not the recover target should delete its local translog
*/
boolean deleteLocalTranslog() {
return deleteLocalTranslog;
}

@Override
Expand All @@ -75,5 +85,8 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().before(Version.V_6_0_0_alpha1)) {
out.writeLong(IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP); // maxUnsafeAutoIdTimestamp
}
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
out.writeBoolean(deleteLocalTranslog);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,9 @@ public RecoveryResponse recoverToTarget() throws IOException {

final long startingSeqNo;
final long requiredSeqNoRangeStart;
final boolean isSequenceNumberBasedRecoveryPossible = request.startingSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO &&
final boolean isSequenceNumberBasedRecovery = request.startingSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO &&
isTargetSameHistory() && isTranslogReadyForSequenceNumberBasedRecovery();
if (isSequenceNumberBasedRecoveryPossible) {
if (isSequenceNumberBasedRecovery) {
logger.trace("performing sequence numbers based recovery. starting at [{}]", request.startingSeqNo());
startingSeqNo = request.startingSeqNo();
requiredSeqNoRangeStart = startingSeqNo;
Expand Down Expand Up @@ -188,7 +188,8 @@ public RecoveryResponse recoverToTarget() throws IOException {
runUnderPrimaryPermit(() -> shard.initiateTracking(request.targetAllocationId()));

try {
prepareTargetForTranslog(translog.estimateTotalOperationsFromMinSeq(startingSeqNo));
// For a sequence based recovery, the target can keep its local translog
prepareTargetForTranslog(isSequenceNumberBasedRecovery == false, translog.estimateTotalOperationsFromMinSeq(startingSeqNo));
} catch (final Exception e) {
throw new RecoveryEngineException(shard.shardId(), 1, "prepare target for translog failed", e);
}
Expand Down Expand Up @@ -421,13 +422,13 @@ public void phase1(final IndexCommit snapshot, final Supplier<Integer> translogO
}
}

void prepareTargetForTranslog(final int totalTranslogOps) throws IOException {
void prepareTargetForTranslog(final boolean createNewTranslog, final int totalTranslogOps) throws IOException {
StopWatch stopWatch = new StopWatch().start();
logger.trace("recovery [phase1]: prepare remote engine for translog");
final long startEngineStart = stopWatch.totalTime().millis();
// Send a request preparing the new shard's translog to receive operations. This ensures the shard engine is started and disables
// garbage collection (not the JVM's GC!) of tombstone deletes.
cancellableThreads.executeIO(() -> recoveryTarget.prepareForTranslogOperations(totalTranslogOps));
cancellableThreads.executeIO(() -> recoveryTarget.prepareForTranslogOperations(createNewTranslog, totalTranslogOps));
stopWatch.stop();

response.startTime = stopWatch.totalTime().millis() - startEngineStart;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,10 +362,14 @@ private void ensureRefCount() {
/*** Implementation of {@link RecoveryTargetHandler } */

@Override
public void prepareForTranslogOperations(int totalTranslogOps) throws IOException {
public void prepareForTranslogOperations(boolean createNewTranslog, int totalTranslogOps) throws IOException {
state().getTranslog().totalOperations(totalTranslogOps);
// TODO: take the local checkpoint from store as global checkpoint, once we know it's safe
indexShard().openIndexAndCreateTranslog(false, SequenceNumbers.UNASSIGNED_SEQ_NO);
if (createNewTranslog) {
// TODO: Assigns the global checkpoint to the max_seqno of the safe commit if the index version >= 6.2
indexShard().openIndexAndCreateTranslog(false, SequenceNumbers.UNASSIGNED_SEQ_NO);
} else {
indexShard().openIndexAndSkipTranslogRecovery();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ public interface RecoveryTargetHandler {

/**
* Prepares the target to receive translog operations, after all file have been copied
*
* @param totalTranslogOps total translog operations expected to be sent
* @param createNewTranslog whether or not to delete the local translog on the target
* @param totalTranslogOps total translog operations expected to be sent
*/
void prepareForTranslogOperations(int totalTranslogOps) throws IOException;
void prepareForTranslogOperations(boolean createNewTranslog, int totalTranslogOps) throws IOException;

/**
* The finalize request refreshes the engine now that new segments are available, enables garbage collection of tombstone files, and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ public RemoteRecoveryTargetHandler(long recoveryId, ShardId shardId, TransportSe
}

@Override
public void prepareForTranslogOperations(int totalTranslogOps) throws IOException {
public void prepareForTranslogOperations(boolean createNewTranslog, int totalTranslogOps) throws IOException {
transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.PREPARE_TRANSLOG,
new RecoveryPrepareForTranslogOperationsRequest(recoveryId, shardId, totalTranslogOps),
new RecoveryPrepareForTranslogOperationsRequest(recoveryId, shardId, totalTranslogOps, createNewTranslog),
TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(),
EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4324,7 +4324,7 @@ public void testConcurrentAppendUpdateAndRefresh() throws InterruptedException,
public void testAcquireIndexCommit() throws Exception {
IOUtils.close(engine, store);
store = createStore();
final AtomicLong globalCheckpoint = new AtomicLong();
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO);
try (InternalEngine engine = createEngine(store, createTempDir(), globalCheckpoint::get)) {
int numDocs = between(1, 20);
for (int i = 0; i < numDocs; i++) {
Expand Down
Loading

0 comments on commit f3a76a7

Please sign in to comment.