Skip to content

Commit

Permalink
[Feature][API] Add Job Defile API (#59)
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaoqigh authored Jun 14, 2023
1 parent 153e7b4 commit 374c400
Show file tree
Hide file tree
Showing 84 changed files with 4,873 additions and 175 deletions.
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,12 @@
<artifactId>jsr305</artifactId>
<version>${jsr305.version}</version>
</dependency>

<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-engine-client</artifactId>
<version>${seatunnel-framework.version}</version>
</dependency>
</dependencies>

</dependencyManagement>
Expand Down
5 changes: 5 additions & 0 deletions seatunnel-server/seatunnel-app/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -256,5 +256,10 @@
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
</dependency>

<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-engine-client</artifactId>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
@EnableConfigurationProperties
@EnableScheduling
@EnableAsync(proxyTargetClass = true)
@MapperScan({"org.apache.seatunnel.app.dal"})
@MapperScan({"org.apache.seatunnel.app.dal.mapper"})
public class SeatunnelApplication {
public static void main(String[] args) {
SpringApplication.run(SeatunnelApplication.class, args);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.app.bean.engine;

import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.LocalTimeType;
import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;

import lombok.AllArgsConstructor;
import lombok.Data;

public class EngineDataType {

public static DataType T_STRING = new DataType("string", BasicType.STRING_TYPE);
public static DataType T_BOOLEAN = new DataType("boolean", BasicType.BOOLEAN_TYPE);
public static DataType T_BYTE = new DataType("tinyint", BasicType.BYTE_TYPE);
public static DataType T_SHORT = new DataType("smallint", BasicType.SHORT_TYPE);
public static DataType T_INT = new DataType("int", BasicType.INT_TYPE);
public static DataType T_LONG = new DataType("bigint", BasicType.LONG_TYPE);
public static DataType T_FLOAT = new DataType("float", BasicType.FLOAT_TYPE);
public static DataType T_DOUBLE = new DataType("double", BasicType.DOUBLE_TYPE);
public static DataType T_VOID = new DataType("null", BasicType.VOID_TYPE);

public static DataType T_DECIMAL = new DataType("decimal(38, 18)", new DecimalType(38, 18));

public static DataType T_LOCAL_DATE = new DataType("date", LocalTimeType.LOCAL_DATE_TYPE);
public static DataType T_LOCAL_TIME = new DataType("time", LocalTimeType.LOCAL_TIME_TYPE);
public static DataType T_LOCAL_DATE_TIME =
new DataType("timestamp", LocalTimeType.LOCAL_DATE_TIME_TYPE);

public static DataType T_PRIMITIVE_BYTE_ARRAY =
new DataType("bytes", PrimitiveByteArrayType.INSTANCE);

public static DataType T_STRING_ARRAY =
new DataType("array<string>", ArrayType.STRING_ARRAY_TYPE);
public static DataType T_BOOLEAN_ARRAY =
new DataType("array<boolean>", ArrayType.BOOLEAN_ARRAY_TYPE);
public static DataType T_BYTE_ARRAY = new DataType("array<tinyint>", ArrayType.BYTE_ARRAY_TYPE);
public static DataType T_SHORT_ARRAY =
new DataType("array<smallint>", ArrayType.SHORT_ARRAY_TYPE);
public static DataType T_INT_ARRAY = new DataType("array<int>", ArrayType.INT_ARRAY_TYPE);
public static DataType T_LONG_ARRAY = new DataType("array<bigint>", ArrayType.LONG_ARRAY_TYPE);
public static DataType T_FLOAT_ARRAY = new DataType("array<float>", ArrayType.FLOAT_ARRAY_TYPE);
public static DataType T_DOUBLE_ARRAY =
new DataType("array<double>", ArrayType.DOUBLE_ARRAY_TYPE);

@Data
@AllArgsConstructor
public static class DataType {
String name;
SeaTunnelDataType<?> RawType;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.app.bean.env;

import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.env.EnvOptionRule;
import org.apache.seatunnel.app.dynamicforms.AbstractFormOption;
import org.apache.seatunnel.app.dynamicforms.FormStructure;
import org.apache.seatunnel.app.thirdparty.framework.SeaTunnelOptionRuleWrapper;

import org.springframework.stereotype.Component;

import lombok.Data;
import lombok.Getter;

import java.util.List;
import java.util.stream.Collectors;

@Component
@Data
public class JobEnvCache {

@Getter private final FormStructure envFormStructure;

public JobEnvCache() {
OptionRule envOptionRules = EnvOptionRule.getEnvOptionRules();
envFormStructure =
SeaTunnelOptionRuleWrapper.wrapper(
envOptionRules.getOptionalOptions(),
envOptionRules.getRequiredOptions(),
"Env");
List<AbstractFormOption> collect =
envFormStructure.getForms().stream()
.filter(form -> !"parallelism".equalsIgnoreCase(form.getField()))
.collect(Collectors.toList());
envFormStructure.setForms(collect);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.app.controller;

import org.apache.seatunnel.app.common.Result;
import org.apache.seatunnel.app.domain.request.connector.ConnectorStatus;
import org.apache.seatunnel.app.domain.response.connector.ConnectorInfo;
import org.apache.seatunnel.app.service.IConnectorService;
import org.apache.seatunnel.common.utils.JsonUtils;

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;

import javax.annotation.Resource;

import java.io.IOException;
import java.util.List;

@RequestMapping("/whaletunnel/api/v1/connector")
@RestController
public class ConnectorController {

@Resource private IConnectorService connectorService;

@GetMapping("/sources")
@ApiOperation(value = "list all source connector", httpMethod = "GET")
public Result<List<ConnectorInfo>> listSource(
@RequestParam(defaultValue = "ALL") ConnectorStatus status) {
return Result.success(connectorService.listSources(status));
}

@GetMapping("/transforms")
@ApiOperation(value = "list all transforms", httpMethod = "GET")
public Result<List<ConnectorInfo>> listAllTransform() {
return Result.success(connectorService.listTransforms());
}

@GetMapping("/sinks")
@ApiOperation(value = "list all sink connector", httpMethod = "GET")
public Result<List<ConnectorInfo>> listSink(
@RequestParam(defaultValue = "ALL") ConnectorStatus status) {
return Result.success(connectorService.listSinks(status));
}

@GetMapping("/sync")
@ApiOperation(value = "sync all connector from disk", httpMethod = "GET")
public Result<List<ConnectorInfo>> sync() throws IOException {
connectorService.sync();
return Result.success();
}

@GetMapping("/form")
@ApiOperation(value = "get source connector form structure", httpMethod = "GET")
public Result<String> getConnectorFormStructure(
@ApiParam(value = "connector type", required = true) @RequestParam String connectorType,
@ApiParam(value = "connector name", required = true) @RequestParam
String connectorName) {
return Result.success(
JsonUtils.toJsonString(
connectorService.getConnectorFormStructure(connectorType, connectorName)));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.app.controller;

import org.apache.seatunnel.app.bean.engine.EngineDataType;
import org.apache.seatunnel.app.common.Result;
import org.apache.seatunnel.app.domain.response.engine.Engine;
import org.apache.seatunnel.app.service.IEngineService;

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import io.swagger.annotations.ApiOperation;

import javax.annotation.Resource;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

@RequestMapping("/seatunnel/api/v1/engine")
@RestController
public class EngineController {

@Resource private IEngineService engineService;

@GetMapping("/list")
@ApiOperation(value = "list all supported engines", httpMethod = "GET")
public Result<List<Engine>> listSupportEngines() {
return Result.success(engineService.listSupportEngines());
}

@GetMapping("/type")
@ApiOperation(value = "list all supported Data Type", httpMethod = "GET")
public Result<List<String>> listSupportDataTypes() {
return Result.success(
Arrays.stream(engineService.listSupportDataTypes())
.map(EngineDataType.DataType::getName)
.collect(Collectors.toList()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.app.controller;

import org.apache.seatunnel.app.common.Result;
import org.apache.seatunnel.app.service.IJobEnvService;
import org.apache.seatunnel.common.utils.JsonUtils;

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import io.swagger.annotations.ApiOperation;

import javax.annotation.Resource;

@RequestMapping("/seatunnel/api/v1/job/env")
@RestController
public class EnvController {

@Resource private IJobEnvService jobEnvService;

@GetMapping("")
@ApiOperation(value = "get job env config parameters", httpMethod = "GET")
public Result<String> getJobEnvFormStructure() {
return Result.success(JsonUtils.toJsonString(jobEnvService.getJobEnvFormStructure()));
}
}
Loading

0 comments on commit 374c400

Please sign in to comment.