Skip to content

Commit

Permalink
[INLONG-8695][Sort] rename topics to topic
Browse files Browse the repository at this point in the history
  • Loading branch information
EMsnap committed Sep 27, 2023
1 parent c79a7c0 commit 01f8878
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@
import static org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_STOP_AT_PUBLISH_TIME;
import static org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_SUBSCRIPTION_NAME;
import static org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_SUBSCRIPTION_TYPE;
import static org.apache.inlong.sort.pulsar.PulsarTableOptions.TOPICS;
import static org.apache.inlong.sort.pulsar.PulsarTableOptions.TOPIC;
import static org.apache.inlong.sort.pulsar.PulsarTableOptions.VALUE_FORMAT;
import static org.apache.inlong.sort.pulsar.PulsarTableValidationUtils.validatePrimaryKeyConstraints;
import static org.apache.inlong.sort.pulsar.PulsarTableValidationUtils.validateTableSourceOptions;
Expand Down Expand Up @@ -169,7 +169,7 @@ public String factoryIdentifier() {

@Override
public Set<ConfigOption<?>> requiredOptions() {
return Stream.of(TOPICS, ADMIN_URL, SERVICE_URL).collect(Collectors.toSet());
return Stream.of(TOPIC, ADMIN_URL, SERVICE_URL).collect(Collectors.toSet());
}

@Override
Expand Down Expand Up @@ -198,7 +198,7 @@ public Set<ConfigOption<?>> optionalOptions() {
@Override
public Set<ConfigOption<?>> forwardOptions() {
return Stream.of(
TOPICS,
TOPIC,
ADMIN_URL,
SERVICE_URL,
SOURCE_SUBSCRIPTION_TYPE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
import static org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_STOP_AT_MESSAGE_ID;
import static org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_STOP_AT_PUBLISH_TIME;
import static org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_SUBSCRIPTION_TYPE;
import static org.apache.inlong.sort.pulsar.PulsarTableOptions.TOPICS;
import static org.apache.inlong.sort.pulsar.PulsarTableOptions.TOPIC;
import static org.apache.inlong.sort.pulsar.PulsarTableOptions.VALUE_FORMAT;

/**
Expand Down Expand Up @@ -151,7 +151,7 @@ public static int[] createValueFormatProjection(
// --------------------------------------------------------------------------------------------

public static List<String> getTopicListFromOptions(ReadableConfig tableOptions) {
return tableOptions.get(TOPICS);
return tableOptions.get(TOPIC);
}

public static Properties getPulsarProperties(ReadableConfig tableOptions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ public final class PulsarTableOptions {
private PulsarTableOptions() {
}

public static final ConfigOption<List<String>> TOPICS =
ConfigOptions.key("topics")
public static final ConfigOption<List<String>> TOPIC =
ConfigOptions.key("topic")
.stringType()
.asList()
.noDefaultValue()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import static org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_STOP_AT_MESSAGE_ID;
import static org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_STOP_AT_PUBLISH_TIME;
import static org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_SUBSCRIPTION_TYPE;
import static org.apache.inlong.sort.pulsar.PulsarTableOptions.TOPICS;
import static org.apache.inlong.sort.pulsar.PulsarTableOptions.TOPIC;
import static org.apache.pulsar.common.naming.TopicName.isValid;

/** Util class for source and sink validation rules.
Expand Down Expand Up @@ -78,11 +78,11 @@ public static void validateTableSourceOptions(ReadableConfig tableOptions) {
}

protected static void validateTopicsConfigs(ReadableConfig tableOptions) {
if (tableOptions.get(TOPICS).isEmpty()) {
if (tableOptions.get(TOPIC).isEmpty()) {
throw new ValidationException("The topics list should not be empty.");
}

for (String topic : tableOptions.get(TOPICS)) {
for (String topic : tableOptions.get(TOPIC)) {
if (!isValid(topic)) {
throw new ValidationException(
String.format("The topics name %s is not a valid topic name.", topic));
Expand Down

0 comments on commit 01f8878

Please sign in to comment.