Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CELEBORN-1472] Reduce CongestionController#userBufferStatuses call times. #2583

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -113,29 +113,6 @@ public static CongestionController instance() {
return _INSTANCE;
}

private static class UserBufferInfo {
long timestamp;
final BufferStatusHub bufferStatusHub;

public UserBufferInfo(long timestamp, BufferStatusHub bufferStatusHub) {
this.timestamp = timestamp;
this.bufferStatusHub = bufferStatusHub;
}

synchronized void updateInfo(long timestamp, BufferStatusHub.BufferStatusNode node) {
this.timestamp = timestamp;
this.bufferStatusHub.add(timestamp, node);
}

public long getTimestamp() {
return timestamp;
}

public BufferStatusHub getBufferStatusHub() {
return bufferStatusHub;
}
}

/**
* 1. If the total pending bytes is over high watermark, will congest users who produce speed is
* higher than the potential average consume speed.
Expand Down Expand Up @@ -166,24 +143,19 @@ public boolean isUserCongested(UserIdentifier userIdentifier) {
return false;
}

public void produceBytes(UserIdentifier userIdentifier, int numBytes) {
long currentTimeMillis = System.currentTimeMillis();
UserBufferInfo userBufferInfo =
userBufferStatuses.computeIfAbsent(
userIdentifier,
user -> {
logger.info("New user {} comes, initializing its rate status", user);
BufferStatusHub bufferStatusHub = new BufferStatusHub(sampleTimeWindowSeconds);
UserBufferInfo userInfo = new UserBufferInfo(currentTimeMillis, bufferStatusHub);
workerSource.addGauge(
WorkerSource.USER_PRODUCE_SPEED(),
userIdentifier.toJMap(),
() -> getUserProduceSpeed(userInfo));
return userInfo;
});

BufferStatusHub.BufferStatusNode node = new BufferStatusHub.BufferStatusNode(numBytes);
userBufferInfo.updateInfo(currentTimeMillis, node);
public UserBufferInfo getUserBuffer(UserIdentifier userIdentifier) {
return userBufferStatuses.computeIfAbsent(
userIdentifier,
user -> {
logger.info("New user {} comes, initializing its rate status", user);
BufferStatusHub bufferStatusHub = new BufferStatusHub(sampleTimeWindowSeconds);
UserBufferInfo userInfo = new UserBufferInfo(System.currentTimeMillis(), bufferStatusHub);
workerSource.addGauge(
WorkerSource.USER_PRODUCE_SPEED(),
userIdentifier.toJMap(),
() -> getUserProduceSpeed(userInfo));
return userInfo;
});
}

public void consumeBytes(int numBytes) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.celeborn.service.deploy.worker.congestcontrol;

public class UserBufferInfo {
long timestamp;
final BufferStatusHub bufferStatusHub;

public UserBufferInfo(long timestamp, BufferStatusHub bufferStatusHub) {
this.timestamp = timestamp;
this.bufferStatusHub = bufferStatusHub;
}

public synchronized void updateInfo(long timestamp, BufferStatusHub.BufferStatusNode node) {
this.timestamp = timestamp;
this.bufferStatusHub.add(timestamp, node);
}

public long getTimestamp() {
return timestamp;
}

public BufferStatusHub getBufferStatusHub() {
return bufferStatusHub;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -50,7 +49,9 @@
import org.apache.celeborn.common.unsafe.Platform;
import org.apache.celeborn.common.util.FileChannelUtils;
import org.apache.celeborn.service.deploy.worker.WorkerSource;
import org.apache.celeborn.service.deploy.worker.congestcontrol.BufferStatusHub;
import org.apache.celeborn.service.deploy.worker.congestcontrol.CongestionController;
import org.apache.celeborn.service.deploy.worker.congestcontrol.UserBufferInfo;
import org.apache.celeborn.service.deploy.worker.memory.MemoryManager;

/*
Expand Down Expand Up @@ -103,6 +104,8 @@ public abstract class PartitionDataWriter implements DeviceObserver {
private Exception exception = null;
private boolean metricsCollectCriticalEnabled;

private UserBufferInfo userBufferInfo = null;

public PartitionDataWriter(
StorageManager storageManager,
AbstractSource workerSource,
Expand Down Expand Up @@ -153,6 +156,10 @@ public PartitionDataWriter(
this.mapIdBitMap = new RoaringBitmap();
}
takeBuffer();
CongestionController congestionController = CongestionController.instance();
if (!isMemoryShuffleFile.get() && congestionController != null) {
userBufferInfo = congestionController.getUserBuffer(getDiskFileInfo().getUserIdentifier());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO the CongestionController should control all file write operations including memory file ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I keep the original code logic unchanged. In the original logic, CongestionController does not manage memory files. Maybe I can correct this logic in the next PR?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I keep the original code logic unchanged. In the original logic, CongestionController does not manage memory files. Maybe I can correct this logic in the next PR?

LGTM

}
}

public void initFileChannelsForDiskFile() throws IOException {
Expand Down Expand Up @@ -288,10 +295,10 @@ public void write(ByteBuf data) throws IOException {
MemoryManager.instance().increaseMemoryFileStorage(numBytes);
} else {
MemoryManager.instance().incrementDiskBuffer(numBytes);
Optional.ofNullable(CongestionController.instance())
.ifPresent(
congestionController ->
congestionController.produceBytes(diskFileInfo.getUserIdentifier(), numBytes));
if (userBufferInfo != null) {
userBufferInfo.updateInfo(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

System.currentTimeMillis(), new BufferStatusHub.BufferStatusNode(numBytes));
}
AngersZhuuuu marked this conversation as resolved.
Show resolved Hide resolved
}

synchronized (flushLock) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void testSingleUser() {

Assert.assertFalse(controller.isUserCongested(userIdentifier));

controller.produceBytes(userIdentifier, 1001);
produceBytes(userIdentifier, 1001);
pendingBytes = 1001;
controller.checkCongestion();
Assert.assertTrue(controller.isUserCongested(userIdentifier));
Expand All @@ -91,17 +91,17 @@ public void testMultipleUsers() {

// If pendingBytes exceed the high watermark, user1 produce speed > avg consume speed
// While user2 produce speed < avg consume speed
controller.produceBytes(user1, 800);
controller.produceBytes(user2, 201);
produceBytes(user1, 800);
produceBytes(user2, 201);
controller.consumeBytes(500);
pendingBytes = 1001;
controller.checkCongestion();
Assert.assertTrue(controller.isUserCongested(user1));
Assert.assertFalse(controller.isUserCongested(user2));

// If both users higher than the avg consume speed, should congest them all.
controller.produceBytes(user1, 800);
controller.produceBytes(user2, 800);
produceBytes(user1, 800);
produceBytes(user2, 800);
controller.consumeBytes(500);
pendingBytes = 1600;
controller.checkCongestion();
Expand All @@ -119,7 +119,7 @@ public void testMultipleUsers() {
public void testUserMetrics() throws InterruptedException {
UserIdentifier user = new UserIdentifier("test", "celeborn");
Assert.assertFalse(controller.isUserCongested(user));
controller.produceBytes(user, 800);
produceBytes(user, 800);

Assert.assertTrue(
isGaugeExist(
Expand Down Expand Up @@ -159,4 +159,10 @@ private boolean isGaugeExist(String name, Map<String, String> labels) {
.count()
== 1;
}

private void produceBytes(UserIdentifier userIdentifier, long numBytes) {
controller
.getUserBuffer(userIdentifier)
.updateInfo(System.currentTimeMillis(), new BufferStatusHub.BufferStatusNode(numBytes));
}
}
Loading