Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CELEBORN-1689: CLI Support for Ratis APIs #2882

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 19 additions & 23 deletions .idea/vcs.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -120,4 +120,46 @@ final class MasterOptions {
names = Array("--delete-apps"),
description = Array("Delete resource of an application."))
private[master] var deleteApps: Boolean = _

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two more commands to implement.

  • show raft group info
  • print local raft meta conf

@Option(
names = Array("--transfer-ratis-leader"),
description =
Array("Transfer the group leader to the specified server. Specify leader via --peerAddress."))
private[master] var transferRatisLeader: Boolean = _

@Option(
names = Array("--step-down-ratis-leader"),
description = Array("Step down from group leadership."))
private[master] var stepDownRatisLeader: Boolean = _

@Option(
names = Array("--pause-leader-election"),
description = Array("Pause leader election at the current server."))
private[master] var pauseLeaderElection: Boolean = _

@Option(
names = Array("--resume-leader-election"),
description = Array("Resume leader election at the current server."))
private[master] var resumeLeaderElection: Boolean = _

@Option(
names = Array("--add-ratis-peers"),
description = Array("Add new peers to the raft group. Specify peers list via --peers."))
private[master] var addRatisPeers: Boolean = _

@Option(
names = Array("--remove-ratis-peers"),
description = Array("Remove new peers from the raft group. Specify peers list via --peers."))
private[master] var removeRatisPeers: Boolean = _

@Option(
names = Array("--set-ratis-peers-priorities"),
description = Array(
"Set the priority of the peers in the raft group. Specify priorities via --priorities."))
private[master] var setRatisPeersPriorities: Boolean = _

@Option(
names = Array("--create-snapshot"),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about using similar command names?

image
--election-transfer
--election-step-down
--election-pause
--election-resume

--peer-add
--peer-remove
--peer-set-priority

--snapshot-create

--local-raft-conf

description = Array("Trigger the current server to take snapshot."))
private[master] var createSnapshot: Boolean = _
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import picocli.CommandLine.Model.CommandSpec
import org.apache.celeborn.cli.CelebornCli
import org.apache.celeborn.cli.common.{CliLogging, CommonOptions}
import org.apache.celeborn.cli.config.CliConfigManager
import org.apache.celeborn.rest.v1.master.{ApplicationApi, ConfApi, DefaultApi, MasterApi, ShuffleApi, WorkerApi}
import org.apache.celeborn.rest.v1.master.{ApplicationApi, ConfApi, DefaultApi, MasterApi, RatisApi, ShuffleApi, WorkerApi}
import org.apache.celeborn.rest.v1.master.invoker.ApiClient
import org.apache.celeborn.rest.v1.model._

Expand All @@ -40,6 +40,9 @@ trait MasterSubcommand extends CliLogging {
@ArgGroup(exclusive = false)
private[master] var reviseLostShuffleOptions: ReviseLostShuffleOptions = _

@ArgGroup(exclusive = false)
private[master] var ratisOptions: RatisOptions = _

@Mixin
private[master] var commonOptions: CommonOptions = _

Expand Down Expand Up @@ -71,6 +74,8 @@ trait MasterSubcommand extends CliLogging {
private[master] def shuffleApi: ShuffleApi = new ShuffleApi(apiClient)
private[master] def workerApi: WorkerApi = new WorkerApi(apiClient)

private[master] def ratisApi: RatisApi = new RatisApi(apiClient)

private[master] def runShowMastersInfo: MasterInfoResponse

private[master] def runShowClusterApps: ApplicationsHeartbeatResponse
Expand Down Expand Up @@ -117,4 +122,20 @@ trait MasterSubcommand extends CliLogging {

private[master] def deleteApps: HandleResponse

private[master] def transferRatisLeader: HandleResponse

private[master] def stepDownRatisLeader: HandleResponse

private[master] def pauseLeaderElection: HandleResponse

private[master] def resumeLeaderElection: HandleResponse

private[master] def addRatisPeers: HandleResponse

private[master] def removeRatisPeers: HandleResponse

private[master] def setRatisPeersPriorities: HandleResponse

private[master] def createSnapshot: HandleResponse

}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,14 @@ class MasterSubcommandImpl extends Runnable with MasterSubcommand {
if (masterOptions.showThreadDump) log(runShowThreadDump)
if (masterOptions.reviseLostShuffles) log(reviseLostShuffles)
if (masterOptions.deleteApps) log(deleteApps)
if (masterOptions.transferRatisLeader) log(transferRatisLeader)
if (masterOptions.stepDownRatisLeader) log(stepDownRatisLeader)
if (masterOptions.pauseLeaderElection) log(pauseLeaderElection)
if (masterOptions.resumeLeaderElection) log(resumeLeaderElection)
if (masterOptions.addRatisPeers) log(addRatisPeers)
if (masterOptions.removeRatisPeers) log(removeRatisPeers)
if (masterOptions.setRatisPeersPriorities) log(setRatisPeersPriorities)
if (masterOptions.createSnapshot) log(createSnapshot)
if (masterOptions.addClusterAlias != null && masterOptions.addClusterAlias.nonEmpty)
runAddClusterAlias
if (masterOptions.removeClusterAlias != null && masterOptions.removeClusterAlias.nonEmpty)
Expand Down Expand Up @@ -238,4 +246,67 @@ class MasterSubcommandImpl extends Runnable with MasterSubcommand {
val apps = commonOptions.apps
applicationApi.deleteApps(apps)
}

override private[master] def transferRatisLeader: HandleResponse = {
if (ratisOptions.peerAddress.isEmpty || ratisOptions.peerAddress == null) {
throw new ParameterException(
spec.commandLine(),
"Peer address in the form of host:ratis_port must be specified. Typically ratis port is 9872.")
}
val ratisElectionTransferRequest = new RatisElectionTransferRequest()
ratisElectionTransferRequest.peerAddress(ratisOptions.peerAddress)
ratisApi.transferRatisLeader(ratisElectionTransferRequest)
}

override private[master] def stepDownRatisLeader: HandleResponse = ratisApi.stepDownRatisLeader

override private[master] def pauseLeaderElection = ratisApi.pauseRatisElection()

override private[master] def resumeLeaderElection: HandleResponse = ratisApi.resumeRatisElection()

override private[master] def addRatisPeers: HandleResponse = {
ratisApi.addRatisPeer(new RatisPeerAddRequest().peers(getRatisPeers))
}

override private[master] def removeRatisPeers: HandleResponse = {
ratisApi.removeRatisPeer(new RatisPeerRemoveRequest().peers(getRatisPeers))
}

override private[master] def setRatisPeersPriorities: HandleResponse = {
ratisApi.setRatisPeerPriority(new RatisPeerSetPriorityRequest()
.addressPriorities(getRatisPeerPrioritiesMap))
}

override private[master] def createSnapshot: HandleResponse = ratisApi.createRatisSnapshot()

private[master] def getRatisPeers: util.List[RatisPeer] = {
if (ratisOptions.peers == null || ratisOptions.peers.isEmpty) {
throw new ParameterException(
spec.commandLine(),
"Peers must be provided for this command via --peers.")
}
parseCliPeersMapping(ratisOptions.peers).map { idToHostPort =>
new RatisPeer().id(idToHostPort._1).address(idToHostPort._2)
}.asJava
}

private[master] def getRatisPeerPrioritiesMap: util.Map[String, Integer] = {
if (ratisOptions.priorities == null || ratisOptions.priorities.isEmpty) {
throw new ParameterException(
spec.commandLine(),
"Priorities must be provided for this command via --priorities.")
}
val javaMap: util.Map[String, Integer] = new util.HashMap[String, Integer]()
parseCliPeersMapping(ratisOptions.priorities).map(e => (e._1, e._2.toInt)).toMap.foreach {
case (k, v) => javaMap.put(k, Integer.valueOf(v))
}
javaMap
}

private def parseCliPeersMapping(cliMappingString: String): List[(String, String)] = {
cliMappingString.split(",").map { entry =>
val parsedEntry = entry.split("=")
(parsedEntry(0), parsedEntry(1))
}.toList
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.celeborn.cli.master

import picocli.CommandLine.Option

final class RatisOptions {

@Option(
names = Array("--peerAddress"),
description = Array("The peer address in form of host:port."))
private[master] var peerAddress: String = _

@Option(
names = Array("--peers"),
description = Array("The comma separated list of peers in" +
" the format id=host:port, e.g. `a=host1:9872,b=host2:9872`."))
private[master] var peers: String = _

@Option(
names = Array("--priorities"),
description = Array("The comma separated list of peers in" +
" the format host:port=priority, e.g. `host1:9872=0,host2:9872=1`."))
private[master] var priorities: String = _

}
Loading
Loading