Skip to content

Commit

Permalink
[Improve][SeaTunnel-Web] Change JobMode and EngineType to enum type t…
Browse files Browse the repository at this point in the history
…o avoid hard coding (#210)

* [Improve][SeaTunnel-Web] Change JobMode to enum type to avoid hard coding

* [Improve][SeaTunnel-Web] Change JobMode and EngineType to enum type to avoid hard coding
  • Loading branch information
wuchunfu authored Sep 9, 2024
1 parent ee671d3 commit 3ea6cb8
Show file tree
Hide file tree
Showing 24 changed files with 93 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.seatunnel.app.domain.dto.job.SeaTunnelJobInstanceDto;
import org.apache.seatunnel.app.service.ITaskInstanceService;
import org.apache.seatunnel.app.utils.PageInfo;
import org.apache.seatunnel.common.constants.JobMode;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
Expand All @@ -46,7 +47,7 @@ public Result<PageInfo<SeaTunnelJobInstanceDto>> getTaskInstanceList(
@RequestParam(name = "stateType", required = false) String stateType,
@RequestParam(name = "startDate", required = false) String startTime,
@RequestParam(name = "endDate", required = false) String endTime,
@RequestParam("syncTaskType") String syncTaskType,
@RequestParam("syncTaskType") JobMode jobMode,
@RequestParam("pageNo") Integer pageNo,
@RequestParam("pageSize") Integer pageSize) {
return taskInstanceService.getSyncTaskInstancePaging(
Expand All @@ -56,7 +57,7 @@ public Result<PageInfo<SeaTunnelJobInstanceDto>> getTaskInstanceList(
stateType,
startTime,
endTime,
syncTaskType,
jobMode,
pageNo,
pageSize);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.seatunnel.app.dal.entity.JobInstance;
import org.apache.seatunnel.app.dal.mapper.JobInstanceMapper;
import org.apache.seatunnel.app.domain.dto.job.SeaTunnelJobInstanceDto;
import org.apache.seatunnel.common.constants.JobMode;

import com.baomidou.mybatisplus.core.metadata.IPage;
import lombok.NonNull;
Expand All @@ -44,7 +45,7 @@ IPage<SeaTunnelJobInstanceDto> queryJobInstanceListPaging(
Date startTime,
Date endTime,
String jobDefineId,
String jobMode);
JobMode jobMode);

List<JobInstance> getAllJobInstance(@NonNull List<Long> jobInstanceIdList);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.seatunnel.app.dal.entity.JobInstance;
import org.apache.seatunnel.app.dal.mapper.JobInstanceMapper;
import org.apache.seatunnel.app.domain.dto.job.SeaTunnelJobInstanceDto;
import org.apache.seatunnel.common.constants.JobMode;

import org.springframework.stereotype.Repository;

Expand Down Expand Up @@ -71,7 +72,7 @@ public IPage<SeaTunnelJobInstanceDto> queryJobInstanceListPaging(
Date startTime,
Date endTime,
String jobDefineName,
String jobMode) {
JobMode jobMode) {
IPage<SeaTunnelJobInstanceDto> jobInstanceIPage =
jobInstanceMapper.queryJobInstanceListPaging(
page, startTime, endTime, jobDefineName, jobMode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.seatunnel.app.dal.entity;

import org.apache.seatunnel.app.common.EngineType;
import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.engine.core.job.JobStatus;

import com.baomidou.mybatisplus.annotation.IdType;
Expand Down Expand Up @@ -50,7 +52,7 @@ public class JobInstance {
private String jobConfig;

@TableField("engine_name")
private String engineName;
private EngineType engineName;

@TableField("engine_version")
private String engineVersion;
Expand All @@ -74,7 +76,7 @@ public class JobInstance {
private Date endTime;

@TableField("job_type")
private String jobType;
private JobMode jobType;

@TableField("error_message")
private String errorMessage;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.seatunnel.app.dal.entity;

import org.apache.seatunnel.app.common.EngineType;
import org.apache.seatunnel.common.constants.JobMode;

import com.baomidou.mybatisplus.annotation.IdType;
Expand Down Expand Up @@ -47,12 +48,12 @@ public class JobVersion {

/** {@link JobMode} value */
@TableField("job_mode")
private String jobMode;
private JobMode jobMode;

@TableField private String env;

@TableField("engine_name")
private String engineName;
private EngineType engineName;

@TableField("engine_version")
private String engineVersion;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.seatunnel.app.dal.entity.JobInstance;
import org.apache.seatunnel.app.domain.dto.job.SeaTunnelJobInstanceDto;
import org.apache.seatunnel.common.constants.JobMode;

import org.apache.ibatis.annotations.Param;

Expand All @@ -36,7 +37,7 @@ IPage<SeaTunnelJobInstanceDto> queryJobInstanceListPaging(
@Param("startTime") Date startTime,
@Param("endTime") Date endTime,
@Param("jobDefineName") String jobDefineName,
@Param("jobMode") String jobMode);
@Param("jobMode") JobMode jobMode);

JobInstance getJobExecutionStatus(@Param("jobInstanceId") Long jobInstanceId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@
*/
package org.apache.seatunnel.app.domain.response.engine;

import org.apache.seatunnel.app.common.EngineType;

import lombok.AllArgsConstructor;
import lombok.Data;

@Data
@AllArgsConstructor
public class Engine {
private String name;
private EngineType name;
private String version;
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.apache.seatunnel.app.domain.response.executor;

import org.apache.seatunnel.app.common.EngineType;
import org.apache.seatunnel.common.constants.JobMode;

import lombok.AllArgsConstructor;
import lombok.Data;

Expand All @@ -29,13 +32,13 @@ public class JobExecutorRes {
private final String jobConfig;

/** engine name Spark/Flink/SeaTunnel */
private final String engine;
private final EngineType engine;

/** The driver run mode, only spark use now, support 'client' and 'cluster' */
private final String deployMode;

/** The engine run mode, for SeaTunnel Engine only support 'local' and null */
private final String master;

private final String jobMode;
private final JobMode jobMode;
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.seatunnel.app.domain.response.metrics.JobPipelineDetailMetricsRes;
import org.apache.seatunnel.app.domain.response.metrics.JobPipelineSummaryMetricsRes;
import org.apache.seatunnel.app.domain.response.metrics.JobSummaryMetricsRes;
import org.apache.seatunnel.common.constants.JobMode;

import org.apache.commons.lang3.tuple.ImmutablePair;

Expand Down Expand Up @@ -54,5 +55,5 @@ Map<Long, JobSummaryMetricsRes> getALLJobSummaryMetrics(
@NonNull Integer userId,
@NonNull Map<Long, Long> jobInstanceIdAndJobEngineIdMap,
@NonNull List<Long> jobInstanceIdList,
@NonNull String syncTaskType);
@NonNull JobMode jobMode);
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.seatunnel.app.domain.dto.job.SeaTunnelJobInstanceDto;
import org.apache.seatunnel.app.domain.response.executor.JobExecutionStatus;
import org.apache.seatunnel.app.utils.PageInfo;
import org.apache.seatunnel.common.constants.JobMode;

public interface ITaskInstanceService<T> {

Expand All @@ -31,7 +32,7 @@ Result<PageInfo<T>> getSyncTaskInstancePaging(
String stateType,
String startTime,
String endTime,
String syncTaskType,
JobMode jobMode,
Integer pageNo,
Integer pageSize);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.seatunnel.app.service.impl;

import org.apache.seatunnel.app.bean.engine.EngineDataType;
import org.apache.seatunnel.app.common.EngineType;
import org.apache.seatunnel.app.domain.response.engine.Engine;
import org.apache.seatunnel.app.permission.constants.SeatunnelFuncPermissionKeyConstant;
import org.apache.seatunnel.app.service.IEngineService;
Expand All @@ -36,7 +37,7 @@ public class EngineServiceImpl extends SeatunnelBaseServiceImpl implements IEngi
Lists.newArrayList(
// new Engine("Spark", "2.4.0"),
// new Engine("Flink", "1.13.6"),
new Engine("SeaTunnel", "2.3.6")));
new Engine(EngineType.SeaTunnel, "2.3.7")));

@Override
public List<Engine> listSupportEngines() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.seatunnel.app.service.impl;

import org.apache.seatunnel.app.common.EngineType;
import org.apache.seatunnel.app.dal.dao.IJobDefinitionDao;
import org.apache.seatunnel.app.dal.dao.IJobVersionDao;
import org.apache.seatunnel.app.dal.entity.JobDefinition;
Expand Down Expand Up @@ -74,7 +73,7 @@ public JobConfigRes getJobConfig(long jobVersionId) throws JsonProcessingExcepti
} catch (IOException e) {
throw new RuntimeException(e);
}
jobConfigRes.setEngine(EngineType.valueOf(jobVersion.getEngineName()));
jobConfigRes.setEngine(jobVersion.getEngineName());
return jobConfigRes;
}

Expand All @@ -100,8 +99,8 @@ public void updateJobConfig(int userId, long jobVersionId, JobConfig jobConfig)
JobVersion.builder()
.jobId(version.getJobId())
.id(version.getId())
.jobMode(jobMode.name())
.engineName(jobConfig.getEngine().name())
.jobMode(jobMode)
.engineName(jobConfig.getEngine())
.updateUserId(userId)
.env(OBJECT_MAPPER.writeValueAsString(jobConfig.getEnv()))
.build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,12 @@ public long createJob(int userId, JobReq jobReq)
.updateUserId(userId)
.name(DEFAULT_VERSION)
.id(uuid)
.engineName(EngineType.SeaTunnel.name())
.engineName(EngineType.SeaTunnel)
.engineVersion("2.3.7");
if (BusinessMode.DATA_INTEGRATION.equals(jobReq.getJobType())) {
builder.jobMode(JobMode.BATCH.name());
builder.jobMode(JobMode.BATCH);
} else if (BusinessMode.DATA_REPLICA.equals(jobReq.getJobType())) {
builder.jobMode(JobMode.STREAMING.name());
builder.jobMode(JobMode.STREAMING);
}
jobVersionDao.createVersion(builder.build());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,7 @@ public JobExecutorRes createExecuteResource(
jobInstance.setEngineVersion(latestVersion.getEngineVersion());
jobInstance.setJobConfig(jobConfig);
jobInstance.setCreateUserId(userId);
if (!latestVersion.getJobMode().isEmpty()) {
jobInstance.setJobType(latestVersion.getJobMode());
}
jobInstance.setJobType(latestVersion.getJobMode());

jobInstanceDao.insert(jobInstance);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.seatunnel.app.service.impl;

import org.apache.seatunnel.app.common.EngineType;
import org.apache.seatunnel.app.dal.dao.IJobInstanceDao;
import org.apache.seatunnel.app.dal.dao.IJobInstanceHistoryDao;
import org.apache.seatunnel.app.dal.dao.IJobMetricsDao;
Expand All @@ -33,6 +34,7 @@
import org.apache.seatunnel.app.thirdparty.metrics.EngineMetricsExtractorFactory;
import org.apache.seatunnel.app.thirdparty.metrics.IEngineMetricsExtractor;
import org.apache.seatunnel.app.utils.JobUtils;
import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.server.common.CodeGenerateUtils;
import org.apache.seatunnel.server.common.Constants;
Expand Down Expand Up @@ -108,7 +110,7 @@ public Map<Long, JobSummaryMetricsRes> getALLJobSummaryMetrics(
@NonNull Integer userId,
@NonNull Map<Long, Long> jobInstanceIdAndJobEngineIdMap,
@NonNull List<Long> jobInstanceIdList,
@NonNull String syncTaskType) {
@NonNull JobMode jobMode) {
log.info("jobInstanceIdAndJobEngineIdMap={}", jobInstanceIdAndJobEngineIdMap);

funcPermissionCheck(SeatunnelFuncPermissionKeyConstant.JOB_METRICS_SUMMARY, userId);
Expand All @@ -125,15 +127,14 @@ public Map<Long, JobSummaryMetricsRes> getALLJobSummaryMetrics(
allJobInstance.get(0).getEngineName(),
allJobInstance.get(0).getEngineVersion());

if (syncTaskType.equals("BATCH")) {

if (JobMode.BATCH == jobMode) {
result =
getMatricsListIfTaskTypeIsBatch(
allJobInstance,
userId,
allRunningJobMetricsFromEngine,
jobInstanceIdAndJobEngineIdMap);
} else if (syncTaskType.equals("STREAMING")) {
} else if (JobMode.STREAMING == jobMode) {
result =
getMatricsListIfTaskTypeIsStreaming(
allJobInstance,
Expand Down Expand Up @@ -425,7 +426,7 @@ private JobSummaryMetricsRes getJobSummaryMetricsResByDb(
}

private Map<Long, HashMap<Integer, JobMetrics>> getAllRunningJobMetricsFromEngine(
String engineName, String engineVersion) {
EngineType engineName, String engineVersion) {
Engine engine = new Engine(engineName, engineVersion);

IEngineMetricsExtractor engineMetricsExtractor =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,20 +94,23 @@ private JobReq getJobDefinition(JobConfig jobConfig) {
SeatunnelErrorEnum.PARAM_CAN_NOT_BE_NULL, "description");
}
jobReq.setDescription(jobConfig.getDescription());
String jobMode = (String) jobConfig.getEnv().get("job.mode");
if (StringUtils.isEmpty(jobMode)) {
throw new ParamValidationException(
SeatunnelErrorEnum.PARAM_CAN_NOT_BE_NULL, "job.mode");
}
if (JobMode.BATCH.name().equals(jobMode)) {
jobReq.setJobType(BusinessMode.DATA_INTEGRATION);
} else if (JobMode.STREAMING.name().equals(jobMode)) {
jobReq.setJobType(BusinessMode.DATA_REPLICA);
} else {
try {
JobMode jobMode = JobMode.valueOf((String) jobConfig.getEnv().get("job.mode"));
if (JobMode.BATCH == jobMode) {
jobReq.setJobType(BusinessMode.DATA_INTEGRATION);
} else if (JobMode.STREAMING == jobMode) {
jobReq.setJobType(BusinessMode.DATA_REPLICA);
} else {
throw new ParamValidationException(
SeatunnelErrorEnum.INVALID_PARAM,
"job.mode",
"job.mode should be either BATCH or STREAMING");
}
} catch (Exception e) {
throw new ParamValidationException(
SeatunnelErrorEnum.INVALID_PARAM,
"job.mode",
"job.mode should be either BATCH or STREAM");
"job.mode should be either BATCH or STREAMING");
}
return jobReq;
}
Expand Down
Loading

0 comments on commit 3ea6cb8

Please sign in to comment.