-
Notifications
You must be signed in to change notification settings - Fork 4.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix TaskGroupQueue will never be wakeup due to wakeup failed at one time #15528
Fix TaskGroupQueue will never be wakeup due to wakeup failed at one time #15528
Conversation
...in/java/org/apache/dolphinscheduler/server/master/runner/taskgroup/TaskGroupCoordinator.java
Fixed
Show fixed
Hide fixed
38ff493
to
212ba8d
Compare
Please retry analysis of this Pull-Request directly on SonarCloud |
2d17d56
to
98a8d8f
Compare
98a8d8f
to
3cfd45c
Compare
ProcessInstance processInstance = processInstanceDao.queryOptionalById(taskGroupQueue.getProcessId()) | ||
.orElseThrow( | ||
() -> new ServiceException(Status.PROCESS_INSTANCE_NOT_EXIST, taskGroupQueue.getProcessId())); |
Check notice
Code scanning / CodeQL
Unread local variable Note
59d46b2
to
82c1bce
Compare
ec952bc
to
628f690
Compare
e36844f
to
b9e452e
Compare
if (taskGroupId == null) { | ||
throw new IllegalArgumentException("taskGroupId cannot be null"); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there any reason we do not use @NonNull
directly for parameter taskGroupId
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DROP PROCEDURE IF EXISTS create_idx_t_ds_task_group_queue_in_queue; | ||
delimiter d// | ||
CREATE PROCEDURE create_idx_t_ds_task_group_queue_in_queue() | ||
BEGIN | ||
DECLARE index_exists INT DEFAULT 0; | ||
SELECT COUNT(*) INTO index_exists FROM information_schema.statistics WHERE table_schema = (SELECT DATABASE()) AND table_name = 't_ds_task_group_queue' AND index_name = 'idx_t_ds_task_group_queue_in_queue'; | ||
IF index_exists = 0 THEN CREATE INDEX idx_t_ds_task_group_queue_in_queue ON t_ds_task_group_queue(in_queue); | ||
END IF; | ||
END; | ||
d// | ||
delimiter ; | ||
CALL create_idx_t_ds_task_group_queue_in_queue; | ||
DROP PROCEDURE create_idx_t_ds_task_group_queue_in_queue; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we could directly remove the related 3.3.0 upgrade. I would merge almost all upgrade sql from 3.3.0 to 3.2.1. and we will not release 3.3.0 until we close or nearly close 3.2.x
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we can remove the upgrade 3.3.0. We can use another PR to do this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
b9e452e
to
3681244
Compare
Quality Gate failedFailed conditions 42.1% Coverage on New Code (required ≥ 60%) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good job! Rest LGTM.
log.warn("The TaskInstance: {} state: {} finished, will release the TaskGroupQueue: {}", | ||
taskInstance.getName(), taskInstance.getState(), taskGroupQueue); | ||
releaseTaskGroupQueueSlot(taskGroupQueue); | ||
continue; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Useless code
taskGroupQueue.setInQueue(Flag.YES.getCode()); | ||
taskGroupQueue.setStatus(TaskGroupQueueStatus.ACQUIRE_SUCCESS); | ||
taskGroupQueue.setUpdateTime(new Date()); | ||
taskGroupQueueDao.updateById(taskGroupQueue); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is some repetitive code in the state change that might be suitable for turning into a function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good suggestion, I will use another PR to move these kind of code in the whole project into function.
Purpose of the pull request
Brief change log
In DS, we use TaskGroup to restrict the Task parallelism.
e.g. We have two task TaskA, TaskB, and they belong to the same task group, if we execute these two task at the same time, only one will get the taskGroup slot and the other will wait until be wakeup.
Unfortunately, if the TaskA wakeup TaskB failed, then TaskB will never be wakeup, since we use
put
mode, the waiting task will only be wakeup by other task, it will not try to pull the slot.This PR will refactor this.
This PR add a new component
TaskGroupCoordinator
which use to deal with acquire/release task group slot and notify task.Once a task need to acquire TaskGroup slot, it should call the
TaskGroupCoordinator::acquireTaskGroupSlot
, in this method, will insert a in queue TaskGroupQueue which status is WAITING.Once a task is finished it will call
TaskGroupCoordinator::releaseTaskGroupSlot
, in this method, will move the TaskGroupQueue out queue.And TaskGroupCoordinator will loop the slot, if there exist available slot, it will try to wakeup a waiting task.
Verify this pull request
This pull request is code cleanup without any test coverage.
(or)
This pull request is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(or)
If your pull request contain incompatible change, you should also add it to
docs/docs/en/guide/upgrede/incompatible.md