diff --git a/.github/workflows/backend.yml b/.github/workflows/backend.yml
index eea3507e5..201cee7e2 100644
--- a/.github/workflows/backend.yml
+++ b/.github/workflows/backend.yml
@@ -49,8 +49,14 @@ jobs:
- uses: actions/checkout@v3
with:
submodules: true
+ - name: Set up JDK ${{ matrix.java }}
+ uses: actions/setup-java@v3
+ with:
+ java-version: 8
+ distribution: 'temurin'
+ cache: 'maven'
- name: Check code style
- run: ./mvnw --batch-mode --quiet --no-snapshot-updates clean checkstyle:check
+ run: ./mvnw --batch-mode --quiet --no-snapshot-updates clean spotless:check
dead-link:
if: github.repository == 'apache/incubator-seatunnel-web'
diff --git a/pom.xml b/pom.xml
index c015a38de..ef0fc19a7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -13,8 +13,7 @@
See the License for the specific language governing permissions and
limitations under the License.
-->
-
4.0.0
@@ -26,55 +25,17 @@
org.apache.seatunnel
seatunnel-web
- pom
${revision}
+ pom
SeaTunnel
-
- Production ready big data processing product based on Apache Spark and Apache Flink.
-
-
- https://github.com/apache/incubator-seatunnel
-
-
-
- The Apache License, Version 2.0
- https://www.apache.org/licenses/LICENSE-2.0.txt
-
-
-
-
- scm:git:https://github.com/apache/incubator-seatunnel.git
- scm:git:https://github.com/apache/incubator-seatunnel.git
- https://github.com/apache/incubator-seatunnel
- HEAD
-
-
-
- GitHub
- https://github.com/apache/incubator-seatunnel/issues
-
-
-
-
- SeaTunnel Developer List
- dev@seatunnel.apache.org
- dev-subscribe@seatunnel.apache.org
- dev-unsubscribe@seatunnel.apache.org
-
-
- SeaTunnel Commits List
- commits@seatunnel.apache.org
- commits-subscribe@seatunnel.apache.org
- commits-unsubscribe@seatunnel.apache.org
-
-
+ Production ready big data processing product based on Apache Spark and Apache Flink.
- seatunnel-server
- seatunnel-datasource
- seatunnel-web-dist
+ seatunnel-server
+ seatunnel-datasource
+ seatunnel-web-dist
@@ -143,6 +104,7 @@
2.3.1
3.1.4
1.11.271
+ 2.29.0
@@ -154,24 +116,24 @@
${seatunnel-framework.version}
- jcl-over-slf4j
org.slf4j
+ jcl-over-slf4j
- log4j-1.2-api
org.apache.logging.log4j
+ log4j-1.2-api
- log4j-api
org.apache.logging.log4j
+ log4j-api
- log4j-core
org.apache.logging.log4j
+ log4j-core
- log4j-slf4j-impl
org.apache.logging.log4j
+ log4j-slf4j-impl
@@ -187,28 +149,28 @@
jackson-dataformat-properties
- slf4j-log4j12
org.slf4j
+ slf4j-log4j12
- log4j-1.2-api
org.apache.logging.log4j
+ log4j-1.2-api
- jcl-over-slf4j
org.slf4j
+ jcl-over-slf4j
- log4j-api
org.apache.logging.log4j
+ log4j-api
- log4j-core
org.apache.logging.log4j
+ log4j-core
- log4j-slf4j-impl
org.apache.logging.log4j
+ log4j-slf4j-impl
@@ -219,24 +181,24 @@
${seatunnel-framework.version}
- jcl-over-slf4j
org.slf4j
+ jcl-over-slf4j
- log4j-1.2-api
org.apache.logging.log4j
+ log4j-1.2-api
- log4j-api
org.apache.logging.log4j
+ log4j-api
- log4j-core
org.apache.logging.log4j
+ log4j-core
- log4j-slf4j-impl
org.apache.logging.log4j
+ log4j-slf4j-impl
@@ -260,24 +222,24 @@
${seatunnel-framework.version}
- jcl-over-slf4j
org.slf4j
+ jcl-over-slf4j
- log4j-1.2-api
org.apache.logging.log4j
+ log4j-1.2-api
- log4j-api
org.apache.logging.log4j
+ log4j-api
- log4j-core
org.apache.logging.log4j
+ log4j-core
- log4j-slf4j-impl
org.apache.logging.log4j
+ log4j-slf4j-impl
@@ -344,8 +306,8 @@
spring-boot-starter-jdbc
- spring-boot-autoconfigure
org.springframework.boot
+ spring-boot-autoconfigure
@@ -502,8 +464,8 @@
org.apache.seatunnel
seatunnel-hadoop3-3.1.4-uber
- provided
${hadoop-uber.version}
+ provided
org.apache.avro
@@ -535,8 +497,8 @@
${cron-utils.version}
- javassist
org.javassist
+ javassist
@@ -593,8 +555,7 @@
${skipUT}
- ${project.build.directory}/jacoco.exec
-
+ ${project.build.directory}/jacoco.exec
**/*IT.java
@@ -602,7 +563,6 @@
-
org.apache.maven.plugins
@@ -664,8 +624,7 @@
${maven-checkstyle-plugin.version}
- ${maven.multiModuleProjectDirectory}/tools/checkstyle/checkStyle.xml
-
+ ${maven.multiModuleProjectDirectory}/tools/checkstyle/checkStyle.xml
UTF-8
true
true
@@ -674,28 +633,23 @@
${project.build.sourceDirectory}
${project.build.testSourceDirectory}
-
- **/*.properties,
+ **/*.properties,
**/*.sh,
**/*.bat,
**/*.yml,
**/*.yaml,
- **/*.xml
-
-
- **/.asf.yaml,
- **/.github/**
-
-
-
+ **/*.xml
+ **/.asf.yaml,
+ **/.github/**
+
validate
- process-sources
check
+ process-sources
@@ -737,7 +691,6 @@
-
org.codehaus.mojo
build-helper-maven-plugin
@@ -791,6 +744,81 @@
maven-dependency-plugin
${maven-dependency-plugin.version}
+
+
+ com.diffplug.spotless
+ spotless-maven-plugin
+ ${spotless.version}
+
+
+
+ 1.7
+
+
+
+
+
+ org.apache.seatunnel.shade,org.apache.seatunnel,org.apache,org,,javax,java,\#
+
+
+ Remove wildcard imports
+ import\s+(static)*\s*[^\*\s]+\*;(\r\n|\r|\n)
+ $1
+
+
+ Block powermock
+ import\s+org\.powermock\.[^\*\s]*(|\*);(\r\n|\r|\n)
+ $1
+
+
+ Block jUnit4 imports
+ import\s+org\.junit\.[^jupiter][^\*\s]*(|\*);(\r\n|\r|\n)
+ $1
+
+
+
+
+ UTF-8
+ 4
+ true
+ false
+ true
+ true
+ false
+ false
+ custom_1
+ false
+ false
+
+
+ Leading blank line
+ project
+ project
+
+
+
+
+ docs/**/*.md
+
+
+ **/.github/**/*.md
+
+
+
+
+ true
+
+
+
+
+
+ check
+
+ compile
+
+
+
+
@@ -837,7 +865,48 @@
org.codehaus.mojo
license-maven-plugin
+
+
+ com.diffplug.spotless
+ spotless-maven-plugin
+
+ https://github.com/apache/incubator-seatunnel
+
+
+
+ The Apache License, Version 2.0
+ https://www.apache.org/licenses/LICENSE-2.0.txt
+
+
+
+
+
+ SeaTunnel Developer List
+ dev-subscribe@seatunnel.apache.org
+ dev-unsubscribe@seatunnel.apache.org
+ dev@seatunnel.apache.org
+
+
+ SeaTunnel Commits List
+ commits-subscribe@seatunnel.apache.org
+ commits-unsubscribe@seatunnel.apache.org
+ commits@seatunnel.apache.org
+
+
+
+
+ scm:git:https://github.com/apache/incubator-seatunnel.git
+ scm:git:https://github.com/apache/incubator-seatunnel.git
+ https://github.com/apache/incubator-seatunnel
+ HEAD
+
+
+
+ GitHub
+ https://github.com/apache/incubator-seatunnel/issues
+
+
diff --git a/seatunnel-datasource/seatunnel-datasource-client/src/main/java/org/apache/seatunnel/datasource/AbstractDataSourceClient.java b/seatunnel-datasource/seatunnel-datasource-client/src/main/java/org/apache/seatunnel/datasource/AbstractDataSourceClient.java
index 7cc2783ed..b9e3b4f33 100644
--- a/seatunnel-datasource/seatunnel-datasource-client/src/main/java/org/apache/seatunnel/datasource/AbstractDataSourceClient.java
+++ b/seatunnel-datasource/seatunnel-datasource-client/src/main/java/org/apache/seatunnel/datasource/AbstractDataSourceClient.java
@@ -17,8 +17,6 @@
package org.apache.seatunnel.datasource;
-import static com.google.common.base.Preconditions.checkNotNull;
-
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.datasource.exception.DataSourceSDKException;
import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel;
@@ -34,6 +32,8 @@
import java.util.ServiceLoader;
import java.util.concurrent.atomic.AtomicInteger;
+import static com.google.common.base.Preconditions.checkNotNull;
+
public abstract class AbstractDataSourceClient implements DataSourceService {
private Map supportedDataSourceInfo = new HashMap<>();
@@ -47,23 +47,23 @@ public abstract class AbstractDataSourceClient implements DataSourceService {
protected AbstractDataSourceClient() {
AtomicInteger dataSourceIndex = new AtomicInteger();
ServiceLoader.load(DataSourceFactory.class)
- .forEach(
- seaTunnelDataSourceFactory -> {
- seaTunnelDataSourceFactory
- .supportedDataSources()
- .forEach(
- dataSourceInfo -> {
- supportedDataSourceInfo.put(
- dataSourceInfo.getName().toUpperCase(),
- dataSourceInfo);
- supportedDataSourceIndex.put(
- dataSourceInfo.getName().toUpperCase(),
- dataSourceIndex.get());
- supportedDataSources.add(dataSourceInfo);
- });
- dataSourceChannels.add(seaTunnelDataSourceFactory.createChannel());
- dataSourceIndex.getAndIncrement();
- });
+ .forEach(
+ seaTunnelDataSourceFactory -> {
+ seaTunnelDataSourceFactory
+ .supportedDataSources()
+ .forEach(
+ dataSourceInfo -> {
+ supportedDataSourceInfo.put(
+ dataSourceInfo.getName().toUpperCase(),
+ dataSourceInfo);
+ supportedDataSourceIndex.put(
+ dataSourceInfo.getName().toUpperCase(),
+ dataSourceIndex.get());
+ supportedDataSources.add(dataSourceInfo);
+ });
+ dataSourceChannels.add(seaTunnelDataSourceFactory.createChannel());
+ dataSourceIndex.getAndIncrement();
+ });
if (supportedDataSourceInfo.isEmpty()) {
throw new DataSourceSDKException("No supported data source found");
}
@@ -71,9 +71,9 @@ protected AbstractDataSourceClient() {
@Override
public Boolean checkDataSourceConnectivity(
- String pluginName, Map dataSourceParams) {
+ String pluginName, Map dataSourceParams) {
return getDataSourceChannel(pluginName)
- .checkDataSourceConnectivity(pluginName, dataSourceParams);
+ .checkDataSourceConnectivity(pluginName, dataSourceParams);
}
@Override
@@ -86,7 +86,7 @@ protected DataSourceChannel getDataSourceChannel(String pluginName) {
Integer index = supportedDataSourceIndex.get(pluginName.toUpperCase());
if (index == null) {
throw new DataSourceSDKException(
- "The %s plugin is not supported or plugin not exist.", pluginName);
+ "The %s plugin is not supported or plugin not exist.", pluginName);
}
return dataSourceChannels.get(index);
}
@@ -99,12 +99,12 @@ public OptionRule queryDataSourceFieldByName(String pluginName) {
@Override
public OptionRule queryMetadataFieldByName(String pluginName) {
return getDataSourceChannel(pluginName)
- .getDatasourceMetadataFieldsByDataSourceName(pluginName);
+ .getDatasourceMetadataFieldsByDataSourceName(pluginName);
}
@Override
public List getTables(
- String pluginName, String databaseName, Map requestParams) {
+ String pluginName, String databaseName, Map requestParams) {
return getDataSourceChannel(pluginName).getTables(pluginName, requestParams, databaseName);
}
@@ -115,21 +115,21 @@ public List getDatabases(String pluginName, Map requestP
@Override
public List getTableFields(
- String pluginName,
- Map requestParams,
- String databaseName,
- String tableName) {
+ String pluginName,
+ Map requestParams,
+ String databaseName,
+ String tableName) {
return getDataSourceChannel(pluginName)
- .getTableFields(pluginName, requestParams, databaseName, tableName);
+ .getTableFields(pluginName, requestParams, databaseName, tableName);
}
@Override
public Map> getTableFields(
- String pluginName,
- Map requestParams,
- String databaseName,
- List tableNames) {
+ String pluginName,
+ Map requestParams,
+ String databaseName,
+ List tableNames) {
return getDataSourceChannel(pluginName)
- .getTableFields(pluginName, requestParams, databaseName, tableNames);
+ .getTableFields(pluginName, requestParams, databaseName, tableNames);
}
}
diff --git a/seatunnel-datasource/seatunnel-datasource-client/src/main/java/org/apache/seatunnel/datasource/service/DataSourceService.java b/seatunnel-datasource/seatunnel-datasource-client/src/main/java/org/apache/seatunnel/datasource/service/DataSourceService.java
index 9ab18325d..5e52c0777 100644
--- a/seatunnel-datasource/seatunnel-datasource-client/src/main/java/org/apache/seatunnel/datasource/service/DataSourceService.java
+++ b/seatunnel-datasource/seatunnel-datasource-client/src/main/java/org/apache/seatunnel/datasource/service/DataSourceService.java
@@ -67,18 +67,18 @@ public interface DataSourceService {
/**
* get data source table names by database name
*
- * @param pluginName plugin name
- * @param databaseName database name
+ * @param pluginName plugin name
+ * @param databaseName database name
* @param requestParams connection params
* @return table names
*/
List getTables(
- String pluginName, String databaseName, Map requestParams);
+ String pluginName, String databaseName, Map requestParams);
/**
* get data source database names
*
- * @param pluginName plugin name
+ * @param pluginName plugin name
* @param requestParams connection params
* @return database names
*/
@@ -87,30 +87,30 @@ List getTables(
/**
* get data source table fields
*
- * @param pluginName plugin name
+ * @param pluginName plugin name
* @param requestParams connection params
- * @param databaseName database name
- * @param tableName table name
+ * @param databaseName database name
+ * @param tableName table name
* @return table fields
*/
List getTableFields(
- String pluginName,
- Map requestParams,
- String databaseName,
- String tableName);
+ String pluginName,
+ Map requestParams,
+ String databaseName,
+ String tableName);
/**
* get data source table fields
*
- * @param pluginName plugin name
+ * @param pluginName plugin name
* @param requestParams connection params
- * @param databaseName database name
- * @param tableNames table names
+ * @param databaseName database name
+ * @param tableNames table names
* @return table fields
*/
Map> getTableFields(
- String pluginName,
- Map requestParams,
- String databaseName,
- List tableNames);
+ String pluginName,
+ Map requestParams,
+ String databaseName,
+ List tableNames);
}
diff --git a/seatunnel-datasource/seatunnel-datasource-client/src/test/java/com/whaleops/datasource/s3/S3DatasourceChannelTest.java b/seatunnel-datasource/seatunnel-datasource-client/src/test/java/com/whaleops/datasource/s3/S3DatasourceChannelTest.java
index ece18206a..50c3d2d5b 100644
--- a/seatunnel-datasource/seatunnel-datasource-client/src/test/java/com/whaleops/datasource/s3/S3DatasourceChannelTest.java
+++ b/seatunnel-datasource/seatunnel-datasource-client/src/test/java/com/whaleops/datasource/s3/S3DatasourceChannelTest.java
@@ -19,11 +19,12 @@
import org.apache.seatunnel.datasource.plugin.s3.S3DatasourceChannel;
-import com.google.common.collect.ImmutableMap;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
+import com.google.common.collect.ImmutableMap;
+
import java.util.Map;
@Disabled
@@ -33,23 +34,23 @@ class S3DatasourceChannelTest {
@Test
void checkDataSourceConnectivity() {
Assertions.assertDoesNotThrow(
- () -> {
- S3_DATASOURCE_CHANNEL.checkDataSourceConnectivity("S3", createRequestParams());
- });
+ () -> {
+ S3_DATASOURCE_CHANNEL.checkDataSourceConnectivity("S3", createRequestParams());
+ });
}
private Map createRequestParams() {
Map requestParams =
- new ImmutableMap.Builder()
- .put("bucket", "s3a://poc-kuke")
- .put("fs.s3a.endpoint", "s3.cn-north-1.amazonaws.com.cn")
- .put(
- "fs.s3a.aws.credentials.provider",
- "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
- .put("access_key", "AKIAYYUV5DMXADXRBGTA")
- .put("secret_key", "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")
- .put("hadoop_s3_properties", "")
- .build();
+ new ImmutableMap.Builder()
+ .put("bucket", "s3a://poc-kuke")
+ .put("fs.s3a.endpoint", "s3.cn-north-1.amazonaws.com.cn")
+ .put(
+ "fs.s3a.aws.credentials.provider",
+ "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
+ .put("access_key", "AKIAYYUV5DMXADXRBGTA")
+ .put("secret_key", "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")
+ .put("hadoop_s3_properties", "")
+ .build();
return requestParams;
}
}
diff --git a/seatunnel-datasource/seatunnel-datasource-client/src/test/java/org/apache/seatunnel/datasource/DataSourceClientTest.java b/seatunnel-datasource/seatunnel-datasource-client/src/test/java/org/apache/seatunnel/datasource/DataSourceClientTest.java
index e2877e88b..1c45fe7e1 100644
--- a/seatunnel-datasource/seatunnel-datasource-client/src/test/java/org/apache/seatunnel/datasource/DataSourceClientTest.java
+++ b/seatunnel-datasource/seatunnel-datasource-client/src/test/java/org/apache/seatunnel/datasource/DataSourceClientTest.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.datasource;
import org.apache.commons.lang3.StringUtils;
+
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -27,22 +28,22 @@ class DataSourceClientTest {
@Test
public void listAllDataSources() {
Assertions.assertTrue(
- DATA_SOURCE_CLIENT.listAllDataSources().stream()
- .anyMatch(
- dataSourcePluginInfo ->
- StringUtils.equalsAnyIgnoreCase(
- dataSourcePluginInfo.getName(), "jdbc-mysql")));
+ DATA_SOURCE_CLIENT.listAllDataSources().stream()
+ .anyMatch(
+ dataSourcePluginInfo ->
+ StringUtils.equalsAnyIgnoreCase(
+ dataSourcePluginInfo.getName(), "jdbc-mysql")));
Assertions.assertTrue(
- DATA_SOURCE_CLIENT.listAllDataSources().stream()
- .anyMatch(
- dataSourcePluginInfo ->
- StringUtils.equalsAnyIgnoreCase(
- dataSourcePluginInfo.getName(), "kafka")));
+ DATA_SOURCE_CLIENT.listAllDataSources().stream()
+ .anyMatch(
+ dataSourcePluginInfo ->
+ StringUtils.equalsAnyIgnoreCase(
+ dataSourcePluginInfo.getName(), "kafka")));
Assertions.assertTrue(
- DATA_SOURCE_CLIENT.listAllDataSources().stream()
- .anyMatch(
- dataSourcePluginInfo ->
- StringUtils.equalsAnyIgnoreCase(
- dataSourcePluginInfo.getName(), "elasticsearch")));
+ DATA_SOURCE_CLIENT.listAllDataSources().stream()
+ .anyMatch(
+ dataSourcePluginInfo ->
+ StringUtils.equalsAnyIgnoreCase(
+ dataSourcePluginInfo.getName(), "elasticsearch")));
}
}
diff --git a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-elasticsearch/src/main/java/org/apache/seatunnel/datasource/plugin/elasticsearch/ElasticSearchDataSourceChannel.java b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-elasticsearch/src/main/java/org/apache/seatunnel/datasource/plugin/elasticsearch/ElasticSearchDataSourceChannel.java
index 17495487f..6a4d92436 100644
--- a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-elasticsearch/src/main/java/org/apache/seatunnel/datasource/plugin/elasticsearch/ElasticSearchDataSourceChannel.java
+++ b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-elasticsearch/src/main/java/org/apache/seatunnel/datasource/plugin/elasticsearch/ElasticSearchDataSourceChannel.java
@@ -17,16 +17,17 @@
package org.apache.seatunnel.datasource.plugin.elasticsearch;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel;
import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginException;
import org.apache.seatunnel.datasource.plugin.api.model.TableField;
import org.apache.seatunnel.datasource.plugin.elasticsearch.client.EsRestClient;
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+import org.apache.commons.lang3.StringUtils;
import lombok.NonNull;
-import org.apache.commons.lang3.StringUtils;
import java.util.ArrayList;
import java.util.HashMap;
@@ -54,47 +55,47 @@ public OptionRule getDatasourceMetadataFieldsByDataSourceName(@NonNull String pl
@Override
public List getTables(
- @NonNull String pluginName, Map requestParams, String database) {
+ @NonNull String pluginName, Map requestParams, String database) {
databaseCheck(database);
try (EsRestClient client =
- EsRestClient.createInstance(ConfigFactory.parseMap(requestParams))) {
+ EsRestClient.createInstance(ConfigFactory.parseMap(requestParams))) {
return client.listIndex();
}
}
@Override
public List getDatabases(
- @NonNull String pluginName, @NonNull Map requestParams) {
+ @NonNull String pluginName, @NonNull Map requestParams) {
return DEFAULT_DATABASES;
}
@Override
public boolean checkDataSourceConnectivity(
- @NonNull String pluginName, @NonNull Map requestParams) {
+ @NonNull String pluginName, @NonNull Map requestParams) {
try (EsRestClient client =
- EsRestClient.createInstance(ConfigFactory.parseMap(requestParams))) {
+ EsRestClient.createInstance(ConfigFactory.parseMap(requestParams))) {
client.getClusterInfo();
return true;
} catch (Throwable e) {
throw new DataSourcePluginException(
- "check ElasticSearch connectivity failed, " + e.getMessage(), e);
+ "check ElasticSearch connectivity failed, " + e.getMessage(), e);
}
}
@Override
public List getTableFields(
- @NonNull String pluginName,
- @NonNull Map requestParams,
- @NonNull String database,
- @NonNull String table) {
+ @NonNull String pluginName,
+ @NonNull Map requestParams,
+ @NonNull String database,
+ @NonNull String table) {
databaseCheck(database);
try (EsRestClient client =
- EsRestClient.createInstance(ConfigFactory.parseMap(requestParams))) {
+ EsRestClient.createInstance(ConfigFactory.parseMap(requestParams))) {
Map fieldTypeMapping = client.getFieldTypeMapping(table);
List fields = new ArrayList<>();
fieldTypeMapping.forEach(
- (fieldName, fieldType) ->
- fields.add(convertToTableField(fieldName, fieldType)));
+ (fieldName, fieldType) ->
+ fields.add(convertToTableField(fieldName, fieldType)));
return fields;
} catch (Exception ex) {
throw new DataSourcePluginException("Get table fields failed", ex);
@@ -103,16 +104,16 @@ public List getTableFields(
@Override
public Map> getTableFields(
- @NonNull String pluginName,
- @NonNull Map requestParams,
- @NonNull String database,
- @NonNull List tables) {
+ @NonNull String pluginName,
+ @NonNull Map requestParams,
+ @NonNull String database,
+ @NonNull List tables) {
databaseCheck(database);
Map> tableFields = new HashMap<>();
tables.forEach(
- table ->
- tableFields.put(
- table, getTableFields(pluginName, requestParams, database, table)));
+ table ->
+ tableFields.put(
+ table, getTableFields(pluginName, requestParams, database, table)));
return tableFields;
}
diff --git a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-elasticsearch/src/main/java/org/apache/seatunnel/datasource/plugin/elasticsearch/client/EsRestClient.java b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-elasticsearch/src/main/java/org/apache/seatunnel/datasource/plugin/elasticsearch/client/EsRestClient.java
index 1ef2e7050..4de06bf4a 100644
--- a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-elasticsearch/src/main/java/org/apache/seatunnel/datasource/plugin/elasticsearch/client/EsRestClient.java
+++ b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-elasticsearch/src/main/java/org/apache/seatunnel/datasource/plugin/elasticsearch/client/EsRestClient.java
@@ -17,15 +17,14 @@
package org.apache.seatunnel.datasource.plugin.elasticsearch.client;
-import org.apache.seatunnel.common.utils.JsonUtils;
-import org.apache.seatunnel.datasource.plugin.elasticsearch.ElasticSearchOptionRule;
-
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
-import lombok.extern.slf4j.Slf4j;
+import org.apache.seatunnel.common.utils.JsonUtils;
+import org.apache.seatunnel.datasource.plugin.elasticsearch.ElasticSearchOptionRule;
+
import org.apache.http.HttpHost;
import org.apache.http.HttpStatus;
import org.apache.http.auth.AuthScope;
@@ -36,11 +35,14 @@
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.ssl.SSLContexts;
import org.apache.http.util.EntityUtils;
+
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
+import lombok.extern.slf4j.Slf4j;
+
import javax.net.ssl.SSLContext;
import java.io.IOException;
@@ -69,18 +71,18 @@ private EsRestClient(RestClient restClient) {
public static EsRestClient createInstance(Config pluginConfig) {
try {
List hosts =
- OBJECT_MAPPER.readValue(
- pluginConfig.getString(ElasticSearchOptionRule.HOSTS.key()),
- List.class);
+ OBJECT_MAPPER.readValue(
+ pluginConfig.getString(ElasticSearchOptionRule.HOSTS.key()),
+ List.class);
Optional username = Optional.empty();
Optional password = Optional.empty();
if (pluginConfig.hasPath(ElasticSearchOptionRule.USERNAME.key())) {
username =
- Optional.of(pluginConfig.getString(ElasticSearchOptionRule.USERNAME.key()));
+ Optional.of(pluginConfig.getString(ElasticSearchOptionRule.USERNAME.key()));
if (pluginConfig.hasPath(ElasticSearchOptionRule.PASSWORD.key())) {
password =
- Optional.of(
- pluginConfig.getString(ElasticSearchOptionRule.PASSWORD.key()));
+ Optional.of(
+ pluginConfig.getString(ElasticSearchOptionRule.PASSWORD.key()));
}
}
Optional keystorePath = Optional.empty();
@@ -88,141 +90,141 @@ public static EsRestClient createInstance(Config pluginConfig) {
Optional truststorePath = Optional.empty();
Optional truststorePassword = Optional.empty();
boolean tlsVerifyCertificate =
- ElasticSearchOptionRule.TLS_VERIFY_CERTIFICATE.defaultValue();
+ ElasticSearchOptionRule.TLS_VERIFY_CERTIFICATE.defaultValue();
if (pluginConfig.hasPath(ElasticSearchOptionRule.TLS_VERIFY_CERTIFICATE.key())) {
tlsVerifyCertificate =
- pluginConfig.getBoolean(
- ElasticSearchOptionRule.TLS_VERIFY_CERTIFICATE.key());
+ pluginConfig.getBoolean(
+ ElasticSearchOptionRule.TLS_VERIFY_CERTIFICATE.key());
}
if (tlsVerifyCertificate) {
if (pluginConfig.hasPath(ElasticSearchOptionRule.TLS_KEY_STORE_PATH.key())) {
keystorePath =
- Optional.of(
- pluginConfig.getString(
- ElasticSearchOptionRule.TLS_KEY_STORE_PATH.key()));
+ Optional.of(
+ pluginConfig.getString(
+ ElasticSearchOptionRule.TLS_KEY_STORE_PATH.key()));
}
if (pluginConfig.hasPath(ElasticSearchOptionRule.TLS_KEY_STORE_PASSWORD.key())) {
keystorePassword =
- Optional.of(
- pluginConfig.getString(
- ElasticSearchOptionRule.TLS_KEY_STORE_PASSWORD.key()));
+ Optional.of(
+ pluginConfig.getString(
+ ElasticSearchOptionRule.TLS_KEY_STORE_PASSWORD.key()));
}
if (pluginConfig.hasPath(ElasticSearchOptionRule.TLS_TRUST_STORE_PATH.key())) {
truststorePath =
- Optional.of(
- pluginConfig.getString(
- ElasticSearchOptionRule.TLS_TRUST_STORE_PATH.key()));
+ Optional.of(
+ pluginConfig.getString(
+ ElasticSearchOptionRule.TLS_TRUST_STORE_PATH.key()));
}
if (pluginConfig.hasPath(ElasticSearchOptionRule.TLS_TRUST_STORE_PASSWORD.key())) {
truststorePassword =
- Optional.of(
- pluginConfig.getString(
- ElasticSearchOptionRule.TLS_TRUST_STORE_PASSWORD
- .key()));
+ Optional.of(
+ pluginConfig.getString(
+ ElasticSearchOptionRule.TLS_TRUST_STORE_PASSWORD
+ .key()));
}
}
boolean tlsVerifyHostnames = ElasticSearchOptionRule.TLS_VERIFY_HOSTNAME.defaultValue();
if (pluginConfig.hasPath(ElasticSearchOptionRule.TLS_VERIFY_HOSTNAME.key())) {
tlsVerifyHostnames =
- pluginConfig.getBoolean(ElasticSearchOptionRule.TLS_VERIFY_HOSTNAME.key());
+ pluginConfig.getBoolean(ElasticSearchOptionRule.TLS_VERIFY_HOSTNAME.key());
}
return createInstance(
- hosts,
- username,
- password,
- tlsVerifyCertificate,
- tlsVerifyHostnames,
- keystorePath,
- keystorePassword,
- truststorePath,
- truststorePassword);
+ hosts,
+ username,
+ password,
+ tlsVerifyCertificate,
+ tlsVerifyHostnames,
+ keystorePath,
+ keystorePassword,
+ truststorePath,
+ truststorePassword);
} catch (Exception e) {
throw new RuntimeException("Create EsRestClient failed", e);
}
}
public static EsRestClient createInstance(
- List hosts,
- Optional username,
- Optional password,
- boolean tlsVerifyCertificate,
- boolean tlsVerifyHostnames,
- Optional keystorePath,
- Optional keystorePassword,
- Optional truststorePath,
- Optional truststorePassword) {
+ List hosts,
+ Optional username,
+ Optional password,
+ boolean tlsVerifyCertificate,
+ boolean tlsVerifyHostnames,
+ Optional keystorePath,
+ Optional keystorePassword,
+ Optional truststorePath,
+ Optional truststorePassword) {
RestClientBuilder restClientBuilder =
- getRestClientBuilder(
- hosts,
- username,
- password,
- tlsVerifyCertificate,
- tlsVerifyHostnames,
- keystorePath,
- keystorePassword,
- truststorePath,
- truststorePassword);
+ getRestClientBuilder(
+ hosts,
+ username,
+ password,
+ tlsVerifyCertificate,
+ tlsVerifyHostnames,
+ keystorePath,
+ keystorePassword,
+ truststorePath,
+ truststorePassword);
return new EsRestClient(restClientBuilder.build());
}
private static RestClientBuilder getRestClientBuilder(
- List hosts,
- Optional username,
- Optional password,
- boolean tlsVerifyCertificate,
- boolean tlsVerifyHostnames,
- Optional keystorePath,
- Optional keystorePassword,
- Optional truststorePath,
- Optional truststorePassword) {
+ List hosts,
+ Optional username,
+ Optional password,
+ boolean tlsVerifyCertificate,
+ boolean tlsVerifyHostnames,
+ Optional keystorePath,
+ Optional keystorePassword,
+ Optional truststorePath,
+ Optional truststorePassword) {
HttpHost[] httpHosts = new HttpHost[hosts.size()];
for (int i = 0; i < hosts.size(); i++) {
httpHosts[i] = HttpHost.create(hosts.get(i));
}
RestClientBuilder restClientBuilder =
- RestClient.builder(httpHosts)
- .setRequestConfigCallback(
- requestConfigBuilder ->
- requestConfigBuilder
- .setConnectionRequestTimeout(
- CONNECTION_REQUEST_TIMEOUT)
- .setSocketTimeout(SOCKET_TIMEOUT));
+ RestClient.builder(httpHosts)
+ .setRequestConfigCallback(
+ requestConfigBuilder ->
+ requestConfigBuilder
+ .setConnectionRequestTimeout(
+ CONNECTION_REQUEST_TIMEOUT)
+ .setSocketTimeout(SOCKET_TIMEOUT));
restClientBuilder.setHttpClientConfigCallback(
- httpClientBuilder -> {
- if (username.isPresent()) {
- CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
- credentialsProvider.setCredentials(
- AuthScope.ANY,
- new UsernamePasswordCredentials(username.get(), password.get()));
- httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
- }
-
- try {
- if (tlsVerifyCertificate) {
- Optional sslContext =
- SSLUtils.buildSSLContext(
- keystorePath,
- keystorePassword,
- truststorePath,
- truststorePassword);
- sslContext.ifPresent(e -> httpClientBuilder.setSSLContext(e));
- } else {
- SSLContext sslContext =
- SSLContexts.custom()
- .loadTrustMaterial(new TrustAllStrategy())
- .build();
- httpClientBuilder.setSSLContext(sslContext);
+ httpClientBuilder -> {
+ if (username.isPresent()) {
+ CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
+ credentialsProvider.setCredentials(
+ AuthScope.ANY,
+ new UsernamePasswordCredentials(username.get(), password.get()));
+ httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}
- if (!tlsVerifyHostnames) {
- httpClientBuilder.setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE);
+
+ try {
+ if (tlsVerifyCertificate) {
+ Optional sslContext =
+ SSLUtils.buildSSLContext(
+ keystorePath,
+ keystorePassword,
+ truststorePath,
+ truststorePassword);
+ sslContext.ifPresent(e -> httpClientBuilder.setSSLContext(e));
+ } else {
+ SSLContext sslContext =
+ SSLContexts.custom()
+ .loadTrustMaterial(new TrustAllStrategy())
+ .build();
+ httpClientBuilder.setSSLContext(sslContext);
+ }
+ if (!tlsVerifyHostnames) {
+ httpClientBuilder.setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
}
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- return httpClientBuilder;
- });
+ return httpClientBuilder;
+ });
return restClientBuilder;
}
@@ -235,12 +237,12 @@ public ElasticsearchClusterInfo getClusterInfo() {
JsonNode jsonNode = objectMapper.readTree(result);
JsonNode versionNode = jsonNode.get("version");
return ElasticsearchClusterInfo.builder()
- .clusterVersion(versionNode.get("number").asText())
- .distribution(
- Optional.ofNullable(versionNode.get("distribution"))
- .map(JsonNode::asText)
- .orElse(null))
- .build();
+ .clusterVersion(versionNode.get("number").asText())
+ .distribution(
+ Optional.ofNullable(versionNode.get("distribution"))
+ .map(JsonNode::asText)
+ .orElse(null))
+ .build();
} catch (IOException e) {
throw new ResponseException("fail to get elasticsearch version.", e);
}
@@ -265,13 +267,13 @@ public List listIndex() {
if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
String entity = EntityUtils.toString(response.getEntity());
return JsonUtils.toList(entity, Map.class).stream()
- .map(map -> map.get("index").toString())
- .collect(Collectors.toList());
+ .map(map -> map.get("index").toString())
+ .collect(Collectors.toList());
} else {
throw new ResponseException(
- String.format(
- "GET %s response status code=%d",
- endpoint, response.getStatusLine().getStatusCode()));
+ String.format(
+ "GET %s response status code=%d",
+ endpoint, response.getStatusLine().getStatusCode()));
}
} catch (IOException ex) {
throw new ResponseException(ex);
@@ -289,9 +291,9 @@ public void dropIndex(String tableName) {
// todo: if the index doesn't exist, the response status code is 200?
if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
throw new ResponseException(
- String.format(
- "DELETE %s response status code=%d",
- endpoint, response.getStatusLine().getStatusCode()));
+ String.format(
+ "DELETE %s response status code=%d",
+ endpoint, response.getStatusLine().getStatusCode()));
}
} catch (IOException ex) {
throw new ResponseException(ex);
@@ -315,9 +317,9 @@ public Map getFieldTypeMapping(String index) {
}
if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
throw new ResponseException(
- String.format(
- "GET %s response status code=%d",
- endpoint, response.getStatusLine().getStatusCode()));
+ String.format(
+ "GET %s response status code=%d",
+ endpoint, response.getStatusLine().getStatusCode()));
}
String entity = EntityUtils.toString(response.getEntity());
log.info(String.format("GET %s response=%s", endpoint, entity));
@@ -359,9 +361,9 @@ private static Map getFieldTypeMappingFromProperties(JsonNode pr
mapping.put(field, type);
} else {
log.warn(
- String.format(
- "fail to get elasticsearch field %s mapping type,so give a default type text",
- field));
+ String.format(
+ "fail to get elasticsearch field %s mapping type,so give a default type text",
+ field));
mapping.put(field, "text");
}
}
diff --git a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-elasticsearch/src/main/java/org/apache/seatunnel/datasource/plugin/elasticsearch/client/SSLUtils.java b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-elasticsearch/src/main/java/org/apache/seatunnel/datasource/plugin/elasticsearch/client/SSLUtils.java
index 9c10cd3b8..8f3caa204 100644
--- a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-elasticsearch/src/main/java/org/apache/seatunnel/datasource/plugin/elasticsearch/client/SSLUtils.java
+++ b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-elasticsearch/src/main/java/org/apache/seatunnel/datasource/plugin/elasticsearch/client/SSLUtils.java
@@ -17,8 +17,6 @@
package org.apache.seatunnel.datasource.plugin.elasticsearch.client;
-import static java.util.Collections.list;
-
import io.airlift.security.pem.PemReader;
import javax.net.ssl.KeyManager;
@@ -43,29 +41,31 @@
import java.util.List;
import java.util.Optional;
+import static java.util.Collections.list;
+
@SuppressWarnings("MagicNumber")
public final class SSLUtils {
public static Optional buildSSLContext(
- Optional keyStorePath,
- Optional keyStorePassword,
- Optional trustStorePath,
- Optional trustStorePassword)
- throws GeneralSecurityException, IOException {
+ Optional keyStorePath,
+ Optional keyStorePassword,
+ Optional trustStorePath,
+ Optional trustStorePassword)
+ throws GeneralSecurityException, IOException {
if (!keyStorePath.isPresent() && !trustStorePath.isPresent()) {
return Optional.empty();
}
return Optional.of(
- createSSLContext(
- keyStorePath, keyStorePassword, trustStorePath, trustStorePassword));
+ createSSLContext(
+ keyStorePath, keyStorePassword, trustStorePath, trustStorePassword));
}
private static SSLContext createSSLContext(
- Optional keyStorePath,
- Optional keyStorePassword,
- Optional trustStorePath,
- Optional trustStorePassword)
- throws GeneralSecurityException, IOException {
+ Optional keyStorePath,
+ Optional keyStorePassword,
+ Optional trustStorePath,
+ Optional trustStorePassword)
+ throws GeneralSecurityException, IOException {
// load KeyStore if configured and get KeyManagers
KeyStore keyStore = null;
KeyManager[] keyManagers = null;
@@ -88,7 +88,7 @@ private static SSLContext createSSLContext(
}
validateCertificates(keyStore);
KeyManagerFactory keyManagerFactory =
- KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
+ KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
keyManagerFactory.init(keyStore, keyManagerPassword);
keyManagers = keyManagerFactory.getKeyManagers();
}
@@ -102,14 +102,14 @@ private static SSLContext createSSLContext(
// create TrustManagerFactory
TrustManagerFactory trustManagerFactory =
- TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
+ TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
trustManagerFactory.init(trustStore);
// get X509TrustManager
TrustManager[] trustManagers = trustManagerFactory.getTrustManagers();
if (trustManagers.length != 1 || !(trustManagers[0] instanceof X509TrustManager)) {
throw new RuntimeException(
- "Unexpected default trust managers:" + Arrays.toString(trustManagers));
+ "Unexpected default trust managers:" + Arrays.toString(trustManagers));
}
// create SSLContext
SSLContext result = SSLContext.getInstance("SSL");
@@ -118,7 +118,7 @@ private static SSLContext createSSLContext(
}
private static KeyStore loadTrustStore(File trustStorePath, Optional trustStorePassword)
- throws IOException, GeneralSecurityException {
+ throws IOException, GeneralSecurityException {
KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType());
try {
// attempt to read the trust store as a PEM file
@@ -155,10 +155,10 @@ private static void validateCertificates(KeyStore keyStore) throws GeneralSecuri
((X509Certificate) certificate).checkValidity();
} catch (CertificateExpiredException e) {
throw new CertificateExpiredException(
- "KeyStore certificate is expired: " + e.getMessage());
+ "KeyStore certificate is expired: " + e.getMessage());
} catch (CertificateNotYetValidException e) {
throw new CertificateNotYetValidException(
- "KeyStore certificate is not yet valid: " + e.getMessage());
+ "KeyStore certificate is not yet valid: " + e.getMessage());
}
}
}
diff --git a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-elasticsearch/src/test/java/org/apache/seatunnel/datasource/plugin/elasticsearch/ElasticSearchDataSourceChannelTest.java b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-elasticsearch/src/test/java/org/apache/seatunnel/datasource/plugin/elasticsearch/ElasticSearchDataSourceChannelTest.java
index b8b25d841..b888a1ff7 100644
--- a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-elasticsearch/src/test/java/org/apache/seatunnel/datasource/plugin/elasticsearch/ElasticSearchDataSourceChannelTest.java
+++ b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-elasticsearch/src/test/java/org/apache/seatunnel/datasource/plugin/elasticsearch/ElasticSearchDataSourceChannelTest.java
@@ -19,14 +19,15 @@
import org.apache.seatunnel.datasource.plugin.api.model.TableField;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+
import java.util.List;
import java.util.Map;
@@ -34,19 +35,19 @@
@Disabled
class ElasticSearchDataSourceChannelTest {
private static final Logger LOGGER =
- LoggerFactory.getLogger(ElasticSearchDataSourceChannelTest.class);
+ LoggerFactory.getLogger(ElasticSearchDataSourceChannelTest.class);
private static final ElasticSearchDataSourceChannel ELASTIC_SEARCH_DATA_SOURCE_CHANNEL =
- new ElasticSearchDataSourceChannel();
+ new ElasticSearchDataSourceChannel();
private static final String PLUGIN_NAME = "ElasticSearch";
private static final String DATABASE = "Default";
private static final Map REQUEST_MAP =
- new ImmutableMap.Builder()
- .put(ElasticSearchOptionRule.HOSTS.key(), "[\"http://localhost:9200\"]")
- .build();
+ new ImmutableMap.Builder()
+ .put(ElasticSearchOptionRule.HOSTS.key(), "[\"http://localhost:9200\"]")
+ .build();
@Test
void canAbleGetSchema() {
@@ -55,60 +56,61 @@ void canAbleGetSchema() {
@Test
void getDataSourceOptions() {
- Assertions.assertNotNull(ELASTIC_SEARCH_DATA_SOURCE_CHANNEL.getDataSourceOptions(PLUGIN_NAME));
+ Assertions.assertNotNull(
+ ELASTIC_SEARCH_DATA_SOURCE_CHANNEL.getDataSourceOptions(PLUGIN_NAME));
}
@Test
void getDatasourceMetadataFieldsByDataSourceName() {
Assertions.assertNotNull(
- ELASTIC_SEARCH_DATA_SOURCE_CHANNEL.getDatasourceMetadataFieldsByDataSourceName(
- PLUGIN_NAME));
+ ELASTIC_SEARCH_DATA_SOURCE_CHANNEL.getDatasourceMetadataFieldsByDataSourceName(
+ PLUGIN_NAME));
}
@Test
void getTables() {
Assertions.assertDoesNotThrow(
- () -> {
- List tables =
- ELASTIC_SEARCH_DATA_SOURCE_CHANNEL.getTables(
- PLUGIN_NAME, REQUEST_MAP, DATABASE);
- LOGGER.info("{}", tables);
- });
+ () -> {
+ List tables =
+ ELASTIC_SEARCH_DATA_SOURCE_CHANNEL.getTables(
+ PLUGIN_NAME, REQUEST_MAP, DATABASE);
+ LOGGER.info("{}", tables);
+ });
}
@Test
void getDatabases() {
Assertions.assertLinesMatch(
- Lists.newArrayList("default"),
- ELASTIC_SEARCH_DATA_SOURCE_CHANNEL.getDatabases(PLUGIN_NAME, REQUEST_MAP));
+ Lists.newArrayList("default"),
+ ELASTIC_SEARCH_DATA_SOURCE_CHANNEL.getDatabases(PLUGIN_NAME, REQUEST_MAP));
}
@Test
void checkDataSourceConnectivity() {
Assertions.assertTrue(
- ELASTIC_SEARCH_DATA_SOURCE_CHANNEL.checkDataSourceConnectivity(
- PLUGIN_NAME, REQUEST_MAP));
+ ELASTIC_SEARCH_DATA_SOURCE_CHANNEL.checkDataSourceConnectivity(
+ PLUGIN_NAME, REQUEST_MAP));
}
@Test
void getTableFields() {
Assertions.assertDoesNotThrow(
- () -> {
- List tableFields =
- ELASTIC_SEARCH_DATA_SOURCE_CHANNEL.getTableFields(
- PLUGIN_NAME, REQUEST_MAP, DATABASE, "");
- LOGGER.info("{}", tableFields);
- });
+ () -> {
+ List tableFields =
+ ELASTIC_SEARCH_DATA_SOURCE_CHANNEL.getTableFields(
+ PLUGIN_NAME, REQUEST_MAP, DATABASE, "");
+ LOGGER.info("{}", tableFields);
+ });
}
@Test
void testGetTableFields() {
Assertions.assertDoesNotThrow(
- () -> {
- Map> tableFields =
- ELASTIC_SEARCH_DATA_SOURCE_CHANNEL.getTableFields(
- PLUGIN_NAME, REQUEST_MAP, DATABASE, Lists.newArrayList(""));
- LOGGER.info("{}", tableFields);
- });
+ () -> {
+ Map> tableFields =
+ ELASTIC_SEARCH_DATA_SOURCE_CHANNEL.getTableFields(
+ PLUGIN_NAME, REQUEST_MAP, DATABASE, Lists.newArrayList(""));
+ LOGGER.info("{}", tableFields);
+ });
}
}
diff --git a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-elasticsearch/src/test/java/org/apache/seatunnel/datasource/plugin/elasticsearch/ElasticSearchDataSourceFactoryTest.java b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-elasticsearch/src/test/java/org/apache/seatunnel/datasource/plugin/elasticsearch/ElasticSearchDataSourceFactoryTest.java
index 85a49e77b..0441d7a12 100644
--- a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-elasticsearch/src/test/java/org/apache/seatunnel/datasource/plugin/elasticsearch/ElasticSearchDataSourceFactoryTest.java
+++ b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-elasticsearch/src/test/java/org/apache/seatunnel/datasource/plugin/elasticsearch/ElasticSearchDataSourceFactoryTest.java
@@ -23,12 +23,12 @@
class ElasticSearchDataSourceFactoryTest {
private static final ElasticSearchDataSourceFactory ELASTIC_SEARCH_DATA_SOURCE_FACTORY =
- new ElasticSearchDataSourceFactory();
+ new ElasticSearchDataSourceFactory();
@Test
void factoryIdentifier() {
Assertions.assertEquals(
- "ElasticSearch", ELASTIC_SEARCH_DATA_SOURCE_FACTORY.factoryIdentifier());
+ "ElasticSearch", ELASTIC_SEARCH_DATA_SOURCE_FACTORY.factoryIdentifier());
}
@Test
diff --git a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-clickhouse/src/main/java/org/apache/seatunnel/datasource/plugin/clickhouse/jdbc/ClickhouseDataSourceConfig.java b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-clickhouse/src/main/java/org/apache/seatunnel/datasource/plugin/clickhouse/jdbc/ClickhouseDataSourceConfig.java
index 4862d41d8..4c5b3b2e7 100644
--- a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-clickhouse/src/main/java/org/apache/seatunnel/datasource/plugin/clickhouse/jdbc/ClickhouseDataSourceConfig.java
+++ b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-clickhouse/src/main/java/org/apache/seatunnel/datasource/plugin/clickhouse/jdbc/ClickhouseDataSourceConfig.java
@@ -29,31 +29,31 @@ public class ClickhouseDataSourceConfig {
public static final String PLUGIN_NAME = "JDBC-ClickHouse";
public static final DataSourcePluginInfo CLICKHOUSE_DATASOURCE_PLUGIN_INFO =
- DataSourcePluginInfo.builder()
- .name(PLUGIN_NAME)
- .icon(PLUGIN_NAME)
- .version("1.0.0")
- .type(DatasourcePluginTypeEnum.DATABASE.getCode())
- .supportVirtualTables(false)
- .build();
+ DataSourcePluginInfo.builder()
+ .name(PLUGIN_NAME)
+ .icon(PLUGIN_NAME)
+ .version("1.0.0")
+ .type(DatasourcePluginTypeEnum.DATABASE.getCode())
+ .supportVirtualTables(false)
+ .build();
public static final Set CLICKHOUSE_SYSTEM_DATABASES =
- Sets.newHashSet(
- "system",
- "default",
- "information_schema",
- "mysql",
- "performance_schema",
- "sys");
+ Sets.newHashSet(
+ "system",
+ "default",
+ "information_schema",
+ "mysql",
+ "performance_schema",
+ "sys");
public static final OptionRule OPTION_RULE =
- OptionRule.builder()
- .required(ClickhouseOptionRule.URL, ClickhouseOptionRule.DRIVER)
- .optional(ClickhouseOptionRule.USER, ClickhouseOptionRule.PASSWORD)
- .build();
+ OptionRule.builder()
+ .required(ClickhouseOptionRule.URL, ClickhouseOptionRule.DRIVER)
+ .optional(ClickhouseOptionRule.USER, ClickhouseOptionRule.PASSWORD)
+ .build();
public static final OptionRule METADATA_RULE =
- OptionRule.builder()
- .required(ClickhouseOptionRule.DATABASE, ClickhouseOptionRule.TABLE)
- .build();
+ OptionRule.builder()
+ .required(ClickhouseOptionRule.DATABASE, ClickhouseOptionRule.TABLE)
+ .build();
}
diff --git a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-clickhouse/src/main/java/org/apache/seatunnel/datasource/plugin/clickhouse/jdbc/ClickhouseJdbcDataSourceChannel.java b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-clickhouse/src/main/java/org/apache/seatunnel/datasource/plugin/clickhouse/jdbc/ClickhouseJdbcDataSourceChannel.java
index 371411741..ef160afbc 100644
--- a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-clickhouse/src/main/java/org/apache/seatunnel/datasource/plugin/clickhouse/jdbc/ClickhouseJdbcDataSourceChannel.java
+++ b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-clickhouse/src/main/java/org/apache/seatunnel/datasource/plugin/clickhouse/jdbc/ClickhouseJdbcDataSourceChannel.java
@@ -17,17 +17,16 @@
package org.apache.seatunnel.datasource.plugin.clickhouse.jdbc;
-import static com.google.common.base.Preconditions.checkNotNull;
-
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel;
import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginException;
import org.apache.seatunnel.datasource.plugin.api.model.TableField;
import org.apache.seatunnel.datasource.plugin.api.utils.JdbcUtils;
+import org.apache.commons.lang3.StringUtils;
+
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.StringUtils;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
@@ -39,6 +38,8 @@
import java.util.List;
import java.util.Map;
+import static com.google.common.base.Preconditions.checkNotNull;
+
@Slf4j
public class ClickhouseJdbcDataSourceChannel implements DataSourceChannel {
@@ -54,13 +55,13 @@ public OptionRule getDatasourceMetadataFieldsByDataSourceName(@NonNull String pl
@Override
public List getTables(
- @NonNull String pluginName, Map requestParams, String database) {
+ @NonNull String pluginName, Map requestParams, String database) {
List tableNames = new ArrayList<>();
- try (Connection connection = getConnection(requestParams);) {
+ try (Connection connection = getConnection(requestParams); ) {
ResultSet resultSet =
- connection
- .getMetaData()
- .getTables(database, null, null, new String[]{"TABLE"});
+ connection
+ .getMetaData()
+ .getTables(database, null, null, new String[] {"TABLE"});
while (resultSet.next()) {
String tableName = resultSet.getString("TABLE_NAME");
if (StringUtils.isNotBlank(tableName)) {
@@ -75,17 +76,17 @@ public List getTables(
@Override
public List getDatabases(
- @NonNull String pluginName, @NonNull Map requestParams) {
+ @NonNull String pluginName, @NonNull Map requestParams) {
List dbNames = new ArrayList<>();
try (Connection connection = getConnection(requestParams);
- Statement statement = connection.createStatement();
- ResultSet re = statement.executeQuery("SHOW DATABASES;")) {
+ Statement statement = connection.createStatement();
+ ResultSet re = statement.executeQuery("SHOW DATABASES;")) {
// filter system databases
while (re.next()) {
String dbName = re.getString("name");
if (StringUtils.isNotBlank(dbName)
- && !ClickhouseDataSourceConfig.CLICKHOUSE_SYSTEM_DATABASES.contains(
- dbName)) {
+ && !ClickhouseDataSourceConfig.CLICKHOUSE_SYSTEM_DATABASES.contains(
+ dbName)) {
dbNames.add(dbName);
}
}
@@ -97,7 +98,7 @@ public List getDatabases(
@Override
public boolean checkDataSourceConnectivity(
- @NonNull String pluginName, @NonNull Map requestParams) {
+ @NonNull String pluginName, @NonNull Map requestParams) {
try (Connection ignored = getConnection(requestParams)) {
return true;
} catch (Exception e) {
@@ -107,10 +108,10 @@ public boolean checkDataSourceConnectivity(
@Override
public List getTableFields(
- @NonNull String pluginName,
- @NonNull Map requestParams,
- @NonNull String database,
- @NonNull String table) {
+ @NonNull String pluginName,
+ @NonNull Map requestParams,
+ @NonNull String database,
+ @NonNull String table) {
List tableFields = new ArrayList<>();
try (Connection connection = getConnection(requestParams, database)) {
DatabaseMetaData metaData = connection.getMetaData();
@@ -139,15 +140,15 @@ public List getTableFields(
@Override
public Map> getTableFields(
- @NonNull String pluginName,
- @NonNull Map requestParams,
- @NonNull String database,
- @NonNull List tables) {
+ @NonNull String pluginName,
+ @NonNull Map requestParams,
+ @NonNull String database,
+ @NonNull List tables) {
return null;
}
private String getPrimaryKey(DatabaseMetaData metaData, String dbName, String tableName)
- throws SQLException {
+ throws SQLException {
ResultSet primaryKeysInfo = metaData.getPrimaryKeys(dbName, "%", tableName);
while (primaryKeysInfo.next()) {
return primaryKeysInfo.getString("COLUMN_NAME");
@@ -156,17 +157,17 @@ private String getPrimaryKey(DatabaseMetaData metaData, String dbName, String ta
}
private Connection getConnection(Map requestParams)
- throws SQLException, ClassNotFoundException {
+ throws SQLException, ClassNotFoundException {
return getConnection(requestParams, null);
}
private Connection getConnection(Map requestParams, String databaseName)
- throws SQLException, ClassNotFoundException {
+ throws SQLException, ClassNotFoundException {
checkNotNull(requestParams.get(ClickhouseOptionRule.DRIVER.key()));
checkNotNull(requestParams.get(ClickhouseOptionRule.URL.key()), "Jdbc url cannot be null");
String url =
- JdbcUtils.replaceDatabase(
- requestParams.get(ClickhouseOptionRule.URL.key()), databaseName);
+ JdbcUtils.replaceDatabase(
+ requestParams.get(ClickhouseOptionRule.URL.key()), databaseName);
if (requestParams.containsKey(ClickhouseOptionRule.USER.key())) {
String username = requestParams.get(ClickhouseOptionRule.USER.key());
String password = requestParams.get(ClickhouseOptionRule.PASSWORD.key());
diff --git a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-clickhouse/src/main/java/org/apache/seatunnel/datasource/plugin/clickhouse/jdbc/ClickhouseOptionRule.java b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-clickhouse/src/main/java/org/apache/seatunnel/datasource/plugin/clickhouse/jdbc/ClickhouseOptionRule.java
index 3beff6058..f02e6030d 100644
--- a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-clickhouse/src/main/java/org/apache/seatunnel/datasource/plugin/clickhouse/jdbc/ClickhouseOptionRule.java
+++ b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-clickhouse/src/main/java/org/apache/seatunnel/datasource/plugin/clickhouse/jdbc/ClickhouseOptionRule.java
@@ -23,30 +23,30 @@
public class ClickhouseOptionRule {
public static final Option URL =
- Options.key("url")
- .stringType()
- .noDefaultValue()
- .withDescription(
- "jdbc url, eg:"
- + "jdbc:clickhouse://localhost:8123/test?useSSL=false&serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8");
+ Options.key("url")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "jdbc url, eg:"
+ + "jdbc:clickhouse://localhost:8123/test?useSSL=false&serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8");
public static final Option USER =
- Options.key("user").stringType().noDefaultValue().withDescription("jdbc user");
+ Options.key("user").stringType().noDefaultValue().withDescription("jdbc user");
public static final Option PASSWORD =
- Options.key("password").stringType().noDefaultValue().withDescription("jdbc password");
+ Options.key("password").stringType().noDefaultValue().withDescription("jdbc password");
public static final Option DATABASE =
- Options.key("database").stringType().noDefaultValue().withDescription("jdbc database");
+ Options.key("database").stringType().noDefaultValue().withDescription("jdbc database");
public static final Option TABLE =
- Options.key("table").stringType().noDefaultValue().withDescription("jdbc table");
+ Options.key("table").stringType().noDefaultValue().withDescription("jdbc table");
public static final Option DRIVER =
- Options.key("driver")
- .enumType(DriverType.class)
- .noDefaultValue()
- .withDescription("driver");
+ Options.key("driver")
+ .enumType(DriverType.class)
+ .noDefaultValue()
+ .withDescription("driver");
public enum DriverType {
ClickHouse("ru.yandex.clickhouse.ClickHouseDriver");
diff --git a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-hive/src/main/java/org/apache/seatunnel/datasource/plugin/hive/jdbc/HiveJdbcDataSourceChannel.java b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-hive/src/main/java/org/apache/seatunnel/datasource/plugin/hive/jdbc/HiveJdbcDataSourceChannel.java
index 32543c423..62559037d 100644
--- a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-hive/src/main/java/org/apache/seatunnel/datasource/plugin/hive/jdbc/HiveJdbcDataSourceChannel.java
+++ b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-hive/src/main/java/org/apache/seatunnel/datasource/plugin/hive/jdbc/HiveJdbcDataSourceChannel.java
@@ -22,11 +22,12 @@
import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginException;
import org.apache.seatunnel.datasource.plugin.api.model.TableField;
-import lombok.NonNull;
-import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
+import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
+
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
@@ -56,13 +57,13 @@ public OptionRule getDatasourceMetadataFieldsByDataSourceName(@NonNull String pl
@Override
public List getTables(
- @NonNull String pluginName, Map requestParams, String database) {
+ @NonNull String pluginName, Map requestParams, String database) {
return getTables(pluginName, requestParams, database);
}
@Override
public List getDatabases(
- @NonNull String pluginName, @NonNull Map requestParams) {
+ @NonNull String pluginName, @NonNull Map requestParams) {
try {
return getDataBaseNames(pluginName, requestParams);
} catch (SQLException e) {
@@ -73,25 +74,25 @@ public List getDatabases(
@Override
public boolean checkDataSourceConnectivity(
- @NonNull String pluginName, @NonNull Map requestParams) {
+ @NonNull String pluginName, @NonNull Map requestParams) {
return checkJdbcConnectivity(requestParams);
}
@Override
public List getTableFields(
- @NonNull String pluginName,
- @NonNull Map requestParams,
- @NonNull String database,
- @NonNull String table) {
+ @NonNull String pluginName,
+ @NonNull Map requestParams,
+ @NonNull String database,
+ @NonNull String table) {
return getTableFields(requestParams, database, table);
}
@Override
public Map> getTableFields(
- @NonNull String pluginName,
- @NonNull Map requestParams,
- @NonNull String database,
- @NonNull List tables) {
+ @NonNull String pluginName,
+ @NonNull Map requestParams,
+ @NonNull String database,
+ @NonNull List tables) {
Map> tableFields = new HashMap<>(tables.size());
for (String table : tables) {
tableFields.put(table, getTableFields(requestParams, database, table));
@@ -104,24 +105,24 @@ protected boolean checkJdbcConnectivity(Map requestParams) {
return true;
} catch (Exception e) {
throw new DataSourcePluginException(
- "check jdbc connectivity failed, " + e.getMessage(), e);
+ "check jdbc connectivity failed, " + e.getMessage(), e);
}
}
protected Connection init(Map requestParams) throws SQLException {
if (MapUtils.isEmpty(requestParams)) {
throw new DataSourcePluginException(
- "Hive jdbc request params is null, please check your config");
+ "Hive jdbc request params is null, please check your config");
}
String url = requestParams.get(HiveJdbcOptionRule.URL.key());
return DriverManager.getConnection(url);
}
protected List getDataBaseNames(String pluginName, Map requestParams)
- throws SQLException {
+ throws SQLException {
List dbNames = new ArrayList<>();
try (Connection connection = init(requestParams);
- Statement statement = connection.createStatement();) {
+ Statement statement = connection.createStatement(); ) {
ResultSet re = statement.executeQuery("SHOW DATABASES;");
// filter system databases
while (re.next()) {
@@ -136,9 +137,9 @@ protected List getDataBaseNames(String pluginName, Map r
protected List getTableNames(Map requestParams, String dbName) {
List tableNames = new ArrayList<>();
- try (Connection connection = init(requestParams);) {
+ try (Connection connection = init(requestParams); ) {
ResultSet resultSet =
- connection.getMetaData().getTables(dbName, null, null, new String[]{"TABLE"});
+ connection.getMetaData().getTables(dbName, null, null, new String[] {"TABLE"});
while (resultSet.next()) {
String tableName = resultSet.getString("TABLE_NAME");
if (StringUtils.isNotBlank(tableName)) {
@@ -152,9 +153,9 @@ protected List getTableNames(Map requestParams, String d
}
protected List getTableFields(
- Map requestParams, String dbName, String tableName) {
+ Map requestParams, String dbName, String tableName) {
List tableFields = new ArrayList<>();
- try (Connection connection = init(requestParams);) {
+ try (Connection connection = init(requestParams); ) {
DatabaseMetaData metaData = connection.getMetaData();
String primaryKey = getPrimaryKey(metaData, dbName, tableName);
ResultSet resultSet = metaData.getColumns(dbName, null, tableName, null);
@@ -180,7 +181,7 @@ protected List getTableFields(
}
private String getPrimaryKey(DatabaseMetaData metaData, String dbName, String tableName)
- throws SQLException {
+ throws SQLException {
ResultSet primaryKeysInfo = metaData.getPrimaryKeys(dbName, "%", tableName);
while (primaryKeysInfo.next()) {
return primaryKeysInfo.getString("COLUMN_NAME");
diff --git a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-mysql/src/main/java/org/apache/seatunnel/datasource/plugin/mysql/jdbc/MysqlDataSourceConfig.java b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-mysql/src/main/java/org/apache/seatunnel/datasource/plugin/mysql/jdbc/MysqlDataSourceConfig.java
index 2e2de3e45..03d6d0cf3 100644
--- a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-mysql/src/main/java/org/apache/seatunnel/datasource/plugin/mysql/jdbc/MysqlDataSourceConfig.java
+++ b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-mysql/src/main/java/org/apache/seatunnel/datasource/plugin/mysql/jdbc/MysqlDataSourceConfig.java
@@ -30,22 +30,22 @@ public class MysqlDataSourceConfig {
public static final String PLUGIN_NAME = "JDBC-Mysql";
public static final DataSourcePluginInfo MYSQL_DATASOURCE_PLUGIN_INFO =
- DataSourcePluginInfo.builder()
- .name(PLUGIN_NAME)
- .icon(PLUGIN_NAME)
- .version("1.0.0")
- .type(DatasourcePluginTypeEnum.DATABASE.getCode())
- .build();
+ DataSourcePluginInfo.builder()
+ .name(PLUGIN_NAME)
+ .icon(PLUGIN_NAME)
+ .version("1.0.0")
+ .type(DatasourcePluginTypeEnum.DATABASE.getCode())
+ .build();
public static final Set MYSQL_SYSTEM_DATABASES =
- Sets.newHashSet("information_schema", "mysql", "performance_schema", "sys");
+ Sets.newHashSet("information_schema", "mysql", "performance_schema", "sys");
public static final OptionRule OPTION_RULE =
- OptionRule.builder()
- .required(MysqlOptionRule.URL, MysqlOptionRule.DRIVER)
- .optional(MysqlOptionRule.USER, MysqlOptionRule.PASSWORD)
- .build();
+ OptionRule.builder()
+ .required(MysqlOptionRule.URL, MysqlOptionRule.DRIVER)
+ .optional(MysqlOptionRule.USER, MysqlOptionRule.PASSWORD)
+ .build();
public static final OptionRule METADATA_RULE =
- OptionRule.builder().required(MysqlOptionRule.DATABASE, MysqlOptionRule.TABLE).build();
+ OptionRule.builder().required(MysqlOptionRule.DATABASE, MysqlOptionRule.TABLE).build();
}
diff --git a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-mysql/src/main/java/org/apache/seatunnel/datasource/plugin/mysql/jdbc/MysqlJdbcDataSourceChannel.java b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-mysql/src/main/java/org/apache/seatunnel/datasource/plugin/mysql/jdbc/MysqlJdbcDataSourceChannel.java
index 1fe420d53..99ab042b9 100644
--- a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-mysql/src/main/java/org/apache/seatunnel/datasource/plugin/mysql/jdbc/MysqlJdbcDataSourceChannel.java
+++ b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-mysql/src/main/java/org/apache/seatunnel/datasource/plugin/mysql/jdbc/MysqlJdbcDataSourceChannel.java
@@ -17,17 +17,16 @@
package org.apache.seatunnel.datasource.plugin.mysql.jdbc;
-import static com.google.common.base.Preconditions.checkNotNull;
-
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel;
import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginException;
import org.apache.seatunnel.datasource.plugin.api.model.TableField;
import org.apache.seatunnel.datasource.plugin.api.utils.JdbcUtils;
-import lombok.NonNull;
import org.apache.commons.lang3.StringUtils;
+import lombok.NonNull;
+
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.DriverManager;
@@ -40,6 +39,8 @@
import java.util.function.Function;
import java.util.stream.Collectors;
+import static com.google.common.base.Preconditions.checkNotNull;
+
public class MysqlJdbcDataSourceChannel implements DataSourceChannel {
@Override
@@ -54,13 +55,13 @@ public OptionRule getDatasourceMetadataFieldsByDataSourceName(@NonNull String pl
@Override
public List getTables(
- @NonNull String pluginName, Map requestParams, String database) {
+ @NonNull String pluginName, Map requestParams, String database) {
List tableNames = new ArrayList<>();
try (Connection connection = getConnection(requestParams);
- ResultSet resultSet =
- connection
- .getMetaData()
- .getTables(database, null, null, new String[]{"TABLE"})) {
+ ResultSet resultSet =
+ connection
+ .getMetaData()
+ .getTables(database, null, null, new String[] {"TABLE"})) {
while (resultSet.next()) {
String tableName = resultSet.getString("TABLE_NAME");
if (StringUtils.isNotBlank(tableName)) {
@@ -75,16 +76,16 @@ public List getTables(
@Override
public List getDatabases(
- @NonNull String pluginName, @NonNull Map requestParams) {
+ @NonNull String pluginName, @NonNull Map requestParams) {
List dbNames = new ArrayList<>();
try (Connection connection = getConnection(requestParams);
- PreparedStatement statement = connection.prepareStatement("SHOW DATABASES;");
- ResultSet re = statement.executeQuery()) {
+ PreparedStatement statement = connection.prepareStatement("SHOW DATABASES;");
+ ResultSet re = statement.executeQuery()) {
// filter system databases
while (re.next()) {
String dbName = re.getString("database");
if (StringUtils.isNotBlank(dbName)
- && !MysqlDataSourceConfig.MYSQL_SYSTEM_DATABASES.contains(dbName)) {
+ && !MysqlDataSourceConfig.MYSQL_SYSTEM_DATABASES.contains(dbName)) {
dbNames.add(dbName);
}
}
@@ -96,7 +97,7 @@ public List getDatabases(
@Override
public boolean checkDataSourceConnectivity(
- @NonNull String pluginName, @NonNull Map requestParams) {
+ @NonNull String pluginName, @NonNull Map requestParams) {
try (Connection ignored = getConnection(requestParams)) {
return true;
} catch (Exception e) {
@@ -106,10 +107,10 @@ public boolean checkDataSourceConnectivity(
@Override
public List getTableFields(
- @NonNull String pluginName,
- @NonNull Map requestParams,
- @NonNull String database,
- @NonNull String table) {
+ @NonNull String pluginName,
+ @NonNull Map requestParams,
+ @NonNull String database,
+ @NonNull String table) {
List tableFields = new ArrayList<>();
try (Connection connection = getConnection(requestParams, database)) {
DatabaseMetaData metaData = connection.getMetaData();
@@ -138,21 +139,21 @@ public List getTableFields(
@Override
public Map> getTableFields(
- @NonNull String pluginName,
- @NonNull Map requestParams,
- @NonNull String database,
- @NonNull List tables) {
+ @NonNull String pluginName,
+ @NonNull Map requestParams,
+ @NonNull String database,
+ @NonNull List tables) {
return tables.parallelStream()
- .collect(
- Collectors.toMap(
- Function.identity(),
- table ->
- getTableFields(
- pluginName, requestParams, database, table)));
+ .collect(
+ Collectors.toMap(
+ Function.identity(),
+ table ->
+ getTableFields(
+ pluginName, requestParams, database, table)));
}
private String getPrimaryKey(DatabaseMetaData metaData, String dbName, String tableName)
- throws SQLException {
+ throws SQLException {
ResultSet primaryKeysInfo = metaData.getPrimaryKeys(dbName, "%", tableName);
while (primaryKeysInfo.next()) {
return primaryKeysInfo.getString("COLUMN_NAME");
@@ -161,17 +162,17 @@ private String getPrimaryKey(DatabaseMetaData metaData, String dbName, String ta
}
private Connection getConnection(Map requestParams)
- throws SQLException, ClassNotFoundException {
+ throws SQLException, ClassNotFoundException {
return getConnection(requestParams, null);
}
private Connection getConnection(Map requestParams, String databaseName)
- throws SQLException, ClassNotFoundException {
+ throws SQLException, ClassNotFoundException {
checkNotNull(requestParams.get(MysqlOptionRule.DRIVER.key()));
checkNotNull(requestParams.get(MysqlOptionRule.URL.key()), "Jdbc url cannot be null");
String url =
- JdbcUtils.replaceDatabase(
- requestParams.get(MysqlOptionRule.URL.key()), databaseName);
+ JdbcUtils.replaceDatabase(
+ requestParams.get(MysqlOptionRule.URL.key()), databaseName);
if (requestParams.containsKey(MysqlOptionRule.USER.key())) {
String username = requestParams.get(MysqlOptionRule.USER.key());
String password = requestParams.get(MysqlOptionRule.PASSWORD.key());
diff --git a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-mysql/src/main/java/org/apache/seatunnel/datasource/plugin/mysql/jdbc/MysqlOptionRule.java b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-mysql/src/main/java/org/apache/seatunnel/datasource/plugin/mysql/jdbc/MysqlOptionRule.java
index f666251fc..e34696410 100644
--- a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-mysql/src/main/java/org/apache/seatunnel/datasource/plugin/mysql/jdbc/MysqlOptionRule.java
+++ b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-mysql/src/main/java/org/apache/seatunnel/datasource/plugin/mysql/jdbc/MysqlOptionRule.java
@@ -23,30 +23,30 @@
public class MysqlOptionRule {
public static final Option URL =
- Options.key("url")
- .stringType()
- .noDefaultValue()
- .withDescription(
- "jdbc url, eg:"
- + " jdbc:mysql://localhost:3306/test?useSSL=false&serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8");
+ Options.key("url")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "jdbc url, eg:"
+ + " jdbc:mysql://localhost:3306/test?useSSL=false&serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8");
public static final Option USER =
- Options.key("user").stringType().noDefaultValue().withDescription("jdbc user");
+ Options.key("user").stringType().noDefaultValue().withDescription("jdbc user");
public static final Option PASSWORD =
- Options.key("password").stringType().noDefaultValue().withDescription("jdbc password");
+ Options.key("password").stringType().noDefaultValue().withDescription("jdbc password");
public static final Option DATABASE =
- Options.key("database").stringType().noDefaultValue().withDescription("jdbc database");
+ Options.key("database").stringType().noDefaultValue().withDescription("jdbc database");
public static final Option TABLE =
- Options.key("table").stringType().noDefaultValue().withDescription("jdbc table");
+ Options.key("table").stringType().noDefaultValue().withDescription("jdbc table");
public static final Option DRIVER =
- Options.key("driver")
- .enumType(DriverType.class)
- .defaultValue(DriverType.MYSQL)
- .withDescription("driver");
+ Options.key("driver")
+ .enumType(DriverType.class)
+ .defaultValue(DriverType.MYSQL)
+ .withDescription("driver");
public enum DriverType {
MYSQL("com.mysql.cj.jdbc.Driver"),
diff --git a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-oracle/src/main/java/org/apache/seatunnel/datasource/plugin/oracle/jdbc/OracleDataSourceChannel.java b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-oracle/src/main/java/org/apache/seatunnel/datasource/plugin/oracle/jdbc/OracleDataSourceChannel.java
index 91ccc6615..87a04fcf6 100644
--- a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-oracle/src/main/java/org/apache/seatunnel/datasource/plugin/oracle/jdbc/OracleDataSourceChannel.java
+++ b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-oracle/src/main/java/org/apache/seatunnel/datasource/plugin/oracle/jdbc/OracleDataSourceChannel.java
@@ -17,17 +17,16 @@
package org.apache.seatunnel.datasource.plugin.oracle.jdbc;
-import static com.google.common.base.Preconditions.checkNotNull;
-
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel;
import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginException;
import org.apache.seatunnel.datasource.plugin.api.model.TableField;
import org.apache.seatunnel.datasource.plugin.api.utils.JdbcUtils;
-import lombok.NonNull;
import org.apache.commons.lang3.StringUtils;
+import lombok.NonNull;
+
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.DriverManager;
@@ -38,6 +37,8 @@
import java.util.List;
import java.util.Map;
+import static com.google.common.base.Preconditions.checkNotNull;
+
public class OracleDataSourceChannel implements DataSourceChannel {
@Override
@@ -52,13 +53,13 @@ public OptionRule getDatasourceMetadataFieldsByDataSourceName(@NonNull String pl
@Override
public List getTables(
- @NonNull String pluginName, Map requestParams, String database) {
+ @NonNull String pluginName, Map requestParams, String database) {
List tableNames = new ArrayList<>();
try (Connection connection = getConnection(requestParams);
- ResultSet resultSet =
- connection
- .getMetaData()
- .getTables(database, null, null, new String[]{"TABLE"});) {
+ ResultSet resultSet =
+ connection
+ .getMetaData()
+ .getTables(database, null, null, new String[] {"TABLE"}); ) {
while (resultSet.next()) {
String tableName = resultSet.getString("TABLE_NAME");
if (StringUtils.isNotBlank(tableName)) {
@@ -73,16 +74,16 @@ public List getTables(
@Override
public List getDatabases(
- @NonNull String pluginName, @NonNull Map requestParams) {
+ @NonNull String pluginName, @NonNull Map requestParams) {
List dbNames = new ArrayList<>();
try (Connection connection = getConnection(requestParams);
- PreparedStatement statement = connection.prepareStatement("SHOW DATABASES;");
- ResultSet re = statement.executeQuery()) {
+ PreparedStatement statement = connection.prepareStatement("SHOW DATABASES;");
+ ResultSet re = statement.executeQuery()) {
// filter system databases
while (re.next()) {
String dbName = re.getString("database");
if (StringUtils.isNotBlank(dbName)
- && !OracleDataSourceConfig.ORACLE_SYSTEM_DATABASES.contains(dbName)) {
+ && !OracleDataSourceConfig.ORACLE_SYSTEM_DATABASES.contains(dbName)) {
dbNames.add(dbName);
}
}
@@ -94,7 +95,7 @@ public List getDatabases(
@Override
public boolean checkDataSourceConnectivity(
- @NonNull String pluginName, @NonNull Map requestParams) {
+ @NonNull String pluginName, @NonNull Map requestParams) {
try (Connection ignored = getConnection(requestParams)) {
return true;
} catch (Exception e) {
@@ -104,10 +105,10 @@ public boolean checkDataSourceConnectivity(
@Override
public List getTableFields(
- @NonNull String pluginName,
- @NonNull Map requestParams,
- @NonNull String database,
- @NonNull String table) {
+ @NonNull String pluginName,
+ @NonNull Map requestParams,
+ @NonNull String database,
+ @NonNull String table) {
List tableFields = new ArrayList<>();
try (Connection connection = getConnection(requestParams, database)) {
DatabaseMetaData metaData = connection.getMetaData();
@@ -136,15 +137,15 @@ public List getTableFields(
@Override
public Map> getTableFields(
- @NonNull String pluginName,
- @NonNull Map requestParams,
- @NonNull String database,
- @NonNull List tables) {
+ @NonNull String pluginName,
+ @NonNull Map requestParams,
+ @NonNull String database,
+ @NonNull List tables) {
return null;
}
private String getPrimaryKey(DatabaseMetaData metaData, String dbName, String tableName)
- throws SQLException {
+ throws SQLException {
ResultSet primaryKeysInfo = metaData.getPrimaryKeys(dbName, "%", tableName);
while (primaryKeysInfo.next()) {
return primaryKeysInfo.getString("COLUMN_NAME");
@@ -153,17 +154,17 @@ private String getPrimaryKey(DatabaseMetaData metaData, String dbName, String ta
}
private Connection getConnection(Map requestParams)
- throws SQLException, ClassNotFoundException {
+ throws SQLException, ClassNotFoundException {
return getConnection(requestParams, null);
}
private Connection getConnection(Map requestParams, String databaseName)
- throws SQLException, ClassNotFoundException {
+ throws SQLException, ClassNotFoundException {
checkNotNull(requestParams.get(OracleOptionRule.DRIVER.key()));
checkNotNull(requestParams.get(OracleOptionRule.URL.key()), "Jdbc url cannot be null");
String url =
- JdbcUtils.replaceDatabase(
- requestParams.get(OracleOptionRule.URL.key()), databaseName);
+ JdbcUtils.replaceDatabase(
+ requestParams.get(OracleOptionRule.URL.key()), databaseName);
if (requestParams.containsKey(OracleOptionRule.USER.key())) {
String username = requestParams.get(OracleOptionRule.USER.key());
String password = requestParams.get(OracleOptionRule.PASSWORD.key());
diff --git a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-oracle/src/main/java/org/apache/seatunnel/datasource/plugin/oracle/jdbc/OracleDataSourceConfig.java b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-oracle/src/main/java/org/apache/seatunnel/datasource/plugin/oracle/jdbc/OracleDataSourceConfig.java
index 96f09fbef..83455b6d7 100644
--- a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-oracle/src/main/java/org/apache/seatunnel/datasource/plugin/oracle/jdbc/OracleDataSourceConfig.java
+++ b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-oracle/src/main/java/org/apache/seatunnel/datasource/plugin/oracle/jdbc/OracleDataSourceConfig.java
@@ -30,24 +30,24 @@ public class OracleDataSourceConfig {
public static final String PLUGIN_NAME = "JDBC-Oracle";
public static final DataSourcePluginInfo ORACLE_DATASOURCE_PLUGIN_INFO =
- DataSourcePluginInfo.builder()
- .name(PLUGIN_NAME)
- .icon(PLUGIN_NAME)
- .version("1.0.0")
- .type(DatasourcePluginTypeEnum.DATABASE.getCode())
- .build();
+ DataSourcePluginInfo.builder()
+ .name(PLUGIN_NAME)
+ .icon(PLUGIN_NAME)
+ .version("1.0.0")
+ .type(DatasourcePluginTypeEnum.DATABASE.getCode())
+ .build();
public static final Set ORACLE_SYSTEM_DATABASES =
- Sets.newHashSet("SYS", "SYSTEM", "SYSDBA", "SYSOPER", "HR", "SCOTT");
+ Sets.newHashSet("SYS", "SYSTEM", "SYSDBA", "SYSOPER", "HR", "SCOTT");
public static final OptionRule OPTION_RULE =
- OptionRule.builder()
- .required(OracleOptionRule.URL, OracleOptionRule.DRIVER)
- .optional(OracleOptionRule.USER, OracleOptionRule.PASSWORD)
- .build();
+ OptionRule.builder()
+ .required(OracleOptionRule.URL, OracleOptionRule.DRIVER)
+ .optional(OracleOptionRule.USER, OracleOptionRule.PASSWORD)
+ .build();
public static final OptionRule METADATA_RULE =
- OptionRule.builder()
- .required(OracleOptionRule.DATABASE, OracleOptionRule.TABLE)
- .build();
+ OptionRule.builder()
+ .required(OracleOptionRule.DATABASE, OracleOptionRule.TABLE)
+ .build();
}
diff --git a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-oracle/src/main/java/org/apache/seatunnel/datasource/plugin/oracle/jdbc/OracleOptionRule.java b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-oracle/src/main/java/org/apache/seatunnel/datasource/plugin/oracle/jdbc/OracleOptionRule.java
index 517a8c32e..f3ec40e33 100644
--- a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-oracle/src/main/java/org/apache/seatunnel/datasource/plugin/oracle/jdbc/OracleOptionRule.java
+++ b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-oracle/src/main/java/org/apache/seatunnel/datasource/plugin/oracle/jdbc/OracleOptionRule.java
@@ -23,28 +23,28 @@
public class OracleOptionRule {
public static final Option URL =
- Options.key("url")
- .stringType()
- .noDefaultValue()
- .withDescription("jdbc url, eg:" + "jdbc:oracle:thin:@localhost:1521:XE");
+ Options.key("url")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("jdbc url, eg:" + "jdbc:oracle:thin:@localhost:1521:XE");
public static final Option USER =
- Options.key("user").stringType().noDefaultValue().withDescription("jdbc user");
+ Options.key("user").stringType().noDefaultValue().withDescription("jdbc user");
public static final Option PASSWORD =
- Options.key("password").stringType().noDefaultValue().withDescription("jdbc password");
+ Options.key("password").stringType().noDefaultValue().withDescription("jdbc password");
public static final Option DATABASE =
- Options.key("database").stringType().noDefaultValue().withDescription("jdbc database");
+ Options.key("database").stringType().noDefaultValue().withDescription("jdbc database");
public static final Option TABLE =
- Options.key("table").stringType().noDefaultValue().withDescription("jdbc table");
+ Options.key("table").stringType().noDefaultValue().withDescription("jdbc table");
public static final Option DRIVER =
- Options.key("driver")
- .enumType(DriverType.class)
- .defaultValue(DriverType.ORACLE)
- .withDescription("driver");
+ Options.key("driver")
+ .enumType(DriverType.class)
+ .defaultValue(DriverType.ORACLE)
+ .withDescription("driver");
public enum DriverType {
ORACLE("oracle.jdbc.driver.OracleDriver"),
diff --git a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-postgresql/src/main/java/org/apache/seatunnel/datasource/plugin/postgresql/jdbc/PostgresqlDataSourceChannel.java b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-postgresql/src/main/java/org/apache/seatunnel/datasource/plugin/postgresql/jdbc/PostgresqlDataSourceChannel.java
index b331d61d0..e43939536 100644
--- a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-postgresql/src/main/java/org/apache/seatunnel/datasource/plugin/postgresql/jdbc/PostgresqlDataSourceChannel.java
+++ b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-postgresql/src/main/java/org/apache/seatunnel/datasource/plugin/postgresql/jdbc/PostgresqlDataSourceChannel.java
@@ -17,17 +17,16 @@
package org.apache.seatunnel.datasource.plugin.postgresql.jdbc;
-import static com.google.common.base.Preconditions.checkNotNull;
-
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel;
import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginException;
import org.apache.seatunnel.datasource.plugin.api.model.TableField;
import org.apache.seatunnel.datasource.plugin.api.utils.JdbcUtils;
-import lombok.NonNull;
import org.apache.commons.lang3.StringUtils;
+import lombok.NonNull;
+
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.DriverManager;
@@ -39,6 +38,8 @@
import java.util.List;
import java.util.Map;
+import static com.google.common.base.Preconditions.checkNotNull;
+
public class PostgresqlDataSourceChannel implements DataSourceChannel {
@Override
@@ -53,18 +54,18 @@ public OptionRule getDatasourceMetadataFieldsByDataSourceName(@NonNull String pl
@Override
public List getTables(
- @NonNull String pluginName, Map requestParams, String database) {
+ @NonNull String pluginName, Map requestParams, String database) {
List tableNames = new ArrayList<>();
String query = "SELECT table_schema, table_name FROM information_schema.tables";
try (Connection connection = getConnection(requestParams, database)) {
try (Statement statement = connection.createStatement();
- ResultSet resultSet = statement.executeQuery(query)) {
+ ResultSet resultSet = statement.executeQuery(query)) {
while (resultSet.next()) {
String schemaName = resultSet.getString("table_schema");
String tableName = resultSet.getString("table_name");
if (StringUtils.isNotBlank(schemaName)
- && !PostgresqlDataSourceConfig.POSTGRESQL_SYSTEM_DATABASES.contains(
- schemaName)) {
+ && !PostgresqlDataSourceConfig.POSTGRESQL_SYSTEM_DATABASES.contains(
+ schemaName)) {
tableNames.add(schemaName + "." + tableName);
}
}
@@ -77,17 +78,17 @@ public List getTables(
@Override
public List getDatabases(
- @NonNull String pluginName, @NonNull Map requestParams) {
+ @NonNull String pluginName, @NonNull Map requestParams) {
List