Skip to content

Commit

Permalink
test: add new test testRefreshTaskCanBeCompletedWhenReaderClosed an…
Browse files Browse the repository at this point in the history
…d `testRefreshAPI`.
  • Loading branch information
liangyepianzhou committed Nov 15, 2023
1 parent e83cd8a commit 89e5c76
Showing 1 changed file with 139 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,24 +31,29 @@
import static org.testng.Assert.fail;

import com.google.common.collect.Sets;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.time.Duration;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomUtils;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TableView;
Expand Down Expand Up @@ -101,10 +106,11 @@ public static Object[] topicDomain() {
}

private Set<String> publishMessages(String topic, int count, boolean enableBatch) throws Exception {
return publishMessages(topic, count, enableBatch, false);
return publishMessages(topic, 0, count, enableBatch, false);
}

private Set<String> publishMessages(String topic, int count, boolean enableBatch, boolean enableEncryption) throws Exception {
private Set<String> publishMessages(String topic, int keyStartPosition, int count, boolean enableBatch,
boolean enableEncryption) throws Exception {
Set<String> keys = new HashSet<>();
ProducerBuilder<byte[]> builder = pulsarClient.newProducer();
builder.messageRoutingMode(MessageRoutingMode.SinglePartition);
Expand All @@ -124,7 +130,7 @@ private Set<String> publishMessages(String topic, int count, boolean enableBatch
}
try (Producer<byte[]> producer = builder.create()) {
CompletableFuture<?> lastFuture = null;
for (int i = 0; i < count; i++) {
for (int i = keyStartPosition; i < keyStartPosition + count; i++) {
String key = "key"+ i;
byte[] data = ("my-message-" + i).getBytes();
lastFuture = producer.newMessage().key(key).value(data).sendAsync();
Expand All @@ -136,6 +142,134 @@ private Set<String> publishMessages(String topic, int count, boolean enableBatch
return keys;
}

@DataProvider(name = "partition")
public static Object[][] partition () {
return new Object[][] {
{ 3 }, { 0 }
};
}

/**
* Case1:
* 1. Slow down the rate of reading messages.
* 2. Send some messages
* 3. Call new `refresh` API, it will wait for reading all the messages completed.
* Case2:
* 1. No new messages.
* 2. Call new `refresh` API, it will be completed immediately.
* Case3:
* 1. multi-partition topic, p1, p2 has new message, p3 has no new messages.
* 2. Call new `refresh` API, it will be completed after read new messages.
*/
@Test(dataProvider = "partition")
public void testRefreshAPI(int partition) throws Exception {
// 1. Prepare resource.
String topic = "persistent://public/default/testRefreshAPI" + RandomUtils.nextLong();
if (partition == 0) {
admin.topics().createNonPartitionedTopic(topic);
} else {
admin.topics().createPartitionedTopic(topic, partition);
}
@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName("my-sub")
.subscribe();
@Cleanup
TableView<byte[]> tv = pulsarClient.newTableView(Schema.BYTES)
.topic(topic)
.create();
// 2. Add a listen action to provider the test environment.
// The listen action will be triggered when there are incoming messages every time.
// This is a sync operation, so sleep in the listen action can slow down the reading rate of messages.
tv.listen((k, v) -> {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
// 3. Send 20 messages. After refresh, all the messages should be received.
int count = 20;
Set<String> keys = this.publishMessages(topic, count, false);
// After message sending completely, the table view will take at least 2 seconds to receive all the messages.
// If there is not the refresh operation, all messages will not be received.
tv.refresh();
// The key of each message is different.
assertEquals(tv.size(), count);
assertEquals(tv.keySet(), keys);
// 4. Test refresh operation can be completed when there is a partition with on new messages
// or no new message for no partition topic.
if (partition > 0) {
publishMessages(topic, count, partition - 1, false, false);
tv.refreshAsync().get(5, TimeUnit.SECONDS);
assertEquals(tv.size(), count + partition - 1);
} else {
tv.refreshAsync().get(5, TimeUnit.SECONDS);
}
}

/**
* Case1:
* 1. Slow down the read of reading messages.
* 2. Send some messages.
* 3. Call new `refresh` API.
* 4. Close the reader of the tableview.
* 5. The refresh operation will be failed with a `AlreadyClosedException`.
* Case2:
* 1. Close the reader of the tableview.
* 2. Call new `refresh` API.
* 3. The refresh operation will be fail with a `AlreadyClosedException`.
*/
@Test
public void testRefreshTaskCanBeCompletedWhenReaderClosed() throws Exception {
// 1. Prepare resource.
String topic1 = "persistent://public/default/testRefreshTaskCanBeCompletedWhenReaderClosed-1";
admin.topics().createNonPartitionedTopic(topic1);
String topic2 = "persistent://public/default/testRefreshTaskCanBeCompletedWhenReaderClosed-2";
admin.topics().createNonPartitionedTopic(topic2);
@Cleanup
TableView<byte[]> tv1 = pulsarClient.newTableView(Schema.BYTES)
.topic(topic1)
.create();
@Cleanup
TableView<byte[]> tv2 = pulsarClient.newTableView(Schema.BYTES)
.topic(topic1)
.create();
// 2. Slow down the rate of reading messages.
tv1.listen((k, v) -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
publishMessages(topic1, 20, false);
Field field = TableViewImpl.class.getDeclaredField("reader");
field.setAccessible(true);
CompletableFuture<Reader<byte[]>> readerFuture1 = (CompletableFuture<Reader<byte[]>>) field.get(tv1);
AtomicBoolean completedExceptionally = new AtomicBoolean(false);
// 3. Test failing `refresh` in the reading process.
tv1.refreshAsync().exceptionally(ex -> {
if (ex.getCause() instanceof PulsarClientException.AlreadyClosedException) {
completedExceptionally.set(true);
}
return null;
});
readerFuture1.get().close();

// 4. Test failing `refresh` when get last message IDs. The topic2 has no available messages.
CompletableFuture<Reader<byte[]>> readerFuture2 = (CompletableFuture<Reader<byte[]>>) field.get(tv2);
readerFuture2.get().close();
try {
tv2.refresh();
fail();
} catch (Exception e) {
assertTrue(e.getCause() instanceof PulsarClientException.AlreadyClosedException);
}
Awaitility.await().untilAsserted(() -> assertTrue(completedExceptionally.get()));
}

@Test(timeOut = 30 * 1000)
public void testTableView() throws Exception {
String topic = "persistent://public/default/tableview-test";
Expand Down Expand Up @@ -391,7 +525,7 @@ public void testTableViewWithEncryptedMessages() throws Exception {

// publish encrypted messages
int count = 20;
Set<String> keys = this.publishMessages(topic, count, false, true);
Set<String> keys = this.publishMessages(topic, 0, count, false, true);

// TableView can read them using the private key
@Cleanup
Expand Down Expand Up @@ -437,7 +571,7 @@ public void testTableViewTailMessageReadRetry() throws Exception {
FieldUtils.writeDeclaredField(reader, "consumer", consumer, true);

int msgCnt = 2;
this.publishMessages(topic, msgCnt, false, false);
this.publishMessages(topic, 0, msgCnt, false, false);
Awaitility.await()
.atMost(5, TimeUnit.SECONDS)
.untilAsserted(() -> {
Expand Down

0 comments on commit 89e5c76

Please sign in to comment.