diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java index 523360884c1bf..61ab4de8a3294 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java @@ -38,10 +38,12 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.RandomUtils; import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.client.api.Message; @@ -49,6 +51,7 @@ import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerBuilder; +import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.TableView; @@ -101,10 +104,11 @@ public static Object[] topicDomain() { } private Set publishMessages(String topic, int count, boolean enableBatch) throws Exception { - return publishMessages(topic, count, enableBatch, false); + return publishMessages(topic, 0, count, enableBatch, false); } - private Set publishMessages(String topic, int count, boolean enableBatch, boolean enableEncryption) throws Exception { + private Set publishMessages(String topic, int keyStartPosition, int count, boolean enableBatch, + boolean enableEncryption) throws Exception { Set keys = new HashSet<>(); ProducerBuilder builder = pulsarClient.newProducer(); builder.messageRoutingMode(MessageRoutingMode.SinglePartition); @@ -124,7 +128,7 @@ private Set publishMessages(String topic, int count, boolean enableBatch } try (Producer producer = builder.create()) { CompletableFuture lastFuture = null; - for (int i = 0; i < count; i++) { + for (int i = keyStartPosition; i < keyStartPosition + count; i++) { String key = "key"+ i; byte[] data = ("my-message-" + i).getBytes(); lastFuture = producer.newMessage().key(key).value(data).sendAsync(); @@ -136,6 +140,126 @@ private Set publishMessages(String topic, int count, boolean enableBatch return keys; } + @DataProvider(name = "partition") + public static Object[][] partition () { + return new Object[][] { + { 3 }, { 0 } + }; + } + + /** + * Case1: + * 1. Slow down the rate of reading messages. + * 2. Send some messages + * 3. Call new `refresh` API, it will wait for reading all the messages completed. + * Case2: + * 1. No new messages. + * 2. Call new `refresh` API, it will be completed immediately. + * Case3: + * 1. multi-partition topic, p1, p2 has new message, p3 has no new messages. + * 2. Call new `refresh` API, it will be completed after read new messages. + */ + @Test(dataProvider = "partition") + public void testRefreshAPI(int partition) throws Exception { + // 1. Prepare resource. + String topic = "persistent://public/default/testRefreshAPI" + RandomUtils.nextLong(); + if (partition == 0) { + admin.topics().createNonPartitionedTopic(topic); + } else { + admin.topics().createPartitionedTopic(topic, partition); + } + + @Cleanup + TableView tv = pulsarClient.newTableView(Schema.BYTES) + .topic(topic) + .create(); + // 2. Add a listen action to provide the test environment. + // The listen action will be triggered when there are incoming messages every time. + // This is a sync operation, so sleep in the listen action can slow down the reading rate of messages. + tv.listen((k, v) -> { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + // 3. Send 20 messages. After refresh, all the messages should be received. + int count = 20; + Set keys = this.publishMessages(topic, count, false); + // After message sending completely, the table view will take at least 2 seconds to receive all the messages. + // If there is not the refresh operation, all messages will not be received. + tv.refresh(); + // The key of each message is different. + assertEquals(tv.size(), count); + assertEquals(tv.keySet(), keys); + // 4. Test refresh operation can be completed when there is a partition with on new messages + // or no new message for no partition topic. + if (partition > 0) { + publishMessages(topic, partition - 1, count, false, false); + tv.refreshAsync().get(5, TimeUnit.SECONDS); + assertEquals(tv.size(), count + partition - 1); + } else { + tv.refreshAsync().get(5, TimeUnit.SECONDS); + } + } + + /** + * Case1: + * 1. Slow down the read of reading messages. + * 2. Send some messages. + * 3. Call new `refresh` API. + * 4. Close the reader of the tableview. + * 5. The refresh operation will be failed with a `AlreadyClosedException`. + * Case2: + * 1. Close the reader of the tableview. + * 2. Call new `refresh` API. + * 3. The refresh operation will be fail with a `AlreadyClosedException`. + */ + @Test + public void testRefreshTaskCanBeCompletedWhenReaderClosed() throws Exception { + // 1. Prepare resource. + String topic1 = "persistent://public/default/testRefreshTaskCanBeCompletedWhenReaderClosed-1"; + admin.topics().createNonPartitionedTopic(topic1); + String topic2 = "persistent://public/default/testRefreshTaskCanBeCompletedWhenReaderClosed-2"; + admin.topics().createNonPartitionedTopic(topic2); + @Cleanup + TableView tv1 = pulsarClient.newTableView(Schema.BYTES) + .topic(topic1) + .create(); + @Cleanup + TableView tv2 = pulsarClient.newTableView(Schema.BYTES) + .topic(topic1) + .create(); + // 2. Slow down the rate of reading messages. + tv1.listen((k, v) -> { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + publishMessages(topic1, 20, false); + AtomicBoolean completedExceptionally = new AtomicBoolean(false); + // 3. Test failing `refresh` in the reading process. + tv1.refreshAsync().exceptionally(ex -> { + if (ex.getCause() instanceof PulsarClientException.AlreadyClosedException) { + completedExceptionally.set(true); + } + return null; + }); + tv1.close(); + + // 4. Test failing `refresh` when get last message IDs. The topic2 has no available messages. + tv2.close(); + try { + tv2.refresh(); + fail(); + } catch (Exception e) { + assertTrue(e instanceof PulsarClientException.AlreadyClosedException); + } + Awaitility.await().untilAsserted(() -> assertTrue(completedExceptionally.get())); + } + @Test(timeOut = 30 * 1000) public void testTableView() throws Exception { String topic = "persistent://public/default/tableview-test"; @@ -391,7 +515,7 @@ public void testTableViewWithEncryptedMessages() throws Exception { // publish encrypted messages int count = 20; - Set keys = this.publishMessages(topic, count, false, true); + Set keys = this.publishMessages(topic, 0, count, false, true); // TableView can read them using the private key @Cleanup @@ -437,7 +561,7 @@ public void testTableViewTailMessageReadRetry() throws Exception { FieldUtils.writeDeclaredField(reader, "consumer", consumer, true); int msgCnt = 2; - this.publishMessages(topic, msgCnt, false, false); + this.publishMessages(topic, 0, msgCnt, false, false); Awaitility.await() .atMost(5, TimeUnit.SECONDS) .untilAsserted(() -> { diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TableView.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TableView.java index 9e5008c8bd0c8..767b8e1103fa6 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TableView.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TableView.java @@ -110,4 +110,38 @@ public interface TableView extends Closeable { * @return a future that can used to track when the table view has been closed. */ CompletableFuture closeAsync(); + + /** + * Refresh the table view with the latest data in the topic, ensuring that all subsequent reads are based on + * the refreshed data. + * + * Example usage: + * + * table.refreshAsync().thenApply(__ -> table.get(key)); + * + * This function retrieves the last written message in the topic and refreshes the table view accordingly. + * Once the refresh is complete, all subsequent reads will be performed on the refreshed data or a combination of + * the refreshed data and newly published data. The table view remains synchronized with any newly published data + * after the refresh. + * + * |x:0|->|y:0|->|z:0|->|x:1|->|z:1|->|x:2|->|y:1|->|y:2| + * + * If a read occurs after the refresh (at the last published message |y:2|), it ensures that outdated data like x=1 + * is not obtained. However, it does not guarantee that the values will always be x=2, y=2, z=1, + * as the table view may receive updates with newly published data. + * + * |x:0|->|y:0|->|z:0|->|x:1|->|z:1|->|x:2|->|y:1|->|y:2| -> |y:3| + * + * Both y=2 or y=3 are possible. Therefore, different readers may receive different values, + * but all values will be equal to or newer than the data refreshed from the last call to the refresh method. + */ + CompletableFuture refreshAsync(); + + /** + * Refresh the table view with the latest data in the topic, ensuring that all subsequent reads are based on + * the refreshed data. + * + * @throws PulsarClientException if there is any error refreshing the table view. + */ + void refresh() throws PulsarClientException; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java index 64abd6d811b8e..d5d4174ee10a9 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java @@ -60,6 +60,26 @@ public class TableViewImpl implements TableView { private final boolean isPersistentTopic; private TopicCompactionStrategy compactionStrategy; + /** + * Store the refresh tasks. When read to the position recording in the right map, + * then remove the position in the right map. If the right map is empty, complete the future in the left. + * There should be no timeout exception here, because the caller can only retry for TimeoutException. + * It will only be completed exceptionally when no more messages can be read. + */ + private final ConcurrentHashMap, Map> pendingRefreshRequests; + + /** + * This map stored the read position of each partition. It is used for the following case: + *

+ * 1. Get last message ID. + * 2. Receive message p1-1:1, p2-1:1, p2-1:2, p3-1:1 + * 3. Receive response of step1 {|p1-1:1|p2-2:2|p3-3:6|} + * 4. No more messages are written to this topic. + * As a result, the refresh operation will never be completed. + *

+ */ + private final ConcurrentHashMap lastReadPositions; + TableViewImpl(PulsarClientImpl client, Schema schema, TableViewConfigurationData conf) { this.conf = conf; this.isPersistentTopic = conf.getTopicName().startsWith(TopicDomain.persistent.toString()); @@ -69,6 +89,8 @@ public class TableViewImpl implements TableView { this.listenersMutex = new ReentrantLock(); this.compactionStrategy = TopicCompactionStrategy.load(TABLE_VIEW_TAG, conf.getTopicCompactionStrategyClassName()); + this.pendingRefreshRequests = new ConcurrentHashMap<>(); + this.lastReadPositions = new ConcurrentHashMap<>(); ReaderBuilder readerBuilder = client.newReader(schema) .topic(conf.getTopicName()) .startMessageId(MessageId.earliest) @@ -94,9 +116,10 @@ CompletableFuture> start() { return reader.thenCompose((reader) -> { if (!isPersistentTopic) { readTailMessages(reader); - return CompletableFuture.completedFuture(reader); + return CompletableFuture.completedFuture(null); } - return this.readAllExistingMessages(reader); + return this.readAllExistingMessages(reader) + .thenRun(() -> readTailMessages(reader)); }).thenApply(__ -> this); } @@ -180,6 +203,7 @@ public void close() throws PulsarClientException { } private void handleMessage(Message msg) { + lastReadPositions.put(msg.getTopicName(), msg.getMessageId()); try { if (msg.hasKey()) { String key = msg.getKey(); @@ -226,31 +250,104 @@ private void handleMessage(Message msg) { } } } + checkAllFreshTask(msg); } finally { msg.release(); } } - private CompletableFuture> readAllExistingMessages(Reader reader) { + @Override + public CompletableFuture refreshAsync() { + CompletableFuture completableFuture = new CompletableFuture<>(); + reader.thenCompose(reader -> getLastMessageIds(reader).thenAccept(lastMessageIds -> { + // After get the response of lastMessageIds, put the future and result into `refreshMap` + // and then filter out partitions that has been read to the lastMessageID. + pendingRefreshRequests.put(completableFuture, lastMessageIds); + filterReceivedMessages(lastMessageIds); + // If there is no new messages, the refresh operation could be completed right now. + if (lastMessageIds.isEmpty()) { + pendingRefreshRequests.remove(completableFuture); + completableFuture.complete(null); + } + })).exceptionally(throwable -> { + completableFuture.completeExceptionally(throwable); + pendingRefreshRequests.remove(completableFuture); + return null; + }); + return completableFuture; + } + + @Override + public void refresh() throws PulsarClientException { + try { + refreshAsync().get(); + } catch (Exception e) { + throw PulsarClientException.unwrap(e); + } + } + + private CompletableFuture readAllExistingMessages(Reader reader) { long startTime = System.nanoTime(); AtomicLong messagesRead = new AtomicLong(); - CompletableFuture> future = new CompletableFuture<>(); - reader.getLastMessageIdsAsync().thenAccept(lastMessageIds -> { - Map maxMessageIds = new ConcurrentHashMap<>(); - lastMessageIds.forEach(topicMessageId -> { - maxMessageIds.put(topicMessageId.getOwnerTopic(), topicMessageId); - }); + CompletableFuture future = new CompletableFuture<>(); + getLastMessageIds(reader).thenAccept(maxMessageIds -> { readAllExistingMessages(reader, future, startTime, messagesRead, maxMessageIds); }).exceptionally(ex -> { future.completeExceptionally(ex); return null; }); - future.thenAccept(__ -> readTailMessages(reader)); return future; } - private void readAllExistingMessages(Reader reader, CompletableFuture> future, long startTime, + private CompletableFuture> getLastMessageIds(Reader reader) { + return reader.getLastMessageIdsAsync().thenApply(lastMessageIds -> { + Map maxMessageIds = new ConcurrentHashMap<>(); + lastMessageIds.forEach(topicMessageId -> { + maxMessageIds.put(topicMessageId.getOwnerTopic(), topicMessageId); + }); + return maxMessageIds; + }); + } + + private void filterReceivedMessages(Map lastMessageIds) { + // The `lastMessageIds` and `readPositions` is concurrency-safe data types. + lastMessageIds.forEach((partition, lastMessageId) -> { + MessageId messageId = lastReadPositions.get(partition); + if (messageId != null && lastMessageId.compareTo(messageId) <= 0) { + lastMessageIds.remove(partition); + } + }); + } + + private boolean checkFreshTask(Map maxMessageIds, CompletableFuture future, + MessageId messageId, String topicName) { + // The message received from multi-consumer/multi-reader is processed to TopicMessageImpl. + TopicMessageId maxMessageId = maxMessageIds.get(topicName); + // We need remove the partition from the maxMessageIds map + // once the partition has been read completely. + if (maxMessageId != null && messageId.compareTo(maxMessageId) >= 0) { + maxMessageIds.remove(topicName); + } + if (maxMessageIds.isEmpty()) { + future.complete(null); + return true; + } else { + return false; + } + } + + private void checkAllFreshTask(Message msg) { + pendingRefreshRequests.forEach((future, maxMessageIds) -> { + String topicName = msg.getTopicName(); + MessageId messageId = msg.getMessageId(); + if (checkFreshTask(maxMessageIds, future, messageId, topicName)) { + pendingRefreshRequests.remove(future); + } + }); + } + + private void readAllExistingMessages(Reader reader, CompletableFuture future, long startTime, AtomicLong messagesRead, Map maxMessageIds) { reader.hasMessageAvailableAsync() .thenAccept(hasMessage -> { @@ -258,17 +355,12 @@ private void readAllExistingMessages(Reader reader, CompletableFuture { messagesRead.incrementAndGet(); - // We need remove the partition from the maxMessageIds map - // once the partition has been read completely. - TopicMessageId maxMessageId = maxMessageIds.get(msg.getTopicName()); - if (maxMessageId != null && msg.getMessageId().compareTo(maxMessageId) >= 0) { - maxMessageIds.remove(msg.getTopicName()); - } + String topicName = msg.getTopicName(); + MessageId messageId = msg.getMessageId(); handleMessage(msg); - if (maxMessageIds.isEmpty()) { - future.complete(reader); - } else { - readAllExistingMessages(reader, future, startTime, messagesRead, maxMessageIds); + if (!checkFreshTask(maxMessageIds, future, messageId, topicName)) { + readAllExistingMessages(reader, future, startTime, + messagesRead, maxMessageIds); } }).exceptionally(ex -> { if (ex.getCause() instanceof PulsarClientException.AlreadyClosedException) { @@ -289,7 +381,7 @@ private void readAllExistingMessages(Reader reader, CompletableFuture reader) { if (ex.getCause() instanceof PulsarClientException.AlreadyClosedException) { log.error("Reader {} was closed while reading tail messages.", reader.getTopic(), ex); + // Fail all refresh request when no more messages can be read. + pendingRefreshRequests.keySet().forEach(future -> { + pendingRefreshRequests.remove(future); + future.completeExceptionally(ex); + }); } else { // Retrying on the other exceptions such as NotConnectedException try {