diff --git a/base-kv/base-kv-store-server/src/main/java/com/baidu/bifromq/basekv/store/range/KVRangeFSM.java b/base-kv/base-kv-store-server/src/main/java/com/baidu/bifromq/basekv/store/range/KVRangeFSM.java index 5d614231e..9aa9d2ab5 100644 --- a/base-kv/base-kv-store-server/src/main/java/com/baidu/bifromq/basekv/store/range/KVRangeFSM.java +++ b/base-kv/base-kv-store-server/src/main/java/com/baidu/bifromq/basekv/store/range/KVRangeFSM.java @@ -302,7 +302,7 @@ public void open(IKVRangeMessenger messenger) { if (!kvRange.hasCheckpoint(wal.latestSnapshot())) { log.debug("Latest snapshot not available, do compaction: rangeId={}, storeId={}\n{}", KVRangeIdUtil.toString(id), hostStoreId, wal.latestSnapshot()); - scheduleCompaction(); + scheduleCheckpoint(); } switch (kvRange.state().getType()) { case Purged, Merged, Removed -> quitReason.complete(kvRange.state().getType()); @@ -687,7 +687,6 @@ private CompletableFuture applyConfigChange(long term, long index, clusterConfigSubject.onNext(config); quitReason.complete(Purged); finishCommand(taskId); - scheduleCompaction(); }); } else { rangeWriter.state(State.newBuilder() @@ -697,7 +696,6 @@ private CompletableFuture applyConfigChange(long term, long index, onDone.complete(() -> { clusterConfigSubject.onNext(config); finishCommand(taskId); - scheduleCompaction(); }); } } else { @@ -735,7 +733,6 @@ private CompletableFuture applyConfigChange(long term, long index, if (remove) { quitReason.complete(Removed); } - scheduleCompaction(); }); } default -> @@ -758,8 +755,7 @@ private CompletableFuture applyCommand(long ver, String taskId = command.getTaskId(); log.trace( "Execute KVRange Command[term={}, index={}, taskId={}]: rangeId={}, storeId={}, ver={}, state={}, \n{}", - logTerm, logIndex, taskId, KVRangeIdUtil.toString(id), hostStoreId, ver, - state, command); + logTerm, logIndex, taskId, KVRangeIdUtil.toString(id), hostStoreId, ver, state, command); switch (command.getCommandTypeCase()) { // admin commands case CHANGECONFIG -> { @@ -776,99 +772,110 @@ private CompletableFuture applyCommand(long ver, "Config change abort, range is in state:" + state.getType().name()))); break; } - // notify new voters and learners host store to ensure the range exist - Set newHostingStoreIds = - difference( - difference( - union(newHashSet(request.getVotersList()), - newHashSet(request.getLearnersList())), - union(newHashSet(wal.clusterConfig().getVotersList()), - newHashSet(wal.clusterConfig().getLearnersList())) - ), - singleton(hostStoreId) - ); - newHostingStoreIds.forEach(storeId -> - log.debug( - "Send request to ensure range hosted[taskId={}]: rangeId={}, storeId={}, newHostingStoreId={}", - taskId, KVRangeIdUtil.toString(id), hostStoreId, storeId)); - List> onceFutures = newHostingStoreIds.stream() - .map(storeId -> messenger.once(m -> { - if (m.hasEnsureRangeReply()) { - EnsureRangeReply reply = m.getEnsureRangeReply(); - return reply.getResult() == EnsureRangeReply.Result.OK; - } - return false; - })).collect(Collectors.toList()); - CompletableFuture.allOf(onceFutures.toArray(new CompletableFuture[] {})) - .orTimeout(5, TimeUnit.SECONDS) - .whenCompleteAsync((_v, _e) -> { - if (_e != null) { + log.debug( + "Changing Config[term={}, index={}, taskId={}]: rangeId={}, storeId={}, ver={}, state={}, nextVoters={}, nextLearners={}", + logTerm, logIndex, taskId, KVRangeIdUtil.toString(id), hostStoreId, ver, state, + request.getVotersList(), request.getLearnersList()); + // make a checkpoint if needed + CompletableFuture checkpointFuture = wal.latestSnapshot().getLastAppliedIndex() < logIndex - 1 ? + scheduleCheckpoint() : CompletableFuture.completedFuture(null); + checkpointFuture.whenCompleteAsync((v, e) -> { + if (e != null) { + // checkpoint failed, re-apply the log again + log.warn( + "Checkpoint failed[term={}, index={}, taskId={}]: rangeId={}, storeId={}, ver={}, state={}", + logTerm, logIndex, taskId, KVRangeIdUtil.toString(id), hostStoreId, ver, state); + onDone.completeExceptionally(e); + } else { + // notify new voters and learners host store to ensure the range exist + Set newHostingStoreIds = difference( + difference( + union(newHashSet(request.getVotersList()), + newHashSet(request.getLearnersList())), + union(newHashSet(wal.clusterConfig().getVotersList()), + newHashSet(wal.clusterConfig().getLearnersList())) + ), + singleton(hostStoreId) + ); + newHostingStoreIds.forEach(storeId -> log.debug( - "Ensure range hosted failed[taskId={}]: rangeId={}, storeId={}, newHostingStoreIds={}", - taskId, KVRangeIdUtil.toString(id), hostStoreId, newHostingStoreIds, _e); - onceFutures.forEach(f -> f.cancel(true)); - } - log.debug( - "Changing Config[taskId={}]: rangeId={}, storeId={}, nextVoters={}, nextLearners={}", - taskId, KVRangeIdUtil.toString(id), hostStoreId, request.getVotersList(), - request.getLearnersList()); - wal.changeClusterConfig(taskId, - newHashSet(request.getVotersList()), - newHashSet(request.getLearnersList())) - .whenCompleteAsync((v, e) -> { - if (e != null) { - String errorMessage = - String.format("Config change aborted[taskId=%s] due to %s", taskId, - e.getMessage()); - log.debug(errorMessage); - finishCommandWithError(taskId, new KVRangeException.TryLater(errorMessage)); - wal.stepDown(); + "Send request to ensure range hosted[taskId={}]: rangeId={}, storeId={}, newHostingStoreId={}", + taskId, KVRangeIdUtil.toString(id), hostStoreId, storeId)); + List> onceFutures = newHostingStoreIds.stream() + .map(storeId -> messenger.once(m -> { + if (m.hasEnsureRangeReply()) { + EnsureRangeReply reply = m.getEnsureRangeReply(); + return reply.getResult() == EnsureRangeReply.Result.OK; } - // postpone finishing command when config entry is applied - }, fsmExecutor); + return false; + })).collect(Collectors.toList()); + CompletableFuture.allOf(onceFutures.toArray(new CompletableFuture[] {})) + .orTimeout(5, TimeUnit.SECONDS) + .whenCompleteAsync((v1, e1) -> { + if (e1 != null) { + log.debug( + "Ensure range hosted failed[taskId={}]: rangeId={}, storeId={}, newHostingStoreIds={}", + taskId, KVRangeIdUtil.toString(id), hostStoreId, newHostingStoreIds, e1); + onceFutures.forEach(f -> f.cancel(true)); + } + wal.changeClusterConfig(taskId, + newHashSet(request.getVotersList()), + newHashSet(request.getLearnersList())) + .whenCompleteAsync((v2, e2) -> { + if (e2 != null) { + String errorMessage = + String.format("Config change aborted[taskId=%s] due to %s", taskId, + e2.getMessage()); + log.debug(errorMessage); + finishCommandWithError(taskId, new KVRangeException.TryLater(errorMessage)); + wal.stepDown(); + } + // postpone finishing command when config entry is applied + }, fsmExecutor); - }, fsmExecutor); - newHostingStoreIds.forEach(storeId -> { - log.debug( - "Send range ensure request to store[taskId={}]: rangeId={}, storeId={}, targetStoreId={}", - taskId, hostStoreId, KVRangeIdUtil.toString(id), storeId); - messenger.send(KVRangeMessage.newBuilder() - .setRangeId(id) - .setHostStoreId(storeId) - .setEnsureRange(EnsureRange.newBuilder() - .setVer(ver) // ensure the new kvrange is compatible in target store - .setBoundary(boundary) - .setInitSnapshot(Snapshot.newBuilder() - .setTerm(0) - .setIndex(0) - .setClusterConfig(ClusterConfig.getDefaultInstance()) // empty voter set - .setData(KVRangeSnapshot.newBuilder() - .setVer(ver) - .setId(id) - // no checkpoint specified - .setLastAppliedIndex(0) + }, fsmExecutor); + newHostingStoreIds.forEach(storeId -> { + log.debug( + "Send range ensure request to store[taskId={}]: rangeId={}, storeId={}, targetStoreId={}", + taskId, hostStoreId, KVRangeIdUtil.toString(id), storeId); + messenger.send(KVRangeMessage.newBuilder() + .setRangeId(id) + .setHostStoreId(storeId) + .setEnsureRange(EnsureRange.newBuilder() + .setVer(ver) // ensure the new kvrange is compatible in target store .setBoundary(boundary) - .setState(state) - .build().toByteString()) - .build()) - .build()) - .build()); - }); - - if (state.getType() == Normal) { - // only transit to ConfigChanging from Normal - rangeWriter.state(State.newBuilder() - .setType(ConfigChanging) - .setTaskId(taskId) - .build()); - } else if (state.getType() == Merged) { - rangeWriter.state(State.newBuilder() - .setType(MergedQuiting) - .setTaskId(taskId) - .build()); - } - rangeWriter.bumpVer(false); - onDone.complete(NOOP); + .setInitSnapshot(Snapshot.newBuilder() + .setTerm(0) + .setIndex(0) + .setClusterConfig(ClusterConfig.getDefaultInstance()) // empty voter set + .setData(KVRangeSnapshot.newBuilder() + .setVer(ver) + .setId(id) + // no checkpoint specified + .setLastAppliedIndex(0) + .setBoundary(boundary) + .setState(state) + .build().toByteString()) + .build()) + .build()) + .build()); + }); + if (state.getType() == Normal) { + // only transit to ConfigChanging from Normal + rangeWriter.state(State.newBuilder() + .setType(ConfigChanging) + .setTaskId(taskId) + .build()); + } else if (state.getType() == Merged) { + rangeWriter.state(State.newBuilder() + .setType(MergedQuiting) + .setTaskId(taskId) + .build()); + } + rangeWriter.bumpVer(false); + onDone.complete(NOOP); + } + }, fsmExecutor); } case TRANSFERLEADERSHIP -> { TransferLeadership request = command.getTransferLeadership(); @@ -996,7 +1003,6 @@ private CompletableFuture applyCommand(long ver, .setData(rhsSS.toByteString()) .build()) .build()).build()); - scheduleCompaction(); }); } }, fsmExecutor); @@ -1372,7 +1378,7 @@ private CompletableFuture install(KVRangeSnapshot snapshot, Str if (isNotOpening()) { return CompletableFuture.completedFuture(null); } - log.debug("Restore from snapshot: rangeId={}, storeId={}\n{}", + log.debug("Restoring from snapshot: rangeId={}, storeId={}\n{}", KVRangeIdUtil.toString(id), hostStoreId, snapshot); CompletableFuture onInstalled = new CompletableFuture<>(); // the restore future is cancelable @@ -1454,6 +1460,21 @@ private void scheduleCompaction() { } } + private CompletableFuture scheduleCheckpoint() { + return mgmtTaskRunner.add(() -> metricManager.recordCompact(() -> { + KVRangeSnapshot snapshot = kvRange.checkpoint(); + log.debug("Checkpointing using snapshot: rangeId={}, storeId={}\n{}", + KVRangeIdUtil.toString(id), hostStoreId, snapshot); + return wal.compact(snapshot) + .thenAccept(v -> dumpSessions.forEach((sessionId, session) -> { + if (!session.checkpointId().equals(snapshot.getCheckpointId())) { + session.cancel(); + dumpSessions.remove(sessionId, session); + } + })); + })); + } + private boolean isNotOpening() { Lifecycle state = lifecycle.get(); return state != Open;