diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java index 3e3fba07eacc4..29710067a61d4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java @@ -107,8 +107,9 @@ public int filterEntriesForConsumer(Optional optMetadataArray long totalBytes = 0; int totalChunkedMessages = 0; int totalEntries = 0; - List entriesToFiltered = CollectionUtils.isNotEmpty(entryFilters) ? new ArrayList<>() : null; - List entriesToRedeliver = CollectionUtils.isNotEmpty(entryFilters) ? new ArrayList<>() : null; + final boolean hasFilter = CollectionUtils.isNotEmpty(entryFilters); + List entriesToFiltered = hasFilter ? new ArrayList<>() : null; + List entriesToRedeliver = hasFilter ? new ArrayList<>() : null; for (int i = 0, entriesSize = entries.size(); i < entriesSize; i++) { final Entry entry = entries.get(i); if (entry == null) { @@ -123,18 +124,24 @@ public int filterEntriesForConsumer(Optional optMetadataArray ); int entryMsgCnt = msgMetadata == null ? 1 : msgMetadata.getNumMessagesInBatch(); - this.filterProcessedMsgs.add(entryMsgCnt); + if (hasFilter) { + this.filterProcessedMsgs.add(entryMsgCnt); + } EntryFilter.FilterResult filterResult = runFiltersForEntry(entry, msgMetadata, consumer); if (filterResult == EntryFilter.FilterResult.REJECT) { entriesToFiltered.add(entry.getPosition()); entries.set(i, null); + // FilterResult will be always `ACCEPTED` when there is No Filter + // dont need to judge whether `hasFilter` is true or not. this.filterRejectedMsgs.add(entryMsgCnt); entry.release(); continue; } else if (filterResult == EntryFilter.FilterResult.RESCHEDULE) { entriesToRedeliver.add((PositionImpl) entry.getPosition()); entries.set(i, null); + // FilterResult will be always `ACCEPTED` when there is No Filter + // dont need to judge whether `hasFilter` is true or not. this.filterRescheduledMsgs.add(entryMsgCnt); entry.release(); continue; @@ -176,7 +183,9 @@ public int filterEntriesForConsumer(Optional optMetadataArray continue; } - this.filterAcceptedMsgs.add(entryMsgCnt); + if (hasFilter) { + this.filterAcceptedMsgs.add(entryMsgCnt); + } totalEntries++; int batchSize = msgMetadata.getNumMessagesInBatch(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java index f2794f6890888..b1b865727a055 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java @@ -148,16 +148,21 @@ public void testNonContiguousDeletedMessagesRanges() throws Exception { @DataProvider(name = "testSubscriptionMetrics") public Object[][] topicAndSubscription() { return new Object[][]{ - {"persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub1", true}, - {"non-persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub2", true}, - {"persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub3", false}, - {"non-persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub4", false}, + {"persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub1", true, true}, + {"non-persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub2", true, true}, + {"persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub3", false, true}, + {"non-persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub4", false, true}, + + {"persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub1", true, false}, + {"non-persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub2", true, false}, + {"persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub3", false, false}, + {"non-persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub4", false, false}, }; } @Test(dataProvider = "testSubscriptionMetrics") - public void testSubscriptionStats(final String topic, final String subName, boolean enableTopicStats) - throws Exception { + public void testSubscriptionStats(final String topic, final String subName, boolean enableTopicStats, + boolean setFilter) throws Exception { @Cleanup Producer producer = pulsarClient.newProducer(Schema.STRING) .topic(topic) @@ -175,12 +180,15 @@ public void testSubscriptionStats(final String topic, final String subName, bool Dispatcher dispatcher = pulsar.getBrokerService().getTopic(topic, false).get() .get().getSubscription(subName).getDispatcher(); - Field field = EntryFilterSupport.class.getDeclaredField("entryFilters"); - field.setAccessible(true); - NarClassLoader narClassLoader = mock(NarClassLoader.class); - EntryFilter filter1 = new EntryFilterTest(); - EntryFilterWithClassLoader loader1 = spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter1, narClassLoader); - field.set(dispatcher, ImmutableList.of(loader1)); + if (setFilter) { + Field field = EntryFilterSupport.class.getDeclaredField("entryFilters"); + field.setAccessible(true); + NarClassLoader narClassLoader = mock(NarClassLoader.class); + EntryFilter filter1 = new EntryFilterTest(); + EntryFilterWithClassLoader loader1 = + spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter1, narClassLoader); + field.set(dispatcher, ImmutableList.of(loader1)); + } for (int i = 0; i < 100; i++) { producer.newMessage().property("ACCEPT", " ").value(UUID.randomUUID().toString()).send(); @@ -233,10 +241,18 @@ public void testSubscriptionStats(final String topic, final String subName, bool .filter(m -> m.tags.get("subscription").equals(subName) && m.tags.get("topic").equals(topic)) .mapToDouble(m-> m.value).sum(); - Assert.assertEquals(filterAccepted, 100); - if (isPersistent) { - Assert.assertEquals(filterRejected, 100); - Assert.assertEquals(throughFilter, filterAccepted + filterRejected + filterRescheduled, 0.01 * throughFilter); + if (setFilter) { + Assert.assertEquals(filterAccepted, 100); + if (isPersistent) { + Assert.assertEquals(filterRejected, 100); + // Only works on the test, if there are some markers, the filterProcessCount will be not equal with rejectedCount + rescheduledCount + acceptCount + Assert.assertEquals(throughFilter, filterAccepted + filterRejected + filterRescheduled, 0.01 * throughFilter); + } + } else { + Assert.assertEquals(throughFilter, 0D); + Assert.assertEquals(filterAccepted, 0D); + Assert.assertEquals(filterRejected, 0D); + Assert.assertEquals(filterRescheduled, 0D); } } else { Assert.assertEquals(throughFilterMetrics.size(), 0); @@ -245,22 +261,32 @@ public void testSubscriptionStats(final String topic, final String subName, bool Assert.assertEquals(rescheduledMetrics.size(), 0); } - testSubscriptionStatsAdminApi(topic, subName); + testSubscriptionStatsAdminApi(topic, subName, setFilter); } - private void testSubscriptionStatsAdminApi(String topic, String subName) throws Exception { + private void testSubscriptionStatsAdminApi(String topic, String subName, boolean setFilter) throws Exception { boolean persistent = TopicName.get(topic).isPersistent(); TopicStats topicStats = admin.topics().getStats(topic); SubscriptionStats stats = topicStats.getSubscriptions().get(subName); Assert.assertNotNull(stats); - Assert.assertEquals(stats.getFilterAcceptedMsgCount(), 100); - if (persistent) { - Assert.assertEquals(stats.getFilterRejectedMsgCount(), 100); - Assert.assertEquals(stats.getFilterProcessedMsgCount(), - stats.getFilterAcceptedMsgCount() + stats.getFilterRejectedMsgCount() - + stats.getFilterRescheduledMsgCount(), - 0.01 * stats.getFilterProcessedMsgCount()); + if (setFilter) { + Assert.assertEquals(stats.getFilterAcceptedMsgCount(), 100); + if (persistent) { + Assert.assertEquals(stats.getFilterRejectedMsgCount(), 100); + // Only works on the test, if there are some markers, the filterProcessCount will be not equal with rejectedCount + rescheduledCount + acceptCount + Assert.assertEquals(stats.getFilterProcessedMsgCount(), + stats.getFilterAcceptedMsgCount() + stats.getFilterRejectedMsgCount() + + stats.getFilterRescheduledMsgCount(), + 0.01 * stats.getFilterProcessedMsgCount()); + } + } else { + Assert.assertEquals(stats.getFilterAcceptedMsgCount(), 0L); + if (persistent) { + Assert.assertEquals(stats.getFilterRejectedMsgCount(), 0L); + Assert.assertEquals(stats.getFilterAcceptedMsgCount(), 0L); + Assert.assertEquals(stats.getFilterRescheduledMsgCount(), 0L); + } } } }