Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat][client] Introduce Refresh API in the TableView #21417

Merged
merged 11 commits into from
Mar 15, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -31,24 +31,29 @@
import static org.testng.Assert.fail;

import com.google.common.collect.Sets;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.time.Duration;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomUtils;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.api.Consumer;
liangyepianzhou marked this conversation as resolved.
Show resolved Hide resolved
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TableView;
Expand Down Expand Up @@ -101,10 +106,11 @@ public static Object[] topicDomain() {
}

private Set<String> publishMessages(String topic, int count, boolean enableBatch) throws Exception {
return publishMessages(topic, count, enableBatch, false);
return publishMessages(topic, 0, count, enableBatch, false);
}

private Set<String> publishMessages(String topic, int count, boolean enableBatch, boolean enableEncryption) throws Exception {
private Set<String> publishMessages(String topic, int keyStartPosition, int count, boolean enableBatch,
boolean enableEncryption) throws Exception {
Set<String> keys = new HashSet<>();
ProducerBuilder<byte[]> builder = pulsarClient.newProducer();
builder.messageRoutingMode(MessageRoutingMode.SinglePartition);
Expand All @@ -124,7 +130,7 @@ private Set<String> publishMessages(String topic, int count, boolean enableBatch
}
try (Producer<byte[]> producer = builder.create()) {
CompletableFuture<?> lastFuture = null;
for (int i = 0; i < count; i++) {
for (int i = keyStartPosition; i < keyStartPosition + count; i++) {
String key = "key"+ i;
byte[] data = ("my-message-" + i).getBytes();
lastFuture = producer.newMessage().key(key).value(data).sendAsync();
Expand All @@ -136,6 +142,134 @@ private Set<String> publishMessages(String topic, int count, boolean enableBatch
return keys;
}

@DataProvider(name = "partition")
public static Object[][] partition () {
return new Object[][] {
{ 3 }, { 0 }
};
}

/**
* Case1:
* 1. Slow down the rate of reading messages.
* 2. Send some messages
* 3. Call new `refresh` API, it will wait for reading all the messages completed.
* Case2:
* 1. No new messages.
* 2. Call new `refresh` API, it will be completed immediately.
* Case3:
* 1. multi-partition topic, p1, p2 has new message, p3 has no new messages.
* 2. Call new `refresh` API, it will be completed after read new messages.
*/
@Test(dataProvider = "partition")
public void testRefreshAPI(int partition) throws Exception {
// 1. Prepare resource.
String topic = "persistent://public/default/testRefreshAPI" + RandomUtils.nextLong();
if (partition == 0) {
admin.topics().createNonPartitionedTopic(topic);
} else {
admin.topics().createPartitionedTopic(topic, partition);
}
@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName("my-sub")
.subscribe();
liangyepianzhou marked this conversation as resolved.
Show resolved Hide resolved
@Cleanup
TableView<byte[]> tv = pulsarClient.newTableView(Schema.BYTES)
.topic(topic)
.create();
// 2. Add a listen action to provider the test environment.
liangyepianzhou marked this conversation as resolved.
Show resolved Hide resolved
// The listen action will be triggered when there are incoming messages every time.
// This is a sync operation, so sleep in the listen action can slow down the reading rate of messages.
tv.listen((k, v) -> {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
// 3. Send 20 messages. After refresh, all the messages should be received.
int count = 20;
Set<String> keys = this.publishMessages(topic, count, false);
// After message sending completely, the table view will take at least 2 seconds to receive all the messages.
// If there is not the refresh operation, all messages will not be received.
tv.refresh();
// The key of each message is different.
assertEquals(tv.size(), count);
assertEquals(tv.keySet(), keys);
// 4. Test refresh operation can be completed when there is a partition with on new messages
// or no new message for no partition topic.
if (partition > 0) {
publishMessages(topic, count, partition - 1, false, false);
liangyepianzhou marked this conversation as resolved.
Show resolved Hide resolved
tv.refreshAsync().get(5, TimeUnit.SECONDS);
assertEquals(tv.size(), count + partition - 1);
} else {
tv.refreshAsync().get(5, TimeUnit.SECONDS);
liangyepianzhou marked this conversation as resolved.
Show resolved Hide resolved
}
}

/**
* Case1:
* 1. Slow down the read of reading messages.
* 2. Send some messages.
* 3. Call new `refresh` API.
* 4. Close the reader of the tableview.
* 5. The refresh operation will be failed with a `AlreadyClosedException`.
* Case2:
* 1. Close the reader of the tableview.
* 2. Call new `refresh` API.
* 3. The refresh operation will be fail with a `AlreadyClosedException`.
*/
@Test
public void testRefreshTaskCanBeCompletedWhenReaderClosed() throws Exception {
// 1. Prepare resource.
String topic1 = "persistent://public/default/testRefreshTaskCanBeCompletedWhenReaderClosed-1";
admin.topics().createNonPartitionedTopic(topic1);
String topic2 = "persistent://public/default/testRefreshTaskCanBeCompletedWhenReaderClosed-2";
admin.topics().createNonPartitionedTopic(topic2);
@Cleanup
TableView<byte[]> tv1 = pulsarClient.newTableView(Schema.BYTES)
.topic(topic1)
.create();
@Cleanup
TableView<byte[]> tv2 = pulsarClient.newTableView(Schema.BYTES)
.topic(topic1)
.create();
// 2. Slow down the rate of reading messages.
tv1.listen((k, v) -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
publishMessages(topic1, 20, false);
Field field = TableViewImpl.class.getDeclaredField("reader");
field.setAccessible(true);
CompletableFuture<Reader<byte[]>> readerFuture1 = (CompletableFuture<Reader<byte[]>>) field.get(tv1);
liangyepianzhou marked this conversation as resolved.
Show resolved Hide resolved
AtomicBoolean completedExceptionally = new AtomicBoolean(false);
// 3. Test failing `refresh` in the reading process.
tv1.refreshAsync().exceptionally(ex -> {
if (ex.getCause() instanceof PulsarClientException.AlreadyClosedException) {
completedExceptionally.set(true);
}
return null;
});
readerFuture1.get().close();

// 4. Test failing `refresh` when get last message IDs. The topic2 has no available messages.
CompletableFuture<Reader<byte[]>> readerFuture2 = (CompletableFuture<Reader<byte[]>>) field.get(tv2);
readerFuture2.get().close();
try {
tv2.refresh();
fail();
} catch (Exception e) {
assertTrue(e.getCause() instanceof PulsarClientException.AlreadyClosedException);
}
Awaitility.await().untilAsserted(() -> assertTrue(completedExceptionally.get()));
}

@Test(timeOut = 30 * 1000)
public void testTableView() throws Exception {
String topic = "persistent://public/default/tableview-test";
Expand Down Expand Up @@ -391,7 +525,7 @@ public void testTableViewWithEncryptedMessages() throws Exception {

// publish encrypted messages
int count = 20;
Set<String> keys = this.publishMessages(topic, count, false, true);
Set<String> keys = this.publishMessages(topic, 0, count, false, true);

// TableView can read them using the private key
@Cleanup
Expand Down Expand Up @@ -437,7 +571,7 @@ public void testTableViewTailMessageReadRetry() throws Exception {
FieldUtils.writeDeclaredField(reader, "consumer", consumer, true);

int msgCnt = 2;
this.publishMessages(topic, msgCnt, false, false);
this.publishMessages(topic, 0, msgCnt, false, false);
Awaitility.await()
.atMost(5, TimeUnit.SECONDS)
.untilAsserted(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,4 +110,38 @@ public interface TableView<T> extends Closeable {
* @return a future that can used to track when the table view has been closed.
*/
CompletableFuture<Void> closeAsync();

/**
* Refresh the table view with the latest data in the topic, ensuring that all subsequent reads are based on
* the refreshed data.
*
* Example usage:
*
* table.refreshAsync().thenApply(__ -> table.get(key));
*
* This function retrieves the last written message in the topic and refreshes the table view accordingly.
* Once the refresh is complete, all subsequent reads will be performed on the refreshed data or a combination of
* the refreshed data and newly published data. The table view remains synchronized with any newly published data
* after the refresh.
*
* |x:0|->|y:0|->|z:0|->|x:1|->|z:1|->|x:2|->|y:1|->|y:2|
*
* If a read occurs after the refresh (at the last published message |y:2|), it ensures that outdated data like x=1
* is not obtained. However, it does not guarantee that the values will always be x=2, y=2, z=1,
* as the table view may receive updates with newly published data.
*
* |x:0|->|y:0|->|z:0|->|x:1|->|z:1|->|x:2|->|y:1|->|y:2| -> |y:3|
*
* Both y=2 or y=3 are possible. Therefore, different readers may receive different values,
* but all values will be equal to or newer than the data refreshed from the last call to the refresh method.
*/
CompletableFuture<Void> refreshAsync();

/**
* Refresh the table view with the latest data in the topic, ensuring that all subsequent reads are based on
* the refreshed data.
*
* @throws PulsarClientException if there is any error refreshing the table view.
*/
void refresh() throws PulsarClientException;
}
Loading
Loading