From 1ac2868f0a9f263905010fb32d0b8e06c08c6869 Mon Sep 17 00:00:00 2001 From: Heesung Sohn Date: Wed, 8 Mar 2023 21:08:27 -0800 Subject: [PATCH] fixed failure count --- .../extensions/manager/SplitManager.java | 9 ++--- .../extensions/manager/SplitManagerTest.java | 37 +++++++++++++++++-- 2 files changed, 37 insertions(+), 9 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManager.java index bdc93af27b109..e2a706a16b45e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManager.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManager.java @@ -53,13 +53,9 @@ private void complete(String serviceUnit, Throwable ex) { var future = inFlightSplitRequest.future; if (!future.isDone()) { if (ex != null) { - counter.update(Failure, Unknown); future.completeExceptionally(ex); - log.error("Failed the bundle split event: {}", serviceUnit, ex); } else { - counter.update(inFlightSplitRequest.splitDecision); future.complete(null); - log.info("Completed the bundle split event: {}", serviceUnit); } } return null; @@ -87,8 +83,11 @@ public CompletableFuture waitAsync(CompletableFuture eventPubFuture, }).future) .whenComplete((__, ex) -> { if (ex != null) { - log.error("Failed to publish the bundle split event for bundle:{}. Skipping wait.", bundle); + log.error("Failed the bundle split event for bundle:{}", bundle, ex); counter.update(Failure, Unknown); + } else { + log.info("Completed the bundle split event for bundle:{}", bundle); + counter.update(decision); } }); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManagerTest.java index 166fb03e968ee..af1a1aadeee32 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManagerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManagerTest.java @@ -20,6 +20,7 @@ import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.VERSION_ID_INIT; import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Sessions; +import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Unknown; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; @@ -47,7 +48,8 @@ public class SplitManagerTest { @Test public void testEventPubFutureHasException() { - SplitManager manager = new SplitManager(new SplitCounter()); + var counter = new SplitCounter(); + SplitManager manager = new SplitManager(counter); var decision = new SplitDecision(); CompletableFuture future = manager.waitAsync(FutureUtil.failedFuture(new Exception("test")), @@ -60,11 +62,16 @@ public void testEventPubFutureHasException() { } catch (Exception ex) { assertEquals(ex.getCause().getMessage(), "test"); } + var counterExpected = new SplitCounter(); + counterExpected.update(SplitDecision.Label.Failure, Unknown); + assertEquals(counter.toMetrics(null).toString(), + counterExpected.toMetrics(null).toString()); } @Test public void testTimeout() throws IllegalAccessException { - SplitManager manager = new SplitManager(new SplitCounter()); + var counter = new SplitCounter(); + SplitManager manager = new SplitManager(counter); var decision = new SplitDecision(); CompletableFuture future = manager.waitAsync(CompletableFuture.completedFuture(null), @@ -81,11 +88,17 @@ public void testTimeout() throws IllegalAccessException { } assertEquals(inFlightUnloadRequests.size(), 0); + var counterExpected = new SplitCounter(); + counterExpected.update(SplitDecision.Label.Failure, Unknown); + assertEquals(counter.toMetrics(null).toString(), + counterExpected.toMetrics(null).toString()); } @Test public void testSuccess() throws IllegalAccessException, ExecutionException, InterruptedException { - SplitManager manager = new SplitManager(new SplitCounter()); + var counter = new SplitCounter(); + SplitManager manager = new SplitManager(counter); + var counterExpected = new SplitCounter(); var decision = new SplitDecision(); decision.succeed(Sessions); CompletableFuture future = @@ -110,10 +123,15 @@ public void testSuccess() throws IllegalAccessException, ExecutionException, Int manager.handleEvent(bundle, new ServiceUnitStateData(ServiceUnitState.Free, dstBroker, VERSION_ID_INIT), null); assertEquals(inFlightUnloadRequests.size(), 1); + assertEquals(counter.toMetrics(null).toString(), + counterExpected.toMetrics(null).toString()); manager.handleEvent(bundle, new ServiceUnitStateData(ServiceUnitState.Deleted, dstBroker, VERSION_ID_INIT), null); + counterExpected.update(SplitDecision.Label.Success, Sessions); assertEquals(inFlightUnloadRequests.size(), 0); + assertEquals(counter.toMetrics(null).toString(), + counterExpected.toMetrics(null).toString()); // Success with Init state. future = manager.waitAsync(CompletableFuture.completedFuture(null), @@ -123,6 +141,9 @@ public void testSuccess() throws IllegalAccessException, ExecutionException, Int manager.handleEvent(bundle, new ServiceUnitStateData(ServiceUnitState.Init, dstBroker, VERSION_ID_INIT), null); assertEquals(inFlightUnloadRequests.size(), 0); + counterExpected.update(SplitDecision.Label.Success, Sessions); + assertEquals(counter.toMetrics(null).toString(), + counterExpected.toMetrics(null).toString()); future.get(); // Success with Owned state. @@ -133,12 +154,16 @@ public void testSuccess() throws IllegalAccessException, ExecutionException, Int manager.handleEvent(bundle, new ServiceUnitStateData(ServiceUnitState.Owned, dstBroker, VERSION_ID_INIT), null); assertEquals(inFlightUnloadRequests.size(), 0); + counterExpected.update(SplitDecision.Label.Success, Sessions); + assertEquals(counter.toMetrics(null).toString(), + counterExpected.toMetrics(null).toString()); future.get(); } @Test public void testFailedStage() throws IllegalAccessException { - SplitManager manager = new SplitManager(new SplitCounter()); + var counter = new SplitCounter(); + SplitManager manager = new SplitManager(counter); var decision = new SplitDecision(); CompletableFuture future = manager.waitAsync(CompletableFuture.completedFuture(null), @@ -160,6 +185,10 @@ public void testFailedStage() throws IllegalAccessException { } assertEquals(inFlightUnloadRequests.size(), 0); + var counterExpected = new SplitCounter(); + counterExpected.update(SplitDecision.Label.Failure, Unknown); + assertEquals(counter.toMetrics(null).toString(), + counterExpected.toMetrics(null).toString()); } @Test