Skip to content

Commit

Permalink
fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
dao-jun committed Dec 25, 2024
1 parent ece3163 commit 850df7b
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import lombok.Cleanup;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.common.events.PulsarEvent;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TopicPolicies;
Expand Down Expand Up @@ -67,7 +68,9 @@ public static Optional<TopicPolicies> getTopicPoliciesBypassCache(TopicPoliciesS
.newReader();
PulsarEvent event = null;
while (reader.hasMoreEvents()) {
event = reader.readNext().getValue();
@Cleanup("release")
Message<PulsarEvent> message = reader.readNext();
event = message.getValue();
}
return Optional.ofNullable(event).map(e -> e.getTopicPoliciesEvent().getPolicies());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ public void testSendAndReceiveNamespaceEvents() throws Exception {
.build();
systemTopicClientForNamespace1.newWriter().write(getEventKey(event), event);
SystemTopicClient.Reader reader = systemTopicClientForNamespace1.newReader();
@Cleanup("release")
Message<PulsarEvent> received = reader.readNext();
log.info("Receive pulsar event from system topic : {}", received.getValue());

Expand All @@ -139,6 +140,7 @@ public void testSendAndReceiveNamespaceEvents() throws Exception {

// test new reader read
SystemTopicClient.Reader reader1 = systemTopicClientForNamespace1.newReader();
@Cleanup("release")
Message<PulsarEvent> received1 = reader1.readNext();
log.info("Receive pulsar event from system topic : {}", received1.getValue());
Assert.assertEquals(received1.getValue(), event);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,9 +338,13 @@ public void testSystemTopicNotCheckExceed() throws Exception {

FutureUtil.waitForAll(List.of(writer1, writer2, f1)).join();
Assert.assertTrue(reader1.hasMoreEvents());
Assert.assertNotNull(reader1.readNext());
Message<?> message = reader1.readNext();
Assert.assertNotNull(message);
message.release();
Assert.assertTrue(reader2.hasMoreEvents());
message = reader2.readNext();
Assert.assertNotNull(reader2.readNext());
message.release();
reader1.close();
reader2.close();
writer1.get().close();
Expand Down

0 comments on commit 850df7b

Please sign in to comment.