Skip to content

Commit

Permalink
Record the shuffle fallback count
Browse files Browse the repository at this point in the history
metrics

nit

log shuffle fallback count

invalid intend

docs
  • Loading branch information
turboFei committed Nov 5, 2024
1 parent 64f201d commit 6f0f4a9
Show file tree
Hide file tree
Showing 18 changed files with 194 additions and 27 deletions.
92 changes: 92 additions & 0 deletions assets/grafana/celeborn-dashboard.json
Original file line number Diff line number Diff line change
Expand Up @@ -1466,6 +1466,98 @@
],
"title": "metrics_LostWorkerCount_Value",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"description": "The count of shuffle fallbacks.",
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green"
},
{
"color": "red",
"value": 80
}
]
}
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 34
},
"id": 218,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"maxHeight": 600,
"mode": "single",
"sort": "none"
}
},
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"expr": "metrics_ShuffleFallbackCount_Value{instance=~\"${instance}\"}",
"legendFormat": "${baseLegend}",
"range": true,
"refId": "A"
}
],
"title": "metrics_ShuffleFallbackCount_Value",
"type": "timeseries"
}
],
"title": "Master",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ public <K, V, C> ShuffleHandle registerShuffle(
initializeLifecycleManager(appId);

if (fallbackPolicyRunner.applyFallbackPolicies(dependency, lifecycleManager)) {
lifecycleManager.shuffleFallbackCount().increment();
if (conf.getBoolean("spark.dynamicAllocation.enabled", false)
&& !conf.getBoolean("spark.shuffle.service.enabled", false)) {
logger.error(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class ApplicationHeartbeater(
appId: String,
conf: CelebornConf,
masterClient: MasterClient,
shuffleMetrics: () => (Long, Long),
shuffleMetrics: () => ((Long, Long), Long),
workerStatusTracker: WorkerStatusTracker,
registeredShuffles: ConcurrentHashMap.KeySetView[Int, java.lang.Boolean],
cancelAllActiveStages: String => Unit) extends Logging {
Expand All @@ -59,16 +59,18 @@ class ApplicationHeartbeater(
override def run(): Unit = {
try {
require(masterClient != null, "When sending a heartbeat, client shouldn't be null.")
val (tmpTotalWritten, tmpTotalFileCount) = shuffleMetrics()
val ((tmpTotalWritten, tmpTotalFileCount), tmpShuffleFallbackCount) = shuffleMetrics()
logInfo("Send app heartbeat with " +
s"written: ${Utils.bytesToString(tmpTotalWritten)}, file count: $tmpTotalFileCount")
s"written: ${Utils.bytesToString(tmpTotalWritten)}, file count: $tmpTotalFileCount, " +
s"shuffle fallback count: $tmpShuffleFallbackCount")
// UserResourceConsumption and DiskInfo are eliminated from WorkerInfo
// during serialization of HeartbeatFromApplication
val appHeartbeat =
HeartbeatFromApplication(
appId,
tmpTotalWritten,
tmpTotalFileCount,
tmpShuffleFallbackCount,
workerStatusTracker.getNeedCheckedWorkers().toList.asJava,
ZERO_UUID,
true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.security.SecureRandom
import java.util
import java.util.{function, List => JList}
import java.util.concurrent._
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.{AtomicInteger, LongAdder}
import java.util.function.{BiConsumer, Consumer}

import scala.collection.JavaConverters._
Expand Down Expand Up @@ -84,6 +84,7 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
private val unregisterShuffleTime = JavaUtils.newConcurrentHashMap[Int, Long]()

val registeredShuffle = ConcurrentHashMap.newKeySet[Int]()
val shuffleFallbackCount = new LongAdder()
// maintain each shuffle's map relation of WorkerInfo and partition location
val shuffleAllocatedWorkers = new ShuffleAllocatedWorkers
// shuffle id -> (partitionId -> newest PartitionLocation)
Expand Down Expand Up @@ -209,7 +210,7 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
appUniqueId,
conf,
masterClient,
() => commitManager.commitMetrics(),
() => commitManager.commitMetrics() -> shuffleFallbackCount.sumThenReset(),
workerStatusTracker,
registeredShuffle,
reason => cancelAllActiveStages(reason))
Expand Down
2 changes: 2 additions & 0 deletions common/src/main/proto/TransportMessages.proto
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,7 @@ message PbHeartbeatFromApplication {
string requestId = 4;
repeated PbWorkerInfo needCheckedWorkerList = 5;
bool shouldResponse = 6;
int64 shuffleFallbackCount = 7;
}

message PbHeartbeatFromApplicationResponse {
Expand Down Expand Up @@ -674,6 +675,7 @@ message PbSnapshotMetaInfo {
map<string, PbWorkerEventInfo> workerEventInfos = 15;
map<string, PbApplicationMeta> applicationMetas = 16;
repeated PbWorkerInfo decommissionWorkers = 17;
int64 shuffleTotalFallbackCount = 18;
}

message PbOpenStream {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,7 @@ object ControlMessages extends Logging {
appId: String,
totalWritten: Long,
fileCount: Long,
shuffleFallbackCount: Long,
needCheckedWorkerList: util.List[WorkerInfo],
override var requestId: String = ZERO_UUID,
shouldResponse: Boolean = false) extends MasterRequestMessage
Expand Down Expand Up @@ -809,6 +810,7 @@ object ControlMessages extends Logging {
appId,
totalWritten,
fileCount,
shuffleFallbackCount,
needCheckedWorkerList,
requestId,
shouldResponse) =>
Expand All @@ -817,6 +819,7 @@ object ControlMessages extends Logging {
.setRequestId(requestId)
.setTotalWritten(totalWritten)
.setFileCount(fileCount)
.setShuffleFallbackCount(shuffleFallbackCount)
.addAllNeedCheckedWorkerList(needCheckedWorkerList.asScala.map(
PbSerDeUtils.toPbWorkerInfo(_, true, true)).toList.asJava)
.setShouldResponse(shouldResponse)
Expand Down Expand Up @@ -1209,6 +1212,7 @@ object ControlMessages extends Logging {
pbHeartbeatFromApplication.getAppId,
pbHeartbeatFromApplication.getTotalWritten,
pbHeartbeatFromApplication.getFileCount,
pbHeartbeatFromApplication.getShuffleFallbackCount,
new util.ArrayList[WorkerInfo](
pbHeartbeatFromApplication.getNeedCheckedWorkerListList.asScala
.map(PbSerDeUtils.fromPbWorkerInfo).toList.asJava),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,7 @@ object PbSerDeUtils {
workers: java.util.Set[WorkerInfo],
partitionTotalWritten: java.lang.Long,
partitionTotalFileCount: java.lang.Long,
shuffleTotalFallbackCount: java.lang.Long,
appDiskUsageMetricSnapshots: Array[AppDiskUsageSnapShot],
currentAppDiskUsageMetricsSnapshot: AppDiskUsageSnapShot,
lostWorkers: ConcurrentHashMap[WorkerInfo, java.lang.Long],
Expand All @@ -480,6 +481,7 @@ object PbSerDeUtils {
.addAllWorkers(workers.asScala.map(toPbWorkerInfo(_, true, false)).asJava)
.setPartitionTotalWritten(partitionTotalWritten)
.setPartitionTotalFileCount(partitionTotalFileCount)
.setShuffleTotalFallbackCount(shuffleTotalFallbackCount)
// appDiskUsageMetricSnapshots can have null values,
// protobuf repeated value can't support null value in list.
.addAllAppDiskUsageMetricSnapshots(appDiskUsageMetricSnapshots.filter(_ != null)
Expand Down
1 change: 1 addition & 0 deletions docs/monitoring.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ These metrics are exposed by Celeborn master.
| RunningApplicationCount | The count of running applications. |
| ActiveShuffleSize | The active shuffle size of workers. |
| ActiveShuffleFileCount | The active shuffle file count of workers. |
| ShuffleFallbackCount | The count of shuffle fallbacks. |
| WorkerCount | The count of active workers. |
| LostWorkerCount | The count of workers in lost list. |
| ExcludedWorkerCount | The count of workers in excluded list. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ public abstract class AbstractMetaManager implements IMetadataHandler {
public double unhealthyDiskRatioThreshold;
public final LongAdder partitionTotalWritten = new LongAdder();
public final LongAdder partitionTotalFileCount = new LongAdder();
public final LongAdder shuffleTotalFallbackCount = new LongAdder();
public AppDiskUsageMetric appDiskUsageMetric = null;

public final ConcurrentHashMap<String, ApplicationMeta> applicationMetas =
Expand Down Expand Up @@ -139,10 +140,12 @@ public void updateBatchUnregisterShuffleMeta(List<String> shuffleKeys) {
}
}

public void updateAppHeartbeatMeta(String appId, long time, long totalWritten, long fileCount) {
public void updateAppHeartbeatMeta(
String appId, long time, long totalWritten, long fileCount, long shuffleFallbackCount) {
appHeartbeatTime.put(appId, time);
partitionTotalWritten.add(totalWritten);
partitionTotalFileCount.add(fileCount);
shuffleTotalFallbackCount.add(shuffleFallbackCount);
}

public void updateAppLostMeta(String appId) {
Expand Down Expand Up @@ -316,6 +319,7 @@ public void writeMetaInfoToFile(File file) throws IOException, RuntimeException
new HashSet(workersMap.values()),
partitionTotalWritten.sum(),
partitionTotalFileCount.sum(),
shuffleTotalFallbackCount.sum(),
appDiskUsageMetric.snapShots(),
appDiskUsageMetric.currentSnapShot().get(),
lostWorkers,
Expand Down Expand Up @@ -416,6 +420,7 @@ public void restoreMetaFromFile(File file) throws IOException {

partitionTotalWritten.add(snapshotMetaInfo.getPartitionTotalWritten());
partitionTotalFileCount.add(snapshotMetaInfo.getPartitionTotalFileCount());
shuffleTotalFallbackCount.add(snapshotMetaInfo.getShuffleTotalFallbackCount());
appDiskUsageMetric.restoreFromSnapshot(
snapshotMetaInfo.getAppDiskUsageMetricSnapshotsList().stream()
.map(PbSerDeUtils::fromPbAppDiskUsageSnapshot)
Expand Down Expand Up @@ -457,6 +462,7 @@ private void cleanUpState() {
workerLostEvents.clear();
partitionTotalWritten.reset();
partitionTotalFileCount.reset();
shuffleTotalFallbackCount.reset();
workerEventInfos.clear();
applicationMetas.clear();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,12 @@ void handleRequestSlots(
void handleBatchUnRegisterShuffles(List<String> shuffleKeys, String requestId);

void handleAppHeartbeat(
String appId, long totalWritten, long fileCount, long time, String requestId);
String appId,
long totalWritten,
long fileCount,
long shuffleFallbackCount,
long time,
String requestId);

void handleAppLost(String appId, String requestId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,13 @@ public void handleBatchUnRegisterShuffles(List<String> shuffleKeys, String reque

@Override
public void handleAppHeartbeat(
String appId, long totalWritten, long fileCount, long time, String requestId) {
updateAppHeartbeatMeta(appId, time, totalWritten, fileCount);
String appId,
long totalWritten,
long fileCount,
long shuffleFallbackCount,
long time,
String requestId) {
updateAppHeartbeatMeta(appId, time, totalWritten, fileCount, shuffleFallbackCount);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,12 @@ public void handleBatchUnRegisterShuffles(List<String> shuffleKeys, String reque

@Override
public void handleAppHeartbeat(
String appId, long totalWritten, long fileCount, long time, String requestId) {
String appId,
long totalWritten,
long fileCount,
long shuffleFallbackCount,
long time,
String requestId) {
try {
ratisServer.submitRequest(
ResourceRequest.newBuilder()
Expand All @@ -142,6 +147,7 @@ public void handleAppHeartbeat(
.setTime(time)
.setTotalWritten(totalWritten)
.setFileCount(fileCount)
.setShuffleFallbackCount(shuffleFallbackCount)
.build())
.build());
} catch (CelebornRuntimeException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,12 @@ public ResourceResponse handleWriteRequest(ResourceProtos.ResourceRequest reques
long time = request.getAppHeartbeatRequest().getTime();
long totalWritten = request.getAppHeartbeatRequest().getTotalWritten();
long fileCount = request.getAppHeartbeatRequest().getFileCount();
metaSystem.updateAppHeartbeatMeta(appId, time, totalWritten, fileCount);
long shuffleFallbackCount = request.getAppHeartbeatRequest().getShuffleFallbackCount();
if (shuffleFallbackCount > 0) {
LOG.warn("{} shuffle fallbacks in app {}", shuffleFallbackCount, appId);
}
metaSystem.updateAppHeartbeatMeta(
appId, time, totalWritten, fileCount, shuffleFallbackCount);
break;

case AppLost:
Expand Down
1 change: 1 addition & 0 deletions master/src/main/proto/Resource.proto
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ message AppHeartbeatRequest {
required int64 time = 2;
required int64 totalWritten = 3;
required int64 fileCount = 4;
optional int64 shuffleFallbackCount = 5;
}

message AppLostRequest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,10 @@ private[celeborn] class Master(
}).sum()
}

masterSource.addGauge(MasterSource.SHUFFLE_FALLBACK_COUNT) { () =>
statusSystem.shuffleTotalFallbackCount.longValue()
}

masterSource.addGauge(MasterSource.DEVICE_CELEBORN_TOTAL_CAPACITY) { () =>
statusSystem.workersMap.values().asScala.toList.map(_.totalSpace()).sum
}
Expand Down Expand Up @@ -391,6 +395,7 @@ private[celeborn] class Master(
appId,
totalWritten,
fileCount,
fallbackShuffles,
needCheckedWorkerList,
requestId,
shouldResponse) =>
Expand All @@ -403,6 +408,7 @@ private[celeborn] class Master(
appId,
totalWritten,
fileCount,
fallbackShuffles,
needCheckedWorkerList,
requestId,
shouldResponse))
Expand Down Expand Up @@ -1094,13 +1100,15 @@ private[celeborn] class Master(
appId: String,
totalWritten: Long,
fileCount: Long,
shuffleFallbackCount: Long,
needCheckedWorkerList: util.List[WorkerInfo],
requestId: String,
shouldResponse: Boolean): Unit = {
statusSystem.handleAppHeartbeat(
appId,
totalWritten,
fileCount,
shuffleFallbackCount,
System.currentTimeMillis(),
requestId)
val unknownWorkers = needCheckedWorkerList.asScala.filterNot(w =>
Expand Down
Loading

0 comments on commit 6f0f4a9

Please sign in to comment.