Skip to content

Commit

Permalink
[Improvement] [Seatunnel-web] Add support to provide reason for job f…
Browse files Browse the repository at this point in the history
…ailure (#196)
  • Loading branch information
arshadmohammad authored Aug 28, 2024
1 parent ee45dca commit 0a1e24d
Show file tree
Hide file tree
Showing 15 changed files with 277 additions and 47 deletions.
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -206,3 +206,9 @@ Now ,let me show you how to use it.

#### Virtual Tables manage
![img.png](docs/images/VirtualImage.png)

### Upgrades
#### 1. Upgrade from 1.0.1 or before to 1.0.2 or after.
Execute the following SQL to upgrade the database:

```ALTER TABLE `t_st_job_instance` ADD COLUMN `error_message` varchar(4096) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL;```
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,7 @@ public class JobInstance {

@TableField("job_type")
private String jobType;

@TableField("error_message")
private String errorMessage;
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.seatunnel.app.dal.entity.JobTask;
import org.apache.seatunnel.app.domain.request.job.JobExecParam;
import org.apache.seatunnel.app.domain.response.executor.JobExecutorRes;
import org.apache.seatunnel.engine.core.job.JobResult;

import lombok.NonNull;

Expand All @@ -40,5 +41,8 @@ String generateJobConfig(
JobExecutorRes getExecuteResource(@NonNull Long jobEngineId);

void complete(
@NonNull Integer userId, @NonNull Long jobInstanceId, @NonNull String jobEngineId);
@NonNull Integer userId,
@NonNull Long jobInstanceId,
@NonNull String jobEngineId,
JobResult jobResult);
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.seatunnel.app.thirdparty.engine.SeaTunnelEngineProxy;
import org.apache.seatunnel.app.thirdparty.metrics.EngineMetricsExtractorFactory;
import org.apache.seatunnel.app.thirdparty.metrics.IEngineMetricsExtractor;
import org.apache.seatunnel.app.utils.JobExecParamUtil;
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.config.DeployMode;
import org.apache.seatunnel.engine.client.SeaTunnelClient;
Expand All @@ -37,6 +38,7 @@
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
import org.apache.seatunnel.engine.common.config.YamlSeaTunnelConfigBuilder;
import org.apache.seatunnel.engine.core.job.JobResult;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.server.common.SeatunnelErrorEnum;

Expand Down Expand Up @@ -128,6 +130,9 @@ private void executeJobBySeaTunnel(Integer userId, String filePath, Long jobInst
JobInstance jobInstance = jobInstanceDao.getJobInstance(jobInstanceId);
jobInstance.setJobStatus(JobStatus.FAILED.name());
jobInstance.setEndTime(new Date());
String jobInstanceErrorMessage =
JobExecParamUtil.getJobInstanceErrorMessage(e.getMessage());
jobInstance.setErrorMessage(jobInstanceErrorMessage);
jobInstanceDao.update(jobInstance);
throw new RuntimeException(e.getMessage(), e);
}
Expand All @@ -152,21 +157,22 @@ public void waitJobFinish(
String jobEngineId,
SeaTunnelClient seaTunnelClient) {
ExecutorService executor = Executors.newFixedThreadPool(1);
CompletableFuture<JobStatus> future =
CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete, executor);
CompletableFuture<JobResult> future =
CompletableFuture.supplyAsync(clientJobProxy::waitForJobCompleteV2, executor);
JobResult jobResult = new JobResult(JobStatus.FAILED, "");
try {
log.info("future.get before");
JobStatus jobStatus = future.get();

jobResult = future.get();
executor.shutdown();
} catch (InterruptedException e) {
jobResult.setError(e.getMessage());
throw new RuntimeException(e);
} catch (ExecutionException e) {
jobResult.setError(e.getMessage());
throw new RuntimeException(e);
} finally {
seaTunnelClient.close();
log.info("and jobInstanceService.complete begin");
jobInstanceService.complete(userId, jobInstanceId, jobEngineId);
jobInstanceService.complete(userId, jobInstanceId, jobEngineId, jobResult);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import org.apache.seatunnel.app.domain.request.job.transform.TransformOptions;
import org.apache.seatunnel.app.domain.response.datasource.VirtualTableDetailRes;
import org.apache.seatunnel.app.domain.response.executor.JobExecutorRes;
import org.apache.seatunnel.app.domain.response.metrics.JobPipelineSummaryMetricsRes;
import org.apache.seatunnel.app.permission.constants.SeatunnelFuncPermissionKeyConstant;
import org.apache.seatunnel.app.service.IDatasourceService;
import org.apache.seatunnel.app.service.IJobInstanceService;
Expand All @@ -64,7 +63,7 @@
import org.apache.seatunnel.app.utils.SeaTunnelConfigUtil;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.core.job.JobResult;
import org.apache.seatunnel.server.common.CodeGenerateUtils;
import org.apache.seatunnel.server.common.SeatunnelErrorEnum;
import org.apache.seatunnel.server.common.SeatunnelException;
Expand Down Expand Up @@ -361,34 +360,18 @@ public JobExecutorRes getExecuteResource(@NonNull Long jobEngineId) {

@Override
public void complete(
@NonNull Integer userId, @NonNull Long jobInstanceId, @NonNull String jobEngineId) {
@NonNull Integer userId,
@NonNull Long jobInstanceId,
@NonNull String jobEngineId,
JobResult jobResult) {
funcPermissionCheck(SeatunnelFuncPermissionKeyConstant.JOB_EXECUTOR_COMPLETE, userId);
JobInstance jobInstance = jobInstanceDao.getJobInstanceMapper().selectById(jobInstanceId);
jobMetricsService.syncJobDataToDb(jobInstance, userId, jobEngineId);

List<JobPipelineSummaryMetricsRes> status =
jobMetricsService.getJobPipelineSummaryMetrics(userId, jobInstanceId);

String jobStatus;
Set<String> statusList =
status.stream()
.map(JobPipelineSummaryMetricsRes::getStatus)
.map(String::toUpperCase)
.collect(Collectors.toSet());
if (statusList.size() == 1 && statusList.contains("FINISHED")) {
jobStatus = JobStatus.FINISHED.name();
} else if (statusList.contains("FAILED")) {
jobStatus = JobStatus.FAILED.name();
} else if (statusList.contains("CANCELED")) {
jobStatus = JobStatus.CANCELED.name();
} else if (statusList.contains("CANCELLING")) {
jobStatus = JobStatus.CANCELING.name();
} else {
jobStatus = JobStatus.RUNNING.name();
}
jobInstance.setJobStatus(jobStatus);
jobInstance.setJobStatus(jobResult.getStatus().name());
jobInstance.setJobEngineId(jobEngineId);
jobInstance.setUpdateUserId(userId);
jobInstance.setErrorMessage(
JobExecParamUtil.getJobInstanceErrorMessage(jobResult.getError()));
jobInstanceDao.update(jobInstance);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,18 @@

public class JobExecParamUtil {

// The maximum length of the job execution error message, 4KB
private static final int ERROR_MESSAGE_MAX_LENGTH = 4096;

public static String getJobInstanceErrorMessage(String message) {
if (message == null) {
return null;
}
return message.length() > ERROR_MESSAGE_MAX_LENGTH
? message.substring(0, ERROR_MESSAGE_MAX_LENGTH)
: message;
}

public static Config updateEnvConfig(JobExecParam jobExecParam, Config envConfig) {
if (jobExecParam == null || jobExecParam.getEnv() == null) {
return envConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@
<result column="engine_name" jdbcType="VARCHAR" property="engineName"/>
<result column="engine_version" jdbcType="VARCHAR" property="engineVersion"/>
<result column="job_engine_id" jdbcType="VARCHAR" property="jobEngineId"/>
<result column="error_message" jdbcType="VARCHAR" property="errorMessage"/>
</resultMap>
<sql id="Base_Column_List">
id
, `job_define_id`, `job_status`, `job_config`, `engine_name`, `engine_version`, `job_engine_id`
, `job_define_id`, `job_status`, `job_config`, `engine_name`, `engine_version`, `job_engine_id`,`error_message`
</sql>

<select id="queryJobInstanceListPaging" resultType="org.apache.seatunnel.app.domain.dto.job.SeaTunnelJobInstanceDto">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ CREATE TABLE t_st_job_instance (
update_time TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3),
end_time TIMESTAMP(3) DEFAULT NULL,
job_type VARCHAR(50) NOT NULL,
error_message VARCHAR(4096) DEFAULT NULL,
PRIMARY KEY (id)
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ CREATE TABLE `t_st_job_instance` (
`update_time` timestamp(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) ON UPDATE CURRENT_TIMESTAMP(3),
`end_time` timestamp(3) NULL DEFAULT NULL,
`job_type` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL,
`error_message` varchar(4096) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL,
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_bin ROW_FORMAT = Dynamic;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public Result<Void> deleteSingleTask(long jobVersionId, String pluginId) {
return JSONTestUtils.parseObject(response, Result.class);
}

public String createFakeSourcePlugin(String datasourceId, long jobVersionId) {
public String createFakeSourcePlugin(String datasourceId, long jobVersionId, String rows) {
DataSourceOption tableOption = new DataSourceOption();
tableOption.setDatabases(Arrays.asList("fake_database"));
tableOption.setTables(Arrays.asList("fake_table"));
Expand All @@ -88,14 +88,26 @@ public String createFakeSourcePlugin(String datasourceId, long jobVersionId) {
.dataSourceId(Long.parseLong(datasourceId))
.sceneMode(SceneMode.SINGLE_TABLE)
.config(
"{\"query\":\"\",\"tables_configs\":\"\",\"schema\":\"fields {\\n name = \\\"string\\\"\\n age = \\\"int\\\"\\n }\",\"string.fake.mode\":\"RANGE\",\"string.template\":\"\",\"tinyint.fake.mode\":\"RANGE\",\"tinyint.template\":\"\",\"smallint.fake.mode\":\"RANGE\",\"smallint.template\":\"\",\"int.fake.mode\":\"RANGE\",\"int.template\":\"\",\"bigint.fake.mode\":\"RANGE\",\"bigint.template\":\"\",\"float.fake.mode\":\"RANGE\",\"float.template\":\"\",\"double.fake.mode\":\"RANGE\",\"double.template\":\"\",\"rows\":\"\",\"row.num\":5,\"split.num\":1,\"split.read-interval\":1,\"map.size\":5,\"array.size\":5,\"bytes.length\":5,\"date.year.template\":\"\",\"date.month.template\":\"\",\"date.day.template\":\"\",\"time.hour.template\":\"\",\"time.minute.template\":\"\",\"time.second.template\":\"\",\"parallelism\":1}")
"{\"query\":\"\",\"tables_configs\":\"\",\"schema\":\"fields {\\n name = \\\"string\\\"\\n age = \\\"int\\\"\\n }\",\"string.fake.mode\":\"RANGE\",\"string.template\":\"\",\"tinyint.fake.mode\":\"RANGE\",\"tinyint.template\":\"\",\"smallint.fake.mode\":\"RANGE\",\"smallint.template\":\"\",\"int.fake.mode\":\"RANGE\",\"int.template\":\"\",\"bigint.fake.mode\":\"RANGE\",\"bigint.template\":\"\",\"float.fake.mode\":\"RANGE\",\"float.template\":\"\",\"double.fake.mode\":\"RANGE\",\"double.template\":\"\",\"rows\":\""
+ rows
+ "\",\"row.num\":5,\"split.num\":1,\"split.read-interval\":1,\"map.size\":5,\"array.size\":5,\"bytes.length\":5,\"date.year.template\":\"\",\"date.month.template\":\"\",\"date.day.template\":\"\",\"time.hour.template\":\"\",\"time.minute.template\":\"\",\"time.second.template\":\"\",\"parallelism\":1}")
.build();

Result<Void> srcResult = saveSingleTask(jobVersionId, sourcePluginConfig);
assertTrue(srcResult.isSuccess());
return sourcePluginId;
}

public String createFakeSourcePlugin(String datasourceId, long jobVersionId) {
return createFakeSourcePlugin(datasourceId, jobVersionId, "");
}

public String createFakeSourcePluginThatFails(String datasourceId, long jobVersionId) {
String rows =
"[{kind=INSERT, fields=[\"org\", 100]}, {kind=INSERT, fields=[\"apache\", 50]}, {kind=INSERT, fields=[\"seatunnel\", 25]}, {kind=INSERT, fields=[\"seatunnel-web\", 12]}, {kind=INSERT, fields=[\"etl\", 6_age_invalid_number]}]";
return createFakeSourcePlugin(datasourceId, jobVersionId, rows);
}

public String createConsoleSinkPlugin(String datasourceId, long jobVersionId) {
DataSourceOption sinkTableOption = new DataSourceOption();
sinkTableOption.setDatabases(Arrays.asList("console_fake_database"));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.app.controller;

import org.apache.seatunnel.app.common.Result;
import org.apache.seatunnel.app.common.SeatunnelWebTestingBase;
import org.apache.seatunnel.app.domain.dto.job.SeaTunnelJobInstanceDto;
import org.apache.seatunnel.app.utils.JSONTestUtils;
import org.apache.seatunnel.app.utils.PageInfo;

import com.fasterxml.jackson.core.type.TypeReference;

import java.net.URLEncoder;
import java.text.SimpleDateFormat;
import java.util.Date;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class TaskInstanceControllerWrapper extends SeatunnelWebTestingBase {

private static SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

public Result<PageInfo<SeaTunnelJobInstanceDto>> getTaskInstanceList(
String jobDefineName,
String executorName,
String stateType,
String startTime,
String endTime,
String syncTaskType,
Integer pageNo,
Integer pageSize) {
String response =
sendRequest(
urlWithParam(
"task/jobMetrics?jobDefineName="
+ jobDefineName
+ "&executorName="
+ executorName
+ "&stateType="
+ stateType
+ "&startDate="
+ startTime
+ "&endDate="
+ endTime
+ "&syncTaskType="
+ syncTaskType
+ "&pageNo="
+ pageNo
+ "&pageSize="
+ pageSize));
return JSONTestUtils.parseObject(
response, new TypeReference<Result<PageInfo<SeaTunnelJobInstanceDto>>>() {});
}

public SeaTunnelJobInstanceDto getTaskInstanceList(String jobDefineName) {
String startTime =
URLEncoder.encode(
dateFormat.format(
new Date(System.currentTimeMillis() - 1000 * 60 * 60 * 24)));
String endTime =
URLEncoder.encode(
dateFormat.format(
new Date(System.currentTimeMillis() + 1000 * 60 * 60 * 24)));
String syncTaskType = "BATCH";
Integer pageNo = 1;
Integer pageSize = 10;
Result<PageInfo<SeaTunnelJobInstanceDto>> result =
getTaskInstanceList(
jobDefineName,
null,
null,
startTime,
endTime,
syncTaskType,
pageNo,
pageSize);
assertTrue(result.isSuccess());
if (result.getData().getTotalList().isEmpty()) {
return null;
}
assertEquals(1, result.getData().getTotalList().size());
return result.getData().getTotalList().get(0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.apache.seatunnel.app.controller.JobControllerWrapper;
import org.apache.seatunnel.app.controller.JobExecutorControllerWrapper;
import org.apache.seatunnel.app.controller.SeatunnelDatasourceControllerWrapper;
import org.apache.seatunnel.app.controller.TaskInstanceControllerWrapper;
import org.apache.seatunnel.app.domain.dto.job.SeaTunnelJobInstanceDto;
import org.apache.seatunnel.app.domain.request.datasource.DatasourceReq;
import org.apache.seatunnel.app.domain.request.job.JobCreateReq;
import org.apache.seatunnel.app.domain.request.job.JobExecParam;
Expand Down Expand Up @@ -51,13 +53,15 @@ public class JobExecutorControllerTest {
private static final String uniqueId = "_" + System.currentTimeMillis();
private static SeatunnelDatasourceControllerWrapper seatunnelDatasourceControllerWrapper;
private static JobControllerWrapper jobControllerWrapper;
private static TaskInstanceControllerWrapper taskInstanceControllerWrapper;

@BeforeAll
public static void setUp() {
seaTunnelWebCluster.start();
jobExecutorControllerWrapper = new JobExecutorControllerWrapper();
seatunnelDatasourceControllerWrapper = new SeatunnelDatasourceControllerWrapper();
jobControllerWrapper = new JobControllerWrapper();
taskInstanceControllerWrapper = new TaskInstanceControllerWrapper();
}

@Test
Expand Down Expand Up @@ -274,6 +278,25 @@ public void executeJob_JobStatusUpdate_WhenSubmissionFailed() {
assertFalse(result.isSuccess());
// Even though job failed but job instance is created into the database.
assertTrue(result.getData() > 0);
SeaTunnelJobInstanceDto taskInstanceList =
taskInstanceControllerWrapper.getTaskInstanceList(jobName);
assertNotNull(taskInstanceList.getErrorMessage());
}

@Test
public void storeErrorMessageWhenJobFailed() throws InterruptedException {
String jobName = "failureCause" + uniqueId;
long jobVersionId = JobUtils.createJob(jobName, true);
Result<Long> result = jobExecutorControllerWrapper.jobExecutor(jobVersionId);
// job submitted successfully but it will fail during execution
assertTrue(result.isSuccess());
assertTrue(result.getData() > 0);
JobUtils.waitForJobCompletion(result.getData());
// extra second to let the data get updated in the database
Thread.sleep(2000);
SeaTunnelJobInstanceDto taskInstanceList =
taskInstanceControllerWrapper.getTaskInstanceList(jobName);
assertNotNull(taskInstanceList.getErrorMessage());
}

@AfterAll
Expand Down
Loading

0 comments on commit 0a1e24d

Please sign in to comment.