Skip to content

Commit

Permalink
[Feature][Master] Add task caching mechanism to improve the running s…
Browse files Browse the repository at this point in the history
…peed of repetitive tasks (#13194)

* Supports task instance cache operation

* add task plugin cache

* use SHA-256 to generate key

* Update dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql

Co-authored-by: Jay Chung <[email protected]>

* Update dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql

Co-authored-by: Jay Chung <[email protected]>

* Optimizing database Scripts

* Optimize clear cache operation

Co-authored-by: Jay Chung <[email protected]>
  • Loading branch information
jieguangzhou and zhongjiajie authored Dec 18, 2022
1 parent 042ec74 commit 66e2027
Show file tree
Hide file tree
Showing 77 changed files with 1,151 additions and 24 deletions.
15 changes: 15 additions & 0 deletions docs/docs/en/faq.md
Original file line number Diff line number Diff line change
Expand Up @@ -752,4 +752,19 @@ start API server. If you want disabled when Python gateway service you could cha

---

## Q:How to determine whether a task has been cached when the cache is executed, that is, how to determine whether a task can use the running result of another task?

A: For the task identified as `Cache Execution`, when the task starts, a cache key will be generated, and the key is composed of the following fields and hashed:

- task definition: the id of the task definition corresponding to the task instance
- task version: the version of the task definition corresponding to the task instance
- task input parameters: including the parameters passed in by the upstream node and the global parameter, the parameters referenced by the parameter list of the task definition and the parameters used by the task definition using `${}`
- environment configuration: the actual configuration content of the environment configuration under the environment name, that is, the actual configuration content in the `security` - `environment management`

If the task with cache identification runs, it will find whether there is data with the same cache key in the database,
- If there is, copy the task instance and update the corresponding data
- If not, the task runs as usual, and the task instance data is stored in the cache when the task is completed

If you do not need to cache, you can right-click the node to run `Clear cache` in the workflow instance to clear the cache, which will clear the cache data of the current input parameters under this version.

We will collect more FAQ later
1 change: 1 addition & 0 deletions docs/docs/en/guide/task/appendix.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ DolphinScheduler task plugins share some common default parameters. Each type of
|--------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| Node Name | The name of the task. Node names within the same workflow must be unique. |
| Run Flag | Indicating whether to schedule the task. If you do not need to execute the task, you can turn on the `Prohibition execution` switch. |
| Cache Execution | Indicating whether this node needs to be cached. If it is cached, the same identifier (same task version, same task definition, same parameter input) task is cached. When the task has been cached, it will not be executed again, and the result will be reused directly. |
| Description | Describing the function of this node. |
| Task Priority | When the number of the worker threads is insufficient, the worker executes task according to the priority. When two tasks have the same priority, the worker will execute them in `first come first served` fashion. |
| Worker Group | Machines which execute the tasks. If you choose `default`, scheduler will send the task to a random worker. |
Expand Down
15 changes: 15 additions & 0 deletions docs/docs/zh/faq.md
Original file line number Diff line number Diff line change
Expand Up @@ -720,4 +720,19 @@ A:在 3.0.0-alpha 版本之后,Python gateway server 集成到 api server

---

## Q: 缓存执行时怎么判断任务已经存在缓存过的任务,即如何判断一个任务可以使用另外一个任务的运行结果?

A: 对于标识为`缓存执行`的任务, 当任务启动时会生成一个缓存key, 该key由以下字段组合哈希得到:

- 任务定义:任务实例对应的任务定义的id
- 任务的版本:任务实例对应的任务定义的版本
- 任务输入的参数:包括上游节点和全局参数传入的参数中,被任务定义的参数列表所引用和任务定义中使用`${}`引用的参数
- 环境配置: 环境名称下具体的环境配置内容,具体为安全中心环境管理中的实际配置内容

当缓存标识的任务运行时,会查找数据库中是否用相同缓存key的数据,
- 若有则复制该任务实例并进行相应数据的更新
- 若无,则任务照常运行,并在任务完成时将任务实例的数据存入缓存

若不需要缓存时,可以在工作流实例中右键运行清除缓存,则会清除该版本下当前输入的参数的缓存数据。

我们会持续收集更多的 FAQ。
1 change: 1 addition & 0 deletions docs/docs/zh/guide/task/appendix.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
|----------|--------------------------------------------------------------------------------------------------------------------------------------|
| 任务名称 | 任务的名称,同一个工作流定义中的节点名称不能重复。 |
| 运行标志 | 标识这个节点是否需要调度执行,如果不需要执行,可以打开禁止执行开关。 |
| 缓存执行 | 标识这个节点是否需要进行缓存,如果缓存,则对于相同标识(相同任务版本,相同任务定义,相同参数传入)的任务进行缓存,运行时若已经存在缓存过的任务时,不在重复执行,直接复用结果。 |
| 描述 | 当前节点的功能描述。 |
| 任务优先级 | worker线程数不足时,根据优先级从高到低依次执行任务,优先级一样时根据先到先得原则执行。 |
| Worker分组 | 设置分组后,任务会被分配给worker组的机器机执行。若选择Default,则会随机选择一个worker执行。 |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@

import static org.apache.dolphinscheduler.api.enums.Status.FORCE_TASK_SUCCESS_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.QUERY_TASK_LIST_PAGING_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.REMOVE_TASK_INSTANCE_CACHE_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.TASK_SAVEPOINT_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.TASK_STOP_ERROR;

import org.apache.dolphinscheduler.api.aspect.AccessLogAnnotation;
import org.apache.dolphinscheduler.api.dto.taskInstance.TaskInstanceRemoveCacheResponse;
import org.apache.dolphinscheduler.api.exceptions.ApiException;
import org.apache.dolphinscheduler.api.service.TaskInstanceService;
import org.apache.dolphinscheduler.api.utils.Result;
Expand All @@ -34,6 +36,7 @@

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
Expand Down Expand Up @@ -188,4 +191,26 @@ public Result<Object> stopTask(@Parameter(hidden = true) @RequestAttribute(value
@PathVariable(value = "id") Integer id) {
return taskInstanceService.stopTask(loginUser, projectCode, id);
}

/**
* remove task instance cache
*
* @param loginUser login user
* @param projectCode project code
* @param id task instance id
* @return the result code and msg
*/
@Operation(summary = "remove-task-instance-cache", description = "REMOVE_TASK_INSTANCE_CACHE")
@Parameters({
@Parameter(name = "id", description = "TASK_INSTANCE_ID", required = true, schema = @Schema(implementation = int.class, example = "12"))
})
@DeleteMapping(value = "/{id}/remove-cache")
@ResponseStatus(HttpStatus.OK)
@ApiException(REMOVE_TASK_INSTANCE_CACHE_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public TaskInstanceRemoveCacheResponse removeTaskInstanceCache(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@PathVariable(value = "id") Integer id) {
return taskInstanceService.removeTaskInstanceCache(loginUser, projectCode, id);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.api.dto.taskInstance;

import org.apache.dolphinscheduler.api.utils.Result;

import lombok.Data;

/**
* task instance success response
*/
@Data
public class TaskInstanceRemoveCacheResponse extends Result {

private String cacheKey;

public TaskInstanceRemoveCacheResponse(Result result) {
super();
this.setCode(result.getCode());
this.setMsg(result.getMsg());
}

public TaskInstanceRemoveCacheResponse(Result result, String cacheKey) {
super();
this.setCode(result.getCode());
this.setMsg(result.getMsg());
this.cacheKey = cacheKey;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -278,13 +278,16 @@ public enum Status {
UDF_RESOURCE_IS_BOUND(20013, "udf resource file is bound by UDF functions:{0}", "udf函数绑定了资源文件[{0}]"),
RESOURCE_IS_USED(20014, "resource file is used by process definition", "资源文件被上线的流程定义使用了"),
PARENT_RESOURCE_NOT_EXIST(20015, "parent resource not exist", "父资源文件不存在"),

RESOURCE_NOT_EXIST_OR_NO_PERMISSION(20016,
"resource not exist or no permission,please view the task node and remove error resource",
"请检查任务节点并移除无权限或者已删除的资源"),
RESOURCE_IS_AUTHORIZED(20017, "resource is authorized to user {0},suffix not allowed to be modified",
"资源文件已授权其他用户[{0}],后缀不允许修改"),
RESOURCE_HAS_FOLDER(20018, "There are files or folders in the current directory:{0}", "当前目录下有文件或文件夹[{0}]"),

REMOVE_TASK_INSTANCE_CACHE_ERROR(20019, "remove task instance cache error", "删除任务实例缓存错误"),

USER_NO_OPERATION_PERM(30001, "user has no operation privilege", "当前用户没有操作权限"),
USER_NO_OPERATION_PROJECT_PERM(30002, "user {0} is not has project {1} permission", "当前用户[{0}]没有[{1}]项目的操作权限"),
USER_NO_WRITE_PROJECT_PERM(30003, "user [{0}] does not have write permission for project [{1}]",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.dolphinscheduler.api.service;

import org.apache.dolphinscheduler.api.dto.taskInstance.TaskInstanceRemoveCacheResponse;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.enums.TaskExecuteType;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
Expand Down Expand Up @@ -100,4 +101,13 @@ Result forceTaskSuccess(User loginUser,
* @return the result code and msg
*/
TaskInstance queryTaskInstanceById(User loginUser, long projectCode, Long taskInstanceId);

/**
* remove task instance cache
* @param loginUser
* @param projectCode
* @param taskInstanceId
* @return
*/
TaskInstanceRemoveCacheResponse removeTaskInstanceCache(User loginUser, long projectCode, Integer taskInstanceId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
package org.apache.dolphinscheduler.api.service.impl;

import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.FORCED_SUCCESS;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.INSTANCE_UPDATE;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_INSTANCE;

import org.apache.dolphinscheduler.api.dto.taskInstance.TaskInstanceRemoveCacheResponse;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.ProcessInstanceService;
import org.apache.dolphinscheduler.api.service.ProjectService;
Expand All @@ -38,13 +40,18 @@
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.dao.utils.TaskCacheUtils;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand;
import org.apache.dolphinscheduler.remote.command.TaskSavePointRequestCommand;
import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.service.process.ProcessService;

import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;

import java.util.Date;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -81,6 +88,9 @@ public class TaskInstanceServiceImpl extends BaseServiceImpl implements TaskInst
@Autowired
TaskInstanceMapper taskInstanceMapper;

@Autowired
TaskInstanceDao taskInstanceDao;

@Autowired
ProcessInstanceService processInstanceService;

Expand Down Expand Up @@ -319,4 +329,30 @@ public TaskInstance queryTaskInstanceById(User loginUser, long projectCode, Long
}
return taskInstance;
}

@Override
public TaskInstanceRemoveCacheResponse removeTaskInstanceCache(User loginUser, long projectCode,
Integer taskInstanceId) {
Result result = new Result();

Project project = projectMapper.queryByCode(projectCode);
projectService.checkProjectAndAuthThrowException(loginUser, project, INSTANCE_UPDATE);

TaskInstance taskInstance = taskInstanceMapper.selectById(taskInstanceId);
if (taskInstance == null) {
logger.error("Task definition can not be found, projectCode:{}, taskInstanceId:{}.", projectCode,
taskInstanceId);
putMsg(result, Status.TASK_INSTANCE_NOT_FOUND);
return new TaskInstanceRemoveCacheResponse(result);
}
String tagCacheKey = taskInstance.getCacheKey();
Pair<Integer, String> taskIdAndCacheKey = TaskCacheUtils.revertCacheKey(tagCacheKey);
String cacheKey = taskIdAndCacheKey.getRight();
if (StringUtils.isNotEmpty(cacheKey)) {
taskInstanceDao.clearCacheByCacheKey(cacheKey);
}
putMsg(result, Status.SUCCESS);
return new TaskInstanceRemoveCacheResponse(result, cacheKey);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.mockito.Mockito.when;

import org.apache.dolphinscheduler.api.ApiApplicationServer;
import org.apache.dolphinscheduler.api.dto.taskInstance.TaskInstanceRemoveCacheResponse;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.impl.ProjectServiceImpl;
import org.apache.dolphinscheduler.api.service.impl.TaskInstanceServiceImpl;
Expand All @@ -40,6 +41,7 @@
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.service.process.ProcessService;

Expand Down Expand Up @@ -93,6 +95,9 @@ public class TaskInstanceServiceTest {
@Mock
TaskDefinitionMapper taskDefinitionMapper;

@Mock
TaskInstanceDao taskInstanceDao;

@Test
public void queryTaskListPaging() {
long projectCode = 1L;
Expand Down Expand Up @@ -341,4 +346,30 @@ public void forceTaskSuccess() {
Assertions.assertEquals(Status.SUCCESS.getCode(), successRes.getCode().intValue());

}

@Test
public void testRemoveTaskInstanceCache() {
User user = getAdminUser();
long projectCode = 1L;
Project project = getProject(projectCode);
int taskId = 1;
TaskInstance task = getTaskInstance();
String cacheKey = "950311f3597f9198976cd3fd69e208e5b9ba6750";
task.setCacheKey(cacheKey);

when(projectMapper.queryByCode(projectCode)).thenReturn(project);
when(taskInstanceMapper.selectById(1)).thenReturn(task);
when(taskInstanceDao.findTaskInstanceByCacheKey(cacheKey)).thenReturn(task, null);
when(taskInstanceDao.updateTaskInstance(task)).thenReturn(true);

TaskInstanceRemoveCacheResponse response =
taskInstanceService.removeTaskInstanceCache(user, projectCode, taskId);
Assertions.assertEquals(Status.SUCCESS.getCode(), response.getCode());

when(taskInstanceMapper.selectById(1)).thenReturn(null);
TaskInstanceRemoveCacheResponse responseNotFoundTask =
taskInstanceService.removeTaskInstanceCache(user, projectCode, taskId);
Assertions.assertEquals(Status.TASK_INSTANCE_NOT_FOUND.getCode(), responseNotFoundTask.getCode());

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
* have_arr_variables
* have_map_variables
* have_alert
* is_cache
*/
public enum Flag {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,6 @@ public enum TaskEventType {
DELAY,
RUNNING,
RESULT,
WORKER_REJECT
WORKER_REJECT,
CACHE,
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,11 @@ public class TaskDefinition {
*/
private Flag flag;

/**
* task is cache: yes/no
*/
private Flag isCache;

/**
* task priority
*/
Expand Down Expand Up @@ -281,6 +286,7 @@ public boolean equals(Object o) {
&& Objects.equals(taskType, that.taskType)
&& Objects.equals(taskParams, that.taskParams)
&& flag == that.flag
&& isCache == that.isCache
&& taskPriority == that.taskPriority
&& Objects.equals(workerGroup, that.workerGroup)
&& timeoutFlag == that.timeoutFlag
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public TaskDefinitionLog(TaskDefinition taskDefinition) {
this.setFailRetryInterval(taskDefinition.getFailRetryInterval());
this.setFailRetryTimes(taskDefinition.getFailRetryTimes());
this.setFlag(taskDefinition.getFlag());
this.setIsCache(taskDefinition.getIsCache());
this.setModifyBy(taskDefinition.getModifyBy());
this.setCpuQuota(taskDefinition.getCpuQuota());
this.setMemoryMax(taskDefinition.getMemoryMax());
Expand Down
Loading

0 comments on commit 66e2027

Please sign in to comment.