Skip to content

Commit

Permalink
[api-draft][connector] apache pulsar source (#1984)
Browse files Browse the repository at this point in the history
* [api-draft][connector] apache pulsar source

# Conflicts:
#	seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml

* [api-draft][connector] fix type not found

* [api-draft][connector] add pulsar dependencies & licenses

# Conflicts:
#	seatunnel-connectors/seatunnel-connectors-seatunnel-dist/pom.xml

* [api-draft][connector] fix deserialization schema

* [api-draft][connector] pulsar source boundedness
  • Loading branch information
ashulin authored Jun 17, 2022
1 parent 1113cb9 commit a8d47c9
Show file tree
Hide file tree
Showing 36 changed files with 3,287 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.commons.lang3.StringUtils;

import java.util.Properties;
import java.util.function.Consumer;
import java.util.function.Function;

public final class PropertiesUtil {

Expand All @@ -39,4 +43,37 @@ public static void setProperties(Config config, Properties properties, String pr
}
});
}

public static <E extends Enum<E>> E getEnum(final Config conf, final String key, final Class<E> enumClass, final E defaultEnum) {
if (!conf.hasPath(key)) {
return defaultEnum;
}
final String value = conf.getString(key);
if (StringUtils.isBlank(value)) {
return defaultEnum;
}
return Enum.valueOf(enumClass, value.toUpperCase());
}

public static <T> void setOption(Config config, String optionName, T defaultValue, Function<String, T> getter, Consumer<T> setter) {
T value;
if (config.hasPath(optionName)) {
value = getter.apply(optionName);
} else {
value = defaultValue;
}
if (value != null) {
setter.accept(value);
}
}

public static <T> void setOption(Config config, String optionName, Function<String, T> getter, Consumer<T> setter) {
T value = null;
if (config.hasPath(optionName)) {
value = getter.apply(optionName);
}
if (value != null) {
setter.accept(value);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@
<artifactId>seatunnel-connector-seatunnel-clickhouse</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-connector-seatunnel-pulsar</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>

<build>
Expand All @@ -94,4 +99,4 @@
</plugin>
</plugins>
</build>
</project>
</project>
3 changes: 2 additions & 1 deletion seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,6 @@
<module>seatunnel-connector-seatunnel-jdbc</module>
<module>seatunnel-connector-seatunnel-socket</module>
<module>seatunnel-connector-seatunnel-clickhouse</module>
<module>seatunnel-connector-seatunnel-pulsar</module>
</modules>
</project>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>seatunnel-connectors-seatunnel</artifactId>
<groupId>org.apache.seatunnel</groupId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>seatunnel-connector-seatunnel-pulsar</artifactId>

<properties>
<pulsar.version>2.8.0</pulsar.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-api</artifactId>
<version>${project.version}</version>
</dependency>
<!-- Pulsar testing environment -->

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>pulsar</artifactId>
<version>${testcontainer.version}</version>
<scope>test</scope>
</dependency>

<!-- Pulsar bundles the latest bookkeeper & zookeeper, -->
<!-- we don't override the version here. -->
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>testmocks</artifactId>
<version>${pulsar.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
</exclusion>
<exclusion>
<groupId>org.powermock</groupId>
<artifactId>powermock-module-testng</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-broker</artifactId>
<version>${pulsar.version}</version>
<scope>test</scope>
</dependency>
<!-- Pulsar use a newer commons-lang3 in broker. -->
<!-- Bump the version only for testing. -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>${commons-lang3.version}</version>
<scope>test</scope>
</dependency>

<!-- Add Pulsar 2.x as a dependency. -->
<!-- Move this to button for avoiding class conflicts with pulsar-broker. -->

<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client-all</artifactId>
<version>${pulsar.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-package-core</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.pulsar.config;

import java.io.Serializable;

public abstract class BasePulsarConfig implements Serializable {
private final String authPluginClassName;
private final String authParams;

public BasePulsarConfig(String authPluginClassName, String authParams) {
this.authPluginClassName = authPluginClassName;
this.authParams = authParams;
}

public String getAuthPluginClassName() {
return authPluginClassName;
}

public String getAuthParams() {
return authParams;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.pulsar.config;

import org.apache.pulsar.shade.com.google.common.base.Preconditions;
import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils;

// TODO: more field

public class PulsarAdminConfig extends BasePulsarConfig {
private static final long serialVersionUID = 1L;
private final String adminUrl;

private PulsarAdminConfig(String authPluginClassName, String authParams, String adminUrl) {
super(authPluginClassName, authParams);
this.adminUrl = adminUrl;
}

public String getAdminUrl() {
return adminUrl;
}

public static Builder builder() {
return new Builder();
}

public static final class Builder {
/**
* Name of the authentication plugin.
*/
private String authPluginClassName = "";
/**
* Parameters for the authentication plugin.
*/
private String authParams = "";
private String adminUrl;

private Builder() {
}

public Builder authPluginClassName(String authPluginClassName) {
this.authPluginClassName = authPluginClassName;
return this;
}

public Builder authParams(String authParams) {
this.authParams = authParams;
return this;
}

public Builder adminUrl(String adminUrl) {
this.adminUrl = adminUrl;
return this;
}

public PulsarAdminConfig build() {
Preconditions.checkArgument(StringUtils.isNotBlank(adminUrl), "Pulsar admin URL is required.");
return new PulsarAdminConfig(authPluginClassName, authParams, adminUrl);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.pulsar.config;

import org.apache.pulsar.shade.com.google.common.base.Preconditions;
import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils;

// TODO: more field

public class PulsarClientConfig extends BasePulsarConfig {
private static final long serialVersionUID = 1L;

private final String serviceUrl;

private PulsarClientConfig(String authPluginClassName, String authParams, String serviceUrl) {
super(authPluginClassName, authParams);
this.serviceUrl = serviceUrl;
}

public String getServiceUrl() {
return serviceUrl;
}

public static Builder builder() {
return new Builder();
}

public static final class Builder {
/**
* Name of the authentication plugin.
*/
private String authPluginClassName = "";
/**
* Parameters for the authentication plugin.
*/
private String authParams = "";
/**
* Service URL provider for Pulsar service.
*/
private String serviceUrl;

private Builder() {
}

public Builder authPluginClassName(String authPluginClassName) {
this.authPluginClassName = authPluginClassName;
return this;
}

public Builder authParams(String authParams) {
this.authParams = authParams;
return this;
}

public Builder serviceUrl(String serviceUrl) {
this.serviceUrl = serviceUrl;
return this;
}

public PulsarClientConfig build() {
Preconditions.checkArgument(StringUtils.isNotBlank(serviceUrl), "Pulsar service URL is required.");
return new PulsarClientConfig(authPluginClassName, authParams, serviceUrl);
}
}
}
Loading

0 comments on commit a8d47c9

Please sign in to comment.