diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TableViewBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TableViewBuilder.java index 33e70852fe8bc..7387b2ee37b58 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TableViewBuilder.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TableViewBuilder.java @@ -91,4 +91,13 @@ public interface TableViewBuilder { * @return the {@link TableViewBuilder} builder instance */ TableViewBuilder autoUpdatePartitionsInterval(int interval, TimeUnit unit); + + + /** + * Set the subscription name of the {@link TableView}. + * + * @param subscriptionName the name of the subscription to the topic + * @return the {@link TableViewBuilder} builder instance + */ + TableViewBuilder subscriptionName(String subscriptionName); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewBuilderImpl.java index 37d4ba5792000..b417083f0c12e 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewBuilderImpl.java @@ -75,4 +75,11 @@ public TableViewBuilder autoUpdatePartitionsInterval(int interval, TimeUnit u conf.setAutoUpdatePartitionsSeconds(unit.toSeconds(interval)); return this; } + + @Override + public TableViewBuilder subscriptionName(String subscriptionName) { + checkArgument(StringUtils.isNotBlank(subscriptionName), "subscription name cannot be blank"); + conf.setSubscriptionName(StringUtils.trim(subscriptionName)); + return this; + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewConfigurationData.java index ecc8135b2ac95..31df24b60f86a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewConfigurationData.java @@ -28,6 +28,7 @@ public class TableViewConfigurationData implements Serializable, Cloneable { private static final long serialVersionUID = 1L; private String topicName = null; + private String subscriptionName = null; private long autoUpdatePartitionsSeconds = 60; @Override @@ -36,6 +37,7 @@ public TableViewConfigurationData clone() { TableViewConfigurationData clone = (TableViewConfigurationData) super.clone(); clone.setTopicName(topicName); clone.setAutoUpdatePartitionsSeconds(autoUpdatePartitionsSeconds); + clone.setSubscriptionName(subscriptionName); return clone; } catch (CloneNotSupportedException e) { throw new AssertionError(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java index 53f62dc309272..9e7257f23fb04 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java @@ -67,7 +67,8 @@ public class TableViewImpl implements TableView { .startMessageId(MessageId.earliest) .autoUpdatePartitions(true) .autoUpdatePartitionsInterval((int) conf.getAutoUpdatePartitionsSeconds(), TimeUnit.SECONDS) - .poolMessages(true); + .poolMessages(true) + .subscriptionName(conf.getSubscriptionName()); if (isPersistentTopic) { readerBuilder.readCompacted(true); } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TableViewConfigurationDataTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TableViewConfigurationDataTest.java new file mode 100644 index 0000000000000..03d62a83d41bb --- /dev/null +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TableViewConfigurationDataTest.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static org.testng.Assert.assertEquals; +import java.util.HashMap; +import java.util.Map; +import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils; +import org.testng.annotations.Test; + +/** + * Unit test of {@link TableViewConfigurationData}. + */ +public class TableViewConfigurationDataTest { + @Test + public void testLoadConfigFromMap() { + Map configMap = new HashMap<>(); + configMap.put("topicName", "persistent://public/default/test"); + configMap.put("subscriptionName", "test-sub"); + configMap.put("autoUpdatePartitionsSeconds", "60"); + TableViewConfigurationData config = new TableViewConfigurationData(); + config = ConfigurationDataUtils.loadData( + configMap, config, TableViewConfigurationData.class); + + assertEquals(configMap.get("topicName"), config.getTopicName()); + assertEquals(configMap.get("subscriptionName"), config.getSubscriptionName()); + assertEquals(configMap.get("autoUpdatePartitionsSeconds"), String.valueOf(config.getAutoUpdatePartitionsSeconds())); + } +} diff --git a/site2/docs/client-libraries-java.md b/site2/docs/client-libraries-java.md index 7127368cf2422..b35d7c13d910d 100644 --- a/site2/docs/client-libraries-java.md +++ b/site2/docs/client-libraries-java.md @@ -1129,6 +1129,7 @@ You can use the available parameters in the `loadConf` configuration or related |---|---|---|---|--- | `topic` | string | yes | The topic name of the TableView. | N/A | `autoUpdatePartitionInterval` | int | no | The interval to check for newly added partitions. | 60 (seconds) +| `subscriptionName` | string | no | The subscription name of the TableView. | null ### Register listeners