-
Notifications
You must be signed in to change notification settings - Fork 359
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CELEBORN-1472] Reduce CongestionController#userBufferStatuses call times. #2583
Conversation
@@ -1282,6 +1282,8 @@ class PushDataHandler(val workerSource: WorkerSource) extends BaseMessageHandler | |||
fileWriter.decrementPendingWrites() | |||
} | |||
} | |||
|
|||
updateBytesProduced(fileWriters.head, body.readableBytes()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems this changes the behavior, PartitionDataWriter
s in fileWriters
are different objects for handlePushMergedData
and body
contains data for those objects. IIUC we need to update each object for its own body, instead of only updating the first one with the whole body.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right, I will modify another place, it seems that there is no need to call userBufferStatuses.computeIfAbsent
every time
Pls update pr title and pr desc |
@AngersZhuuuu @waitinfuture PTAL. |
...main/java/org/apache/celeborn/service/deploy/worker/congestcontrol/CongestionController.java
Outdated
Show resolved
Hide resolved
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## main #2583 +/- ##
==========================================
- Coverage 40.49% 39.98% -0.50%
==========================================
Files 222 233 +11
Lines 14289 14695 +406
Branches 1291 1338 +47
==========================================
+ Hits 5785 5875 +90
- Misses 8173 8487 +314
- Partials 331 333 +2 ☔ View full report in Codecov by Sentry. |
@@ -153,6 +156,10 @@ public PartitionDataWriter( | |||
this.mapIdBitMap = new RoaringBitmap(); | |||
} | |||
takeBuffer(); | |||
CongestionController congestionController = CongestionController.instance(); | |||
if (!isMemoryShuffleFile.get() && congestionController != null) { | |||
userBufferInfo = congestionController.getUserBuffer(getDiskFileInfo().getUserIdentifier()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO the CongestionController should control all file write operations including memory file ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I keep the original code logic unchanged. In the original logic, CongestionController does not manage memory files. Maybe I can correct this logic in the next PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I keep the original code logic unchanged. In the original logic, CongestionController does not manage memory files. Maybe I can correct this logic in the next PR?
LGTM
congestionController -> | ||
congestionController.produceBytes(diskFileInfo.getUserIdentifier(), numBytes)); | ||
if (userBufferInfo != null) { | ||
userBufferInfo.updateInfo( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
…imes ### What changes were proposed in this pull request? Reduce CongestionController#userBufferStatuses call times. ### Why are the changes needed? When we use sort based shuffle writer, The number of PushMergedData requests has increased which make CongestionController#produceBytes taking up much cpu time. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing uts. Closes #2583 from leixm/issue_1472. Authored-by: Xianming Lei <[email protected]> Signed-off-by: Shuang <[email protected]> (cherry picked from commit d362d9f) Signed-off-by: Shuang <[email protected]>
Merge to main(v0.6.0) and branch-0.5(v0.5.1) |
Thank you for you review. @AngersZhuuuu @RexXiong |
What changes were proposed in this pull request?
Reduce CongestionController#userBufferStatuses call times.
Why are the changes needed?
When we use sort based shuffle writer, The number of PushMergedData requests has increased which make CongestionController#produceBytes taking up much cpu time.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Existing uts.