Skip to content

Commit

Permalink
fixed failure count
Browse files Browse the repository at this point in the history
  • Loading branch information
heesung-sn committed Mar 9, 2023
1 parent 0d88959 commit 1ac2868
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,9 @@ private void complete(String serviceUnit, Throwable ex) {
var future = inFlightSplitRequest.future;
if (!future.isDone()) {
if (ex != null) {
counter.update(Failure, Unknown);
future.completeExceptionally(ex);
log.error("Failed the bundle split event: {}", serviceUnit, ex);
} else {
counter.update(inFlightSplitRequest.splitDecision);
future.complete(null);
log.info("Completed the bundle split event: {}", serviceUnit);
}
}
return null;
Expand Down Expand Up @@ -87,8 +83,11 @@ public CompletableFuture<Void> waitAsync(CompletableFuture<Void> eventPubFuture,
}).future)
.whenComplete((__, ex) -> {
if (ex != null) {
log.error("Failed to publish the bundle split event for bundle:{}. Skipping wait.", bundle);
log.error("Failed the bundle split event for bundle:{}", bundle, ex);
counter.update(Failure, Unknown);
} else {
log.info("Completed the bundle split event for bundle:{}", bundle);
counter.update(decision);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.VERSION_ID_INIT;
import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Sessions;
import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Unknown;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
Expand Down Expand Up @@ -47,7 +48,8 @@ public class SplitManagerTest {

@Test
public void testEventPubFutureHasException() {
SplitManager manager = new SplitManager(new SplitCounter());
var counter = new SplitCounter();
SplitManager manager = new SplitManager(counter);
var decision = new SplitDecision();
CompletableFuture<Void> future =
manager.waitAsync(FutureUtil.failedFuture(new Exception("test")),
Expand All @@ -60,11 +62,16 @@ public void testEventPubFutureHasException() {
} catch (Exception ex) {
assertEquals(ex.getCause().getMessage(), "test");
}
var counterExpected = new SplitCounter();
counterExpected.update(SplitDecision.Label.Failure, Unknown);
assertEquals(counter.toMetrics(null).toString(),
counterExpected.toMetrics(null).toString());
}

@Test
public void testTimeout() throws IllegalAccessException {
SplitManager manager = new SplitManager(new SplitCounter());
var counter = new SplitCounter();
SplitManager manager = new SplitManager(counter);
var decision = new SplitDecision();
CompletableFuture<Void> future =
manager.waitAsync(CompletableFuture.completedFuture(null),
Expand All @@ -81,11 +88,17 @@ public void testTimeout() throws IllegalAccessException {
}

assertEquals(inFlightUnloadRequests.size(), 0);
var counterExpected = new SplitCounter();
counterExpected.update(SplitDecision.Label.Failure, Unknown);
assertEquals(counter.toMetrics(null).toString(),
counterExpected.toMetrics(null).toString());
}

@Test
public void testSuccess() throws IllegalAccessException, ExecutionException, InterruptedException {
SplitManager manager = new SplitManager(new SplitCounter());
var counter = new SplitCounter();
SplitManager manager = new SplitManager(counter);
var counterExpected = new SplitCounter();
var decision = new SplitDecision();
decision.succeed(Sessions);
CompletableFuture<Void> future =
Expand All @@ -110,10 +123,15 @@ public void testSuccess() throws IllegalAccessException, ExecutionException, Int
manager.handleEvent(bundle,
new ServiceUnitStateData(ServiceUnitState.Free, dstBroker, VERSION_ID_INIT), null);
assertEquals(inFlightUnloadRequests.size(), 1);
assertEquals(counter.toMetrics(null).toString(),
counterExpected.toMetrics(null).toString());

manager.handleEvent(bundle,
new ServiceUnitStateData(ServiceUnitState.Deleted, dstBroker, VERSION_ID_INIT), null);
counterExpected.update(SplitDecision.Label.Success, Sessions);
assertEquals(inFlightUnloadRequests.size(), 0);
assertEquals(counter.toMetrics(null).toString(),
counterExpected.toMetrics(null).toString());

// Success with Init state.
future = manager.waitAsync(CompletableFuture.completedFuture(null),
Expand All @@ -123,6 +141,9 @@ public void testSuccess() throws IllegalAccessException, ExecutionException, Int
manager.handleEvent(bundle,
new ServiceUnitStateData(ServiceUnitState.Init, dstBroker, VERSION_ID_INIT), null);
assertEquals(inFlightUnloadRequests.size(), 0);
counterExpected.update(SplitDecision.Label.Success, Sessions);
assertEquals(counter.toMetrics(null).toString(),
counterExpected.toMetrics(null).toString());
future.get();

// Success with Owned state.
Expand All @@ -133,12 +154,16 @@ public void testSuccess() throws IllegalAccessException, ExecutionException, Int
manager.handleEvent(bundle,
new ServiceUnitStateData(ServiceUnitState.Owned, dstBroker, VERSION_ID_INIT), null);
assertEquals(inFlightUnloadRequests.size(), 0);
counterExpected.update(SplitDecision.Label.Success, Sessions);
assertEquals(counter.toMetrics(null).toString(),
counterExpected.toMetrics(null).toString());
future.get();
}

@Test
public void testFailedStage() throws IllegalAccessException {
SplitManager manager = new SplitManager(new SplitCounter());
var counter = new SplitCounter();
SplitManager manager = new SplitManager(counter);
var decision = new SplitDecision();
CompletableFuture<Void> future =
manager.waitAsync(CompletableFuture.completedFuture(null),
Expand All @@ -160,6 +185,10 @@ public void testFailedStage() throws IllegalAccessException {
}

assertEquals(inFlightUnloadRequests.size(), 0);
var counterExpected = new SplitCounter();
counterExpected.update(SplitDecision.Label.Failure, Unknown);
assertEquals(counter.toMetrics(null).toString(),
counterExpected.toMetrics(null).toString());
}

@Test
Expand Down

0 comments on commit 1ac2868

Please sign in to comment.