Skip to content

Commit

Permalink
diagnose cov failure
Browse files Browse the repository at this point in the history
  • Loading branch information
popduke committed Sep 14, 2024
1 parent 35894c7 commit 4fd27ac
Showing 1 changed file with 8 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,15 @@ public CompletableFuture<Void> restoreFrom(String leader, KVRangeSnapshot rangeS
CompletableFuture<Void> onDone = session.doneFuture;
try {
IKVReseter restorer = range.toReseter(rangeSnapshot);
log.info("Restoring from snapshot: session={}, leader={} \n{}", session.id, leader, rangeSnapshot);
log.info("Restoring from snapshot: session={}, leader={}, restorer={} \n{}",
session.id, leader, restorer, rangeSnapshot);
DisposableObserver<KVRangeMessage> observer = messenger.receive()
.filter(m -> m.hasSaveSnapshotDataRequest()
&& m.getSaveSnapshotDataRequest().getSessionId().equals(session.id))
.timeout(idleTimeSec, TimeUnit.SECONDS)
.observeOn(Schedulers.from(executor))
.subscribeWith(new DisposableObserver<KVRangeMessage>() {
private final IKVReseter reseter = restorer;
@Override
public void onNext(@NonNull KVRangeMessage m) {
SaveSnapshotDataRequest request = m.getSaveSnapshotDataRequest();
Expand All @@ -90,19 +92,19 @@ public void onNext(@NonNull KVRangeMessage m) {
for (KVPair kv : request.getKvList()) {
bytes += kv.getKey().size();
bytes += kv.getValue().size();
restorer.put(kv.getKey(), kv.getValue());
reseter.put(kv.getKey(), kv.getValue());
}
metricManager.reportRestore(bytes);
log.debug("Saved {} bytes snapshot data, send reply to {}: session={}",
bytes, m.getHostStoreId(), session.id);
if (request.getFlag() == SaveSnapshotDataRequest.Flag.End) {
if (!onDone.isCancelled()) {
restorer.done();
reseter.done();
dispose();
onDone.complete(null);
log.info("Restored from snapshot: session={}", session.id);
} else {
restorer.abort();
reseter.abort();
dispose();
log.info("Snapshot restore canceled: session={}", session.id);
}
Expand Down Expand Up @@ -136,8 +138,8 @@ public void onNext(@NonNull KVRangeMessage m) {

@Override
public void onError(@NonNull Throwable e) {
if (restorer != null) {
restorer.abort();
if (reseter != null) {
reseter.abort();
} else {
log.error("restorer is null", e);
}
Expand Down

0 comments on commit 4fd27ac

Please sign in to comment.