-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
enhance: Add compaction task slot usage logic #34581
Conversation
@wayblink E2e jenkins job failed, comment |
/run-cpu-e2e |
287a20b
to
7abe847
Compare
rerun ut |
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #34581 +/- ##
========================================
Coverage 80.76% 80.77%
========================================
Files 1161 1161
Lines 141018 141148 +130
========================================
+ Hits 113898 114017 +119
- Misses 22755 22760 +5
- Partials 4365 4371 +6
|
7abe847
to
918d293
Compare
918d293
to
9b0e8db
Compare
3e0ff3c
to
b4b9eaa
Compare
b4b9eaa
to
afc4110
Compare
internal/datacoord/compaction.go
Outdated
nodeID = id | ||
maxSlots = slots | ||
} | ||
} | ||
// update the input nodeSlots | ||
nodeSlots[nodeID] = nodeSlots[nodeID] - acquireSlot |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should decrease the slots after nodeID meta saved
@@ -558,7 +566,7 @@ func (t *clusteringCompactionTask) GetLabel() string { | |||
} | |||
|
|||
func (t *clusteringCompactionTask) NeedReAssignNodeID() bool { | |||
return t.GetState() == datapb.CompactionTaskState_pipelining && t.GetNodeID() == 0 | |||
return t.GetState() == datapb.CompactionTaskState_pipelining && (t.GetNodeID() == 0 || t.GetNodeID() == NullNodeID) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dup with pr: #34537
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, I learn this from your pr
dropped *typeutil.ConcurrentSet[string] // vchannel dropped | ||
usingSlots *atomic.Int64 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually it is a double check. Executor can refuse new task even datacoord works unexpectedly.
@@ -37,7 +39,7 @@ const ( | |||
|
|||
type Executor interface { | |||
Start(ctx context.Context) | |||
Execute(task Compactor) | |||
Execute(task Compactor) (bool, error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, if slots are used up. return a DataNodeSlotExhausted error
3f61843
to
27d07dd
Compare
b7c26b4
to
28b8d7c
Compare
377995e
to
a5a43c2
Compare
Signed-off-by: wayblink <[email protected]>
a5a43c2
to
eefcccb
Compare
issue: #34544 pr: #34581 --------- Signed-off-by: wayblink <[email protected]>
/approve |
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: czs007, wayblink The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
#34544