-
Notifications
You must be signed in to change notification settings - Fork 4.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Feature][Master] Add task caching mechanism to improve the running speed of repetitive tasks #13194
Conversation
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/TaskCacheUtils.java
Fixed
Show fixed
Hide fixed
...-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
Fixed
Show fixed
Hide fixed
Codecov Report
@@ Coverage Diff @@
## dev #13194 +/- ##
============================================
+ Coverage 39.37% 39.46% +0.09%
- Complexity 4278 4294 +16
============================================
Files 1066 1069 +3
Lines 40479 40641 +162
Branches 4657 4673 +16
============================================
+ Hits 15937 16040 +103
- Misses 22755 22812 +57
- Partials 1787 1789 +2
📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
while (true) { | ||
TaskInstance cacheTaskInstance = taskInstanceDao.findTaskInstanceByCacheKey(cacheKey); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe it's not a loop action.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There may be one cacheKey
for multiple pieces of data. For example, two cache tasks will same cache key run almost simultaneously.
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/IsCache.java
Outdated
Show resolved
Hide resolved
...inscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskEventType.java
Outdated
Show resolved
Hide resolved
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java
Outdated
Show resolved
Hide resolved
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
Outdated
Show resolved
Hide resolved
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
Outdated
Show resolved
Hide resolved
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/TaskCacheUtils.java
Outdated
Show resolved
Hide resolved
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/TaskCacheUtils.java
Outdated
Show resolved
Hide resolved
dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql
Outdated
Show resolved
Hide resolved
.../main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
Outdated
Show resolved
Hide resolved
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/TaskCacheUtils.java
Fixed
Show fixed
Hide fixed
if (tagCacheKey.contains(MERGE_TAG)) { | ||
String[] split = tagCacheKey.split(MERGE_TAG); | ||
if (split.length == 2) { | ||
taskIdAndCacheKey = Pair.of(Integer.parseInt(split[0]), split[1]); |
Check notice
Code scanning / CodeQL
Missing catch of NumberFormatException
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/TaskCacheUtils.java
Show resolved
Hide resolved
a0646a4
to
b7819e5
Compare
@Amy0104 @songjianet please help to review the front-end code |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The front end side LGTM.
dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql
Outdated
Show resolved
Hide resolved
dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql
Outdated
Show resolved
Hide resolved
...cheduler-dao/src/main/resources/sql/upgrade/3.2.0_schema/postgresql/dolphinscheduler_ddl.sql
Show resolved
Hide resolved
…ysql.sql Co-authored-by: Jay Chung <[email protected]>
…ostgresql.sql Co-authored-by: Jay Chung <[email protected]>
01e1163
to
e5f80d2
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM 👍
...-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
Outdated
Show resolved
Hide resolved
SonarCloud Quality Gate failed. |
@caishunfeng @ruanwenjun please PTAL, thanks |
Purpose of the pull request
close #13133
Like this :Flyter Caching
In machine learning workflow, if some tasks will be caching, the workflows will be executed faster
How to determine whether a task has been cached when the cache is executed, that is, how to determine whether a task can use the running result of another task?
For the task identified as
Cache Execution
, when the task starts, a cache key will be generated, and the key is composed of the following fields and hashed:${}
security
-environment management
If the task with cache identification runs, it will find whether there is data with the same cache key in the database,
If you do not need to cache, you can right-click the node to run
Clear cache
in the workflow instance to clear the cache, which will clear the cache data of the current input parameters under this version.Brief change log
Front end
useCache
flag to all task plugin except the logical componenthandleRemoveTaskInstanceCache
method to task instance clear cacheBackend
removeTaskInstanceCache
API to clear task instance cacheTaskCacheUtils
to manage the cache keycheckIsCacheExecution
beforedispatcher.dispatch
task to workerTaskCacheEventHandler
to the handle cache task instead dispatch tasksaveCacheTaskInstance
after the cache task successfully runDatabase
is_cache
into tablet_ds_task_definition
、t_ds_task_definition_log
、t_ds_task_instance
cache_key
tot_ds_task_instance
Doc
Cache Execution
parameters introductionVerify this pull request
This pull request is code cleanup without any test coverage.
(or)
This pull request is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(or)
If your pull request contain incompatible change, you should also add it to
docs/docs/en/guide/upgrede/incompatible.md