Skip to content

Commit

Permalink
checkpoint if needed before doing cluster config change to prevent re…
Browse files Browse the repository at this point in the history
…plica from being purged wrongly
  • Loading branch information
popduke committed Apr 15, 2024
1 parent 5077699 commit b2c296f
Showing 1 changed file with 118 additions and 97 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ public void open(IKVRangeMessenger messenger) {
if (!kvRange.hasCheckpoint(wal.latestSnapshot())) {
log.debug("Latest snapshot not available, do compaction: rangeId={}, storeId={}\n{}",
KVRangeIdUtil.toString(id), hostStoreId, wal.latestSnapshot());
scheduleCompaction();
scheduleCheckpoint();
}
switch (kvRange.state().getType()) {
case Purged, Merged, Removed -> quitReason.complete(kvRange.state().getType());
Expand Down Expand Up @@ -687,7 +687,6 @@ private CompletableFuture<Runnable> applyConfigChange(long term, long index,
clusterConfigSubject.onNext(config);
quitReason.complete(Purged);
finishCommand(taskId);
scheduleCompaction();
});
} else {
rangeWriter.state(State.newBuilder()
Expand All @@ -697,7 +696,6 @@ private CompletableFuture<Runnable> applyConfigChange(long term, long index,
onDone.complete(() -> {
clusterConfigSubject.onNext(config);
finishCommand(taskId);
scheduleCompaction();
});
}
} else {
Expand Down Expand Up @@ -735,7 +733,6 @@ private CompletableFuture<Runnable> applyConfigChange(long term, long index,
if (remove) {
quitReason.complete(Removed);
}
scheduleCompaction();
});
}
default ->
Expand All @@ -758,8 +755,7 @@ private CompletableFuture<Runnable> applyCommand(long ver,
String taskId = command.getTaskId();
log.trace(
"Execute KVRange Command[term={}, index={}, taskId={}]: rangeId={}, storeId={}, ver={}, state={}, \n{}",
logTerm, logIndex, taskId, KVRangeIdUtil.toString(id), hostStoreId, ver,
state, command);
logTerm, logIndex, taskId, KVRangeIdUtil.toString(id), hostStoreId, ver, state, command);
switch (command.getCommandTypeCase()) {
// admin commands
case CHANGECONFIG -> {
Expand All @@ -776,99 +772,110 @@ private CompletableFuture<Runnable> applyCommand(long ver,
"Config change abort, range is in state:" + state.getType().name())));
break;
}
// notify new voters and learners host store to ensure the range exist
Set<String> newHostingStoreIds =
difference(
difference(
union(newHashSet(request.getVotersList()),
newHashSet(request.getLearnersList())),
union(newHashSet(wal.clusterConfig().getVotersList()),
newHashSet(wal.clusterConfig().getLearnersList()))
),
singleton(hostStoreId)
);
newHostingStoreIds.forEach(storeId ->
log.debug(
"Send request to ensure range hosted[taskId={}]: rangeId={}, storeId={}, newHostingStoreId={}",
taskId, KVRangeIdUtil.toString(id), hostStoreId, storeId));
List<CompletableFuture<?>> onceFutures = newHostingStoreIds.stream()
.map(storeId -> messenger.once(m -> {
if (m.hasEnsureRangeReply()) {
EnsureRangeReply reply = m.getEnsureRangeReply();
return reply.getResult() == EnsureRangeReply.Result.OK;
}
return false;
})).collect(Collectors.toList());
CompletableFuture.allOf(onceFutures.toArray(new CompletableFuture[] {}))
.orTimeout(5, TimeUnit.SECONDS)
.whenCompleteAsync((_v, _e) -> {
if (_e != null) {
log.debug(
"Changing Config[term={}, index={}, taskId={}]: rangeId={}, storeId={}, ver={}, state={}, nextVoters={}, nextLearners={}",
logTerm, logIndex, taskId, KVRangeIdUtil.toString(id), hostStoreId, ver, state,
request.getVotersList(), request.getLearnersList());
// make a checkpoint if needed
CompletableFuture<Void> checkpointFuture = wal.latestSnapshot().getLastAppliedIndex() < logIndex - 1 ?
scheduleCheckpoint() : CompletableFuture.completedFuture(null);
checkpointFuture.whenCompleteAsync((v, e) -> {
if (e != null) {
// checkpoint failed, re-apply the log again
log.warn(
"Checkpoint failed[term={}, index={}, taskId={}]: rangeId={}, storeId={}, ver={}, state={}",
logTerm, logIndex, taskId, KVRangeIdUtil.toString(id), hostStoreId, ver, state);
onDone.completeExceptionally(e);
} else {
// notify new voters and learners host store to ensure the range exist
Set<String> newHostingStoreIds = difference(
difference(
union(newHashSet(request.getVotersList()),
newHashSet(request.getLearnersList())),
union(newHashSet(wal.clusterConfig().getVotersList()),
newHashSet(wal.clusterConfig().getLearnersList()))
),
singleton(hostStoreId)
);
newHostingStoreIds.forEach(storeId ->
log.debug(
"Ensure range hosted failed[taskId={}]: rangeId={}, storeId={}, newHostingStoreIds={}",
taskId, KVRangeIdUtil.toString(id), hostStoreId, newHostingStoreIds, _e);
onceFutures.forEach(f -> f.cancel(true));
}
log.debug(
"Changing Config[taskId={}]: rangeId={}, storeId={}, nextVoters={}, nextLearners={}",
taskId, KVRangeIdUtil.toString(id), hostStoreId, request.getVotersList(),
request.getLearnersList());
wal.changeClusterConfig(taskId,
newHashSet(request.getVotersList()),
newHashSet(request.getLearnersList()))
.whenCompleteAsync((v, e) -> {
if (e != null) {
String errorMessage =
String.format("Config change aborted[taskId=%s] due to %s", taskId,
e.getMessage());
log.debug(errorMessage);
finishCommandWithError(taskId, new KVRangeException.TryLater(errorMessage));
wal.stepDown();
"Send request to ensure range hosted[taskId={}]: rangeId={}, storeId={}, newHostingStoreId={}",
taskId, KVRangeIdUtil.toString(id), hostStoreId, storeId));
List<CompletableFuture<?>> onceFutures = newHostingStoreIds.stream()
.map(storeId -> messenger.once(m -> {
if (m.hasEnsureRangeReply()) {
EnsureRangeReply reply = m.getEnsureRangeReply();
return reply.getResult() == EnsureRangeReply.Result.OK;
}
// postpone finishing command when config entry is applied
}, fsmExecutor);
return false;
})).collect(Collectors.toList());
CompletableFuture.allOf(onceFutures.toArray(new CompletableFuture[] {}))
.orTimeout(5, TimeUnit.SECONDS)
.whenCompleteAsync((v1, e1) -> {
if (e1 != null) {
log.debug(
"Ensure range hosted failed[taskId={}]: rangeId={}, storeId={}, newHostingStoreIds={}",
taskId, KVRangeIdUtil.toString(id), hostStoreId, newHostingStoreIds, e1);
onceFutures.forEach(f -> f.cancel(true));
}
wal.changeClusterConfig(taskId,
newHashSet(request.getVotersList()),
newHashSet(request.getLearnersList()))
.whenCompleteAsync((v2, e2) -> {
if (e2 != null) {
String errorMessage =
String.format("Config change aborted[taskId=%s] due to %s", taskId,
e2.getMessage());
log.debug(errorMessage);
finishCommandWithError(taskId, new KVRangeException.TryLater(errorMessage));
wal.stepDown();
}
// postpone finishing command when config entry is applied
}, fsmExecutor);

}, fsmExecutor);
newHostingStoreIds.forEach(storeId -> {
log.debug(
"Send range ensure request to store[taskId={}]: rangeId={}, storeId={}, targetStoreId={}",
taskId, hostStoreId, KVRangeIdUtil.toString(id), storeId);
messenger.send(KVRangeMessage.newBuilder()
.setRangeId(id)
.setHostStoreId(storeId)
.setEnsureRange(EnsureRange.newBuilder()
.setVer(ver) // ensure the new kvrange is compatible in target store
.setBoundary(boundary)
.setInitSnapshot(Snapshot.newBuilder()
.setTerm(0)
.setIndex(0)
.setClusterConfig(ClusterConfig.getDefaultInstance()) // empty voter set
.setData(KVRangeSnapshot.newBuilder()
.setVer(ver)
.setId(id)
// no checkpoint specified
.setLastAppliedIndex(0)
}, fsmExecutor);
newHostingStoreIds.forEach(storeId -> {
log.debug(
"Send range ensure request to store[taskId={}]: rangeId={}, storeId={}, targetStoreId={}",
taskId, hostStoreId, KVRangeIdUtil.toString(id), storeId);
messenger.send(KVRangeMessage.newBuilder()
.setRangeId(id)
.setHostStoreId(storeId)
.setEnsureRange(EnsureRange.newBuilder()
.setVer(ver) // ensure the new kvrange is compatible in target store
.setBoundary(boundary)
.setState(state)
.build().toByteString())
.build())
.build())
.build());
});

if (state.getType() == Normal) {
// only transit to ConfigChanging from Normal
rangeWriter.state(State.newBuilder()
.setType(ConfigChanging)
.setTaskId(taskId)
.build());
} else if (state.getType() == Merged) {
rangeWriter.state(State.newBuilder()
.setType(MergedQuiting)
.setTaskId(taskId)
.build());
}
rangeWriter.bumpVer(false);
onDone.complete(NOOP);
.setInitSnapshot(Snapshot.newBuilder()
.setTerm(0)
.setIndex(0)
.setClusterConfig(ClusterConfig.getDefaultInstance()) // empty voter set
.setData(KVRangeSnapshot.newBuilder()
.setVer(ver)
.setId(id)
// no checkpoint specified
.setLastAppliedIndex(0)
.setBoundary(boundary)
.setState(state)
.build().toByteString())
.build())
.build())
.build());
});
if (state.getType() == Normal) {
// only transit to ConfigChanging from Normal
rangeWriter.state(State.newBuilder()
.setType(ConfigChanging)
.setTaskId(taskId)
.build());
} else if (state.getType() == Merged) {
rangeWriter.state(State.newBuilder()
.setType(MergedQuiting)
.setTaskId(taskId)
.build());
}
rangeWriter.bumpVer(false);
onDone.complete(NOOP);
}
}, fsmExecutor);
}
case TRANSFERLEADERSHIP -> {
TransferLeadership request = command.getTransferLeadership();
Expand Down Expand Up @@ -996,7 +1003,6 @@ private CompletableFuture<Runnable> applyCommand(long ver,
.setData(rhsSS.toByteString())
.build())
.build()).build());
scheduleCompaction();
});
}
}, fsmExecutor);
Expand Down Expand Up @@ -1372,7 +1378,7 @@ private CompletableFuture<KVRangeSnapshot> install(KVRangeSnapshot snapshot, Str
if (isNotOpening()) {
return CompletableFuture.completedFuture(null);
}
log.debug("Restore from snapshot: rangeId={}, storeId={}\n{}",
log.debug("Restoring from snapshot: rangeId={}, storeId={}\n{}",
KVRangeIdUtil.toString(id), hostStoreId, snapshot);
CompletableFuture<KVRangeSnapshot> onInstalled = new CompletableFuture<>();
// the restore future is cancelable
Expand Down Expand Up @@ -1454,6 +1460,21 @@ private void scheduleCompaction() {
}
}

private CompletableFuture<Void> scheduleCheckpoint() {
return mgmtTaskRunner.add(() -> metricManager.recordCompact(() -> {
KVRangeSnapshot snapshot = kvRange.checkpoint();
log.debug("Checkpointing using snapshot: rangeId={}, storeId={}\n{}",
KVRangeIdUtil.toString(id), hostStoreId, snapshot);
return wal.compact(snapshot)
.thenAccept(v -> dumpSessions.forEach((sessionId, session) -> {
if (!session.checkpointId().equals(snapshot.getCheckpointId())) {
session.cancel();
dumpSessions.remove(sessionId, session);
}
}));
}));
}

private boolean isNotOpening() {
Lifecycle state = lifecycle.get();
return state != Open;
Expand Down

0 comments on commit b2c296f

Please sign in to comment.