Skip to content

Commit

Permalink
[Improve] [Seatunnel-Web] Add support for Seatunnel 2.3.6 in Seatunne…
Browse files Browse the repository at this point in the history
…l-Web. (#170)
  • Loading branch information
arshadmohammad authored Aug 7, 2024
1 parent 9569686 commit 6155509
Show file tree
Hide file tree
Showing 11 changed files with 88 additions and 71 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@
<guava.version>19.0</guava.version>
<checker.qual.version>3.10.0</checker.qual.version>
<awaitility.version>4.2.0</awaitility.version>
<seatunnel-framework.version>2.3.3</seatunnel-framework.version>
<seatunnel-framework.version>2.3.6</seatunnel-framework.version>
<oracle-jdbc.version>21.5.0.0</oracle-jdbc.version>
<postgresql.version>42.4.3</postgresql.version>
<sqlserver.version>9.2.1.jre8</sqlserver.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.seatunnel.app.bean.engine;

import org.apache.seatunnel.api.table.catalog.DataTypeConvertException;
import org.apache.seatunnel.api.table.catalog.DataTypeConvertor;
import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.BasicType;
Expand All @@ -29,7 +28,6 @@

import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -86,21 +84,19 @@ public static class SeaTunnelDataTypeConvertor
implements DataTypeConvertor<SeaTunnelDataType<?>> {

@Override
public SeaTunnelDataType<?> toSeaTunnelType(String engineDataType) {
return DATA_TYPE_MAP.get(engineDataType.toLowerCase(Locale.ROOT)).getRawType();
public SeaTunnelDataType<?> toSeaTunnelType(String field, String connectorDataType) {
return DATA_TYPE_MAP.get(connectorDataType).getRawType();
}

@Override
public SeaTunnelDataType<?> toSeaTunnelType(
SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object> map)
throws DataTypeConvertException {
String field, SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object> map) {
return seaTunnelDataType;
}

@Override
public SeaTunnelDataType<?> toConnectorType(
SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object> map)
throws DataTypeConvertException {
String field, SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object> map) {
return seaTunnelDataType;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class EngineServiceImpl extends SeatunnelBaseServiceImpl implements IEngi
Lists.newArrayList(
// new Engine("Spark", "2.4.0"),
// new Engine("Flink", "1.13.6"),
new Engine("SeaTunnel", "2.3.1")));
new Engine("SeaTunnel", "2.3.6")));

@Override
public List<Engine> listSupportEngines() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,12 @@
import org.apache.seatunnel.common.config.DeployMode;
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.engine.client.SeaTunnelClient;
import org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment;
import org.apache.seatunnel.engine.client.job.ClientJobProxy;
import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment;
import org.apache.seatunnel.engine.common.config.ConfigProvider;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
import org.apache.seatunnel.engine.common.config.YamlSeaTunnelConfigBuilder;
import org.apache.seatunnel.engine.core.job.JobStatus;

import org.springframework.stereotype.Service;
Expand Down Expand Up @@ -104,8 +106,9 @@ public Long executeJobBySeaTunnel(Integer userId, String filePath, Long jobInsta
jobConfig.setName(jobInstanceId + "_job");
SeaTunnelClient seaTunnelClient = createSeaTunnelClient();
try {
JobExecutionEnvironment jobExecutionEnv =
seaTunnelClient.createExecutionContext(filePath, jobConfig);
SeaTunnelConfig seaTunnelConfig = new YamlSeaTunnelConfigBuilder().build();
ClientJobExecutionEnvironment jobExecutionEnv =
seaTunnelClient.createExecutionContext(filePath, jobConfig, seaTunnelConfig);
final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
JobInstance jobInstance = jobInstanceDao.getJobInstance(jobInstanceId);
jobInstance.setJobEngineId(Long.toString(clientJobProxy.getJobId()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ public void complete(
} else if (statusList.contains("CANCELED")) {
jobStatus = JobStatus.CANCELED.name();
} else if (statusList.contains("CANCELLING")) {
jobStatus = JobStatus.CANCELLING.name();
jobStatus = JobStatus.CANCELING.name();
} else {
jobStatus = JobStatus.RUNNING.name();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.connector.TableTransform;
import org.apache.seatunnel.api.table.factory.FactoryUtil;
import org.apache.seatunnel.api.table.factory.TableFactoryContext;
import org.apache.seatunnel.api.table.factory.TableTransformFactory;
import org.apache.seatunnel.api.table.factory.TableTransformFactoryContext;
import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.DecimalType;
Expand Down Expand Up @@ -103,8 +103,8 @@ public TableSchemaReq derivationSQL(long jobVersionId, String inputPluginId, SQL
tableSchema.getTableName());
Map<String, Object> config = new HashMap<>();
config.put(SQLTransform.KEY_QUERY.key(), sql.getQuery());
TableFactoryContext context =
new TableFactoryContext(
TableTransformFactoryContext context =
new TableTransformFactoryContext(
Collections.singletonList(table),
ReadonlyConfig.fromMap(config),
Thread.currentThread().getContextClassLoader());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import java.io.IOException;
import java.net.URL;
import java.net.URLClassLoader;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -72,11 +71,17 @@ public class TableSchemaServiceImpl extends SeatunnelBaseServiceImpl

public TableSchemaServiceImpl() throws IOException {
Common.setStarter(true);
Path path = new SeaTunnelSinkPluginDiscovery().getPluginDir();
if (path.toFile().exists()) {
List<URL> files = FileUtils.searchJarFiles(path);
files.addAll(FileUtils.searchJarFiles(Common.pluginRootDir()));
factory = new DataTypeConvertorFactory(new URLClassLoader(files.toArray(new URL[0])));
Set<PluginIdentifier> pluginIdentifiers =
SeaTunnelSinkPluginDiscovery.getAllSupportedPlugins(PluginType.SINK).keySet();
ArrayList<PluginIdentifier> pluginIdentifiersList = new ArrayList<>();
pluginIdentifiersList.addAll(pluginIdentifiers);
List<URL> pluginJarPaths =
new SeaTunnelSinkPluginDiscovery().getPluginJarPaths(pluginIdentifiersList);
if (!pluginJarPaths.isEmpty()) {
pluginJarPaths.addAll(FileUtils.searchJarFiles(Common.pluginRootDir()));
factory =
new DataTypeConvertorFactory(
new URLClassLoader(pluginJarPaths.toArray(new URL[0])));
} else {
factory = new DataTypeConvertorFactory();
}
Expand Down Expand Up @@ -105,7 +110,8 @@ public TableSchemaRes getSeaTunnelSchema(String pluginName, TableSchemaReq table
}

for (TableField field : tableSchemaReq.getFields()) {
SeaTunnelDataType<?> dataType = convertor.toSeaTunnelType(field.getType());
SeaTunnelDataType<?> dataType =
convertor.toSeaTunnelType(field.getName(), field.getType());
field.setType(dataType.toString());
}
TableSchemaRes res = new TableSchemaRes();
Expand Down Expand Up @@ -135,7 +141,8 @@ public void getAddSeaTunnelSchema(List<TableField> tableFields, String pluginNam
}
for (TableField field : tableFields) {
try {
SeaTunnelDataType<?> dataType = convertor.toSeaTunnelType(field.getType());
SeaTunnelDataType<?> dataType =
convertor.toSeaTunnelType(field.getName(), field.getType());
field.setUnSupport(false);
field.setOutputDataType(dataType.toString());
} catch (Exception exception) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@
package org.apache.seatunnel.app.thirdparty.engine;

import org.apache.seatunnel.engine.client.SeaTunnelClient;
import org.apache.seatunnel.engine.client.job.JobClient;
import org.apache.seatunnel.engine.common.config.ConfigProvider;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
import org.apache.seatunnel.engine.common.config.YamlSeaTunnelConfigBuilder;
import org.apache.seatunnel.engine.core.job.JobDAGInfo;

import com.hazelcast.client.config.ClientConfig;
Expand All @@ -31,16 +32,21 @@

@Slf4j
public class SeaTunnelEngineProxy {
ClientConfig clientConfig = null;

private SeaTunnelEngineProxy() {
clientConfig = ConfigProvider.locateAndGetClientConfig();
private ClientConfig clientConfig = null;

private static class SeaTunnelEngineProxyHolder {
private static final SeaTunnelEngineProxy INSTANCE = new SeaTunnelEngineProxy();
}

public static SeaTunnelEngineProxy getInstance() {
return SeaTunnelEngineProxyHolder.INSTANCE;
}

private SeaTunnelEngineProxy() {
clientConfig = ConfigProvider.locateAndGetClientConfig();
}

public String getMetricsContent(@NonNull String jobEngineId) {
SeaTunnelClient seaTunnelClient = new SeaTunnelClient(clientConfig);
try {
Expand Down Expand Up @@ -89,10 +95,6 @@ public Map<String, String> getClusterHealthMetrics() {
}
}

private static class SeaTunnelEngineProxyHolder {
private static final SeaTunnelEngineProxy INSTANCE = new SeaTunnelEngineProxy();
}

public String getAllRunningJobMetricsContent() {

SeaTunnelClient seaTunnelClient = new SeaTunnelClient(clientConfig);
Expand All @@ -103,22 +105,24 @@ public String getAllRunningJobMetricsContent() {
}
}

public void pauseJob(String jobEngineId) {
SeaTunnelClient seaTunnelClient = new SeaTunnelClient(clientConfig);
JobClient jobClient = seaTunnelClient.getJobClient();
jobClient.savePointJob(Long.valueOf(jobEngineId));
public void pauseJob(@NonNull String jobEngineId) {
try (SeaTunnelClient seaTunnelClient = new SeaTunnelClient(clientConfig)) {
seaTunnelClient.getJobClient().savePointJob(Long.valueOf(jobEngineId));
} catch (Exception e) {
log.warn("Can not pause job from engine.", e);
}
}

public void restoreJob(
@NonNull String filePath, @NonNull Long jobInstanceId, @NonNull Long jobEngineId) {
SeaTunnelClient seaTunnelClient = new SeaTunnelClient(clientConfig);
JobConfig jobConfig = new JobConfig();
jobConfig.setName(jobInstanceId + "_job");
try {
seaTunnelClient.restoreExecutionContext(filePath, jobConfig, jobEngineId).execute();
} catch (ExecutionException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
SeaTunnelConfig seaTunnelConfig = new YamlSeaTunnelConfigBuilder().build();
try (SeaTunnelClient seaTunnelClient = new SeaTunnelClient(clientConfig)) {
seaTunnelClient
.restoreExecutionContext(filePath, jobConfig, seaTunnelConfig, jobEngineId)
.execute();
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.seatunnel.app.dynamicforms.FormStructure;
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.common.utils.FileUtils;
import org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery;
import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
Expand All @@ -36,7 +35,6 @@
import java.io.IOException;
import java.net.URL;
import java.net.URLClassLoader;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
Expand All @@ -57,17 +55,23 @@ public static List<ConnectorInfo> getAllConnectorsFromPluginMapping(PluginType p
}

public static Map<PluginIdentifier, ConnectorFeature> getConnectorFeatures(
PluginType pluginType) throws IOException {
PluginType pluginType) {
Common.setStarter(true);
if (!pluginType.equals(PluginType.SOURCE)) {
throw new UnsupportedOperationException("ONLY support plugin type source");
}
Path path = new SeaTunnelSinkPluginDiscovery().getPluginDir();

ArrayList<PluginIdentifier> pluginIdentifiers = new ArrayList<>();
pluginIdentifiers.addAll(
SeaTunnelSinkPluginDiscovery.getAllSupportedPlugins(PluginType.SOURCE).keySet());
List<URL> pluginJarPaths =
new SeaTunnelSinkPluginDiscovery().getPluginJarPaths(pluginIdentifiers);

List<Factory> factories;
if (path.toFile().exists()) {
List<URL> files = FileUtils.searchJarFiles(path);
if (!pluginJarPaths.isEmpty()) {
factories =
FactoryUtil.discoverFactories(new URLClassLoader(files.toArray(new URL[0])));
FactoryUtil.discoverFactories(
new URLClassLoader(pluginJarPaths.toArray(new URL[0])));
} else {
factories =
FactoryUtil.discoverFactories(Thread.currentThread().getContextClassLoader());
Expand Down
2 changes: 1 addition & 1 deletion seatunnel-ui/src/layouts/dashboard/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ const Dashboard = defineComponent({
style={'height: 100%'}
size='small'
>
<router-view key={this.$route.fullPath} class={!this.showSide && 'px-32 py-12'} />
<router-view key={this['$route'].fullPath} class={!this.showSide && 'px-32 py-12'} />
</NSpace>
</NLayoutContent>
</NLayout>
Expand Down
43 changes: 23 additions & 20 deletions tools/dependencies/known-dependencies.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ gson-2.8.6.jar
guava-19.0.jar
hibernate-validator-6.2.2.Final.jar
jackson-annotations-2.12.6.jar
jackson-core-2.12.6.jar
jackson-core-2.13.3.jar
jackson-annotations-2.13.3.jar
jackson-databind-2.12.6.jar
jackson-datatype-jdk8-2.13.3.jar
jackson-datatype-jsr310-2.13.3.jar
Expand All @@ -35,15 +36,15 @@ jjwt-api-0.10.7.jar
jjwt-impl-0.10.7.jar
jjwt-jackson-0.10.7.jar
mapstruct-1.0.0.Final.jar
jsqlparser-4.4.jar
jsqlparser-4.5.jar
mybatis-3.5.10.jar
mybatis-spring-2.0.7.jar
mybatis-plus-3.5.3.1.jar
mybatis-plus-annotation-3.5.3.1.jar
mybatis-plus-boot-starter-3.5.3.1.jar
mybatis-plus-core-3.5.3.1.jar
mybatis-plus-extension-3.5.3.1.jar
scala-library-2.11.12.jar
scala-library-2.12.15.jar



Expand Down Expand Up @@ -86,14 +87,13 @@ tomcat-embed-el-9.0.63.jar
h2-2.1.214.jar
auto-service-annotations-1.0.1.jar
jsr305-3.0.0.jar
checkpoint-storage-api-2.3.3.jar
checkpoint-storage-local-file-2.3.3.jar
checkpoint-storage-api-2.3.6.jar
checkpoint-storage-local-file-2.3.6.jar
clickhouse-cli-client-0.3.2-patch11-shaded.jar
clickhouse-grpc-client-0.3.2-patch11-shaded.jar
clickhouse-http-client-0.3.2-patch11-shaded.jar
clickhouse-jdbc-0.3.2-patch11.jar
commons-lang3-3.4.jar
hazelcast-5.1.jar
httpcore-4.4.13.jar
httpmime-4.5.13.jar
jcl-over-slf4j-1.7.25.jar
Expand All @@ -107,20 +107,22 @@ protostuff-api-1.8.0.jar
protostuff-collectionschema-1.8.0.jar
protostuff-core-1.8.0.jar
protostuff-runtime-1.8.0.jar
seatunnel-api-2.3.3.jar
seatunnel-common-2.3.3.jar
seatunnel-config-base-2.3.3.jar
seatunnel-config-shade-2.3.3.jar
seatunnel-core-starter-2.3.3.jar
seatunnel-engine-client-2.3.3.jar
seatunnel-engine-common-2.3.3.jar
seatunnel-engine-core-2.3.3.jar
seatunnel-guava-2.3.3-optional.jar
seatunnel-jackson-2.3.3-optional.jar
seatunnel-plugin-discovery-2.3.3.jar
seatunnel-transforms-v2-2.3.3.jar
serializer-api-2.3.3.jar
serializer-protobuf-2.3.3.jar
seatunnel-api-2.3.6.jar
seatunnel-common-2.3.6.jar
seatunnel-config-base-2.3.6.jar
seatunnel-config-shade-2.3.6.jar
seatunnel-core-starter-2.3.6.jar
seatunnel-engine-client-2.3.6.jar
seatunnel-engine-common-2.3.6.jar
seatunnel-engine-core-2.3.6.jar
seatunnel-guava-2.3.6-optional.jar
seatunnel-jackson-2.3.6-optional.jar
seatunnel-plugin-discovery-2.3.6.jar
seatunnel-transforms-v2-2.3.6.jar
seatunnel-config-sql-2.3.6.jar
seatunnel-hazelcast-shade-2.3.6-optional.jar
serializer-api-2.3.6.jar
serializer-protobuf-2.3.6.jar
swagger-annotations-2.2.14.jar
commons-codec-1.11.jar
commons-compress-1.20.jar
Expand All @@ -133,6 +135,7 @@ lz4-java-1.8.0.jar
security-206.jar
snappy-java-1.1.8.4.jar
zstd-jni-1.5.2-1.jar
commons-csv-1.10.0.jar



Expand Down

0 comments on commit 6155509

Please sign in to comment.