Skip to content

Commit

Permalink
[improve][client] allow customize subscription name for TableView (ap…
Browse files Browse the repository at this point in the history
  • Loading branch information
freeznet authored and lifepuzzlefun committed Dec 9, 2022
1 parent d022f14 commit 1e1e51a
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,13 @@ public interface TableViewBuilder<T> {
* @return the {@link TableViewBuilder} builder instance
*/
TableViewBuilder<T> autoUpdatePartitionsInterval(int interval, TimeUnit unit);


/**
* Set the subscription name of the {@link TableView}.
*
* @param subscriptionName the name of the subscription to the topic
* @return the {@link TableViewBuilder} builder instance
*/
TableViewBuilder<T> subscriptionName(String subscriptionName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,11 @@ public TableViewBuilder<T> autoUpdatePartitionsInterval(int interval, TimeUnit u
conf.setAutoUpdatePartitionsSeconds(unit.toSeconds(interval));
return this;
}

@Override
public TableViewBuilder<T> subscriptionName(String subscriptionName) {
checkArgument(StringUtils.isNotBlank(subscriptionName), "subscription name cannot be blank");
conf.setSubscriptionName(StringUtils.trim(subscriptionName));
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public class TableViewConfigurationData implements Serializable, Cloneable {
private static final long serialVersionUID = 1L;

private String topicName = null;
private String subscriptionName = null;
private long autoUpdatePartitionsSeconds = 60;

@Override
Expand All @@ -36,6 +37,7 @@ public TableViewConfigurationData clone() {
TableViewConfigurationData clone = (TableViewConfigurationData) super.clone();
clone.setTopicName(topicName);
clone.setAutoUpdatePartitionsSeconds(autoUpdatePartitionsSeconds);
clone.setSubscriptionName(subscriptionName);
return clone;
} catch (CloneNotSupportedException e) {
throw new AssertionError();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ public class TableViewImpl<T> implements TableView<T> {
.startMessageId(MessageId.earliest)
.autoUpdatePartitions(true)
.autoUpdatePartitionsInterval((int) conf.getAutoUpdatePartitionsSeconds(), TimeUnit.SECONDS)
.poolMessages(true);
.poolMessages(true)
.subscriptionName(conf.getSubscriptionName());
if (isPersistentTopic) {
readerBuilder.readCompacted(true);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl;

import static org.testng.Assert.assertEquals;
import java.util.HashMap;
import java.util.Map;
import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils;
import org.testng.annotations.Test;

/**
* Unit test of {@link TableViewConfigurationData}.
*/
public class TableViewConfigurationDataTest {
@Test
public void testLoadConfigFromMap() {
Map<String, Object> configMap = new HashMap<>();
configMap.put("topicName", "persistent://public/default/test");
configMap.put("subscriptionName", "test-sub");
configMap.put("autoUpdatePartitionsSeconds", "60");
TableViewConfigurationData config = new TableViewConfigurationData();
config = ConfigurationDataUtils.loadData(
configMap, config, TableViewConfigurationData.class);

assertEquals(configMap.get("topicName"), config.getTopicName());
assertEquals(configMap.get("subscriptionName"), config.getSubscriptionName());
assertEquals(configMap.get("autoUpdatePartitionsSeconds"), String.valueOf(config.getAutoUpdatePartitionsSeconds()));
}
}
1 change: 1 addition & 0 deletions site2/docs/client-libraries-java.md
Original file line number Diff line number Diff line change
Expand Up @@ -1129,6 +1129,7 @@ You can use the available parameters in the `loadConf` configuration or related
|---|---|---|---|---
| `topic` | string | yes | The topic name of the TableView. | N/A
| `autoUpdatePartitionInterval` | int | no | The interval to check for newly added partitions. | 60 (seconds)
| `subscriptionName` | string | no | The subscription name of the TableView. | null
### Register listeners
Expand Down

0 comments on commit 1e1e51a

Please sign in to comment.