Skip to content

Commit

Permalink
[CELEBORN-1680] Introduce ShuffleFallbackCount metrics
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

As title, introduce metrics_ShuffleFallbackCount_Value.

### Why are the changes needed?
To provide the insights that how many shuffles fallback to spark built-in shuffle service. It is helpful for us  to deprecate the ESS progressively.

Currently, we plan to set the `celeborn.client.spark.shuffle.fallback.numPartitionsThreshold` to fallback the shuffle with too large shuffle partitions number, for example: 50k.

In the future, we plan to limit the acceptable maximum shuffle partition number so that the bad job would be rejected and not impact the celeborn master health.

### Does this PR introduce _any_ user-facing change?
Yes, new metrics.

### How was this patch tested?
UT.
<img width="1188" alt="image" src="https://github.com/user-attachments/assets/8193c12c-5dc9-4783-b64b-6a8449a1bea4">

Closes #2866 from turboFei/record_fallback.

Lead-authored-by: Wang, Fei <[email protected]>
Co-authored-by: Fei Wang <[email protected]>
Signed-off-by: mingji <[email protected]>
  • Loading branch information
2 people authored and FMX committed Nov 7, 2024
1 parent d44b23c commit f1bda46
Show file tree
Hide file tree
Showing 19 changed files with 253 additions and 27 deletions.
92 changes: 92 additions & 0 deletions assets/grafana/celeborn-dashboard.json
Original file line number Diff line number Diff line change
Expand Up @@ -1466,6 +1466,98 @@
],
"title": "metrics_LostWorkerCount_Value",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"description": "The count of shuffle fallbacks.",
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green"
},
{
"color": "red",
"value": 80
}
]
}
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 34
},
"id": 218,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"maxHeight": 600,
"mode": "single",
"sort": "none"
}
},
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"expr": "metrics_ShuffleFallbackCount_Value{instance=~\"${instance}\"}",
"legendFormat": "${baseLegend}",
"range": true,
"refId": "A"
}
],
"title": "metrics_ShuffleFallbackCount_Value",
"type": "timeseries"
}
],
"title": "Master",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ public <K, V, C> ShuffleHandle registerShuffle(
initializeLifecycleManager(appId);

if (fallbackPolicyRunner.applyFallbackPolicies(dependency, lifecycleManager)) {
lifecycleManager.shuffleFallbackCount().increment();
if (conf.getBoolean("spark.dynamicAllocation.enabled", false)
&& !conf.getBoolean("spark.shuffle.service.enabled", false)) {
logger.error(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class ApplicationHeartbeater(
appId: String,
conf: CelebornConf,
masterClient: MasterClient,
shuffleMetrics: () => (Long, Long),
shuffleMetrics: () => ((Long, Long), Long),
workerStatusTracker: WorkerStatusTracker,
registeredShuffles: ConcurrentHashMap.KeySetView[Int, java.lang.Boolean],
cancelAllActiveStages: String => Unit) extends Logging {
Expand All @@ -59,16 +59,18 @@ class ApplicationHeartbeater(
override def run(): Unit = {
try {
require(masterClient != null, "When sending a heartbeat, client shouldn't be null.")
val (tmpTotalWritten, tmpTotalFileCount) = shuffleMetrics()
val ((tmpTotalWritten, tmpTotalFileCount), tmpShuffleFallbackCount) = shuffleMetrics()
logInfo("Send app heartbeat with " +
s"written: ${Utils.bytesToString(tmpTotalWritten)}, file count: $tmpTotalFileCount")
s"written: ${Utils.bytesToString(tmpTotalWritten)}, file count: $tmpTotalFileCount, " +
s"shuffle fallback count: $tmpShuffleFallbackCount")
// UserResourceConsumption and DiskInfo are eliminated from WorkerInfo
// during serialization of HeartbeatFromApplication
val appHeartbeat =
HeartbeatFromApplication(
appId,
tmpTotalWritten,
tmpTotalFileCount,
tmpShuffleFallbackCount,
workerStatusTracker.getNeedCheckedWorkers().toList.asJava,
ZERO_UUID,
true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.security.SecureRandom
import java.util
import java.util.{function, List => JList}
import java.util.concurrent._
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.{AtomicInteger, LongAdder}
import java.util.function.{BiConsumer, Consumer}

import scala.collection.JavaConverters._
Expand Down Expand Up @@ -84,6 +84,7 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
private val unregisterShuffleTime = JavaUtils.newConcurrentHashMap[Int, Long]()

val registeredShuffle = ConcurrentHashMap.newKeySet[Int]()
val shuffleFallbackCount = new LongAdder()
// maintain each shuffle's map relation of WorkerInfo and partition location
val shuffleAllocatedWorkers = new ShuffleAllocatedWorkers
// shuffle id -> (partitionId -> newest PartitionLocation)
Expand Down Expand Up @@ -209,7 +210,7 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
appUniqueId,
conf,
masterClient,
() => commitManager.commitMetrics(),
() => commitManager.commitMetrics() -> shuffleFallbackCount.sumThenReset(),
workerStatusTracker,
registeredShuffle,
reason => cancelAllActiveStages(reason))
Expand Down
2 changes: 2 additions & 0 deletions common/src/main/proto/TransportMessages.proto
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,7 @@ message PbHeartbeatFromApplication {
string requestId = 4;
repeated PbWorkerInfo needCheckedWorkerList = 5;
bool shouldResponse = 6;
int64 shuffleFallbackCount = 7;
}

message PbHeartbeatFromApplicationResponse {
Expand Down Expand Up @@ -674,6 +675,7 @@ message PbSnapshotMetaInfo {
map<string, PbWorkerEventInfo> workerEventInfos = 15;
map<string, PbApplicationMeta> applicationMetas = 16;
repeated PbWorkerInfo decommissionWorkers = 17;
int64 shuffleTotalFallbackCount = 18;
}

message PbOpenStream {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ case class NamedCounter(name: String, counter: Counter, labels: Map[String, Stri
case class NamedGauge[T](name: String, gauge: Gauge[T], labels: Map[String, String])
extends MetricLabels

case class NamedMeter(name: String, meter: Meter, labels: Map[String, String])
extends MetricLabels

case class NamedHistogram(name: String, histogram: Histogram, labels: Map[String, String])
extends MetricLabels

Expand Down Expand Up @@ -79,6 +82,9 @@ abstract class AbstractSource(conf: CelebornConf, role: String)
protected val namedGauges: ConcurrentHashMap[String, NamedGauge[_]] =
JavaUtils.newConcurrentHashMap[String, NamedGauge[_]]()

protected val namedMeters: ConcurrentHashMap[String, NamedMeter] =
JavaUtils.newConcurrentHashMap[String, NamedMeter]()

def addGauge[T](
name: String,
labels: Map[String, String],
Expand Down Expand Up @@ -112,6 +118,33 @@ abstract class AbstractSource(conf: CelebornConf, role: String)
addGauge(name, Map.empty[String, String], gauge)
}

def addMeter(
name: String,
labels: Map[String, String],
meter: Meter): Unit = {
namedMeters.putIfAbsent(
metricNameWithCustomizedLabels(name, labels),
NamedMeter(name, meter, labels ++ staticLabels))
}

def addMeter(
name: String,
labels: JMap[String, String],
meter: Meter): Unit = {
addMeter(name, labels.asScala.toMap, meter)
}

def addMeter(name: String, labels: Map[String, String] = Map.empty)(f: () => Long): Unit = {
addMeter(
name,
labels,
metricRegistry.meter(metricNameWithCustomizedLabels(name, labels), new MeterSupplier(f)))
}

def addMeter(name: String, meter: Meter): Unit = {
addMeter(name, Map.empty[String, String], meter)
}

protected val namedTimers
: ConcurrentHashMap[String, (NamedTimer, ConcurrentHashMap[String, Long])] =
JavaUtils.newConcurrentHashMap[String, (NamedTimer, ConcurrentHashMap[String, Long])]()
Expand Down Expand Up @@ -152,6 +185,10 @@ abstract class AbstractSource(conf: CelebornConf, role: String)
namedGauges.values().asScala.toList
}

def meters(): List[NamedMeter] = {
namedMeters.values().asScala.toList
}

def histograms(): List[NamedHistogram] = {
List.empty[NamedHistogram]
}
Expand Down Expand Up @@ -321,6 +358,22 @@ abstract class AbstractSource(conf: CelebornConf, role: String)
updateInnerMetrics(sb.toString())
}

def recordMeter(nm: NamedMeter): Unit = {
val timestamp = System.currentTimeMillis
val sb = new StringBuilder
val label = nm.labelString
sb.append(s"${normalizeKey(nm.name)}Count$label ${nm.meter.getCount} $timestamp\n")
sb.append(s"${normalizeKey(nm.name)}MeanRate$label ${nm.meter.getMeanRate} $timestamp\n")
sb.append(
s"${normalizeKey(nm.name)}OneMinuteRate$label ${nm.meter.getOneMinuteRate} $timestamp\n")
sb.append(
s"${normalizeKey(nm.name)}FiveMinuteRate$label ${nm.meter.getFiveMinuteRate} $timestamp\n")
sb.append(
s"${normalizeKey(nm.name)}FifteenMinuteRate$label ${nm.meter.getFifteenMinuteRate} $timestamp\n")

updateInnerMetrics(sb.toString())
}

def recordHistogram(nh: NamedHistogram): Unit = {
val timestamp = System.currentTimeMillis
val sb = new mutable.StringBuilder
Expand Down Expand Up @@ -377,6 +430,7 @@ abstract class AbstractSource(conf: CelebornConf, role: String)
innerMetrics.synchronized {
counters().foreach(c => recordCounter(c))
gauges().foreach(g => recordGauge(g))
meters().foreach(m => recordMeter(m))
histograms().foreach(h => {
recordHistogram(h)
h.asInstanceOf[CelebornHistogram].reservoir
Expand All @@ -400,6 +454,7 @@ abstract class AbstractSource(conf: CelebornConf, role: String)
metricsCleaner.shutdown()
namedCounters.clear()
namedGauges.clear()
namedMeters.clear()
namedTimers.clear()
innerMetrics.clear()
metricRegistry.removeMatching(new MetricFilter {
Expand Down Expand Up @@ -436,3 +491,7 @@ class TimerSupplier(val slidingWindowSize: Int)
class GaugeSupplier[T](f: () => T) extends MetricRegistry.MetricSupplier[Gauge[_]] {
override def newMetric(): Gauge[T] = new Gauge[T] { override def getValue: T = f() }
}

class MeterSupplier(f: () => Long) extends MetricRegistry.MetricSupplier[Meter] {
override def newMetric(): Meter = new Meter { override def getCount: Long = f() }
}
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,7 @@ object ControlMessages extends Logging {
appId: String,
totalWritten: Long,
fileCount: Long,
shuffleFallbackCount: Long,
needCheckedWorkerList: util.List[WorkerInfo],
override var requestId: String = ZERO_UUID,
shouldResponse: Boolean = false) extends MasterRequestMessage
Expand Down Expand Up @@ -809,6 +810,7 @@ object ControlMessages extends Logging {
appId,
totalWritten,
fileCount,
shuffleFallbackCount,
needCheckedWorkerList,
requestId,
shouldResponse) =>
Expand All @@ -817,6 +819,7 @@ object ControlMessages extends Logging {
.setRequestId(requestId)
.setTotalWritten(totalWritten)
.setFileCount(fileCount)
.setShuffleFallbackCount(shuffleFallbackCount)
.addAllNeedCheckedWorkerList(needCheckedWorkerList.asScala.map(
PbSerDeUtils.toPbWorkerInfo(_, true, true)).toList.asJava)
.setShouldResponse(shouldResponse)
Expand Down Expand Up @@ -1209,6 +1212,7 @@ object ControlMessages extends Logging {
pbHeartbeatFromApplication.getAppId,
pbHeartbeatFromApplication.getTotalWritten,
pbHeartbeatFromApplication.getFileCount,
pbHeartbeatFromApplication.getShuffleFallbackCount,
new util.ArrayList[WorkerInfo](
pbHeartbeatFromApplication.getNeedCheckedWorkerListList.asScala
.map(PbSerDeUtils.fromPbWorkerInfo).toList.asJava),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,7 @@ object PbSerDeUtils {
workers: java.util.Set[WorkerInfo],
partitionTotalWritten: java.lang.Long,
partitionTotalFileCount: java.lang.Long,
shuffleTotalFallbackCount: java.lang.Long,
appDiskUsageMetricSnapshots: Array[AppDiskUsageSnapShot],
currentAppDiskUsageMetricsSnapshot: AppDiskUsageSnapShot,
lostWorkers: ConcurrentHashMap[WorkerInfo, java.lang.Long],
Expand All @@ -480,6 +481,7 @@ object PbSerDeUtils {
.addAllWorkers(workers.asScala.map(toPbWorkerInfo(_, true, false)).asJava)
.setPartitionTotalWritten(partitionTotalWritten)
.setPartitionTotalFileCount(partitionTotalFileCount)
.setShuffleTotalFallbackCount(shuffleTotalFallbackCount)
// appDiskUsageMetricSnapshots can have null values,
// protobuf repeated value can't support null value in list.
.addAllAppDiskUsageMetricSnapshots(appDiskUsageMetricSnapshots.filter(_ != null)
Expand Down
1 change: 1 addition & 0 deletions docs/monitoring.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ These metrics are exposed by Celeborn master.
| RunningApplicationCount | The count of running applications. |
| ActiveShuffleSize | The active shuffle size of workers. |
| ActiveShuffleFileCount | The active shuffle file count of workers. |
| ShuffleFallbackCount | The count of shuffle fallbacks. |
| WorkerCount | The count of active workers. |
| LostWorkerCount | The count of workers in lost list. |
| ExcludedWorkerCount | The count of workers in excluded list. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ public abstract class AbstractMetaManager implements IMetadataHandler {
public double unhealthyDiskRatioThreshold;
public final LongAdder partitionTotalWritten = new LongAdder();
public final LongAdder partitionTotalFileCount = new LongAdder();
public final LongAdder shuffleTotalFallbackCount = new LongAdder();
public AppDiskUsageMetric appDiskUsageMetric = null;

public final ConcurrentHashMap<String, ApplicationMeta> applicationMetas =
Expand Down Expand Up @@ -139,10 +140,12 @@ public void updateBatchUnregisterShuffleMeta(List<String> shuffleKeys) {
}
}

public void updateAppHeartbeatMeta(String appId, long time, long totalWritten, long fileCount) {
public void updateAppHeartbeatMeta(
String appId, long time, long totalWritten, long fileCount, long shuffleFallbackCount) {
appHeartbeatTime.put(appId, time);
partitionTotalWritten.add(totalWritten);
partitionTotalFileCount.add(fileCount);
shuffleTotalFallbackCount.add(shuffleFallbackCount);
}

public void updateAppLostMeta(String appId) {
Expand Down Expand Up @@ -316,6 +319,7 @@ public void writeMetaInfoToFile(File file) throws IOException, RuntimeException
new HashSet(workersMap.values()),
partitionTotalWritten.sum(),
partitionTotalFileCount.sum(),
shuffleTotalFallbackCount.sum(),
appDiskUsageMetric.snapShots(),
appDiskUsageMetric.currentSnapShot().get(),
lostWorkers,
Expand Down Expand Up @@ -416,6 +420,7 @@ public void restoreMetaFromFile(File file) throws IOException {

partitionTotalWritten.add(snapshotMetaInfo.getPartitionTotalWritten());
partitionTotalFileCount.add(snapshotMetaInfo.getPartitionTotalFileCount());
shuffleTotalFallbackCount.add(snapshotMetaInfo.getShuffleTotalFallbackCount());
appDiskUsageMetric.restoreFromSnapshot(
snapshotMetaInfo.getAppDiskUsageMetricSnapshotsList().stream()
.map(PbSerDeUtils::fromPbAppDiskUsageSnapshot)
Expand Down Expand Up @@ -457,6 +462,7 @@ private void cleanUpState() {
workerLostEvents.clear();
partitionTotalWritten.reset();
partitionTotalFileCount.reset();
shuffleTotalFallbackCount.reset();
workerEventInfos.clear();
applicationMetas.clear();
}
Expand Down
Loading

0 comments on commit f1bda46

Please sign in to comment.