diff --git a/assets/grafana/celeborn-dashboard.json b/assets/grafana/celeborn-dashboard.json index b9297b88c3e..15d81a7257a 100644 --- a/assets/grafana/celeborn-dashboard.json +++ b/assets/grafana/celeborn-dashboard.json @@ -1466,6 +1466,98 @@ ], "title": "metrics_LostWorkerCount_Value", "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "description": "The count of shuffle fallbacks.", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green" + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 34 + }, + "id": 218, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "maxHeight": 600, + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "expr": "metrics_ShuffleFallbackCount_Value{instance=~\"${instance}\"}", + "legendFormat": "${baseLegend}", + "range": true, + "refId": "A" + } + ], + "title": "metrics_ShuffleFallbackCount_Value", + "type": "timeseries" } ], "title": "Master", diff --git a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java index fdb13102d8b..cbebde09686 100644 --- a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java +++ b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java @@ -162,6 +162,7 @@ public ShuffleHandle registerShuffle( initializeLifecycleManager(appId); if (fallbackPolicyRunner.applyFallbackPolicies(dependency, lifecycleManager)) { + lifecycleManager.shuffleFallbackCount().increment(); if (conf.getBoolean("spark.dynamicAllocation.enabled", false) && !conf.getBoolean("spark.shuffle.service.enabled", false)) { logger.error( diff --git a/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala b/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala index b558df59658..ee337deb6a2 100644 --- a/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala +++ b/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala @@ -37,7 +37,7 @@ class ApplicationHeartbeater( appId: String, conf: CelebornConf, masterClient: MasterClient, - shuffleMetrics: () => (Long, Long), + shuffleMetrics: () => ((Long, Long), Long), workerStatusTracker: WorkerStatusTracker, registeredShuffles: ConcurrentHashMap.KeySetView[Int, java.lang.Boolean], cancelAllActiveStages: String => Unit) extends Logging { @@ -59,9 +59,10 @@ class ApplicationHeartbeater( override def run(): Unit = { try { require(masterClient != null, "When sending a heartbeat, client shouldn't be null.") - val (tmpTotalWritten, tmpTotalFileCount) = shuffleMetrics() + val ((tmpTotalWritten, tmpTotalFileCount), tmpShuffleFallbackCount) = shuffleMetrics() logInfo("Send app heartbeat with " + - s"written: ${Utils.bytesToString(tmpTotalWritten)}, file count: $tmpTotalFileCount") + s"written: ${Utils.bytesToString(tmpTotalWritten)}, file count: $tmpTotalFileCount, " + + s"shuffle fallback count: $tmpShuffleFallbackCount") // UserResourceConsumption and DiskInfo are eliminated from WorkerInfo // during serialization of HeartbeatFromApplication val appHeartbeat = @@ -69,6 +70,7 @@ class ApplicationHeartbeater( appId, tmpTotalWritten, tmpTotalFileCount, + tmpShuffleFallbackCount, workerStatusTracker.getNeedCheckedWorkers().toList.asJava, ZERO_UUID, true) diff --git a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala index afab1a56e86..bf3ccf63d7a 100644 --- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala +++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala @@ -23,7 +23,7 @@ import java.security.SecureRandom import java.util import java.util.{function, List => JList} import java.util.concurrent._ -import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.atomic.{AtomicInteger, LongAdder} import java.util.function.{BiConsumer, Consumer} import scala.collection.JavaConverters._ @@ -84,6 +84,7 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends private val unregisterShuffleTime = JavaUtils.newConcurrentHashMap[Int, Long]() val registeredShuffle = ConcurrentHashMap.newKeySet[Int]() + val shuffleFallbackCount = new LongAdder() // maintain each shuffle's map relation of WorkerInfo and partition location val shuffleAllocatedWorkers = new ShuffleAllocatedWorkers // shuffle id -> (partitionId -> newest PartitionLocation) @@ -209,7 +210,7 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends appUniqueId, conf, masterClient, - () => commitManager.commitMetrics(), + () => commitManager.commitMetrics() -> shuffleFallbackCount.sumThenReset(), workerStatusTracker, registeredShuffle, reason => cancelAllActiveStages(reason)) diff --git a/common/src/main/proto/TransportMessages.proto b/common/src/main/proto/TransportMessages.proto index 9ed21d18030..1228c173291 100644 --- a/common/src/main/proto/TransportMessages.proto +++ b/common/src/main/proto/TransportMessages.proto @@ -442,6 +442,7 @@ message PbHeartbeatFromApplication { string requestId = 4; repeated PbWorkerInfo needCheckedWorkerList = 5; bool shouldResponse = 6; + int64 shuffleFallbackCount = 7; } message PbHeartbeatFromApplicationResponse { @@ -674,6 +675,7 @@ message PbSnapshotMetaInfo { map workerEventInfos = 15; map applicationMetas = 16; repeated PbWorkerInfo decommissionWorkers = 17; + int64 shuffleTotalFallbackCount = 18; } message PbOpenStream { diff --git a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala index 4a96184b6f7..24d3fe60187 100644 --- a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala +++ b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala @@ -412,6 +412,7 @@ object ControlMessages extends Logging { appId: String, totalWritten: Long, fileCount: Long, + shuffleFallbackCount: Long, needCheckedWorkerList: util.List[WorkerInfo], override var requestId: String = ZERO_UUID, shouldResponse: Boolean = false) extends MasterRequestMessage @@ -809,6 +810,7 @@ object ControlMessages extends Logging { appId, totalWritten, fileCount, + shuffleFallbackCount, needCheckedWorkerList, requestId, shouldResponse) => @@ -817,6 +819,7 @@ object ControlMessages extends Logging { .setRequestId(requestId) .setTotalWritten(totalWritten) .setFileCount(fileCount) + .setShuffleFallbackCount(shuffleFallbackCount) .addAllNeedCheckedWorkerList(needCheckedWorkerList.asScala.map( PbSerDeUtils.toPbWorkerInfo(_, true, true)).toList.asJava) .setShouldResponse(shouldResponse) @@ -1209,6 +1212,7 @@ object ControlMessages extends Logging { pbHeartbeatFromApplication.getAppId, pbHeartbeatFromApplication.getTotalWritten, pbHeartbeatFromApplication.getFileCount, + pbHeartbeatFromApplication.getShuffleFallbackCount, new util.ArrayList[WorkerInfo]( pbHeartbeatFromApplication.getNeedCheckedWorkerListList.asScala .map(PbSerDeUtils.fromPbWorkerInfo).toList.asJava), diff --git a/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala b/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala index f9f01b9a7d8..9a14a3dee1b 100644 --- a/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala +++ b/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala @@ -459,6 +459,7 @@ object PbSerDeUtils { workers: java.util.Set[WorkerInfo], partitionTotalWritten: java.lang.Long, partitionTotalFileCount: java.lang.Long, + shuffleTotalFallbackCount: java.lang.Long, appDiskUsageMetricSnapshots: Array[AppDiskUsageSnapShot], currentAppDiskUsageMetricsSnapshot: AppDiskUsageSnapShot, lostWorkers: ConcurrentHashMap[WorkerInfo, java.lang.Long], @@ -480,6 +481,7 @@ object PbSerDeUtils { .addAllWorkers(workers.asScala.map(toPbWorkerInfo(_, true, false)).asJava) .setPartitionTotalWritten(partitionTotalWritten) .setPartitionTotalFileCount(partitionTotalFileCount) + .setShuffleTotalFallbackCount(shuffleTotalFallbackCount) // appDiskUsageMetricSnapshots can have null values, // protobuf repeated value can't support null value in list. .addAllAppDiskUsageMetricSnapshots(appDiskUsageMetricSnapshots.filter(_ != null) diff --git a/docs/monitoring.md b/docs/monitoring.md index b037f6a0f77..c07c16d1e03 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -100,6 +100,7 @@ These metrics are exposed by Celeborn master. | RunningApplicationCount | The count of running applications. | | ActiveShuffleSize | The active shuffle size of workers. | | ActiveShuffleFileCount | The active shuffle file count of workers. | + | ShuffleFallbackCount | The count of shuffle fallbacks. | | WorkerCount | The count of active workers. | | LostWorkerCount | The count of workers in lost list. | | ExcludedWorkerCount | The count of workers in excluded list. | diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java index f1d8a37e815..80988cfff1f 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java @@ -85,6 +85,7 @@ public abstract class AbstractMetaManager implements IMetadataHandler { public double unhealthyDiskRatioThreshold; public final LongAdder partitionTotalWritten = new LongAdder(); public final LongAdder partitionTotalFileCount = new LongAdder(); + public final LongAdder shuffleTotalFallbackCount = new LongAdder(); public AppDiskUsageMetric appDiskUsageMetric = null; public final ConcurrentHashMap applicationMetas = @@ -139,10 +140,12 @@ public void updateBatchUnregisterShuffleMeta(List shuffleKeys) { } } - public void updateAppHeartbeatMeta(String appId, long time, long totalWritten, long fileCount) { + public void updateAppHeartbeatMeta( + String appId, long time, long totalWritten, long fileCount, long shuffleFallbackCount) { appHeartbeatTime.put(appId, time); partitionTotalWritten.add(totalWritten); partitionTotalFileCount.add(fileCount); + shuffleTotalFallbackCount.add(shuffleFallbackCount); } public void updateAppLostMeta(String appId) { @@ -316,6 +319,7 @@ public void writeMetaInfoToFile(File file) throws IOException, RuntimeException new HashSet(workersMap.values()), partitionTotalWritten.sum(), partitionTotalFileCount.sum(), + shuffleTotalFallbackCount.sum(), appDiskUsageMetric.snapShots(), appDiskUsageMetric.currentSnapShot().get(), lostWorkers, @@ -416,6 +420,7 @@ public void restoreMetaFromFile(File file) throws IOException { partitionTotalWritten.add(snapshotMetaInfo.getPartitionTotalWritten()); partitionTotalFileCount.add(snapshotMetaInfo.getPartitionTotalFileCount()); + shuffleTotalFallbackCount.add(snapshotMetaInfo.getShuffleTotalFallbackCount()); appDiskUsageMetric.restoreFromSnapshot( snapshotMetaInfo.getAppDiskUsageMetricSnapshotsList().stream() .map(PbSerDeUtils::fromPbAppDiskUsageSnapshot) @@ -457,6 +462,7 @@ private void cleanUpState() { workerLostEvents.clear(); partitionTotalWritten.reset(); partitionTotalFileCount.reset(); + shuffleTotalFallbackCount.reset(); workerEventInfos.clear(); applicationMetas.clear(); } diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java index 9e17728d61a..e765a9d4dae 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java @@ -39,7 +39,12 @@ void handleRequestSlots( void handleBatchUnRegisterShuffles(List shuffleKeys, String requestId); void handleAppHeartbeat( - String appId, long totalWritten, long fileCount, long time, String requestId); + String appId, + long totalWritten, + long fileCount, + long shuffleFallbackCount, + long time, + String requestId); void handleAppLost(String appId, String requestId); diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java index 228f9c3db8c..04c74e51ed7 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java @@ -73,8 +73,13 @@ public void handleBatchUnRegisterShuffles(List shuffleKeys, String reque @Override public void handleAppHeartbeat( - String appId, long totalWritten, long fileCount, long time, String requestId) { - updateAppHeartbeatMeta(appId, time, totalWritten, fileCount); + String appId, + long totalWritten, + long fileCount, + long shuffleFallbackCount, + long time, + String requestId) { + updateAppHeartbeatMeta(appId, time, totalWritten, fileCount, shuffleFallbackCount); } @Override diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java index fd229c90479..a87470d9d5e 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java @@ -130,7 +130,12 @@ public void handleBatchUnRegisterShuffles(List shuffleKeys, String reque @Override public void handleAppHeartbeat( - String appId, long totalWritten, long fileCount, long time, String requestId) { + String appId, + long totalWritten, + long fileCount, + long shuffleFallbackCount, + long time, + String requestId) { try { ratisServer.submitRequest( ResourceRequest.newBuilder() @@ -142,6 +147,7 @@ public void handleAppHeartbeat( .setTime(time) .setTotalWritten(totalWritten) .setFileCount(fileCount) + .setShuffleFallbackCount(shuffleFallbackCount) .build()) .build()); } catch (CelebornRuntimeException e) { diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java index 4ec1dac371d..626378f0e22 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java @@ -141,7 +141,12 @@ public ResourceResponse handleWriteRequest(ResourceProtos.ResourceRequest reques long time = request.getAppHeartbeatRequest().getTime(); long totalWritten = request.getAppHeartbeatRequest().getTotalWritten(); long fileCount = request.getAppHeartbeatRequest().getFileCount(); - metaSystem.updateAppHeartbeatMeta(appId, time, totalWritten, fileCount); + long shuffleFallbackCount = request.getAppHeartbeatRequest().getShuffleFallbackCount(); + if (shuffleFallbackCount > 0) { + LOG.warn("{} shuffle fallbacks in app {}", shuffleFallbackCount, appId); + } + metaSystem.updateAppHeartbeatMeta( + appId, time, totalWritten, fileCount, shuffleFallbackCount); break; case AppLost: diff --git a/master/src/main/proto/Resource.proto b/master/src/main/proto/Resource.proto index acb1d6097ad..1e65943a9be 100644 --- a/master/src/main/proto/Resource.proto +++ b/master/src/main/proto/Resource.proto @@ -120,6 +120,7 @@ message AppHeartbeatRequest { required int64 time = 2; required int64 totalWritten = 3; required int64 fileCount = 4; + optional int64 shuffleFallbackCount = 5; } message AppLostRequest { diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala index 117b4875a37..7d029753e85 100644 --- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala +++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala @@ -261,6 +261,10 @@ private[celeborn] class Master( }).sum() } + masterSource.addGauge(MasterSource.SHUFFLE_FALLBACK_COUNT) { () => + statusSystem.shuffleTotalFallbackCount.longValue() + } + masterSource.addGauge(MasterSource.DEVICE_CELEBORN_TOTAL_CAPACITY) { () => statusSystem.workersMap.values().asScala.toList.map(_.totalSpace()).sum } @@ -391,6 +395,7 @@ private[celeborn] class Master( appId, totalWritten, fileCount, + fallbackShuffles, needCheckedWorkerList, requestId, shouldResponse) => @@ -403,6 +408,7 @@ private[celeborn] class Master( appId, totalWritten, fileCount, + fallbackShuffles, needCheckedWorkerList, requestId, shouldResponse)) @@ -1094,6 +1100,7 @@ private[celeborn] class Master( appId: String, totalWritten: Long, fileCount: Long, + shuffleFallbackCount: Long, needCheckedWorkerList: util.List[WorkerInfo], requestId: String, shouldResponse: Boolean): Unit = { @@ -1101,6 +1108,7 @@ private[celeborn] class Master( appId, totalWritten, fileCount, + shuffleFallbackCount, System.currentTimeMillis(), requestId) val unknownWorkers = needCheckedWorkerList.asScala.filterNot(w => diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterSource.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterSource.scala index b2e72524486..13f35cc8423 100644 --- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterSource.scala +++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterSource.scala @@ -55,6 +55,8 @@ object MasterSource { val ACTIVE_SHUFFLE_FILE_COUNT = "ActiveShuffleFileCount" + val SHUFFLE_FALLBACK_COUNT = "ShuffleFallbackCount" + val OFFER_SLOTS_TIME = "OfferSlotsTime" // Capacity diff --git a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java index cde62c4cf0c..810f12d191e 100644 --- a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java +++ b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java @@ -637,11 +637,11 @@ public void testHandleBatchUnRegisterShuffle() { @Test public void testHandleAppHeartbeat() { Long dummy = 1235L; - statusSystem.handleAppHeartbeat(APPID1, 1, 1, dummy, getNewReqeustId()); + statusSystem.handleAppHeartbeat(APPID1, 1, 1, 0, dummy, getNewReqeustId()); assertEquals(dummy, statusSystem.appHeartbeatTime.get(APPID1)); String appId2 = "app02"; - statusSystem.handleAppHeartbeat(appId2, 1, 1, dummy, getNewReqeustId()); + statusSystem.handleAppHeartbeat(appId2, 1, 1, 0, dummy, getNewReqeustId()); assertEquals(dummy, statusSystem.appHeartbeatTime.get(appId2)); assertEquals(2, statusSystem.appHeartbeatTime.size()); @@ -811,23 +811,23 @@ public void testHandleUpdatePartitionSize() { Assert.assertEquals(statusSystem.estimatedPartitionSize, conf.initialEstimatedPartitionSize()); Long dummy = 1235L; - statusSystem.handleAppHeartbeat(APPID1, 10000000000l, 1, dummy, getNewReqeustId()); + statusSystem.handleAppHeartbeat(APPID1, 10000000000l, 1, 0, dummy, getNewReqeustId()); String appId2 = "app02"; - statusSystem.handleAppHeartbeat(appId2, 1, 1, dummy, getNewReqeustId()); + statusSystem.handleAppHeartbeat(appId2, 1, 1, 0, dummy, getNewReqeustId()); // Max size statusSystem.handleUpdatePartitionSize(); Assert.assertEquals(statusSystem.estimatedPartitionSize, conf.maxPartitionSizeToEstimate()); - statusSystem.handleAppHeartbeat(APPID1, 1000000000l, 1, dummy, getNewReqeustId()); - statusSystem.handleAppHeartbeat(appId2, 1, 1, dummy, getNewReqeustId()); + statusSystem.handleAppHeartbeat(APPID1, 1000000000l, 1, 0, dummy, getNewReqeustId()); + statusSystem.handleAppHeartbeat(appId2, 1, 1, 0, dummy, getNewReqeustId()); // Size between minEstimateSize -> maxEstimateSize statusSystem.handleUpdatePartitionSize(); Assert.assertEquals(statusSystem.estimatedPartitionSize, 500000000); - statusSystem.handleAppHeartbeat(APPID1, 1000l, 1, dummy, getNewReqeustId()); - statusSystem.handleAppHeartbeat(appId2, 1000l, 1, dummy, getNewReqeustId()); + statusSystem.handleAppHeartbeat(APPID1, 1000l, 1, 0, dummy, getNewReqeustId()); + statusSystem.handleAppHeartbeat(appId2, 1000l, 1, 0, dummy, getNewReqeustId()); // Min size statusSystem.handleUpdatePartitionSize(); @@ -897,4 +897,15 @@ public void testHandleReportWorkerDecommission() { assertEquals(1, statusSystem.decommissionWorkers.size()); assertTrue(statusSystem.excludedWorkers.isEmpty()); } + + @Test + public void testShuffleFallbackCount() { + statusSystem.shuffleTotalFallbackCount.reset(); + + Long dummy = 1235L; + statusSystem.handleAppHeartbeat(APPID1, 1000l, 1, 1, dummy, getNewReqeustId()); + statusSystem.handleAppHeartbeat(APPID1, 1000l, 1, 2, dummy, getNewReqeustId()); + + assertEquals(statusSystem.shuffleTotalFallbackCount.longValue(), 3); + } } diff --git a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java index e4170a02cf3..ae70ed54b4a 100644 --- a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java +++ b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java @@ -946,14 +946,14 @@ public void testHandleAppHeartbeat() throws InterruptedException { Assert.assertNotNull(statusSystem); long dummy = 1235L; - statusSystem.handleAppHeartbeat(APPID1, 1, 1, dummy, getNewReqeustId()); + statusSystem.handleAppHeartbeat(APPID1, 1, 1, 0, dummy, getNewReqeustId()); Thread.sleep(3000L); Assert.assertEquals(Long.valueOf(dummy), STATUSSYSTEM1.appHeartbeatTime.get(APPID1)); Assert.assertEquals(Long.valueOf(dummy), STATUSSYSTEM2.appHeartbeatTime.get(APPID1)); Assert.assertEquals(Long.valueOf(dummy), STATUSSYSTEM3.appHeartbeatTime.get(APPID1)); String appId2 = "app02"; - statusSystem.handleAppHeartbeat(appId2, 1, 1, dummy, getNewReqeustId()); + statusSystem.handleAppHeartbeat(appId2, 1, 1, 0, dummy, getNewReqeustId()); Thread.sleep(3000L); Assert.assertEquals(Long.valueOf(dummy), STATUSSYSTEM1.appHeartbeatTime.get(appId2)); @@ -1315,23 +1315,23 @@ public void testHandleUpdatePartitionSize() throws InterruptedException { Assert.assertEquals(statusSystem.estimatedPartitionSize, conf.initialEstimatedPartitionSize()); Long dummy = 1235L; - statusSystem.handleAppHeartbeat(APPID1, 10000000000l, 1, dummy, getNewReqeustId()); + statusSystem.handleAppHeartbeat(APPID1, 10000000000l, 1, 0, dummy, getNewReqeustId()); String appId2 = "app02"; - statusSystem.handleAppHeartbeat(appId2, 1, 1, dummy, getNewReqeustId()); + statusSystem.handleAppHeartbeat(appId2, 1, 1, 0, dummy, getNewReqeustId()); // Max size statusSystem.handleUpdatePartitionSize(); Assert.assertEquals(statusSystem.estimatedPartitionSize, conf.maxPartitionSizeToEstimate()); - statusSystem.handleAppHeartbeat(APPID1, 1000000000l, 1, dummy, getNewReqeustId()); - statusSystem.handleAppHeartbeat(appId2, 1, 1, dummy, getNewReqeustId()); + statusSystem.handleAppHeartbeat(APPID1, 1000000000l, 1, 0, dummy, getNewReqeustId()); + statusSystem.handleAppHeartbeat(appId2, 1, 1, 0, dummy, getNewReqeustId()); // Size between minEstimateSize -> maxEstimateSize statusSystem.handleUpdatePartitionSize(); Assert.assertEquals(statusSystem.estimatedPartitionSize, 500000000); - statusSystem.handleAppHeartbeat(APPID1, 1000l, 1, dummy, getNewReqeustId()); - statusSystem.handleAppHeartbeat(appId2, 1000l, 1, dummy, getNewReqeustId()); + statusSystem.handleAppHeartbeat(APPID1, 1000l, 1, 0, dummy, getNewReqeustId()); + statusSystem.handleAppHeartbeat(appId2, 1000l, 1, 0, dummy, getNewReqeustId()); // Min size statusSystem.handleUpdatePartitionSize(); @@ -1501,4 +1501,17 @@ public void testHandleReportWorkerDecommission() throws InterruptedException { Assert.assertEquals(0, STATUSSYSTEM2.excludedWorkers.size()); Assert.assertEquals(0, STATUSSYSTEM3.excludedWorkers.size()); } + + @Test + public void testShuffleFallbackCount() { + AbstractMetaManager statusSystem = pickLeaderStatusSystem(); + Assert.assertNotNull(statusSystem); + statusSystem.shuffleTotalFallbackCount.reset(); + + Long dummy = 1235L; + statusSystem.handleAppHeartbeat(APPID1, 1000l, 1, 1, dummy, getNewReqeustId()); + statusSystem.handleAppHeartbeat(APPID1, 1000l, 1, 2, dummy, getNewReqeustId()); + + Assert.assertEquals(statusSystem.shuffleTotalFallbackCount.longValue(), 3); + } }