Skip to content

Commit

Permalink
[fix][monitor] Fix the partitioned publisher topic stat aggregation b…
Browse files Browse the repository at this point in the history
…ug (#18807)

(cherry picked from commit 8790ed1)
  • Loading branch information
heesung-sn authored and poorbarcode committed May 7, 2023
1 parent f88a49e commit b5b2de6
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

/**
* Statistics for a non-persistent partitioned topic.
* This class is not thread-safe.
*/
@SuppressFBWarnings("EQ_DOESNT_OVERRIDE_EQUALS")
public class NonPersistentPartitionedTopicStatsImpl extends NonPersistentTopicStatsImpl
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@

/**
* Statistics for a non-persistent topic.
* This class is not thread-safe.
*/
@SuppressFBWarnings("EQ_DOESNT_OVERRIDE_EQUALS")
public class NonPersistentTopicStatsImpl extends TopicStatsImpl implements NonPersistentTopicStats {
Expand Down Expand Up @@ -148,14 +149,14 @@ public void reset() {
}

// if the stats are added for the 1st time, we will need to make a copy of these stats and add it to the current
// stats.
// stats. This stat addition is not thread-safe.
public NonPersistentTopicStatsImpl add(NonPersistentTopicStats ts) {
NonPersistentTopicStatsImpl stats = (NonPersistentTopicStatsImpl) ts;
Objects.requireNonNull(stats);
super.add(stats);
this.msgDropRate += stats.msgDropRate;

stats.getNonPersistentPublishers().forEach(s -> {
for (int index = 0; index < stats.getNonPersistentPublishers().size(); index++) {
NonPersistentPublisherStats s = stats.getNonPersistentPublishers().get(index);
if (s.isSupportsPartialProducer() && s.getProducerName() != null) {
((NonPersistentPublisherStatsImpl) this.nonPersistentPublishersMap
.computeIfAbsent(s.getProducerName(), key -> {
Expand All @@ -165,20 +166,20 @@ public NonPersistentTopicStatsImpl add(NonPersistentTopicStats ts) {
return newStats;
})).add((NonPersistentPublisherStatsImpl) s);
} else {
if (this.nonPersistentPublishers.size() != stats.getNonPersistentPublishers().size()) {
for (int i = 0; i < stats.getNonPersistentPublishers().size(); i++) {
NonPersistentPublisherStatsImpl newStats = new NonPersistentPublisherStatsImpl();
newStats.setSupportsPartialProducer(false);
this.nonPersistentPublishers.add(newStats.add((NonPersistentPublisherStatsImpl) s));
}
} else {
for (int i = 0; i < stats.getNonPersistentPublishers().size(); i++) {
((NonPersistentPublisherStatsImpl) this.nonPersistentPublishers.get(i))
.add((NonPersistentPublisherStatsImpl) s);
}
// Add a non-persistent publisher stat entry to this.nonPersistentPublishers
// if this.nonPersistentPublishers.size() is smaller than
// the input stats.nonPersistentPublishers.size().
// Here, index == this.nonPersistentPublishers.size() means
// this.nonPersistentPublishers.size() is smaller than the input stats.nonPersistentPublishers.size()
if (index == this.nonPersistentPublishers.size()) {
NonPersistentPublisherStatsImpl newStats = new NonPersistentPublisherStatsImpl();
newStats.setSupportsPartialProducer(false);
this.nonPersistentPublishers.add(newStats);
}
((NonPersistentPublisherStatsImpl) this.nonPersistentPublishers.get(index))
.add((NonPersistentPublisherStatsImpl) s);
}
});
}

if (this.getNonPersistentSubscriptions().size() != stats.getNonPersistentSubscriptions().size()) {
for (String subscription : stats.getNonPersistentSubscriptions().keySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

/**
* Statistics for a partitioned topic.
* This class is not thread-safe.
*/
@SuppressFBWarnings("EQ_DOESNT_OVERRIDE_EQUALS")
public class PartitionedTopicStatsImpl extends TopicStatsImpl implements PartitionedTopicStats {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@

/**
* Statistics for a Pulsar topic.
* This class is not thread-safe.
*/
@Data
public class TopicStatsImpl implements TopicStats {
Expand Down Expand Up @@ -204,7 +205,7 @@ public void reset() {
}

// if the stats are added for the 1st time, we will need to make a copy of these stats and add it to the current
// stats.
// stats. This stat addition is not thread-safe.
public TopicStatsImpl add(TopicStats ts) {
TopicStatsImpl stats = (TopicStatsImpl) ts;

Expand All @@ -227,7 +228,8 @@ public TopicStatsImpl add(TopicStats ts) {
this.nonContiguousDeletedMessagesRanges += stats.nonContiguousDeletedMessagesRanges;
this.nonContiguousDeletedMessagesRangesSerializedSize += stats.nonContiguousDeletedMessagesRangesSerializedSize;

stats.getPublishers().forEach(s -> {
for (int index = 0; index < stats.getPublishers().size(); index++) {
PublisherStats s = stats.getPublishers().get(index);
if (s.isSupportsPartialProducer() && s.getProducerName() != null) {
this.publishersMap.computeIfAbsent(s.getProducerName(), key -> {
final PublisherStatsImpl newStats = new PublisherStatsImpl();
Expand All @@ -236,19 +238,20 @@ public TopicStatsImpl add(TopicStats ts) {
return newStats;
}).add((PublisherStatsImpl) s);
} else {
if (this.publishers.size() != stats.publishers.size()) {
for (int i = 0; i < stats.publishers.size(); i++) {
PublisherStatsImpl newStats = new PublisherStatsImpl();
newStats.setSupportsPartialProducer(false);
this.publishers.add(newStats.add(stats.publishers.get(i)));
}
} else {
for (int i = 0; i < stats.publishers.size(); i++) {
this.publishers.get(i).add(stats.publishers.get(i));
}
// Add a publisher stat entry to this.publishers
// if this.publishers.size() is smaller than
// the input stats.publishers.size().
// Here, index == this.publishers.size() means
// this.publishers.size() is smaller than the input stats.publishers.size()
if (index == this.publishers.size()) {
PublisherStatsImpl newStats = new PublisherStatsImpl();
newStats.setSupportsPartialProducer(false);
this.publishers.add(newStats);
}
this.publishers.get(index)
.add((PublisherStatsImpl) s);
}
});
}

if (this.subscriptions.size() != stats.subscriptions.size()) {
for (String subscription : stats.subscriptions.keySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,11 @@ public void testPartitionedTopicStats() {
public void testPartitionedTopicStatsByNullProducerName() {
final NonPersistentTopicStatsImpl topicStats1 = new NonPersistentTopicStatsImpl();
final NonPersistentPublisherStatsImpl publisherStats1 = new NonPersistentPublisherStatsImpl();
publisherStats1.setMsgRateIn(1);
publisherStats1.setSupportsPartialProducer(false);
publisherStats1.setProducerName(null);
final NonPersistentPublisherStatsImpl publisherStats2 = new NonPersistentPublisherStatsImpl();
publisherStats2.setMsgRateIn(2);
publisherStats2.setSupportsPartialProducer(false);
publisherStats2.setProducerName(null);
topicStats1.addPublisher(publisherStats1);
Expand All @@ -76,15 +78,22 @@ public void testPartitionedTopicStatsByNullProducerName() {

final NonPersistentTopicStatsImpl topicStats2 = new NonPersistentTopicStatsImpl();
final NonPersistentPublisherStatsImpl publisherStats3 = new NonPersistentPublisherStatsImpl();
publisherStats3.setMsgRateIn(3);
publisherStats3.setSupportsPartialProducer(true);
publisherStats3.setProducerName(null);
final NonPersistentPublisherStatsImpl publisherStats4 = new NonPersistentPublisherStatsImpl();
publisherStats4.setMsgRateIn(4);
publisherStats4.setSupportsPartialProducer(true);
publisherStats4.setProducerName(null);
final NonPersistentPublisherStatsImpl publisherStats5 = new NonPersistentPublisherStatsImpl();
publisherStats5.setMsgRateIn(5);
publisherStats5.setSupportsPartialProducer(true);
publisherStats5.setProducerName(null);
topicStats2.addPublisher(publisherStats3);
topicStats2.addPublisher(publisherStats4);
topicStats2.addPublisher(publisherStats5);

assertEquals(topicStats2.getPublishers().size(), 2);
assertEquals(topicStats2.getPublishers().size(), 3);
// when the producerName is null, fall back to false
assertFalse(topicStats2.getPublishers().get(0).isSupportsPartialProducer());
assertFalse(topicStats2.getPublishers().get(1).isSupportsPartialProducer());
Expand All @@ -93,6 +102,9 @@ public void testPartitionedTopicStatsByNullProducerName() {
target.add(topicStats1);
target.add(topicStats2);

assertEquals(target.getPublishers().size(), 2);
assertEquals(target.getPublishers().size(), 3);
assertEquals(target.getPublishers().get(0).getMsgRateIn(), 4);
assertEquals(target.getPublishers().get(1).getMsgRateIn(), 6);
assertEquals(target.getPublishers().get(2).getMsgRateIn(), 5);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,15 @@ public void testPersistentTopicStatsAggregationPartialProducerIsNotSupported() {
topicStats1.averageMsgSize = 1;
topicStats1.storageSize = 1;
final PublisherStatsImpl publisherStats1 = new PublisherStatsImpl();
publisherStats1.setMsgRateIn(1);
publisherStats1.setSupportsPartialProducer(false);
publisherStats1.setProducerName("name1");
final PublisherStatsImpl publisherStats2 = new PublisherStatsImpl();
publisherStats2.setMsgRateIn(2);
publisherStats2.setSupportsPartialProducer(false);
publisherStats2.setProducerName("name2");
topicStats1.addPublisher(publisherStats1);
topicStats1.addPublisher(publisherStats2);
topicStats1.subscriptions.put("test_ns", new SubscriptionStatsImpl());
topicStats1.replication.put("test_ns", new ReplicatorStatsImpl());

Expand All @@ -100,10 +106,21 @@ public void testPersistentTopicStatsAggregationPartialProducerIsNotSupported() {
topicStats2.msgThroughputOut = 4;
topicStats2.averageMsgSize = 5;
topicStats2.storageSize = 6;
final PublisherStatsImpl publisherStats2 = new PublisherStatsImpl();
publisherStats2.setSupportsPartialProducer(false);
publisherStats2.setProducerName("name1");
topicStats2.addPublisher(publisherStats2);
final PublisherStatsImpl publisherStats3 = new PublisherStatsImpl();
publisherStats3.setMsgRateIn(3);
publisherStats3.setSupportsPartialProducer(false);
publisherStats3.setProducerName("name3");
final PublisherStatsImpl publisherStats4 = new PublisherStatsImpl();
publisherStats4.setMsgRateIn(4);
publisherStats4.setSupportsPartialProducer(false);
publisherStats4.setProducerName("name4");
final PublisherStatsImpl publisherStats5 = new PublisherStatsImpl();
publisherStats5.setMsgRateIn(5);
publisherStats5.setSupportsPartialProducer(false);
publisherStats5.setProducerName("name5");
topicStats2.addPublisher(publisherStats3);
topicStats2.addPublisher(publisherStats4);
topicStats2.addPublisher(publisherStats5);
topicStats2.subscriptions.put("test_ns", new SubscriptionStatsImpl());
topicStats2.replication.put("test_ns", new ReplicatorStatsImpl());

Expand All @@ -117,7 +134,10 @@ public void testPersistentTopicStatsAggregationPartialProducerIsNotSupported() {
assertEquals(target.msgThroughputOut, 5.0);
assertEquals(target.averageMsgSize, 3.0);
assertEquals(target.storageSize, 7);
assertEquals(target.getPublishers().size(), 1);
assertEquals(target.getPublishers().size(), 3);
assertEquals(target.getPublishers().get(0).getMsgRateIn(), 4);
assertEquals(target.getPublishers().get(1).getMsgRateIn(), 6);
assertEquals(target.getPublishers().get(2).getMsgRateIn(), 5);
assertEquals(target.subscriptions.size(), 1);
assertEquals(target.replication.size(), 1);
}
Expand Down

0 comments on commit b5b2de6

Please sign in to comment.