From 76dee8fb67bebb4750db89cf34c9fa528f109bb7 Mon Sep 17 00:00:00 2001 From: daojun Date: Tue, 13 Sep 2022 12:46:57 +0800 Subject: [PATCH 1/4] fix entryFilter stats --- .../service/AbstractBaseDispatcher.java | 13 ++++- .../broker/stats/SubscriptionStatsTest.java | 47 ++++++++++++------- 2 files changed, 42 insertions(+), 18 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java index 22a5df2931eb2..c20860d5fb863 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java @@ -107,6 +107,7 @@ public int filterEntriesForConsumer(Optional optMetadataArray long totalBytes = 0; int totalChunkedMessages = 0; int totalEntries = 0; + final boolean hasFilter = CollectionUtils.isNotEmpty(entryFilters); List entriesToFiltered = CollectionUtils.isNotEmpty(entryFilters) ? new ArrayList<>() : null; List entriesToRedeliver = CollectionUtils.isNotEmpty(entryFilters) ? new ArrayList<>() : null; for (int i = 0, entriesSize = entries.size(); i < entriesSize; i++) { @@ -123,18 +124,24 @@ public int filterEntriesForConsumer(Optional optMetadataArray ); int entryMsgCnt = msgMetadata == null ? 1 : msgMetadata.getNumMessagesInBatch(); - this.filterProcessedMsgs.add(entryMsgCnt); + if (hasFilter) { + this.filterProcessedMsgs.add(entryMsgCnt); + } EntryFilter.FilterResult filterResult = runFiltersForEntry(entry, msgMetadata, consumer); if (filterResult == EntryFilter.FilterResult.REJECT) { entriesToFiltered.add(entry.getPosition()); entries.set(i, null); + // FilterResult will be always `ACCEPTED` when there is No Filter + // dont need to judge whether `hasFilter` is true or not. this.filterRejectedMsgs.add(entryMsgCnt); entry.release(); continue; } else if (filterResult == EntryFilter.FilterResult.RESCHEDULE) { entriesToRedeliver.add((PositionImpl) entry.getPosition()); entries.set(i, null); + // FilterResult will be always `ACCEPTED` when there is No Filter + // dont need to judge whether `hasFilter` is true or not. this.filterRescheduledMsgs.add(entryMsgCnt); entry.release(); continue; @@ -174,7 +181,9 @@ public int filterEntriesForConsumer(Optional optMetadataArray continue; } - this.filterAcceptedMsgs.add(entryMsgCnt); + if (hasFilter) { + this.filterAcceptedMsgs.add(entryMsgCnt); + } totalEntries++; int batchSize = msgMetadata.getNumMessagesInBatch(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java index f2794f6890888..4b958336d38e8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java @@ -148,16 +148,21 @@ public void testNonContiguousDeletedMessagesRanges() throws Exception { @DataProvider(name = "testSubscriptionMetrics") public Object[][] topicAndSubscription() { return new Object[][]{ - {"persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub1", true}, - {"non-persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub2", true}, - {"persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub3", false}, - {"non-persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub4", false}, + {"persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub1", true, true}, + {"non-persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub2", true, true}, + {"persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub3", false, true}, + {"non-persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub4", false, true}, + + {"persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub1", true, false}, + {"non-persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub2", true, false}, + {"persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub3", false, false}, + {"non-persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub4", false, false}, }; } @Test(dataProvider = "testSubscriptionMetrics") - public void testSubscriptionStats(final String topic, final String subName, boolean enableTopicStats) - throws Exception { + public void testSubscriptionStats(final String topic, final String subName, boolean enableTopicStats, + boolean setFilter) throws Exception { @Cleanup Producer producer = pulsarClient.newProducer(Schema.STRING) .topic(topic) @@ -175,12 +180,15 @@ public void testSubscriptionStats(final String topic, final String subName, bool Dispatcher dispatcher = pulsar.getBrokerService().getTopic(topic, false).get() .get().getSubscription(subName).getDispatcher(); - Field field = EntryFilterSupport.class.getDeclaredField("entryFilters"); - field.setAccessible(true); - NarClassLoader narClassLoader = mock(NarClassLoader.class); - EntryFilter filter1 = new EntryFilterTest(); - EntryFilterWithClassLoader loader1 = spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter1, narClassLoader); - field.set(dispatcher, ImmutableList.of(loader1)); + if (setFilter) { + Field field = EntryFilterSupport.class.getDeclaredField("entryFilters"); + field.setAccessible(true); + NarClassLoader narClassLoader = mock(NarClassLoader.class); + EntryFilter filter1 = new EntryFilterTest(); + EntryFilterWithClassLoader loader1 = + spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter1, narClassLoader); + field.set(dispatcher, ImmutableList.of(loader1)); + } for (int i = 0; i < 100; i++) { producer.newMessage().property("ACCEPT", " ").value(UUID.randomUUID().toString()).send(); @@ -233,10 +241,17 @@ public void testSubscriptionStats(final String topic, final String subName, bool .filter(m -> m.tags.get("subscription").equals(subName) && m.tags.get("topic").equals(topic)) .mapToDouble(m-> m.value).sum(); - Assert.assertEquals(filterAccepted, 100); - if (isPersistent) { - Assert.assertEquals(filterRejected, 100); - Assert.assertEquals(throughFilter, filterAccepted + filterRejected + filterRescheduled, 0.01 * throughFilter); + if (setFilter) { + Assert.assertEquals(filterAccepted, 100); + if (isPersistent) { + Assert.assertEquals(filterRejected, 100); + Assert.assertEquals(throughFilter, filterAccepted + filterRejected + filterRescheduled, 0.01 * throughFilter); + } + } else { + Assert.assertEquals(throughFilter, 0D); + Assert.assertEquals(filterAccepted, 0D); + Assert.assertEquals(filterRejected, 0D); + Assert.assertEquals(filterRescheduled, 0D); } } else { Assert.assertEquals(throughFilterMetrics.size(), 0); From 32c7ff1ea636a3149e044518bba105a8626dacb8 Mon Sep 17 00:00:00 2001 From: daojun Date: Tue, 13 Sep 2022 16:19:21 +0800 Subject: [PATCH 2/4] fix test --- .../broker/stats/SubscriptionStatsTest.java | 27 ++++++++++++------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java index 4b958336d38e8..4e9507f3e479d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java @@ -260,22 +260,31 @@ public void testSubscriptionStats(final String topic, final String subName, bool Assert.assertEquals(rescheduledMetrics.size(), 0); } - testSubscriptionStatsAdminApi(topic, subName); + testSubscriptionStatsAdminApi(topic, subName, setFilter); } - private void testSubscriptionStatsAdminApi(String topic, String subName) throws Exception { + private void testSubscriptionStatsAdminApi(String topic, String subName, boolean setFilter) throws Exception { boolean persistent = TopicName.get(topic).isPersistent(); TopicStats topicStats = admin.topics().getStats(topic); SubscriptionStats stats = topicStats.getSubscriptions().get(subName); Assert.assertNotNull(stats); - Assert.assertEquals(stats.getFilterAcceptedMsgCount(), 100); - if (persistent) { - Assert.assertEquals(stats.getFilterRejectedMsgCount(), 100); - Assert.assertEquals(stats.getFilterProcessedMsgCount(), - stats.getFilterAcceptedMsgCount() + stats.getFilterRejectedMsgCount() - + stats.getFilterRescheduledMsgCount(), - 0.01 * stats.getFilterProcessedMsgCount()); + if (setFilter) { + Assert.assertEquals(stats.getFilterAcceptedMsgCount(), 100); + if (persistent) { + Assert.assertEquals(stats.getFilterRejectedMsgCount(), 100); + Assert.assertEquals(stats.getFilterProcessedMsgCount(), + stats.getFilterAcceptedMsgCount() + stats.getFilterRejectedMsgCount() + + stats.getFilterRescheduledMsgCount(), + 0.01 * stats.getFilterProcessedMsgCount()); + } + } else { + Assert.assertEquals(stats.getFilterAcceptedMsgCount(), 0L); + if (persistent) { + Assert.assertEquals(stats.getFilterRejectedMsgCount(), 0L); + Assert.assertEquals(stats.getFilterAcceptedMsgCount(), 0L); + Assert.assertEquals(stats.getFilterRescheduledMsgCount(), 0L); + } } } } From 8823e6229864c443e33a6936a9d9dd0d83ba8465 Mon Sep 17 00:00:00 2001 From: daojun Date: Tue, 13 Sep 2022 16:35:53 +0800 Subject: [PATCH 3/4] add test comment --- .../org/apache/pulsar/broker/stats/SubscriptionStatsTest.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java index 4e9507f3e479d..b1b865727a055 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java @@ -245,6 +245,7 @@ public void testSubscriptionStats(final String topic, final String subName, bool Assert.assertEquals(filterAccepted, 100); if (isPersistent) { Assert.assertEquals(filterRejected, 100); + // Only works on the test, if there are some markers, the filterProcessCount will be not equal with rejectedCount + rescheduledCount + acceptCount Assert.assertEquals(throughFilter, filterAccepted + filterRejected + filterRescheduled, 0.01 * throughFilter); } } else { @@ -273,6 +274,7 @@ private void testSubscriptionStatsAdminApi(String topic, String subName, boolean Assert.assertEquals(stats.getFilterAcceptedMsgCount(), 100); if (persistent) { Assert.assertEquals(stats.getFilterRejectedMsgCount(), 100); + // Only works on the test, if there are some markers, the filterProcessCount will be not equal with rejectedCount + rescheduledCount + acceptCount Assert.assertEquals(stats.getFilterProcessedMsgCount(), stats.getFilterAcceptedMsgCount() + stats.getFilterRejectedMsgCount() + stats.getFilterRescheduledMsgCount(), From ef121056a44dd9cad2dd4dd45ac1591e334cf3c7 Mon Sep 17 00:00:00 2001 From: daojun Date: Wed, 14 Sep 2022 00:10:05 +0800 Subject: [PATCH 4/4] review fix --- .../apache/pulsar/broker/service/AbstractBaseDispatcher.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java index c20860d5fb863..1649516def19b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java @@ -108,8 +108,8 @@ public int filterEntriesForConsumer(Optional optMetadataArray int totalChunkedMessages = 0; int totalEntries = 0; final boolean hasFilter = CollectionUtils.isNotEmpty(entryFilters); - List entriesToFiltered = CollectionUtils.isNotEmpty(entryFilters) ? new ArrayList<>() : null; - List entriesToRedeliver = CollectionUtils.isNotEmpty(entryFilters) ? new ArrayList<>() : null; + List entriesToFiltered = hasFilter ? new ArrayList<>() : null; + List entriesToRedeliver = hasFilter ? new ArrayList<>() : null; for (int i = 0, entriesSize = entries.size(); i < entriesSize; i++) { final Entry entry = entries.get(i); if (entry == null) {