diff --git a/cli/src/main/scala/org/apache/celeborn/cli/common/CommonOptions.scala b/cli/src/main/scala/org/apache/celeborn/cli/common/CommonOptions.scala index 507d5374f1b..69c243f946c 100644 --- a/cli/src/main/scala/org/apache/celeborn/cli/common/CommonOptions.scala +++ b/cli/src/main/scala/org/apache/celeborn/cli/common/CommonOptions.scala @@ -71,4 +71,11 @@ class CommonOptions { paramLabel = "username", description = Array("The username of the TENANT_USER level.")) private[cli] var configName: String = _ + + @Option( + names = Array("--apps"), + paramLabel = "appId", + description = Array("The application Id list seperated by comma.")) + private[cli] var apps: String = _ + } diff --git a/cli/src/main/scala/org/apache/celeborn/cli/master/MasterOptions.scala b/cli/src/main/scala/org/apache/celeborn/cli/master/MasterOptions.scala index d69a2e6bf8b..840a54d2350 100644 --- a/cli/src/main/scala/org/apache/celeborn/cli/master/MasterOptions.scala +++ b/cli/src/main/scala/org/apache/celeborn/cli/master/MasterOptions.scala @@ -110,4 +110,14 @@ final class MasterOptions { names = Array("--remove-workers-unavailable-info"), description = Array("Remove the workers unavailable info from the master.")) private[master] var removeWorkersUnavailableInfo: Boolean = _ + + @Option( + names = Array("--revise-lost-shuffles"), + description = Array("Revise lost shuffles or remove shuffles for an application.")) + private[master] var reviseLostShuffles: Boolean = _ + + @Option( + names = Array("--delete-apps"), + description = Array("Delete resource of an application.")) + private[master] var deleteApps: Boolean = _ } diff --git a/cli/src/main/scala/org/apache/celeborn/cli/master/MasterSubcommand.scala b/cli/src/main/scala/org/apache/celeborn/cli/master/MasterSubcommand.scala index 22265039642..a875621e2f4 100644 --- a/cli/src/main/scala/org/apache/celeborn/cli/master/MasterSubcommand.scala +++ b/cli/src/main/scala/org/apache/celeborn/cli/master/MasterSubcommand.scala @@ -37,6 +37,9 @@ trait MasterSubcommand extends CliLogging { @ArgGroup(exclusive = true, multiplicity = "1") private[master] var masterOptions: MasterOptions = _ + @ArgGroup(exclusive = false) + private[master] var reviseLostShuffleOptions: ReviseLostShuffleOptions = _ + @Mixin private[master] var commonOptions: CommonOptions = _ @@ -110,4 +113,8 @@ trait MasterSubcommand extends CliLogging { private[master] def runShowThreadDump: ThreadStackResponse + private[master] def reviseLostShuffles: HandleResponse + + private[master] def deleteApps: HandleResponse + } diff --git a/cli/src/main/scala/org/apache/celeborn/cli/master/MasterSubcommandImpl.scala b/cli/src/main/scala/org/apache/celeborn/cli/master/MasterSubcommandImpl.scala index 32f540f4b1f..70bd32c63c5 100644 --- a/cli/src/main/scala/org/apache/celeborn/cli/master/MasterSubcommandImpl.scala +++ b/cli/src/main/scala/org/apache/celeborn/cli/master/MasterSubcommandImpl.scala @@ -51,6 +51,8 @@ class MasterSubcommandImpl extends Runnable with MasterSubcommand { if (masterOptions.showContainerInfo) log(runShowContainerInfo) if (masterOptions.showDynamicConf) log(runShowDynamicConf) if (masterOptions.showThreadDump) log(runShowThreadDump) + if (masterOptions.reviseLostShuffles) log(reviseLostShuffles) + if (masterOptions.deleteApps) log(deleteApps) if (masterOptions.addClusterAlias != null && masterOptions.addClusterAlias.nonEmpty) runAddClusterAlias if (masterOptions.removeClusterAlias != null && masterOptions.removeClusterAlias.nonEmpty) @@ -220,4 +222,20 @@ class MasterSubcommandImpl extends Runnable with MasterSubcommand { } private[master] def runShowContainerInfo: ContainerInfo = defaultApi.getContainerInfo + + override private[master] def reviseLostShuffles: HandleResponse = { + val app = commonOptions.apps + if (app.contains(",")) { + throw new ParameterException( + spec.commandLine(), + "Only one application id can be provided for this command.") + } + val shuffleIds = reviseLostShuffleOptions.shuffleIds + applicationApi.reviseLostShuffles(app, shuffleIds) + } + + override private[master] def deleteApps: HandleResponse = { + val apps = commonOptions.apps + applicationApi.deleteApps(apps) + } } diff --git a/cli/src/main/scala/org/apache/celeborn/cli/master/ReviseLostShuffleOptions.scala b/cli/src/main/scala/org/apache/celeborn/cli/master/ReviseLostShuffleOptions.scala new file mode 100644 index 00000000000..a0c4963feaa --- /dev/null +++ b/cli/src/main/scala/org/apache/celeborn/cli/master/ReviseLostShuffleOptions.scala @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.cli.master + +import picocli.CommandLine.Option + +final class ReviseLostShuffleOptions { + + @Option( + names = Array("--shuffleIds"), + description = Array("The shuffle ids to manipulate.")) + private[master] var shuffleIds: String = _ + +} diff --git a/cli/src/test/scala/org/apache/celeborn/cli/TestCelebornCliCommands.scala b/cli/src/test/scala/org/apache/celeborn/cli/TestCelebornCliCommands.scala index 56b4ea213b9..1f450a19f69 100644 --- a/cli/src/test/scala/org/apache/celeborn/cli/TestCelebornCliCommands.scala +++ b/cli/src/test/scala/org/apache/celeborn/cli/TestCelebornCliCommands.scala @@ -247,6 +247,32 @@ class TestCelebornCliCommands extends CelebornFunSuite with MiniClusterFeature { captureOutputAndValidateResponse(args, "success: true") } + test("master --delete-apps case1") { + val args = prepareMasterArgs() ++ Array( + "--delete-apps", + "--apps", + "app1") + captureOutputAndValidateResponse(args, "success: true") + } + + test("master --delete-apps case2") { + val args = prepareMasterArgs() ++ Array( + "--delete-apps", + "--apps", + "app1,app2") + captureOutputAndValidateResponse(args, "success: true") + } + + test("master --revise-lost-shuffles case1") { + val args = prepareMasterArgs() ++ Array( + "--revise-lost-shuffles", + "--apps", + "app1", + "--shuffleIds", + "1,2,3,4,5,6") + captureOutputAndValidateResponse(args, "success: true") + } + private def prepareMasterArgs(): Array[String] = { Array( "master", diff --git a/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala b/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala index 786ac9fb137..71486b9ef60 100644 --- a/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala +++ b/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala @@ -17,14 +17,19 @@ package org.apache.celeborn.client -import java.util.concurrent.{ScheduledFuture, TimeUnit} +import java.util +import java.util.concurrent.{ConcurrentHashMap, ScheduledFuture, TimeUnit} +import java.util.function.Consumer import scala.collection.JavaConverters._ +import org.apache.commons.lang3.StringUtils + import org.apache.celeborn.common.CelebornConf import org.apache.celeborn.common.client.MasterClient import org.apache.celeborn.common.internal.Logging -import org.apache.celeborn.common.protocol.message.ControlMessages.{ApplicationLost, ApplicationLostResponse, HeartbeatFromApplication, HeartbeatFromApplicationResponse, ZERO_UUID} +import org.apache.celeborn.common.protocol.PbReviseLostShufflesResponse +import org.apache.celeborn.common.protocol.message.ControlMessages.{ApplicationLost, ApplicationLostResponse, HeartbeatFromApplication, HeartbeatFromApplicationResponse, ReviseLostShuffles, ZERO_UUID} import org.apache.celeborn.common.protocol.message.StatusCode import org.apache.celeborn.common.util.{ThreadUtils, Utils} @@ -33,9 +38,11 @@ class ApplicationHeartbeater( conf: CelebornConf, masterClient: MasterClient, shuffleMetrics: () => (Long, Long), - workerStatusTracker: WorkerStatusTracker) extends Logging { + workerStatusTracker: WorkerStatusTracker, + registeredShuffles: ConcurrentHashMap.KeySetView[Int, java.lang.Boolean]) extends Logging { private var stopped = false + private val reviseLostShuffles = conf.reviseLostShufflesEnabled // Use independent app heartbeat threads to avoid being blocked by other operations. private val appHeartbeatIntervalMs = conf.appHeartbeatIntervalMs @@ -70,6 +77,30 @@ class ApplicationHeartbeater( if (response.statusCode == StatusCode.SUCCESS) { logDebug("Successfully send app heartbeat.") workerStatusTracker.handleHeartbeatResponse(response) + // revise shuffle id if there are lost shuffles + if (reviseLostShuffles) { + val masterRecordedShuffleIds = response.registeredShuffles + val localOnlyShuffles = new util.ArrayList[Integer]() + registeredShuffles.forEach(new Consumer[Int] { + override def accept(key: Int): Unit = { + localOnlyShuffles.add(key) + } + }) + localOnlyShuffles.removeAll(masterRecordedShuffleIds) + if (!localOnlyShuffles.isEmpty) { + logWarning( + s"There are lost shuffle found ${StringUtils.join(localOnlyShuffles, ",")}, revise lost shuffles.") + val reviseLostShufflesResponse = masterClient.askSync( + ReviseLostShuffles.apply(appId, localOnlyShuffles, MasterClient.genRequestId()), + classOf[PbReviseLostShufflesResponse]) + if (!reviseLostShufflesResponse.getSuccess) { + logWarning( + s"Revise lost shuffles failed. Error message :${reviseLostShufflesResponse.getMessage}") + } else { + logInfo("Revise lost shuffles succeed.") + } + } + } } } catch { case it: InterruptedException => diff --git a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala index 9416b6b585f..b6403f3cebc 100644 --- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala +++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala @@ -210,7 +210,8 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends conf, masterClient, () => commitManager.commitMetrics(), - workerStatusTracker) + workerStatusTracker, + registeredShuffle) private val changePartitionManager = new ChangePartitionManager(conf, this) private val releasePartitionManager = new ReleasePartitionManager(conf, this) diff --git a/client/src/test/scala/org/apache/celeborn/client/WorkerStatusTrackerSuite.scala b/client/src/test/scala/org/apache/celeborn/client/WorkerStatusTrackerSuite.scala index 2fd3bd3a6e4..d2c21245dbd 100644 --- a/client/src/test/scala/org/apache/celeborn/client/WorkerStatusTrackerSuite.scala +++ b/client/src/test/scala/org/apache/celeborn/client/WorkerStatusTrackerSuite.scala @@ -23,89 +23,16 @@ import org.junit.Assert import org.apache.celeborn.CelebornFunSuite import org.apache.celeborn.common.CelebornConf -import org.apache.celeborn.common.CelebornConf.{APPLICATION_HEARTBEAT_WITH_AVAILABLE_WORKERS_ENABLE, CLIENT_EXCLUDED_WORKER_EXPIRE_TIMEOUT} +import org.apache.celeborn.common.CelebornConf.CLIENT_EXCLUDED_WORKER_EXPIRE_TIMEOUT import org.apache.celeborn.common.meta.WorkerInfo import org.apache.celeborn.common.protocol.message.ControlMessages.HeartbeatFromApplicationResponse import org.apache.celeborn.common.protocol.message.StatusCode class WorkerStatusTrackerSuite extends CelebornFunSuite { - test("handleHeartbeatResponse without availableWorkers") { - val celebornConf = new CelebornConf() - celebornConf.set(CLIENT_EXCLUDED_WORKER_EXPIRE_TIMEOUT, 2000L) - celebornConf.set(APPLICATION_HEARTBEAT_WITH_AVAILABLE_WORKERS_ENABLE, false) - val statusTracker = new WorkerStatusTracker(celebornConf, null) - - val registerTime = System.currentTimeMillis() - statusTracker.excludedWorkers.put(mock("host1"), (StatusCode.WORKER_UNKNOWN, registerTime)) - statusTracker.excludedWorkers.put(mock("host2"), (StatusCode.WORKER_SHUTDOWN, registerTime)) - - // test reserve (only statusCode list in handleHeartbeatResponse) - val empty = buildResponse(Array.empty, Array.empty, Array.empty, Array.empty) - statusTracker.handleHeartbeatResponse(empty) - - // only reserve host1 - Assert.assertEquals( - statusTracker.excludedWorkers.get(mock("host1")), - (StatusCode.WORKER_UNKNOWN, registerTime)) - Assert.assertFalse(statusTracker.excludedWorkers.containsKey(mock("host2"))) - - // add shutdown/excluded worker - val response1 = - buildResponse(Array("host0"), Array("host1", "host3"), Array("host4"), Array.empty) - statusTracker.handleHeartbeatResponse(response1) - - // test keep Unknown register time - Assert.assertEquals( - statusTracker.excludedWorkers.get(mock("host1")), - (StatusCode.WORKER_UNKNOWN, registerTime)) - - // test new added shutdown/excluded workers - Assert.assertTrue(statusTracker.excludedWorkers.containsKey(mock("host0"))) - Assert.assertTrue(statusTracker.excludedWorkers.containsKey(mock("host3"))) - Assert.assertTrue(!statusTracker.excludedWorkers.containsKey(mock("host4"))) - Assert.assertTrue(statusTracker.shuttingWorkers.contains(mock("host4"))) - - // test re heartbeat with shutdown workers - val response2 = buildResponse(Array.empty, Array.empty, Array("host4"), Array.empty) - statusTracker.handleHeartbeatResponse(response2) - Assert.assertTrue(!statusTracker.excludedWorkers.containsKey(mock("host4"))) - Assert.assertTrue(statusTracker.shuttingWorkers.contains(mock("host4"))) - - // test remove - val workers = new util.HashSet[WorkerInfo] - workers.add(mock("host3")) - statusTracker.removeFromExcludedWorkers(workers) - Assert.assertFalse(statusTracker.excludedWorkers.containsKey(mock("host3"))) - - // test register time elapsed - Thread.sleep(3000) - val response3 = buildResponse(Array.empty, Array("host5", "host6"), Array.empty, Array.empty) - statusTracker.handleHeartbeatResponse(response3) - Assert.assertEquals(statusTracker.excludedWorkers.size(), 2) - Assert.assertFalse(statusTracker.excludedWorkers.containsKey(mock("host1"))) - - // test available workers - Assert.assertEquals(statusTracker.availableWorkers.size(), 0) - val response4 = buildResponse( - Array.empty, - Array.empty, - Array.empty, - Array("host5", "host6", "host7", "host8")) - statusTracker.handleHeartbeatResponse(response4) - - // availableWorkers wont update through heartbeat - // when APPLICATION_HEARTBEAT_WITH_AVAILABLE_WORKERS_ENABLE set to false - Assert.assertEquals(statusTracker.availableWorkers.size(), 0) - // available workers won't overwrite excluded workers - Assert.assertEquals(statusTracker.excludedWorkers.size(), 2) - Assert.assertTrue(statusTracker.excludedWorkers.containsKey(mock("host5"))) - Assert.assertTrue(statusTracker.excludedWorkers.containsKey(mock("host6"))) - } - test("handleHeartbeatResponse with availableWorkers") { + test("handleHeartbeatResponse") { val celebornConf = new CelebornConf() celebornConf.set(CLIENT_EXCLUDED_WORKER_EXPIRE_TIMEOUT, 2000L) - celebornConf.set(APPLICATION_HEARTBEAT_WITH_AVAILABLE_WORKERS_ENABLE, true) val statusTracker = new WorkerStatusTracker(celebornConf, null) val registerTime = System.currentTimeMillis() @@ -113,7 +40,7 @@ class WorkerStatusTrackerSuite extends CelebornFunSuite { statusTracker.excludedWorkers.put(mock("host2"), (StatusCode.WORKER_SHUTDOWN, registerTime)) // test reserve (only statusCode list in handleHeartbeatResponse) - val empty = buildResponse(Array.empty, Array.empty, Array.empty, Array.empty) + val empty = buildResponse(Array.empty, Array.empty, Array.empty) statusTracker.handleHeartbeatResponse(empty) // only reserve host1 @@ -123,14 +50,14 @@ class WorkerStatusTrackerSuite extends CelebornFunSuite { Assert.assertFalse(statusTracker.excludedWorkers.containsKey(mock("host2"))) // add shutdown/excluded worker - val response1 = - buildResponse(Array("host0"), Array("host1", "host3"), Array("host4"), Array.empty) + val response1 = buildResponse(Array("host0"), Array("host1", "host3"), Array("host4")) statusTracker.handleHeartbeatResponse(response1) // test keep Unknown register time Assert.assertEquals( statusTracker.excludedWorkers.get(mock("host1")), (StatusCode.WORKER_UNKNOWN, registerTime)) + // test new added workers Assert.assertTrue(statusTracker.excludedWorkers.containsKey(mock("host0"))) Assert.assertTrue(statusTracker.excludedWorkers.containsKey(mock("host3"))) @@ -138,8 +65,8 @@ class WorkerStatusTrackerSuite extends CelebornFunSuite { Assert.assertTrue(statusTracker.shuttingWorkers.contains(mock("host4"))) // test re heartbeat with shutdown workers - val response2 = buildResponse(Array.empty, Array.empty, Array("host4"), Array.empty) - statusTracker.handleHeartbeatResponse(response2) + val response3 = buildResponse(Array.empty, Array.empty, Array("host4")) + statusTracker.handleHeartbeatResponse(response3) Assert.assertTrue(!statusTracker.excludedWorkers.containsKey(mock("host4"))) Assert.assertTrue(statusTracker.shuttingWorkers.contains(mock("host4"))) @@ -151,49 +78,25 @@ class WorkerStatusTrackerSuite extends CelebornFunSuite { // test register time elapsed Thread.sleep(3000) - val response3 = buildResponse(Array.empty, Array("host5", "host6"), Array.empty, Array.empty) - statusTracker.handleHeartbeatResponse(response3) + val response2 = buildResponse(Array.empty, Array("host5", "host6"), Array.empty) + statusTracker.handleHeartbeatResponse(response2) Assert.assertEquals(statusTracker.excludedWorkers.size(), 2) Assert.assertFalse(statusTracker.excludedWorkers.containsKey(mock("host1"))) - - // test available workers - Assert.assertEquals(statusTracker.availableWorkers.size(), 0) - val response4 = buildResponse( - Array.empty, - Array.empty, - Array.empty, - Array("host5", "host6", "host7", "host8")) - statusTracker.handleHeartbeatResponse(response4) - Assert.assertEquals(statusTracker.availableWorkers.size(), 2) - // available workers won't overwrite excluded workers - Assert.assertEquals(statusTracker.excludedWorkers.size(), 2) - Assert.assertTrue(statusTracker.excludedWorkers.containsKey(mock("host5"))) - Assert.assertTrue(statusTracker.excludedWorkers.containsKey(mock("host6"))) - - // test re heartbeat with available workers - val response5 = buildResponse(Array.empty, Array.empty, Array.empty, Array("host8", "host9")) - statusTracker.handleHeartbeatResponse(response5) - Assert.assertEquals(statusTracker.availableWorkers.size(), 2) - Assert.assertFalse(statusTracker.availableWorkers.contains(mock("host7"))) - Assert.assertTrue(statusTracker.availableWorkers.contains(mock("host8"))) - Assert.assertTrue(statusTracker.availableWorkers.contains(mock("host9"))) } private def buildResponse( excludedWorkerHosts: Array[String], unknownWorkerHosts: Array[String], - shuttingWorkerHosts: Array[String], - availableWorkerHosts: Array[String]): HeartbeatFromApplicationResponse = { + shuttingWorkerHosts: Array[String]): HeartbeatFromApplicationResponse = { val excludedWorkers = mockWorkers(excludedWorkerHosts) val unknownWorkers = mockWorkers(unknownWorkerHosts) val shuttingWorkers = mockWorkers(shuttingWorkerHosts) - val availableWorkers = mockWorkers(availableWorkerHosts) HeartbeatFromApplicationResponse( StatusCode.SUCCESS, excludedWorkers, unknownWorkers, shuttingWorkers, - availableWorkers) + new util.ArrayList[Integer]()) } private def mockWorkers(workerHosts: Array[String]): util.ArrayList[WorkerInfo] = { diff --git a/common/src/main/proto/TransportMessages.proto b/common/src/main/proto/TransportMessages.proto index 63676b44c5d..bc8da08abff 100644 --- a/common/src/main/proto/TransportMessages.proto +++ b/common/src/main/proto/TransportMessages.proto @@ -109,6 +109,8 @@ enum MessageType { NOTIFY_REQUIRED_SEGMENT = 86; BATCH_UNREGISTER_SHUFFLES = 87; BATCH_UNREGISTER_SHUFFLE_RESPONSE= 88; + REVISE_LOST_SHUFFLES = 89; + REVISE_LOST_SHUFFLES_RESPONSE = 90; } enum StreamType { @@ -439,8 +441,7 @@ message PbHeartbeatFromApplication { int64 fileCount = 3 ; string requestId = 4; repeated PbWorkerInfo needCheckedWorkerList = 5; - bool needAvailableWorkers = 6; - bool shouldResponse = 7; + bool shouldResponse = 6; } message PbHeartbeatFromApplicationResponse { @@ -448,7 +449,7 @@ message PbHeartbeatFromApplicationResponse { repeated PbWorkerInfo excludedWorkers = 2; repeated PbWorkerInfo unknownWorkers = 3; repeated PbWorkerInfo shuttingWorkers = 4; - repeated PbWorkerInfo availableWorkers = 5; + repeated int32 registeredShuffles = 5; } message PbCheckQuota { @@ -858,3 +859,14 @@ message PbReportWorkerDecommission { repeated PbWorkerInfo workers = 1; string requestId = 2; } + +message PbReviseLostShuffles{ + string appId = 1; + repeated int32 lostShuffles = 2; + string requestId = 3; +} + +message PbReviseLostShufflesResponse{ + bool success = 1; + string message = 2; +} \ No newline at end of file diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index 7dcaf649051..32d793dcb68 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -1075,6 +1075,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se def registerShuffleFilterExcludedWorkerEnabled: Boolean = get(REGISTER_SHUFFLE_FILTER_EXCLUDED_WORKER_ENABLED) + def reviseLostShufflesEnabled: Boolean = get(REVISE_LOST_SHUFFLES_ENABLED) // ////////////////////////////////////////////////////// // Worker // @@ -5670,6 +5671,14 @@ object CelebornConf extends Logging { .booleanConf .createWithDefault(false) + val REVISE_LOST_SHUFFLES_ENABLED: ConfigEntry[Boolean] = + buildConf("celeborn.client.shuffle.reviseLostShuffles.enabled") + .categories("client") + .version("0.6.0") + .doc("Whether to revise lost shuffles.") + .booleanConf + .createWithDefault(false) + val NETWORK_IO_SASL_TIMEOUT: ConfigEntry[Long] = buildConf("celeborn..io.saslTimeout") .categories("network") diff --git a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala index 01b70df95fd..d0eb85b02df 100644 --- a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala +++ b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala @@ -335,6 +335,26 @@ object ControlMessages extends Logging { .build() } + object ReviseLostShuffles { + def apply( + appId: String, + lostShuffles: java.util.List[Integer], + requestId: String): PbReviseLostShuffles = + PbReviseLostShuffles.newBuilder() + .setAppId(appId) + .addAllLostShuffles(lostShuffles) + .setRequestId(requestId) + .build() + } + + object ReviseLostShufflesResponse { + def apply(success: Boolean, message: String): PbReviseLostShufflesResponse = + PbReviseLostShufflesResponse.newBuilder() + .setSuccess(success) + .setMessage(message) + .build() + } + case class StageEnd(shuffleId: Int) extends MasterMessage case class StageEndResponse(status: StatusCode) @@ -393,7 +413,6 @@ object ControlMessages extends Logging { totalWritten: Long, fileCount: Long, needCheckedWorkerList: util.List[WorkerInfo], - needAvailableWorkers: Boolean, override var requestId: String = ZERO_UUID, shouldResponse: Boolean = false) extends MasterRequestMessage @@ -402,7 +421,7 @@ object ControlMessages extends Logging { excludedWorkers: util.List[WorkerInfo], unknownWorkers: util.List[WorkerInfo], shuttingWorkers: util.List[WorkerInfo], - availableWorkers: util.List[WorkerInfo]) extends Message + registeredShuffles: util.List[Integer]) extends Message case class CheckQuota(userIdentifier: UserIdentifier) extends Message @@ -567,6 +586,12 @@ object ControlMessages extends Logging { case pb: PbReportShuffleFetchFailureResponse => new TransportMessage(MessageType.REPORT_SHUFFLE_FETCH_FAILURE_RESPONSE, pb.toByteArray) + case pb: PbReviseLostShuffles => + new TransportMessage(MessageType.REVISE_LOST_SHUFFLES, pb.toByteArray) + + case pb: PbReviseLostShufflesResponse => + new TransportMessage(MessageType.REVISE_LOST_SHUFFLES_RESPONSE, pb.toByteArray) + case pb: PbReportBarrierStageAttemptFailure => new TransportMessage(MessageType.REPORT_BARRIER_STAGE_ATTEMPT_FAILURE, pb.toByteArray) @@ -784,7 +809,6 @@ object ControlMessages extends Logging { totalWritten, fileCount, needCheckedWorkerList, - needAvailableWorkers, requestId, shouldResponse) => val payload = PbHeartbeatFromApplication.newBuilder() @@ -794,7 +818,6 @@ object ControlMessages extends Logging { .setFileCount(fileCount) .addAllNeedCheckedWorkerList(needCheckedWorkerList.asScala.map( PbSerDeUtils.toPbWorkerInfo(_, true, true)).toList.asJava) - .setNeedAvailableWorkers(needAvailableWorkers) .setShouldResponse(shouldResponse) .build().toByteArray new TransportMessage(MessageType.HEARTBEAT_FROM_APPLICATION, payload) @@ -804,7 +827,7 @@ object ControlMessages extends Logging { excludedWorkers, unknownWorkers, shuttingWorkers, - availableWorkers) => + registeredShuffles) => val payload = PbHeartbeatFromApplicationResponse.newBuilder() .setStatus(statusCode.getValue) .addAllExcludedWorkers( @@ -813,8 +836,7 @@ object ControlMessages extends Logging { unknownWorkers.asScala.map(PbSerDeUtils.toPbWorkerInfo(_, true, true)).toList.asJava) .addAllShuttingWorkers( shuttingWorkers.asScala.map(PbSerDeUtils.toPbWorkerInfo(_, true, true)).toList.asJava) - .addAllAvailableWorkers( - availableWorkers.asScala.map(PbSerDeUtils.toPbWorkerInfo(_, true, true)).toList.asJava) + .addAllRegisteredShuffles(registeredShuffles) .build().toByteArray new TransportMessage(MessageType.HEARTBEAT_FROM_APPLICATION_RESPONSE, payload) @@ -1185,7 +1207,6 @@ object ControlMessages extends Logging { new util.ArrayList[WorkerInfo]( pbHeartbeatFromApplication.getNeedCheckedWorkerListList.asScala .map(PbSerDeUtils.fromPbWorkerInfo).toList.asJava), - pbHeartbeatFromApplication.getNeedAvailableWorkers, pbHeartbeatFromApplication.getRequestId, pbHeartbeatFromApplication.getShouldResponse) @@ -1200,8 +1221,7 @@ object ControlMessages extends Logging { .map(PbSerDeUtils.fromPbWorkerInfo).toList.asJava, pbHeartbeatFromApplicationResponse.getShuttingWorkersList.asScala .map(PbSerDeUtils.fromPbWorkerInfo).toList.asJava, - pbHeartbeatFromApplicationResponse.getAvailableWorkersList.asScala - .map(PbSerDeUtils.fromPbWorkerInfo).toList.asJava) + pbHeartbeatFromApplicationResponse.getRegisteredShufflesList) case CHECK_QUOTA_VALUE => val pbCheckAvailable = PbCheckQuota.parseFrom(message.getPayload) diff --git a/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala b/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala index 30f00f179f9..f9f01b9a7d8 100644 --- a/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala +++ b/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala @@ -450,7 +450,7 @@ object PbSerDeUtils { def toPbSnapshotMetaInfo( estimatedPartitionSize: java.lang.Long, - registeredShuffle: java.util.Set[String], + registeredShuffle: java.util.Map[String, java.util.Set[Integer]], hostnameSet: java.util.Set[String], excludedWorkers: java.util.Set[WorkerInfo], manuallyExcludedWorkers: java.util.Set[WorkerInfo], @@ -468,7 +468,9 @@ object PbSerDeUtils { decommissionWorkers: java.util.Set[WorkerInfo]): PbSnapshotMetaInfo = { val builder = PbSnapshotMetaInfo.newBuilder() .setEstimatedPartitionSize(estimatedPartitionSize) - .addAllRegisteredShuffle(registeredShuffle) + .addAllRegisteredShuffle(registeredShuffle.asScala.flatMap { appIdAndShuffleId => + appIdAndShuffleId._2.asScala.map(i => Utils.makeShuffleKey(appIdAndShuffleId._1, i)) + }.asJava) .addAllHostnameSet(hostnameSet) .addAllExcludedWorkers(excludedWorkers.asScala.map(toPbWorkerInfo(_, true, false)).asJava) .addAllManuallyExcludedWorkers(manuallyExcludedWorkers.asScala diff --git a/docs/configuration/client.md b/docs/configuration/client.md index dc3b2e152e3..ea2b420ec59 100644 --- a/docs/configuration/client.md +++ b/docs/configuration/client.md @@ -104,6 +104,7 @@ license: | | celeborn.client.shuffle.partitionSplit.threshold | 1G | false | Shuffle file size threshold, if file size exceeds this, trigger split. | 0.3.0 | celeborn.shuffle.partitionSplit.threshold | | celeborn.client.shuffle.rangeReadFilter.enabled | false | false | If a spark application have skewed partition, this value can set to true to improve performance. | 0.2.0 | celeborn.shuffle.rangeReadFilter.enabled | | celeborn.client.shuffle.register.filterExcludedWorker.enabled | false | false | Whether to filter excluded worker when register shuffle. | 0.4.0 | | +| celeborn.client.shuffle.reviseLostShuffles.enabled | false | false | Whether to revise lost shuffles. | 0.6.0 | | | celeborn.client.slot.assign.maxWorkers | 10000 | false | Max workers that slots of one shuffle can be allocated on. Will choose the smaller positive one from Master side and Client side, see `celeborn.master.slot.assign.maxWorkers`. | 0.3.1 | | | celeborn.client.spark.fetch.throwsFetchFailure | false | false | client throws FetchFailedException instead of CelebornIOException | 0.4.0 | | | celeborn.client.spark.push.dynamicWriteMode.enabled | false | false | Whether to dynamically switch push write mode based on conditions.If true, shuffle mode will be only determined by partition count | 0.5.0 | | diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java index 1631e90d34c..5c36dd23fa5 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java @@ -22,10 +22,7 @@ import java.io.FileInputStream; import java.io.IOException; import java.nio.file.Files; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Set; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -33,6 +30,7 @@ import java.util.stream.Collectors; import scala.Option; +import scala.Tuple2; import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.net.Node; @@ -63,7 +61,8 @@ public abstract class AbstractMetaManager implements IMetadataHandler { private static final Logger LOG = LoggerFactory.getLogger(AbstractMetaManager.class); // Metadata for master service - public final Set registeredShuffle = ConcurrentHashMap.newKeySet(); + public final Map> registeredAppAndShuffles = + JavaUtils.newConcurrentHashMap(); public final Set hostnameSet = ConcurrentHashMap.newKeySet(); public final Set workers = ConcurrentHashMap.newKeySet(); @@ -92,9 +91,12 @@ public abstract class AbstractMetaManager implements IMetadataHandler { public void updateRequestSlotsMeta( String shuffleKey, String hostName, Map> workerWithAllocations) { - registeredShuffle.add(shuffleKey); + Tuple2 appIdShuffleId = Utils.splitShuffleKey(shuffleKey); + registeredAppAndShuffles + .computeIfAbsent(appIdShuffleId._1(), v -> new HashSet<>()) + .add((Integer) appIdShuffleId._2); - String appId = Utils.splitShuffleKey(shuffleKey)._1; + String appId = appIdShuffleId._1; appHeartbeatTime.compute( appId, (applicationId, oldTimestamp) -> { @@ -111,11 +113,29 @@ public void updateRequestSlotsMeta( } public void updateUnregisterShuffleMeta(String shuffleKey) { - registeredShuffle.remove(shuffleKey); + Tuple2 appIdShuffleId = Utils.splitShuffleKey(shuffleKey); + Set shuffleIds = registeredAppAndShuffles.get(appIdShuffleId._1()); + if (shuffleIds != null) { + shuffleIds.remove(appIdShuffleId._2); + registeredAppAndShuffles.compute( + appIdShuffleId._1(), + (s, shuffles) -> { + if (shuffles.size() == 0) { + return null; + } + return shuffles; + }); + } } public void updateBatchUnregisterShuffleMeta(List shuffleKeys) { - registeredShuffle.removeAll(shuffleKeys); + for (String shuffleKey : shuffleKeys) { + Tuple2 appIdShuffleId = Utils.splitShuffleKey(shuffleKey); + String appId = appIdShuffleId._1; + if (registeredAppAndShuffles.containsKey(appId)) { + registeredAppAndShuffles.get(appId).remove(appIdShuffleId._2); + } + } } public void updateAppHeartbeatMeta(String appId, long time, long totalWritten, long fileCount) { @@ -125,7 +145,7 @@ public void updateAppHeartbeatMeta(String appId, long time, long totalWritten, l } public void updateAppLostMeta(String appId) { - registeredShuffle.removeIf(shuffleKey -> shuffleKey.startsWith(appId)); + registeredAppAndShuffles.remove(appId); appHeartbeatTime.remove(appId); applicationMetas.remove(appId); } @@ -136,6 +156,14 @@ public void updateWorkerExcludeMeta( workersToRemove.forEach(manuallyExcludedWorkers::remove); } + public void reviseLostShuffles(String appId, List lostShuffles) { + registeredAppAndShuffles.computeIfAbsent(appId, v -> new HashSet<>()).addAll(lostShuffles); + } + + public void deleteApp(String appId) { + registeredAppAndShuffles.remove(appId); + } + public void updateWorkerLostMeta( String host, int rpcPort, int pushPort, int fetchPort, int replicatePort) { WorkerInfo worker = new WorkerInfo(host, rpcPort, pushPort, fetchPort, replicatePort); @@ -280,7 +308,7 @@ public void writeMetaInfoToFile(File file) throws IOException, RuntimeException byte[] snapshotBytes = PbSerDeUtils.toPbSnapshotMetaInfo( estimatedPartitionSize, - registeredShuffle, + registeredAppAndShuffles, hostnameSet, excludedWorkers, manuallyExcludedWorkers, @@ -313,7 +341,12 @@ public void restoreMetaFromFile(File file) throws IOException { estimatedPartitionSize = snapshotMetaInfo.getEstimatedPartitionSize(); - registeredShuffle.addAll(snapshotMetaInfo.getRegisteredShuffleList()); + for (String shuffleKey : snapshotMetaInfo.getRegisteredShuffleList()) { + Tuple2 appIdShuffleId = Utils.splitShuffleKey(shuffleKey); + registeredAppAndShuffles + .computeIfAbsent(appIdShuffleId._1, v -> new HashSet<>()) + .add((Integer) appIdShuffleId._2); + } hostnameSet.addAll(snapshotMetaInfo.getHostnameSetList()); excludedWorkers.addAll( snapshotMetaInfo.getExcludedWorkersList().stream() @@ -329,9 +362,8 @@ public void restoreMetaFromFile(File file) throws IOException { .collect(Collectors.toSet())); appHeartbeatTime.putAll(snapshotMetaInfo.getAppHeartbeatTimeMap()); - registeredShuffle.forEach( - shuffleKey -> { - String appId = Utils.splitShuffleKey(shuffleKey)._1; + registeredAppAndShuffles.forEach( + (appId, shuffleId) -> { if (!appHeartbeatTime.containsKey(appId)) { appHeartbeatTime.put(appId, System.currentTimeMillis()); } @@ -405,15 +437,16 @@ public void restoreMetaFromFile(File file) throws IOException { LOG.info( "Worker size: {}, Registered shuffle size: {}. Worker excluded list size: {}. Manually Excluded list size: {}", workers.size(), - registeredShuffle.size(), + registeredAppAndShuffles.size(), excludedWorkers.size(), manuallyExcludedWorkers.size()); workers.forEach(workerInfo -> LOG.info(workerInfo.toString())); - registeredShuffle.forEach(shuffle -> LOG.info("RegisteredShuffle {}", shuffle)); + registeredAppAndShuffles.forEach( + (appId, shuffleId) -> LOG.info("RegisteredShuffle {}-{}", appId, shuffleId)); } private void cleanUpState() { - registeredShuffle.clear(); + registeredAppAndShuffles.clear(); hostnameSet.clear(); workers.clear(); lostWorkers.clear(); @@ -504,4 +537,8 @@ public boolean isWorkerAvailable(WorkerInfo workerInfo) { public void updateApplicationMeta(ApplicationMeta applicationMeta) { applicationMetas.putIfAbsent(applicationMeta.appId(), applicationMeta); } + + public int registeredShuffleCount() { + return registeredAppAndShuffles.values().stream().mapToInt(Set::size).sum(); + } } diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java index 7eb72679e04..9e17728d61a 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java @@ -46,6 +46,8 @@ void handleAppHeartbeat( void handleWorkerExclude( List workersToAdd, List workersToRemove, String requestId); + void handleReviseLostShuffles(String appId, List shuffles, String requestId); + void handleWorkerLost( String host, int rpcPort, int pushPort, int fetchPort, int replicatePort, String requestId); diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java index b311b1b45d1..39bac5330d6 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java @@ -87,6 +87,11 @@ public void handleWorkerExclude( updateWorkerExcludeMeta(workersToAdd, workersToRemove); } + @Override + public void handleReviseLostShuffles(String appId, List shuffles, String requestId) { + reviseLostShuffles(appId, shuffles); + } + @Override public void handleWorkerLost( String host, int rpcPort, int pushPort, int fetchPort, int replicatePort, String requestId) { diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java index d6c95afd4ce..1d5117991d0 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java @@ -194,6 +194,25 @@ public void handleWorkerExclude( } } + @Override + public void handleReviseLostShuffles(String appId, List shuffles, String requestId) { + try { + ratisServer.submitRequest( + ResourceRequest.newBuilder() + .setCmdType(Type.ReviseLostShuffles) + .setRequestId(requestId) + .setReviseLostShufflesRequest( + ResourceProtos.ReviseLostShufflesRequest.newBuilder() + .setAppId(appId) + .addAllLostShuffles(shuffles) + .build()) + .build()); + } catch (CelebornRuntimeException e) { + LOG.error("Handle revise lost shuffle failed!", e); + throw e; + } + } + @Override public void handleWorkerLost( String host, int rpcPort, int pushPort, int fetchPort, int replicatePort, String requestId) { diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java index a7948dde7ce..4ec1dac371d 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java @@ -22,6 +22,7 @@ import java.util.*; import java.util.stream.Collectors; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -101,7 +102,16 @@ public ResourceResponse handleWriteRequest(ResourceProtos.ResourceRequest reques Map userResourceConsumption; Map estimatedAppDiskUsage = new HashMap<>(); WorkerStatus workerStatus; + List lostShuffles; switch (cmdType) { + case ReviseLostShuffles: + appId = request.getReviseLostShufflesRequest().getAppId(); + lostShuffles = request.getReviseLostShufflesRequest().getLostShufflesList(); + LOG.info( + "Handle revise lost shuffles for {} {}", appId, StringUtils.join(lostShuffles, ",")); + metaSystem.reviseLostShuffles(appId, lostShuffles); + break; + case RequestSlots: shuffleKey = request.getRequestSlotsRequest().getShuffleKey(); LOG.debug("Handle request slots for {}", shuffleKey); diff --git a/master/src/main/proto/Resource.proto b/master/src/main/proto/Resource.proto index b83599409f8..acb1d6097ad 100644 --- a/master/src/main/proto/Resource.proto +++ b/master/src/main/proto/Resource.proto @@ -41,6 +41,8 @@ enum Type { ApplicationMeta = 26; ReportWorkerDecommission = 27; BatchUnRegisterShuffle = 28; + + ReviseLostShuffles = 29; } enum WorkerEventType { @@ -75,6 +77,7 @@ message ResourceRequest { optional ApplicationMetaRequest applicationMetaRequest = 23; optional ReportWorkerDecommissionRequest reportWorkerDecommissionRequest = 24; optional BatchUnregisterShuffleRequest batchUnregisterShuffleRequest = 25; + optional ReviseLostShufflesRequest reviseLostShufflesRequest = 102; } message DiskInfo { @@ -243,3 +246,8 @@ message ApplicationMetaRequest { required string appId = 1; optional string secret = 2; } + +message ReviseLostShufflesRequest { + required string appId = 1 ; + repeated int32 lostShuffles = 2 ; +} \ No newline at end of file diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala index 9a4946ca9d7..86baaa92129 100644 --- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala +++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala @@ -20,6 +20,7 @@ package org.apache.celeborn.service.deploy.master import java.io.IOException import java.net.BindException import java.util +import java.util.Collections import java.util.concurrent.{ExecutorService, ScheduledFuture, TimeUnit} import java.util.concurrent.atomic.AtomicBoolean import java.util.function.ToLongFunction @@ -224,7 +225,7 @@ private[celeborn] class Master( private var hadoopFs: util.Map[StorageInfo.Type, FileSystem] = _ masterSource.addGauge(MasterSource.REGISTERED_SHUFFLE_COUNT) { () => - statusSystem.registeredShuffle.size + statusSystem.registeredShuffleCount } masterSource.addGauge(MasterSource.WORKER_COUNT) { () => statusSystem.workers.size } masterSource.addGauge(MasterSource.LOST_WORKER_COUNT) { () => statusSystem.lostWorkers.size } @@ -406,7 +407,6 @@ private[celeborn] class Master( totalWritten, fileCount, needCheckedWorkerList, - needAvailableWorkers, requestId, shouldResponse) => logDebug(s"Received heartbeat from app $appId") @@ -419,7 +419,6 @@ private[celeborn] class Master( totalWritten, fileCount, needCheckedWorkerList, - needAvailableWorkers, requestId, shouldResponse)) @@ -532,6 +531,11 @@ private[celeborn] class Master( context, handleWorkerDecommission(context, workers, requestId)) + case pb: PbReviseLostShuffles => + executeWithLeaderChecker( + context, + handleReviseLostShuffle(context, pb.getAppId, pb.getLostShufflesList, pb.getRequestId)) + case pb: PbWorkerExclude => val workersToAdd = new util.ArrayList[WorkerInfo](pb.getWorkersToAddList .asScala.map(PbSerDeUtils.fromPbWorkerInfo).toList.asJava) @@ -686,7 +690,11 @@ private[celeborn] class Master( val expiredShuffleKeys = new util.HashSet[String] activeShuffleKeys.asScala.foreach { shuffleKey => - if (!statusSystem.registeredShuffle.contains(shuffleKey)) { + val (appId, shuffleId) = Utils.splitShuffleKey(shuffleKey) + val shuffleIds = statusSystem.registeredAppAndShuffles.get(appId) + if (shuffleIds == null || !shuffleIds.contains(shuffleId)) { + logWarning( + s"Shuffle $shuffleKey expired on $host:$rpcPort:$pushPort:$fetchPort:$replicatePort.") expiredShuffleKeys.add(shuffleKey) } } @@ -706,6 +714,23 @@ private[celeborn] class Master( } } + private def handleReviseLostShuffle( + context: RpcCallContext, + appId: String, + lostShuffles: java.util.List[Integer], + requestId: String) = { + try { + logInfo(s"Handle lost shuffles for ${appId} ${lostShuffles} ") + statusSystem.handleReviseLostShuffles(appId, lostShuffles, requestId); + if (context != null) { + context.reply(ReviseLostShufflesResponse(true, "")) + } + } catch { + case e: Exception => + context.reply(ReviseLostShufflesResponse(false, e.getMessage)) + } + } + private def handleWorkerExclude( context: RpcCallContext, workersToAdd: util.List[WorkerInfo], @@ -1088,7 +1113,6 @@ private[celeborn] class Master( totalWritten: Long, fileCount: Long, needCheckedWorkerList: util.List[WorkerInfo], - needAvailableWorkers: Boolean, requestId: String, shouldResponse: Boolean): Unit = { statusSystem.handleAppHeartbeat( @@ -1102,12 +1126,8 @@ private[celeborn] class Master( if (shouldResponse) { // UserResourceConsumption and DiskInfo are eliminated from WorkerInfo // during serialization of HeartbeatFromApplicationResponse - var availableWorksSentToClient = new util.ArrayList[WorkerInfo]() - if (needAvailableWorkers) { - availableWorksSentToClient = new util.ArrayList[WorkerInfo]( - statusSystem.workers.asScala.filter(worker => - statusSystem.isWorkerAvailable(worker)).asJava) - } + var appRelatedShuffles = + statusSystem.registeredAppAndShuffles.getOrDefault(appId, Collections.emptySet()) context.reply(HeartbeatFromApplicationResponse( StatusCode.SUCCESS, new util.ArrayList( @@ -1115,7 +1135,7 @@ private[celeborn] class Master( needCheckedWorkerList, new util.ArrayList[WorkerInfo]( (statusSystem.shutdownWorkers.asScala ++ statusSystem.decommissionWorkers.asScala).asJava), - availableWorksSentToClient)) + new util.ArrayList(appRelatedShuffles))) } else { context.reply(OneWayMessageResponse) } @@ -1349,8 +1369,11 @@ private[celeborn] class Master( override def getShuffleList: String = { val sb = new StringBuilder sb.append("======================= Shuffle Key List ============================\n") - statusSystem.registeredShuffle.asScala.foreach { shuffleKey => - sb.append(s"$shuffleKey\n") + statusSystem.registeredAppAndShuffles.asScala.foreach { shuffleKey => + val appId = shuffleKey._1 + shuffleKey._2.asScala.foreach { id => + sb.append(s"$appId-${id}\n") + } } sb.toString() } @@ -1452,6 +1475,14 @@ private[celeborn] class Master( } } + override def reviseLostShuffles(appId: String, shuffles: java.util.List[Integer]): Unit = { + statusSystem.reviseLostShuffles(appId, shuffles) + } + + override def deleteApps(appIds: String): Unit = { + appIds.split(",").foreach(id => statusSystem.deleteApp(id)) + } + override def getWorkerEventInfo(): String = { val sb = new StringBuilder sb.append("======================= Workers Event in Master ========================\n") diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/ApiMasterResource.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/ApiMasterResource.scala index ac326c5fb94..447a9b1b233 100644 --- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/ApiMasterResource.scala +++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/ApiMasterResource.scala @@ -142,4 +142,5 @@ class ApiMasterResource extends ApiRequestContext { workerList)._2) sb.toString() } + } diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/v1/ApplicationResource.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/v1/ApplicationResource.scala index 7c08e335e14..18cab93e8a6 100644 --- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/v1/ApplicationResource.scala +++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/v1/ApplicationResource.scala @@ -17,7 +17,8 @@ package org.apache.celeborn.service.deploy.master.http.api.v1 -import javax.ws.rs.{Consumes, GET, Path, Produces} +import java.util +import javax.ws.rs.{Consumes, GET, Path, Produces, QueryParam} import javax.ws.rs.core.MediaType import scala.collection.JavaConverters._ @@ -27,7 +28,7 @@ import io.swagger.v3.oas.annotations.responses.ApiResponse import io.swagger.v3.oas.annotations.tags.Tag import org.apache.celeborn.common.util.Utils -import org.apache.celeborn.rest.v1.model.{AppDiskUsageData, AppDiskUsageSnapshotData, AppDiskUsageSnapshotsResponse, ApplicationHeartbeatData, ApplicationsHeartbeatResponse, HostnamesResponse} +import org.apache.celeborn.rest.v1.model.{AppDiskUsageData, AppDiskUsageSnapshotData, AppDiskUsageSnapshotsResponse, ApplicationHeartbeatData, ApplicationsHeartbeatResponse, HandleResponse, HostnamesResponse} import org.apache.celeborn.server.common.http.api.ApiRequestContext import org.apache.celeborn.service.deploy.master.Master @@ -94,4 +95,42 @@ class ApplicationResource extends ApiRequestContext { def hostnames(): HostnamesResponse = { new HostnamesResponse().hostnames(statusSystem.hostnameSet.asScala.toSeq.asJava) } + + @Path("/reviseLostShuffles") + @ApiResponse( + responseCode = "200", + content = Array(new Content( + mediaType = MediaType.APPLICATION_JSON, + schema = new Schema(implementation = classOf[HandleResponse]))), + description = + "Revise lost shuffles") + @GET + def reviseLostShuffles( + @QueryParam("app") appId: String, + @QueryParam("shuffleIds") shufflesIds: String): HandleResponse = { + val shuffles = new util.ArrayList[Integer]() + shufflesIds.split(",").foreach { p => + shuffles.add(Integer.parseInt(p)) + } + if (!shuffles.isEmpty) { + httpService.reviseLostShuffles(appId, shuffles) + } + new HandleResponse().success(true).message("revise lost shuffle done") + } + + @Path("/deleteApps") + @ApiResponse( + responseCode = "200", + content = Array(new Content( + mediaType = MediaType.APPLICATION_JSON, + schema = new Schema(implementation = classOf[HandleResponse]))), + description = + "Delete resource of an app") + @GET + def deleteApp( + @QueryParam("apps") apps: String): HandleResponse = { + httpService.deleteApps(apps) + new HandleResponse().success(true).message(s"delete shuffles of app ${apps}") + } + } diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/v1/ShuffleResource.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/v1/ShuffleResource.scala index 4b7319d4675..fbdcd920ecf 100644 --- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/v1/ShuffleResource.scala +++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/v1/ShuffleResource.scala @@ -17,6 +17,7 @@ package org.apache.celeborn.service.deploy.master.http.api.v1 +import java.util import javax.ws.rs.{Consumes, GET, Produces} import javax.ws.rs.core.MediaType @@ -44,6 +45,13 @@ class ShuffleResource extends ApiRequestContext { "List all running shuffle keys of the service. It will return all running shuffle's key of the cluster.") @GET def shuffles: ShufflesResponse = { - new ShufflesResponse().shuffleIds(statusSystem.registeredShuffle.asScala.toSeq.asJava) + val shuffles = new util.ArrayList[String]() + statusSystem.registeredAppAndShuffles.asScala.foreach { shuffleKey => + val appId = shuffleKey._1 + shuffleKey._2.asScala.foreach { id => + shuffles.add(s"$appId-${id}") + } + } + new ShufflesResponse().shuffleIds(shuffles) } } diff --git a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java index b571e49f6df..65660158ed6 100644 --- a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java +++ b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java @@ -472,11 +472,11 @@ public void testHandleAppLost() { statusSystem.handleApplicationMeta(new ApplicationMeta(APPID1, "testSecret")); statusSystem.handleRequestSlots(SHUFFLEKEY1, HOSTNAME1, workersToAllocate, getNewReqeustId()); - assertEquals(1, statusSystem.registeredShuffle.size()); + assertEquals(1, statusSystem.registeredAppAndShuffles.size()); assertEquals(1, statusSystem.applicationMetas.size()); statusSystem.handleAppLost(APPID1, getNewReqeustId()); - assertTrue(statusSystem.registeredShuffle.isEmpty()); + assertTrue(statusSystem.registeredAppAndShuffles.isEmpty()); assertTrue(statusSystem.applicationMetas.isEmpty()); } @@ -545,11 +545,11 @@ public void testHandleUnRegisterShuffle() { statusSystem.handleRequestSlots(SHUFFLEKEY1, HOSTNAME1, workersToAllocate, getNewReqeustId()); - assertEquals(1, statusSystem.registeredShuffle.size()); + assertEquals(1, statusSystem.registeredAppAndShuffles.size()); statusSystem.handleUnRegisterShuffle(SHUFFLEKEY1, getNewReqeustId()); - assertTrue(statusSystem.registeredShuffle.isEmpty()); + assertTrue(statusSystem.registeredAppAndShuffles.isEmpty()); } @Test @@ -621,17 +621,17 @@ public void testHandleBatchUnRegisterShuffle() { shuffleKeysAll.add(shuffleKey); statusSystem.handleRequestSlots(shuffleKey, HOSTNAME1, workersToAllocate, getNewReqeustId()); } - Assert.assertEquals(4, statusSystem.registeredShuffle.size()); + Assert.assertEquals(4, statusSystem.registeredShuffleCount()); List shuffleKeys1 = new ArrayList<>(); shuffleKeys1.add(shuffleKeysAll.get(0)); statusSystem.handleBatchUnRegisterShuffles(shuffleKeys1, getNewReqeustId()); - Assert.assertEquals(3, statusSystem.registeredShuffle.size()); + Assert.assertEquals(3, statusSystem.registeredShuffleCount()); statusSystem.handleBatchUnRegisterShuffles(shuffleKeysAll, getNewReqeustId()); - Assert.assertTrue(statusSystem.registeredShuffle.isEmpty()); + Assert.assertTrue(statusSystem.registeredShuffleCount() == 0); } @Test diff --git a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java index 851d87c1c41..4f2fae78abe 100644 --- a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java +++ b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java @@ -751,16 +751,16 @@ public void testHandleAppLost() throws InterruptedException { statusSystem.handleRequestSlots(SHUFFLEKEY1, HOSTNAME1, workersToAllocate, getNewReqeustId()); Thread.sleep(3000L); - Assert.assertEquals(1, STATUSSYSTEM1.registeredShuffle.size()); - Assert.assertEquals(1, STATUSSYSTEM2.registeredShuffle.size()); - Assert.assertEquals(1, STATUSSYSTEM3.registeredShuffle.size()); + Assert.assertEquals(1, STATUSSYSTEM1.registeredAppAndShuffles.size()); + Assert.assertEquals(1, STATUSSYSTEM2.registeredAppAndShuffles.size()); + Assert.assertEquals(1, STATUSSYSTEM3.registeredAppAndShuffles.size()); statusSystem.handleAppLost(APPID1, getNewReqeustId()); Thread.sleep(3000L); - Assert.assertTrue(STATUSSYSTEM1.registeredShuffle.isEmpty()); - Assert.assertTrue(STATUSSYSTEM2.registeredShuffle.isEmpty()); - Assert.assertTrue(STATUSSYSTEM3.registeredShuffle.isEmpty()); + Assert.assertTrue(STATUSSYSTEM1.registeredAppAndShuffles.isEmpty()); + Assert.assertTrue(STATUSSYSTEM2.registeredAppAndShuffles.isEmpty()); + Assert.assertTrue(STATUSSYSTEM3.registeredAppAndShuffles.isEmpty()); } @Test @@ -832,16 +832,16 @@ public void testHandleUnRegisterShuffle() throws InterruptedException { statusSystem.handleRequestSlots(SHUFFLEKEY1, HOSTNAME1, workersToAllocate, getNewReqeustId()); Thread.sleep(3000L); - Assert.assertEquals(1, STATUSSYSTEM1.registeredShuffle.size()); - Assert.assertEquals(1, STATUSSYSTEM2.registeredShuffle.size()); - Assert.assertEquals(1, STATUSSYSTEM3.registeredShuffle.size()); + Assert.assertEquals(1, STATUSSYSTEM1.registeredAppAndShuffles.size()); + Assert.assertEquals(1, STATUSSYSTEM2.registeredAppAndShuffles.size()); + Assert.assertEquals(1, STATUSSYSTEM3.registeredAppAndShuffles.size()); statusSystem.handleUnRegisterShuffle(SHUFFLEKEY1, getNewReqeustId()); Thread.sleep(3000L); - Assert.assertTrue(STATUSSYSTEM1.registeredShuffle.isEmpty()); - Assert.assertTrue(STATUSSYSTEM2.registeredShuffle.isEmpty()); - Assert.assertTrue(STATUSSYSTEM3.registeredShuffle.isEmpty()); + Assert.assertTrue(STATUSSYSTEM1.registeredAppAndShuffles.isEmpty()); + Assert.assertTrue(STATUSSYSTEM2.registeredAppAndShuffles.isEmpty()); + Assert.assertTrue(STATUSSYSTEM3.registeredAppAndShuffles.isEmpty()); } @Test @@ -919,25 +919,25 @@ public void testHandleBatchUnRegisterShuffle() throws InterruptedException { Thread.sleep(3000L); - Assert.assertEquals(4, STATUSSYSTEM1.registeredShuffle.size()); - Assert.assertEquals(4, STATUSSYSTEM2.registeredShuffle.size()); - Assert.assertEquals(4, STATUSSYSTEM3.registeredShuffle.size()); + Assert.assertEquals(4, STATUSSYSTEM1.registeredShuffleCount()); + Assert.assertEquals(4, STATUSSYSTEM2.registeredShuffleCount()); + Assert.assertEquals(4, STATUSSYSTEM3.registeredShuffleCount()); List shuffleKeys1 = new ArrayList<>(); shuffleKeys1.add(shuffleKeysAll.get(0)); statusSystem.handleBatchUnRegisterShuffles(shuffleKeys1, getNewReqeustId()); Thread.sleep(3000L); - Assert.assertEquals(3, STATUSSYSTEM1.registeredShuffle.size()); - Assert.assertEquals(3, STATUSSYSTEM2.registeredShuffle.size()); - Assert.assertEquals(3, STATUSSYSTEM3.registeredShuffle.size()); + Assert.assertEquals(3, STATUSSYSTEM1.registeredShuffleCount()); + Assert.assertEquals(3, STATUSSYSTEM2.registeredShuffleCount()); + Assert.assertEquals(3, STATUSSYSTEM3.registeredShuffleCount()); statusSystem.handleBatchUnRegisterShuffles(shuffleKeysAll, getNewReqeustId()); Thread.sleep(3000L); - Assert.assertTrue(STATUSSYSTEM1.registeredShuffle.isEmpty()); - Assert.assertTrue(STATUSSYSTEM2.registeredShuffle.isEmpty()); - Assert.assertTrue(STATUSSYSTEM3.registeredShuffle.isEmpty()); + Assert.assertTrue(STATUSSYSTEM1.registeredShuffleCount() == 0); + Assert.assertTrue(STATUSSYSTEM2.registeredShuffleCount() == 0); + Assert.assertTrue(STATUSSYSTEM3.registeredShuffleCount() == 0); } @Test @@ -1085,21 +1085,21 @@ public void testHandleWorkerHeartbeat() throws InterruptedException { @Before public void resetStatus() { - STATUSSYSTEM1.registeredShuffle.clear(); + STATUSSYSTEM1.registeredAppAndShuffles.clear(); STATUSSYSTEM1.hostnameSet.clear(); STATUSSYSTEM1.workers.clear(); STATUSSYSTEM1.appHeartbeatTime.clear(); STATUSSYSTEM1.excludedWorkers.clear(); STATUSSYSTEM1.workerLostEvents.clear(); - STATUSSYSTEM2.registeredShuffle.clear(); + STATUSSYSTEM2.registeredAppAndShuffles.clear(); STATUSSYSTEM2.hostnameSet.clear(); STATUSSYSTEM2.workers.clear(); STATUSSYSTEM2.appHeartbeatTime.clear(); STATUSSYSTEM2.excludedWorkers.clear(); STATUSSYSTEM2.workerLostEvents.clear(); - STATUSSYSTEM3.registeredShuffle.clear(); + STATUSSYSTEM3.registeredAppAndShuffles.clear(); STATUSSYSTEM3.hostnameSet.clear(); STATUSSYSTEM3.workers.clear(); STATUSSYSTEM3.appHeartbeatTime.clear(); @@ -1414,6 +1414,19 @@ public void testHandleWorkerEvent() throws InterruptedException { STATUSSYSTEM1.workerEventInfos.get(workerInfo2).getEventType()); } + @Test + public void testReviseShuffles() throws InterruptedException { + AbstractMetaManager statusSystem = pickLeaderStatusSystem(); + Assert.assertNotNull(statusSystem); + + statusSystem.handleReviseLostShuffles( + "app-1", Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8), getNewReqeustId()); + Thread.sleep(1000l); + Assert.assertEquals(STATUSSYSTEM1.registeredShuffleCount(), 8); + Assert.assertEquals(STATUSSYSTEM2.registeredShuffleCount(), 8); + Assert.assertEquals(STATUSSYSTEM3.registeredShuffleCount(), 8); + } + @AfterClass public static void testNotifyLogFailed() { List list = Arrays.asList(RATISSERVER1, RATISSERVER2, RATISSERVER3); diff --git a/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/master/ApplicationApi.java b/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/master/ApplicationApi.java index 281b821b277..2afac4729ed 100644 --- a/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/master/ApplicationApi.java +++ b/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/master/ApplicationApi.java @@ -27,6 +27,7 @@ import org.apache.celeborn.rest.v1.model.AppDiskUsageSnapshotsResponse; import org.apache.celeborn.rest.v1.model.ApplicationsHeartbeatResponse; +import org.apache.celeborn.rest.v1.model.HandleResponse; import org.apache.celeborn.rest.v1.model.HostnamesResponse; @@ -48,6 +49,76 @@ public ApplicationApi(ApiClient apiClient) { super(apiClient); } + /** + * + * Delete resource of apps + * @param apps (optional) + * @return HandleResponse + * @throws ApiException if fails to make API call + */ + public HandleResponse deleteApps(String apps) throws ApiException { + return this.deleteApps(apps, Collections.emptyMap()); + } + + + /** + * + * Delete resource of apps + * @param apps (optional) + * @param additionalHeaders additionalHeaders for this call + * @return HandleResponse + * @throws ApiException if fails to make API call + */ + public HandleResponse deleteApps(String apps, Map additionalHeaders) throws ApiException { + Object localVarPostBody = null; + + // create path and map variables + String localVarPath = "/api/v1/applications/deleteApps"; + + StringJoiner localVarQueryStringJoiner = new StringJoiner("&"); + String localVarQueryParameterBaseName; + List localVarQueryParams = new ArrayList(); + List localVarCollectionQueryParams = new ArrayList(); + Map localVarHeaderParams = new HashMap(); + Map localVarCookieParams = new HashMap(); + Map localVarFormParams = new HashMap(); + + localVarQueryParams.addAll(apiClient.parameterToPair("apps", apps)); + + localVarHeaderParams.putAll(additionalHeaders); + + + + final String[] localVarAccepts = { + "application/json" + }; + final String localVarAccept = apiClient.selectHeaderAccept(localVarAccepts); + + final String[] localVarContentTypes = { + + }; + final String localVarContentType = apiClient.selectHeaderContentType(localVarContentTypes); + + String[] localVarAuthNames = new String[] { "basic" }; + + TypeReference localVarReturnType = new TypeReference() {}; + return apiClient.invokeAPI( + localVarPath, + "GET", + localVarQueryParams, + localVarCollectionQueryParams, + localVarQueryStringJoiner.toString(), + localVarPostBody, + localVarHeaderParams, + localVarCookieParams, + localVarFormParams, + localVarAccept, + localVarContentType, + localVarAuthNames, + localVarReturnType + ); + } + /** * * List all running application's LifecycleManager's hostnames of the cluster. @@ -249,6 +320,79 @@ public AppDiskUsageSnapshotsResponse getApplicationsDiskUsageSnapshots(Map additionalHeaders) throws ApiException { + Object localVarPostBody = null; + + // create path and map variables + String localVarPath = "/api/v1/applications/reviseLostShuffles"; + + StringJoiner localVarQueryStringJoiner = new StringJoiner("&"); + String localVarQueryParameterBaseName; + List localVarQueryParams = new ArrayList(); + List localVarCollectionQueryParams = new ArrayList(); + Map localVarHeaderParams = new HashMap(); + Map localVarCookieParams = new HashMap(); + Map localVarFormParams = new HashMap(); + + localVarQueryParams.addAll(apiClient.parameterToPair("app", app)); + localVarQueryParams.addAll(apiClient.parameterToPair("shuffleIds", shuffleIds)); + + localVarHeaderParams.putAll(additionalHeaders); + + + + final String[] localVarAccepts = { + "application/json" + }; + final String localVarAccept = apiClient.selectHeaderAccept(localVarAccepts); + + final String[] localVarContentTypes = { + + }; + final String localVarContentType = apiClient.selectHeaderContentType(localVarContentTypes); + + String[] localVarAuthNames = new String[] { "basic" }; + + TypeReference localVarReturnType = new TypeReference() {}; + return apiClient.invokeAPI( + localVarPath, + "GET", + localVarQueryParams, + localVarCollectionQueryParams, + localVarQueryStringJoiner.toString(), + localVarPostBody, + localVarHeaderParams, + localVarCookieParams, + localVarFormParams, + localVarAccept, + localVarContentType, + localVarAuthNames, + localVarReturnType + ); + } + @Override public T invokeAPI(String url, String method, Object request, TypeReference returnType, Map additionalHeaders) throws ApiException { String localVarPath = url.replace(apiClient.getBaseURL(), ""); diff --git a/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/model/ContainerInfo.java b/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/model/ContainerInfo.java index e1ef4fb8cf6..4c9dfe81fd0 100644 --- a/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/model/ContainerInfo.java +++ b/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/model/ContainerInfo.java @@ -44,7 +44,7 @@ ContainerInfo.JSON_PROPERTY_CONTAINER_CLUSTER, ContainerInfo.JSON_PROPERTY_CONTAINER_TAGS }) -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", comments = "Generator version: 7.7.0") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", comments = "Generator version: 7.8.0") public class ContainerInfo { public static final String JSON_PROPERTY_CONTAINER_NAME = "containerName"; private String containerName; diff --git a/openapi/openapi-client/src/main/openapi3/master_rest_v1.yaml b/openapi/openapi-client/src/main/openapi3/master_rest_v1.yaml index 7c8d5cd6764..60528efcbbd 100644 --- a/openapi/openapi-client/src/main/openapi3/master_rest_v1.yaml +++ b/openapi/openapi-client/src/main/openapi3/master_rest_v1.yaml @@ -270,6 +270,52 @@ paths: schema: $ref: '#/components/schemas/HostnamesResponse' + /api/v1/applications/reviseLostShuffles: + get: + tags: + - Application + operationId: reviseLostShuffles + description: Revise lost shuffles or delete shuffles of an application. + parameters: + - name: app + in: query + required: false + schema: + type: string + - name: shuffleIds + in: query + required: false + schema: + type: string + responses: + "200": + description: The request was successful. + content: + application/json: + schema: + $ref: '#/components/schemas/HandleResponse' + + /api/v1/applications/deleteApps: + get: + tags: + - Application + operationId: deleteApps + description: Delete resource of apps + parameters: + - name: apps + in: query + required: false + schema: + type: string + responses: + "200": + description: The request was successful. + content: + application/json: + schema: + $ref: '#/components/schemas/HandleResponse' + + /api/v1/ratis/election/transfer: post: tags: diff --git a/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala b/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala index bb98e88f66d..e8610dfdc13 100644 --- a/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala +++ b/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala @@ -194,6 +194,10 @@ abstract class HttpService extends Service with Logging { def getWorkerEventInfo(): String = throw new UnsupportedOperationException() + def reviseLostShuffles(appId: String, shuffles: java.util.List[Integer]) + + def deleteApps(appIds: String) + def startHttpServer(): Unit = { httpServer = HttpServer( serviceName, diff --git a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/client/LifecycleManagerUnregisterShuffleSuite.scala b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/client/LifecycleManagerUnregisterShuffleSuite.scala index 3b7d74df8a8..d689ead7e0c 100644 --- a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/client/LifecycleManagerUnregisterShuffleSuite.scala +++ b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/client/LifecycleManagerUnregisterShuffleSuite.scala @@ -28,7 +28,6 @@ import org.scalatest.time.SpanSugar.convertIntToGrainOfTime import org.apache.celeborn.client.{LifecycleManager, WithShuffleClientSuite} import org.apache.celeborn.common.CelebornConf import org.apache.celeborn.common.protocol.message.StatusCode -import org.apache.celeborn.common.util.Utils import org.apache.celeborn.service.deploy.MiniClusterFeature class LifecycleManagerUnregisterShuffleSuite extends WithShuffleClientSuite @@ -60,8 +59,8 @@ class LifecycleManagerUnregisterShuffleSuite extends WithShuffleClientSuite assert(res.status == StatusCode.SUCCESS) lifecycleManager.registeredShuffle.add(shuffleId) assert(lifecycleManager.registeredShuffle.contains(shuffleId)) - val shuffleKey = Utils.makeShuffleKey(APP, shuffleId) - assert(masterInfo._1.statusSystem.registeredShuffle.contains(shuffleKey)) + assert(masterInfo._1.statusSystem.registeredAppAndShuffles.containsKey(APP)) + assert(masterInfo._1.statusSystem.registeredAppAndShuffles.get(APP).contains(shuffleId)) lifecycleManager.commitManager.setStageEnd(shuffleId) } @@ -71,9 +70,10 @@ class LifecycleManagerUnregisterShuffleSuite extends WithShuffleClientSuite // after unregister shuffle eventually(timeout(120.seconds), interval(2.seconds)) { shuffleIds.foreach { shuffleId: Int => - val shuffleKey = Utils.makeShuffleKey(APP, shuffleId) assert(!lifecycleManager.registeredShuffle.contains(shuffleId)) - assert(!masterInfo._1.statusSystem.registeredShuffle.contains(shuffleKey)) + val containShuffleKey = masterInfo._1.statusSystem.registeredAppAndShuffles.containsKey( + APP) && masterInfo._1.statusSystem.registeredAppAndShuffles.get(APP).contains(shuffleId) + assert(!containShuffleKey) } } @@ -93,8 +93,8 @@ class LifecycleManagerUnregisterShuffleSuite extends WithShuffleClientSuite assert(res.status == StatusCode.SUCCESS) lifecycleManager.registeredShuffle.add(shuffleId) assert(lifecycleManager.registeredShuffle.contains(shuffleId)) - val shuffleKey = Utils.makeShuffleKey(APP, shuffleId) - assert(masterInfo._1.statusSystem.registeredShuffle.contains(shuffleKey)) + assert(masterInfo._1.statusSystem.registeredAppAndShuffles.containsKey(APP)) + assert(masterInfo._1.statusSystem.registeredAppAndShuffles.get(APP).contains(shuffleId)) lifecycleManager.commitManager.setStageEnd(shuffleId) } val previousTime = System.currentTimeMillis() @@ -104,9 +104,10 @@ class LifecycleManagerUnregisterShuffleSuite extends WithShuffleClientSuite // after unregister shuffle eventually(timeout(120.seconds), interval(2.seconds)) { shuffleIds.foreach { shuffleId: Int => - val shuffleKey = Utils.makeShuffleKey(APP, shuffleId) assert(!lifecycleManager.registeredShuffle.contains(shuffleId)) - assert(!masterInfo._1.statusSystem.registeredShuffle.contains(shuffleKey)) + val containShuffleKey = masterInfo._1.statusSystem.registeredAppAndShuffles.containsKey( + APP) && masterInfo._1.statusSystem.registeredAppAndShuffles.get(APP).contains(shuffleId) + assert(!containShuffleKey) } } val currentTime = System.currentTimeMillis() diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala index b9bfff05b04..62602cd06b9 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala @@ -866,6 +866,11 @@ private[celeborn] class Worker( sb.toString() } + override def reviseLostShuffles(appId: String, shuffles: java.util.List[Integer]): Unit = + throw new UnsupportedOperationException() + + override def deleteApps(appIds: String): Unit = throw new UnsupportedOperationException() + override def exit(exitType: String): String = { exitType.toUpperCase(Locale.ROOT) match { case "DECOMMISSION" =>