Skip to content

Commit

Permalink
[feat][client] Introduce Refresh API in the TableView (apache#21417)
Browse files Browse the repository at this point in the history
Master apache#21271
### Motivation
The proposal will introduce a new API to refresh the table view with the latest written data on the topic, ensuring that all subsequent reads are based on the refreshed data.
  • Loading branch information
liangyepianzhou authored Mar 15, 2024
1 parent ac263c0 commit 95d24ac
Show file tree
Hide file tree
Showing 3 changed files with 282 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,20 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomUtils;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TableView;
Expand Down Expand Up @@ -101,10 +104,11 @@ public static Object[] topicDomain() {
}

private Set<String> publishMessages(String topic, int count, boolean enableBatch) throws Exception {
return publishMessages(topic, count, enableBatch, false);
return publishMessages(topic, 0, count, enableBatch, false);
}

private Set<String> publishMessages(String topic, int count, boolean enableBatch, boolean enableEncryption) throws Exception {
private Set<String> publishMessages(String topic, int keyStartPosition, int count, boolean enableBatch,
boolean enableEncryption) throws Exception {
Set<String> keys = new HashSet<>();
ProducerBuilder<byte[]> builder = pulsarClient.newProducer();
builder.messageRoutingMode(MessageRoutingMode.SinglePartition);
Expand All @@ -124,7 +128,7 @@ private Set<String> publishMessages(String topic, int count, boolean enableBatch
}
try (Producer<byte[]> producer = builder.create()) {
CompletableFuture<?> lastFuture = null;
for (int i = 0; i < count; i++) {
for (int i = keyStartPosition; i < keyStartPosition + count; i++) {
String key = "key"+ i;
byte[] data = ("my-message-" + i).getBytes();
lastFuture = producer.newMessage().key(key).value(data).sendAsync();
Expand All @@ -136,6 +140,126 @@ private Set<String> publishMessages(String topic, int count, boolean enableBatch
return keys;
}

@DataProvider(name = "partition")
public static Object[][] partition () {
return new Object[][] {
{ 3 }, { 0 }
};
}

/**
* Case1:
* 1. Slow down the rate of reading messages.
* 2. Send some messages
* 3. Call new `refresh` API, it will wait for reading all the messages completed.
* Case2:
* 1. No new messages.
* 2. Call new `refresh` API, it will be completed immediately.
* Case3:
* 1. multi-partition topic, p1, p2 has new message, p3 has no new messages.
* 2. Call new `refresh` API, it will be completed after read new messages.
*/
@Test(dataProvider = "partition")
public void testRefreshAPI(int partition) throws Exception {
// 1. Prepare resource.
String topic = "persistent://public/default/testRefreshAPI" + RandomUtils.nextLong();
if (partition == 0) {
admin.topics().createNonPartitionedTopic(topic);
} else {
admin.topics().createPartitionedTopic(topic, partition);
}

@Cleanup
TableView<byte[]> tv = pulsarClient.newTableView(Schema.BYTES)
.topic(topic)
.create();
// 2. Add a listen action to provide the test environment.
// The listen action will be triggered when there are incoming messages every time.
// This is a sync operation, so sleep in the listen action can slow down the reading rate of messages.
tv.listen((k, v) -> {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
// 3. Send 20 messages. After refresh, all the messages should be received.
int count = 20;
Set<String> keys = this.publishMessages(topic, count, false);
// After message sending completely, the table view will take at least 2 seconds to receive all the messages.
// If there is not the refresh operation, all messages will not be received.
tv.refresh();
// The key of each message is different.
assertEquals(tv.size(), count);
assertEquals(tv.keySet(), keys);
// 4. Test refresh operation can be completed when there is a partition with on new messages
// or no new message for no partition topic.
if (partition > 0) {
publishMessages(topic, partition - 1, count, false, false);
tv.refreshAsync().get(5, TimeUnit.SECONDS);
assertEquals(tv.size(), count + partition - 1);
} else {
tv.refreshAsync().get(5, TimeUnit.SECONDS);
}
}

/**
* Case1:
* 1. Slow down the read of reading messages.
* 2. Send some messages.
* 3. Call new `refresh` API.
* 4. Close the reader of the tableview.
* 5. The refresh operation will be failed with a `AlreadyClosedException`.
* Case2:
* 1. Close the reader of the tableview.
* 2. Call new `refresh` API.
* 3. The refresh operation will be fail with a `AlreadyClosedException`.
*/
@Test
public void testRefreshTaskCanBeCompletedWhenReaderClosed() throws Exception {
// 1. Prepare resource.
String topic1 = "persistent://public/default/testRefreshTaskCanBeCompletedWhenReaderClosed-1";
admin.topics().createNonPartitionedTopic(topic1);
String topic2 = "persistent://public/default/testRefreshTaskCanBeCompletedWhenReaderClosed-2";
admin.topics().createNonPartitionedTopic(topic2);
@Cleanup
TableView<byte[]> tv1 = pulsarClient.newTableView(Schema.BYTES)
.topic(topic1)
.create();
@Cleanup
TableView<byte[]> tv2 = pulsarClient.newTableView(Schema.BYTES)
.topic(topic1)
.create();
// 2. Slow down the rate of reading messages.
tv1.listen((k, v) -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
publishMessages(topic1, 20, false);
AtomicBoolean completedExceptionally = new AtomicBoolean(false);
// 3. Test failing `refresh` in the reading process.
tv1.refreshAsync().exceptionally(ex -> {
if (ex.getCause() instanceof PulsarClientException.AlreadyClosedException) {
completedExceptionally.set(true);
}
return null;
});
tv1.close();

// 4. Test failing `refresh` when get last message IDs. The topic2 has no available messages.
tv2.close();
try {
tv2.refresh();
fail();
} catch (Exception e) {
assertTrue(e instanceof PulsarClientException.AlreadyClosedException);
}
Awaitility.await().untilAsserted(() -> assertTrue(completedExceptionally.get()));
}

@Test(timeOut = 30 * 1000)
public void testTableView() throws Exception {
String topic = "persistent://public/default/tableview-test";
Expand Down Expand Up @@ -391,7 +515,7 @@ public void testTableViewWithEncryptedMessages() throws Exception {

// publish encrypted messages
int count = 20;
Set<String> keys = this.publishMessages(topic, count, false, true);
Set<String> keys = this.publishMessages(topic, 0, count, false, true);

// TableView can read them using the private key
@Cleanup
Expand Down Expand Up @@ -437,7 +561,7 @@ public void testTableViewTailMessageReadRetry() throws Exception {
FieldUtils.writeDeclaredField(reader, "consumer", consumer, true);

int msgCnt = 2;
this.publishMessages(topic, msgCnt, false, false);
this.publishMessages(topic, 0, msgCnt, false, false);
Awaitility.await()
.atMost(5, TimeUnit.SECONDS)
.untilAsserted(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,4 +110,38 @@ public interface TableView<T> extends Closeable {
* @return a future that can used to track when the table view has been closed.
*/
CompletableFuture<Void> closeAsync();

/**
* Refresh the table view with the latest data in the topic, ensuring that all subsequent reads are based on
* the refreshed data.
*
* Example usage:
*
* table.refreshAsync().thenApply(__ -> table.get(key));
*
* This function retrieves the last written message in the topic and refreshes the table view accordingly.
* Once the refresh is complete, all subsequent reads will be performed on the refreshed data or a combination of
* the refreshed data and newly published data. The table view remains synchronized with any newly published data
* after the refresh.
*
* |x:0|->|y:0|->|z:0|->|x:1|->|z:1|->|x:2|->|y:1|->|y:2|
*
* If a read occurs after the refresh (at the last published message |y:2|), it ensures that outdated data like x=1
* is not obtained. However, it does not guarantee that the values will always be x=2, y=2, z=1,
* as the table view may receive updates with newly published data.
*
* |x:0|->|y:0|->|z:0|->|x:1|->|z:1|->|x:2|->|y:1|->|y:2| -> |y:3|
*
* Both y=2 or y=3 are possible. Therefore, different readers may receive different values,
* but all values will be equal to or newer than the data refreshed from the last call to the refresh method.
*/
CompletableFuture<Void> refreshAsync();

/**
* Refresh the table view with the latest data in the topic, ensuring that all subsequent reads are based on
* the refreshed data.
*
* @throws PulsarClientException if there is any error refreshing the table view.
*/
void refresh() throws PulsarClientException;
}
Loading

0 comments on commit 95d24ac

Please sign in to comment.