Skip to content

Commit

Permalink
diagnose cov failure
Browse files Browse the repository at this point in the history
  • Loading branch information
popduke committed Sep 14, 2024
1 parent 4fd27ac commit 1747747
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ public CompletableFuture<Void> restoreFrom(String leader, KVRangeSnapshot rangeS
.timeout(idleTimeSec, TimeUnit.SECONDS)
.observeOn(Schedulers.from(executor))
.subscribeWith(new DisposableObserver<KVRangeMessage>() {
private final IKVReseter reseter = restorer;
@Override
public void onNext(@NonNull KVRangeMessage m) {
SaveSnapshotDataRequest request = m.getSaveSnapshotDataRequest();
Expand All @@ -92,19 +91,19 @@ public void onNext(@NonNull KVRangeMessage m) {
for (KVPair kv : request.getKvList()) {
bytes += kv.getKey().size();
bytes += kv.getValue().size();
reseter.put(kv.getKey(), kv.getValue());
restorer.put(kv.getKey(), kv.getValue());
}
metricManager.reportRestore(bytes);
log.debug("Saved {} bytes snapshot data, send reply to {}: session={}",
bytes, m.getHostStoreId(), session.id);
if (request.getFlag() == SaveSnapshotDataRequest.Flag.End) {
if (!onDone.isCancelled()) {
reseter.done();
restorer.done();
dispose();
onDone.complete(null);
log.info("Restored from snapshot: session={}", session.id);
} else {
reseter.abort();
restorer.abort();
dispose();
log.info("Snapshot restore canceled: session={}", session.id);
}
Expand Down Expand Up @@ -138,8 +137,8 @@ public void onNext(@NonNull KVRangeMessage m) {

@Override
public void onError(@NonNull Throwable e) {
if (reseter != null) {
reseter.abort();
if (restorer != null) {
restorer.abort();
} else {
log.error("restorer is null", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

package com.baidu.bifromq.basekv.store.range;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -59,7 +61,8 @@ public void setUp() {

@Test
public void awaitDone() {
when(range.toReseter(snapshot)).thenReturn(reseter);
IKVReseter reseter = mock(IKVReseter.class);
when(range.toReseter(eq(snapshot))).thenReturn(reseter);
KVRangeRestorer restorer = new KVRangeRestorer(snapshot, range, messenger, metricManager, executor, 10);

assertTrue(restorer.awaitDone().isDone());
Expand Down Expand Up @@ -128,7 +131,7 @@ public void restoreFromWithError() {
@Test
public void restoreFromTimeout() {
IKVReseter reseter = mock(IKVReseter.class);
when(range.toReseter(snapshot)).thenReturn(reseter);
when(range.toReseter(eq(snapshot))).thenReturn(reseter);

KVRangeRestorer restorer = new KVRangeRestorer(snapshot, range, messenger, metricManager, executor, 1);
CompletableFuture<Void> restoreFuture = restorer.restoreFrom("leader", snapshot);
Expand All @@ -143,7 +146,7 @@ public void restoreFromTimeout() {
@Test
public void cancelPreviousSession() {
IKVReseter reseter = mock(IKVReseter.class);
when(range.toReseter(snapshot)).thenReturn(reseter);
when(range.toReseter(eq(snapshot))).thenReturn(reseter);

KVRangeRestorer restorer = new KVRangeRestorer(snapshot, range, messenger, metricManager, executor, 10);

Expand All @@ -152,6 +155,7 @@ public void cancelPreviousSession() {

// Start the second restore session, which should cancel the first
KVRangeSnapshot newSnapshot = KVRangeSnapshot.newBuilder().setId(snapshot.getId()).setVer(1).build();
when(range.toReseter(eq(newSnapshot))).thenReturn(reseter);
CompletableFuture<Void> secondRestore = restorer.restoreFrom("leader", newSnapshot);

verify(reseter, times(1)).abort();
Expand Down

0 comments on commit 1747747

Please sign in to comment.