Skip to content

Commit

Permalink
[INLONG-10560][Sort] Support bounded pulsar source (#10569)
Browse files Browse the repository at this point in the history
Co-authored-by: Aloys Zhang <[email protected]>
  • Loading branch information
aloyszhang and Aloys Zhang committed Jul 9, 2024
1 parent b9f1bcf commit 1a69810
Show file tree
Hide file tree
Showing 9 changed files with 225 additions and 9 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.common.bounded;

import lombok.AllArgsConstructor;
import lombok.Data;

@Data
@AllArgsConstructor
public class Boundaries {

public String lowerBound;
public String upperBound;
public BoundaryType boundaryType;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.common.bounded;

import lombok.Getter;

/**
* Source boundary types.
* TIME is the common boundary type should be supported in every bounded source.
* OFFSET is the boundary for MQ type bounded source, like offset in kafka or messageId in pulsar.
* */
@Getter
public enum BoundaryType {

TIME("time"),
OFFSET("offset");

private final String type;

BoundaryType(String boundaryType) {
this.type = boundaryType;
}

public static BoundaryType getInstance(String boundaryType) {
for (BoundaryType type : values()) {
if (type.getType().equalsIgnoreCase(boundaryType)) {
return type;
}
}
return null;
}

public static boolean isSupportBoundaryType(String boundaryType) {
for (BoundaryType source : values()) {
if (source.getType().equalsIgnoreCase(boundaryType)) {
return true;
}
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,21 @@ public class Constants {
.defaultValue("stream")
.withDescription("The runtime execution mode of Flink, including stream and batch, default is stream");

// ------------------------------------------------------------------------
// Source boundary related
// ------------------------------------------------------------------------
public static final ConfigOption<String> SOURCE_BOUNDARY_TYPE = key("source.boundary.type")
.defaultValue("time")
.withDescription("The type of source boundary");

public static final ConfigOption<String> SOURCE_LOWER_BOUNDARY = key("source.lower.boundary")
.defaultValue("0")
.withDescription("The lower bound of source");

public static final ConfigOption<String> SOURCE_UPPER_BOUNDARY = key("source.upper.boundary")
.defaultValue("0")
.withDescription("The upper bound of source");

// ------------------------------------------------------------------------
// Metrics related
// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.inlong.sort.protocol.node;

import org.apache.inlong.common.bounded.Boundaries;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.node.extract.DorisExtractNode;
import org.apache.inlong.sort.protocol.node.extract.FileSystemExtractNode;
Expand Down Expand Up @@ -106,4 +107,10 @@ public ExtractNode(@JsonProperty("id") String id,
this.watermarkField = watermarkField;
this.properties = properties;
}

public void fillInBoundaries(Boundaries boundaries) {
Preconditions.checkNotNull(boundaries, "boundaries is null");
// every single kind of extract node should provide the way to fill in boundaries individually
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.inlong.sort.protocol.node.extract;

import org.apache.inlong.common.bounded.Boundaries;
import org.apache.inlong.common.bounded.BoundaryType;
import org.apache.inlong.common.enums.MetaField;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.InlongMetric;
Expand All @@ -33,20 +35,25 @@
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

@EqualsAndHashCode(callSuper = true)
@JsonTypeName("pulsarExtract")
@Data
public class PulsarExtractNode extends ExtractNode implements InlongMetric, Metadata {

private static final Logger log = LoggerFactory.getLogger(PulsarExtractNode.class);
private static final long serialVersionUID = 1L;

@Nonnull
Expand Down Expand Up @@ -89,6 +96,8 @@ public class PulsarExtractNode extends ExtractNode implements InlongMetric, Meta
@JsonProperty("clientAuthParams")
private String clientAuthParams;

Map<String, String> sourceBoundaryOptions = new HashMap<>();

@JsonCreator
public PulsarExtractNode(@JsonProperty("id") String id,
@JsonProperty("name") String name,
Expand Down Expand Up @@ -146,11 +155,17 @@ public Map<String, String> tableOptions() {
options.put("scan.startup.sub-name", scanStartupSubName);
options.put("scan.startup.sub-start-offset", scanStartupSubStartOffset);
}

if (StringUtils.isNotBlank(clientAuthPluginClassName)
&& StringUtils.isNotBlank(clientAuthParams)) {
options.put("pulsar.client.authPluginClassName", clientAuthPluginClassName);
options.put("pulsar.client.authParams", clientAuthParams);
}

// add boundary options
if (!sourceBoundaryOptions.isEmpty()) {
options.putAll(sourceBoundaryOptions);
}
return options;
}

Expand Down Expand Up @@ -197,4 +212,18 @@ public Set<MetaField> supportedMetaFields() {
return EnumSet.of(MetaField.AUDIT_DATA_TIME);
}

@Override
public void fillInBoundaries(Boundaries boundaries) {
super.fillInBoundaries(boundaries);
BoundaryType boundaryType = boundaries.getBoundaryType();
String lowerBoundary = boundaries.getLowerBound();
String upperBoundary = boundaries.getUpperBound();
if (Objects.requireNonNull(boundaryType) == BoundaryType.TIME) {
sourceBoundaryOptions.put("source.start.publish-time", lowerBoundary);
sourceBoundaryOptions.put("source.stop.at-publish-time", upperBoundary);
log.info("Filled in source boundaries options");
} else {
log.warn("Not supported boundary type: {}", boundaryType);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@

package org.apache.inlong.sort;

import org.apache.inlong.common.bounded.Boundaries;
import org.apache.inlong.common.bounded.BoundaryType;
import org.apache.inlong.sort.configuration.Configuration;
import org.apache.inlong.sort.configuration.Constants;
import org.apache.inlong.sort.parser.Parser;
import org.apache.inlong.sort.parser.impl.FlinkSqlParser;
import org.apache.inlong.sort.parser.impl.NativeFlinkSqlParser;
import org.apache.inlong.sort.parser.result.ParseResult;
import org.apache.inlong.sort.protocol.GroupInfo;
import org.apache.inlong.sort.protocol.node.ExtractNode;
import org.apache.inlong.sort.util.ParameterTool;

import com.google.common.base.Preconditions;
Expand All @@ -33,13 +36,20 @@
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;

import static org.apache.inlong.sort.configuration.Constants.SOURCE_BOUNDARY_TYPE;
import static org.apache.inlong.sort.configuration.Constants.SOURCE_LOWER_BOUNDARY;
import static org.apache.inlong.sort.configuration.Constants.SOURCE_UPPER_BOUNDARY;

public class Entrance {

private static final Logger log = LoggerFactory.getLogger(Entrance.class);
public static final String BATCH_MODE = "batch";

public static void main(String[] args) throws Exception {
Expand Down Expand Up @@ -75,6 +85,10 @@ public static void main(String[] args) throws Exception {
groupInfo.getProperties().putIfAbsent(Constants.METRICS_AUDIT_PROXY_HOSTS.key(),
config.getString(Constants.METRICS_AUDIT_PROXY_HOSTS));
}

// fill in boundaries if needed
fillInSourceBoundariesIfNeeded(runtimeExecutionMode, groupInfo, config);

parser = FlinkSqlParser.getInstance(tableEnv, groupInfo);
} else {
String statements = getStatementSetFromFile(sqlFile);
Expand All @@ -85,6 +99,34 @@ public static void main(String[] args) throws Exception {
parseResult.execute();
}

private static void fillInSourceBoundariesIfNeeded(String runtimeExecutionMode, GroupInfo groupInfo,
Configuration configuration) {
if (!BATCH_MODE.equalsIgnoreCase(runtimeExecutionMode)) {
return;
}
String type = configuration.getString(SOURCE_BOUNDARY_TYPE);
String lowerBoundary = configuration.getString(SOURCE_LOWER_BOUNDARY);
String upperBoundary = configuration.getString(SOURCE_UPPER_BOUNDARY);

log.info("Filling in source boundaries for group: {}, with execution mode: {}, boundaryType: {}, "
+ "lowerBoundary: {}, upperBoundary: {}",
groupInfo.getGroupId(), runtimeExecutionMode, type, lowerBoundary, upperBoundary);

BoundaryType boundaryType = BoundaryType.getInstance(type);
if (boundaryType == null) {
throw new RuntimeException("Unknown boundary type: " + type);
}
Boundaries boundaries = new Boundaries(lowerBoundary, upperBoundary, boundaryType);
// add source boundaries for bounded source
groupInfo.getStreams().forEach(streamInfo -> {
streamInfo.getNodes().forEach(node -> {
if (node instanceof ExtractNode) {
((ExtractNode) node).fillInBoundaries(boundaries);
}
});
});
}

private static String getStatementSetFromFile(String fileName) throws IOException {
return Files.asCharSource(new File(fileName), StandardCharsets.UTF_8).read();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,13 @@
import static org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SINK_MESSAGE_DELAY_INTERVAL;
import static org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SINK_TOPIC_ROUTING_MODE;
import static org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SOURCE_START_FROM_MESSAGE_ID;
import static org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SOURCE_START_FROM_MESSAGE_ID_DEPRECATED;
import static org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SOURCE_START_FROM_PUBLISH_TIME;
import static org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SOURCE_STOP_AFTER_MESSAGE_ID;
import static org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SOURCE_STOP_AT_MESSAGE_ID;
import static org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SOURCE_STOP_AT_PUBLISH_TIME;
import static org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SOURCE_SUBSCRIPTION_NAME;
import static org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SOURCE_SUBSCRIPTION_NAME_DEPRECATED;
import static org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SOURCE_SUBSCRIPTION_TYPE;
import static org.apache.inlong.sort.pulsar.table.PulsarTableOptions.STARTUP_MODE;
import static org.apache.inlong.sort.pulsar.table.PulsarTableOptions.TOPIC;
Expand Down Expand Up @@ -271,8 +273,10 @@ public Set<ConfigOption<?>> optionalOptions() {
ADMIN_URL,
STARTUP_MODE,
SOURCE_SUBSCRIPTION_NAME,
SOURCE_SUBSCRIPTION_NAME_DEPRECATED,
SOURCE_SUBSCRIPTION_TYPE,
SOURCE_START_FROM_MESSAGE_ID,
SOURCE_START_FROM_MESSAGE_ID_DEPRECATED,
SOURCE_START_FROM_PUBLISH_TIME,
SOURCE_STOP_AT_MESSAGE_ID,
SOURCE_STOP_AFTER_MESSAGE_ID,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,16 @@ private PulsarTableOptions() {
* Copied because we want to have a default value for it.
*/
public static final ConfigOption<String> SOURCE_SUBSCRIPTION_NAME =
ConfigOptions.key("source.subscription-name")
.stringType()
.noDefaultValue()
.withDescription(
Description.builder()
.text(
"The subscription name of the consumer that is used by the runtime [Pulsar DataStream source connector](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/pulsar/#pulsar-source). This argument is required for constructing the consumer.")
.build());

public static final ConfigOption<String> SOURCE_SUBSCRIPTION_NAME_DEPRECATED =
ConfigOptions.key("scan.startup.sub-name")
.stringType()
.noDefaultValue()
Expand All @@ -89,6 +99,24 @@ private PulsarTableOptions() {
.build());

public static final ConfigOption<String> SOURCE_START_FROM_MESSAGE_ID =
ConfigOptions.key("source.start.message-id")
.stringType()
.noDefaultValue()
.withDescription(
Description.builder()
.text(
"(Optional) Message id that is used to specify a consuming starting "
+ "point for source. Use %s, %s or pass in a message id "
+ "representation in %s, "
+ "such as %s. This option takes precedence over "
+ "source.start.publish-time.",
code("earliest"),
code("latest"),
code("ledgerId:entryId:partitionId"),
code("12:2:-1"))
.build());

public static final ConfigOption<String> SOURCE_START_FROM_MESSAGE_ID_DEPRECATED =
ConfigOptions.key("scan.startup.sub-start-offset")
.stringType()
.noDefaultValue()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.connector.pulsar.source.PulsarSource;
import org.apache.flink.connector.pulsar.source.PulsarSourceBuilder;
import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
import org.apache.flink.connector.pulsar.source.enumerator.cursor.stop.NeverStopCursor;
import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
import org.apache.flink.connector.pulsar.table.source.PulsarReadableMetadata;
import org.apache.flink.table.connector.ChangelogMode;
Expand Down Expand Up @@ -114,15 +116,18 @@ public ChangelogMode getChangelogMode() {
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
PulsarDeserializationSchema<RowData> deserializationSchema =
deserializationSchemaFactory.createPulsarDeserialization(context);
PulsarSource<RowData> source =
PulsarSource.builder()
.setTopics(topics)
.setStartCursor(startCursor)
.setUnboundedStopCursor(stopCursor)
.setDeserializationSchema(deserializationSchema)
.setProperties(properties)
.build();
return SourceProvider.of(source);
PulsarSourceBuilder<RowData> sourceBuilder = PulsarSource.builder();
sourceBuilder
.setTopics(topics)
.setStartCursor(startCursor)
.setDeserializationSchema(deserializationSchema)
.setProperties(properties);
if (!(stopCursor instanceof NeverStopCursor)) {
sourceBuilder.setBoundedStopCursor(stopCursor);
} else {
sourceBuilder.setUnboundedStopCursor(stopCursor);
}
return SourceProvider.of(sourceBuilder.build());
}

/**
Expand Down

0 comments on commit 1a69810

Please sign in to comment.