Skip to content

Commit

Permalink
[CELEBORN-1660] Only count the available workers device free capacity
Browse files Browse the repository at this point in the history
save

check

ut

doc

handle app heart beat

UT

UT

master metrics description
  • Loading branch information
turboFei committed Nov 6, 2024
1 parent 7dcd259 commit 469b027
Show file tree
Hide file tree
Showing 10 changed files with 188 additions and 29 deletions.
12 changes: 6 additions & 6 deletions assets/grafana/celeborn-dashboard.json
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"description": "The count of active workers.",
"description": "The count of workers in available list.",
"fieldConfig": {
"defaults": {
"color": {
Expand Down Expand Up @@ -243,12 +243,12 @@
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"expr": "metrics_WorkerCount_Value{instance=~\"${instance}\"}",
"expr": "metrics_AvailableWorkerCount_Value{instance=~\"${instance}\"}",
"legendFormat": "${baseLegend}",
"refId": "A"
}
],
"title": "metrics_WorkerCount_Value",
"title": "metrics_AvailableWorkerCount_Value",
"type": "timeseries"
},
{
Expand Down Expand Up @@ -1287,7 +1287,7 @@
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"description": "The count of workers in available list.",
"description": "The count of active workers.",
"fieldConfig": {
"defaults": {
"color": {
Expand Down Expand Up @@ -1366,13 +1366,13 @@
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
"expr": "metrics_AvailableWorkerCount_Value{instance=~\"${instance}\"}",
"expr": "metrics_WorkerCount_Value{instance=~\"${instance}\"}",
"legendFormat": "${baseLegend}",
"range": true,
"refId": "A"
}
],
"title": "metrics_AvailableWorkerCount_Value",
"title": "metrics_WorkerCount_Value",
"type": "timeseries"
},
{
Expand Down
2 changes: 1 addition & 1 deletion docs/monitoring.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ These metrics are exposed by Celeborn master.
| Metric Name | Description |
|--------------------------|---------------------------------------------------------------------------------|
| RegisteredShuffleCount | The count of registered shuffle. |
| DeviceCelebornFreeBytes | The actual usable space of Celeborn for device. |
| DeviceCelebornFreeBytes | The actual usable space of Celeborn available workers for device. |
| DeviceCelebornTotalBytes | The total space of Celeborn for device. |
| RunningApplicationCount | The count of running applications. |
| ActiveShuffleSize | The active shuffle size of workers. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import scala.Option;
import scala.Tuple2;

import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.Node;
import org.slf4j.Logger;
Expand All @@ -57,6 +58,13 @@
import org.apache.celeborn.common.util.Utils;
import org.apache.celeborn.common.util.WorkerStatusUtils;

/**
* Note: Do not update the worker collections directly from outside the metadata manager, especially
* {@link #workersMap}, {@link #workerEventInfos}, {@link #shutdownWorkers}, {@link
* #excludedWorkers}, {@link #manuallyExcludedWorkers}, {@link #availableWorkers}.
*
* <p>All updates should be done through the provided methods to ensure consistency.
*/
public abstract class AbstractMetaManager implements IMetadataHandler {
private static final Logger LOG = LoggerFactory.getLogger(AbstractMetaManager.class);

Expand All @@ -65,6 +73,7 @@ public abstract class AbstractMetaManager implements IMetadataHandler {
JavaUtils.newConcurrentHashMap();
public final Set<String> hostnameSet = ConcurrentHashMap.newKeySet();
public final Map<String, WorkerInfo> workersMap = JavaUtils.newConcurrentHashMap();
public final Set<WorkerInfo> availableWorkers = ConcurrentHashMap.newKeySet();

public final ConcurrentHashMap<WorkerInfo, Long> lostWorkers = JavaUtils.newConcurrentHashMap();
public final ConcurrentHashMap<WorkerInfo, WorkerEventInfo> workerEventInfos =
Expand Down Expand Up @@ -151,10 +160,33 @@ public void updateAppLostMeta(String appId) {
applicationMetas.remove(appId);
}

public void updateWorkerExcludeMeta(
@VisibleForTesting
public void updateExcludedWorkersMeta(
List<WorkerInfo> workersToAdd, List<WorkerInfo> workersToRemove) {
workersToAdd.forEach(
worker -> {
excludedWorkers.add(worker);
availableWorkers.remove(worker);
});
workersToRemove.forEach(
worker -> {
excludedWorkers.remove(worker);
updateAvailableWorkers(worker);
});
}

public void updateManuallyExcludedWorkersMeta(
List<WorkerInfo> workersToAdd, List<WorkerInfo> workersToRemove) {
manuallyExcludedWorkers.addAll(workersToAdd);
workersToRemove.forEach(manuallyExcludedWorkers::remove);
workersToAdd.forEach(
worker -> {
manuallyExcludedWorkers.add(worker);
availableWorkers.remove(worker);
});
workersToRemove.forEach(
worker -> {
manuallyExcludedWorkers.remove(worker);
updateAvailableWorkers(worker);
});
}

public void reviseLostShuffles(String appId, List<Integer> lostShuffles) {
Expand All @@ -173,6 +205,7 @@ public void updateWorkerLostMeta(
synchronized (workersMap) {
workersMap.remove(worker.toUniqueId());
lostWorkers.put(worker, System.currentTimeMillis());
availableWorkers.remove(worker);
}
excludedWorkers.remove(worker);
workerLostEvents.remove(worker);
Expand All @@ -185,6 +218,7 @@ public void updateWorkerRemoveMeta(
synchronized (workersMap) {
workersMap.remove(worker.toUniqueId());
lostWorkers.put(worker, System.currentTimeMillis());
availableWorkers.remove(worker);
}
excludedWorkers.remove(worker);
}
Expand All @@ -197,6 +231,7 @@ public void removeWorkersUnavailableInfoMeta(List<WorkerInfo> unavailableWorkers
shutdownWorkers.remove(workerInfo);
workerEventInfos.remove(workerInfo);
decommissionWorkers.remove(workerInfo);
updateAvailableWorkers(workerInfo);
}
}
}
Expand Down Expand Up @@ -256,6 +291,11 @@ public void updateWorkerHeartbeatMeta(
// only unblack if numSlots larger than 0
excludedWorkers.remove(worker);
}

// try to update the available workers if the worker status is Normal
if (workerStatus.getState() == PbWorkerStatus.State.Normal) {
updateAvailableWorkers(worker);
}
}

public void updateRegisterWorkerMeta(
Expand Down Expand Up @@ -294,6 +334,7 @@ public void updateRegisterWorkerMeta(
excludedWorkers.remove(workerInfo);
workerEventInfos.remove(workerInfo);
decommissionWorkers.remove(workerInfo);
updateAvailableWorkers(workerInfo);
}
}

Expand Down Expand Up @@ -429,6 +470,11 @@ public void restoreMetaFromFile(File file) throws IOException {
.getApplicationMetasMap()
.forEach(
(key, value) -> applicationMetas.put(key, PbSerDeUtils.fromPbApplicationMeta(value)));

availableWorkers.addAll(
workersMap.values().stream()
.filter(worker -> isWorkerAvailable(worker))
.collect(Collectors.toSet()));
} catch (Exception e) {
throw new IOException(e);
}
Expand All @@ -448,6 +494,7 @@ private void cleanUpState() {
registeredAppAndShuffles.clear();
hostnameSet.clear();
workersMap.clear();
availableWorkers.clear();
lostWorkers.clear();
appHeartbeatTime.clear();
excludedWorkers.clear();
Expand All @@ -464,6 +511,7 @@ private void cleanUpState() {
public void updateMetaByReportWorkerUnavailable(List<WorkerInfo> failedWorkers) {
synchronized (this.workersMap) {
shutdownWorkers.addAll(failedWorkers);
availableWorkers.removeAll(failedWorkers);
}
}

Expand All @@ -478,8 +526,10 @@ public void updateWorkerEventMeta(int workerEventTypeValue, List<WorkerInfo> wor
if (workerEventInfo == null || !workerEventInfo.isSameEvent(eventType.getNumber())) {
if (eventType == ResourceProtos.WorkerEventType.None) {
workerEventInfos.remove(workerInfo);
updateAvailableWorkers(workerInfo);
} else {
workerEventInfos.put(workerInfo, new WorkerEventInfo(eventType.getNumber(), eventTime));
availableWorkers.remove(workerInfo);
}
}
}
Expand All @@ -489,6 +539,7 @@ public void updateWorkerEventMeta(int workerEventTypeValue, List<WorkerInfo> wor
public void updateMetaByReportWorkerDecommission(List<WorkerInfo> workers) {
synchronized (this.workersMap) {
decommissionWorkers.addAll(workers);
availableWorkers.removeAll(workers);
}
}

Expand Down Expand Up @@ -525,14 +576,25 @@ public void updatePartitionSize() {
workers.forEach(workerInfo -> workerInfo.updateDiskMaxSlots(estimatedPartitionSize));
}

public boolean isWorkerAvailable(WorkerInfo workerInfo) {
private boolean isWorkerAvailable(WorkerInfo workerInfo) {
return (workerInfo.getWorkerStatus().getState() == PbWorkerStatus.State.Normal
&& !workerEventInfos.containsKey(workerInfo))
&& !excludedWorkers.contains(workerInfo)
&& !shutdownWorkers.contains(workerInfo)
&& !manuallyExcludedWorkers.contains(workerInfo);
}

private void updateAvailableWorkers(WorkerInfo worker) {
synchronized (workersMap) {
Optional<WorkerInfo> workerInfo = Optional.ofNullable(workersMap.get(worker.toUniqueId()));
if (workerInfo.map(this::isWorkerAvailable).orElse(false)) {
availableWorkers.add(workerInfo.get());
} else {
availableWorkers.remove(worker);
}
}
}

public void updateApplicationMeta(ApplicationMeta applicationMeta) {
applicationMetas.putIfAbsent(applicationMeta.appId(), applicationMeta);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public void handleAppLost(String appId, String requestId) {
@Override
public void handleWorkerExclude(
List<WorkerInfo> workersToAdd, List<WorkerInfo> workersToRemove, String requestId) {
updateWorkerExcludeMeta(workersToAdd, workersToRemove);
updateManuallyExcludedWorkersMeta(workersToAdd, workersToRemove);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ public ResourceResponse handleWriteRequest(ResourceProtos.ResourceRequest reques
addAddresses.stream().map(MetaUtil::addrToInfo).collect(Collectors.toList());
List<WorkerInfo> workersToRemove =
removeAddresses.stream().map(MetaUtil::addrToInfo).collect(Collectors.toList());
metaSystem.updateWorkerExcludeMeta(workersToAdd, workersToRemove);
metaSystem.updateManuallyExcludedWorkersMeta(workersToAdd, workersToRemove);
break;

case WorkerLost:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,9 +229,7 @@ private[celeborn] class Master(
statusSystem.excludedWorkers.size + statusSystem.manuallyExcludedWorkers.size
}
masterSource.addGauge(MasterSource.AVAILABLE_WORKER_COUNT) { () =>
statusSystem.workersMap.values().asScala.count { w =>
statusSystem.isWorkerAvailable(w)
}
statusSystem.availableWorkers.size()
}
masterSource.addGauge(MasterSource.SHUTDOWN_WORKER_COUNT) { () =>
statusSystem.shutdownWorkers.size
Expand Down Expand Up @@ -266,7 +264,7 @@ private[celeborn] class Master(
}

masterSource.addGauge(MasterSource.DEVICE_CELEBORN_FREE_CAPACITY) { () =>
statusSystem.workersMap.values().asScala.toList.map(_.totalActualUsableSpace()).sum
statusSystem.availableWorkers.asScala.toList.map(_.totalActualUsableSpace()).sum
}

masterSource.addGauge(MasterSource.IS_ACTIVE_MASTER) { () => isMasterActive }
Expand Down Expand Up @@ -942,9 +940,10 @@ private[celeborn] class Master(
logInfo(s"Offer slots successfully for $numReducers reducers of $shuffleKey" +
s" on ${slots.size()} workers.")

val workersNotSelected = availableWorkers.asScala.filter(!slots.containsKey(_))
val offerSlotsExtraSize = Math.min(conf.masterSlotAssignExtraSlots, workersNotSelected.size)
val workersNotSelectedSize = availableWorkers.size() - slots.size()
val offerSlotsExtraSize = Math.min(conf.masterSlotAssignExtraSlots, workersNotSelectedSize)
if (offerSlotsExtraSize > 0) {
val workersNotSelected = availableWorkers.asScala.filterNot(slots.containsKey)
var index = Random.nextInt(workersNotSelected.size)
(1 to offerSlotsExtraSize).foreach(_ => {
slots.put(
Expand Down Expand Up @@ -1108,7 +1107,7 @@ private[celeborn] class Master(
if (shouldResponse) {
// UserResourceConsumption and DiskInfo are eliminated from WorkerInfo
// during serialization of HeartbeatFromApplicationResponse
var appRelatedShuffles =
val appRelatedShuffles =
statusSystem.registeredAppAndShuffles.getOrDefault(appId, Collections.emptySet())
context.reply(HeartbeatFromApplicationResponse(
StatusCode.SUCCESS,
Expand Down Expand Up @@ -1210,7 +1209,7 @@ private[celeborn] class Master(
}

private def handleCheckWorkersAvailable(context: RpcCallContext): Unit = {
context.reply(CheckWorkersAvailableResponse(!workersAvailable().isEmpty))
context.reply(CheckWorkersAvailableResponse(!statusSystem.availableWorkers.isEmpty))
}

private def handleWorkerEvent(
Expand All @@ -1224,9 +1223,13 @@ private[celeborn] class Master(

private def workersAvailable(
tmpExcludedWorkerList: Set[WorkerInfo] = Set.empty): util.List[WorkerInfo] = {
statusSystem.workersMap.values().asScala.filter { w =>
statusSystem.isWorkerAvailable(w) && !tmpExcludedWorkerList.contains(w)
}.toList.asJava
if (tmpExcludedWorkerList.isEmpty) {
new util.ArrayList[WorkerInfo](statusSystem.availableWorkers)
} else {
val availableWorkers = new util.HashSet(statusSystem.availableWorkers)
tmpExcludedWorkerList.foreach(availableWorkers.remove)
new util.ArrayList[WorkerInfo](availableWorkers)
}
}

private def handleRequestForApplicationMeta(
Expand Down
Loading

0 comments on commit 469b027

Please sign in to comment.