Skip to content
This repository has been archived by the owner on Jan 3, 2023. It is now read-only.

Commit

Permalink
Solve #1728, Dispatch cmdlet to given node with free slot (#1729)
Browse files Browse the repository at this point in the history
  • Loading branch information
littlezhou authored May 10, 2018
1 parent 287ffae commit a602462
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.smartdata.server.engine.message.NodeMessage;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -70,7 +71,10 @@ public class CmdletDispatcher {
private final int defaultSlots;
private AtomicInteger index = new AtomicInteger(0);

private Map<String, Integer> regNodes = new HashMap<>();
private Map<String, AtomicInteger> regNodes = new HashMap<>();

private List<List<String>> cmdExecSrvNodeIds = new ArrayList<>();
private String[] completeOn = new String[ExecutorType.values().length];

public CmdletDispatcher(SmartContext smartContext, CmdletManager cmdletManager,
Queue<Long> scheduledCmdlets, Map<Long, LaunchCmdlet> idToLaunchCmdlet,
Expand All @@ -91,6 +95,7 @@ public CmdletDispatcher(SmartContext smartContext, CmdletManager cmdletManager,
execSrvSlotsLeft = new AtomicInteger[ExecutorType.values().length];
for (int i = 0; i < execSrvSlotsLeft.length; i++) {
execSrvSlotsLeft[i] = new AtomicInteger(0);
cmdExecSrvNodeIds.add(new ArrayList<String>());
}
cmdExecSrvTotalInsts = 0;
dispatchedToSrvs = new ConcurrentHashMap<>();
Expand All @@ -112,7 +117,7 @@ public CmdletDispatcher(SmartContext smartContext, CmdletManager cmdletManager,
SmartConfKeys.SMART_CMDLET_DISPATCHERS_DEFAULT);
dispatchTasks = new DispatchTask[numDisp];
for (int i = 0; i < numDisp; i++) {
dispatchTasks[i] = new DispatchTask(this);
dispatchTasks[i] = new DispatchTask(this, i);
}
schExecService = Executors.newScheduledThreadPool(numDisp + 1);
}
Expand All @@ -125,59 +130,6 @@ public boolean canDispatchMore() {
return getTotalSlotsLeft() > 0;
}

public boolean dispatch(LaunchCmdlet cmdlet) {
int mod = index.incrementAndGet() % cmdExecSrvTotalInsts;
int idx = 0;

for (int nround = 0; nround < 2 && mod >= 0; nround++) {
for (idx = 0; idx < cmdExecSrvInsts.length; idx++) {
mod -= cmdExecSrvInsts[idx];
if (mod < 0) {
break;
}
}
try {
Thread.sleep(10);
} catch (InterruptedException e) {
// ignore
}
}

if (mod >= 0) {
return false;
}

CmdletExecutorService selected = null;
for (int i = 0; i < ExecutorType.values().length; i++) {
idx = idx % ExecutorType.values().length;
if (execSrvSlotsLeft[idx].get() > 0) {
selected = cmdExecServices[idx];
break;
}
}

if (selected == null) {
LOG.error("No cmdlet executor service available. " + cmdlet);
return false;
}

updateCmdActionStatus(cmdlet);

String id = selected.execute(cmdlet);

execSrvSlotsLeft[selected.getExecutorType().ordinal()].decrementAndGet();

dispatchedToSrvs.put(cmdlet.getCmdletId(), selected.getExecutorType());

if (logDispResult) {
LOG.info(
String.format(
"Dispatching cmdlet->[%s] to executor service %s : %s",
cmdlet.getCmdletId(), selected.getExecutorType(), id));
}
return true;
}

//Todo: pick the right service to stop cmdlet
public void stop(long cmdletId) {
for (CmdletExecutorService service : cmdExecServices) {
Expand Down Expand Up @@ -225,14 +177,18 @@ private void updateCmdActionStatus(LaunchCmdlet cmdlet) {

private class DispatchTask implements Runnable {
private final CmdletDispatcher dispatcher;
private final int taskId;
private int statRound = 0;
private int statFail = 0;
private int statDispatched = 0;
private int statNoMoreCmdlet = 0;
private int statFull = 0;

public DispatchTask(CmdletDispatcher dispatcher) {
private int[] dispInstIdxs = new int[ExecutorType.values().length];

public DispatchTask(CmdletDispatcher dispatcher, int taskId) {
this.dispatcher = dispatcher;
this.taskId = taskId;
}

public CmdletDispatcherStat getStat() {
Expand Down Expand Up @@ -270,7 +226,7 @@ public void run() {
break;
} else {
cmdletPreExecutionProcess(launchCmdlet);
if (!dispatcher.dispatch(launchCmdlet)) {
if (!dispatch(launchCmdlet)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Stop this round dispatch due : " + launchCmdlet);
}
Expand All @@ -289,6 +245,91 @@ public void run() {
}
}
}

private boolean dispatch(LaunchCmdlet cmdlet) {
int mod = index.incrementAndGet() % cmdExecSrvTotalInsts;
int idx = 0;

for (int nround = 0; nround < 2 && mod >= 0; nround++) {
for (idx = 0; idx < cmdExecSrvInsts.length; idx++) {
mod -= cmdExecSrvInsts[idx];
if (mod < 0) {
break;
}
}
try {
Thread.sleep(10);
} catch (InterruptedException e) {
// ignore
}
}

if (mod >= 0) {
return false;
}

CmdletExecutorService selected = null;
for (int i = 0; i < ExecutorType.values().length; i++) {
idx = idx % ExecutorType.values().length;
int left;
do {
left = execSrvSlotsLeft[idx].get();
if (left > 0) {
if (execSrvSlotsLeft[idx].compareAndSet(left, left - 1)) {
selected = cmdExecServices[idx];
break;
}
}
} while (left > 0);

if (selected != null) {
break;
}
}

if (selected == null) {
LOG.error("No cmdlet executor service available. " + cmdlet);
return false;
}

updateCmdActionStatus(cmdlet);

int srvId = selected.getExecutorType().ordinal();

boolean sFlag = true;
String nodeId;

do {
dispInstIdxs[srvId] = (dispInstIdxs[srvId] + 1) % cmdExecSrvNodeIds.get(srvId).size();
nodeId = cmdExecSrvNodeIds.get(srvId).get(dispInstIdxs[srvId]);
AtomicInteger counter = regNodes.get(nodeId);
int left = counter.get();
if (left > 0) {
if (counter.compareAndSet(left, left - 1)) {
selected = cmdExecServices[idx];
break;
}
}

if (sFlag && completeOn[srvId] != null) {
dispInstIdxs[srvId] = cmdExecSrvNodeIds.get(srvId).indexOf(completeOn[srvId]);
sFlag = false;
}
} while (true);

cmdlet.setNodeId(nodeId);
selected.execute(cmdlet);

dispatchedToSrvs.put(cmdlet.getCmdletId(), selected.getExecutorType());

if (logDispResult) {
LOG.info(
String.format(
"Dispatching cmdlet->[%s] to executor service %s : %s",
cmdlet.getCmdletId(), selected.getExecutorType(), nodeId));
}
return true;
}
}

private class LogStatTask implements Runnable {
Expand Down Expand Up @@ -337,8 +378,11 @@ public void cmdletPreExecutionProcess(LaunchCmdlet cmdlet) {
public void onCmdletFinished(long cmdletId) {
synchronized (dispatchedToSrvs) {
if (dispatchedToSrvs.containsKey(cmdletId)) {
LaunchCmdlet cmdlet = idToLaunchCmdlet.get(cmdletId);
regNodes.get(cmdlet.getNodeId()).incrementAndGet();
ExecutorType t = dispatchedToSrvs.remove(cmdletId);
updateSlotsLeft(t.ordinal(), 1);
completeOn[t.ordinal()] = cmdlet.getNodeId();
}
}
}
Expand All @@ -355,14 +399,16 @@ public void onNodeMessage(NodeMessage msg, boolean isAdd) {
LOG.warn("Skip duplicate add node for {}", msg.getNodeInfo());
return;
} else {
regNodes.put(nodeId, 0);
regNodes.put(nodeId, new AtomicInteger(defaultSlots));
cmdExecSrvNodeIds.get(msg.getNodeInfo().getExecutorType().ordinal()).add(nodeId);
}
} else {
if (!regNodes.containsKey(nodeId)) {
LOG.warn("Skip duplicate remove node for {}", msg.getNodeInfo());
return;
} else {
regNodes.remove(nodeId);
cmdExecSrvNodeIds.get(msg.getNodeInfo().getExecutorType().ordinal()).remove(nodeId);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,10 @@ public boolean canAcceptMore() {

@Override
public String execute(LaunchCmdlet cmdlet) {
String[] members = masterToWorkers.keySet().toArray(new String[0]);
String member = members[random.nextInt() % members.length];
String member = cmdlet.getNodeId();
masterToWorkers.get(member).publish(cmdlet);
scheduledCmdlets.get(member).add(cmdlet.getCmdletId());
LOG.info("Executing cmdlet {} on worker {}", cmdlet.getCmdletId(), members);
LOG.debug("Executing cmdlet {} on worker {}", cmdlet.getCmdletId(), member);
return member;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public class AgentManager {

private final Map<ActorRef, AgentId> agents = new HashMap<>();
private final Map<ActorRef, NodeInfo> agentNodeInfos = new HashMap<>();
private final Map<String, ActorRef> agentActorRefs = new HashMap<>();
private List<ActorRef> resources = new ArrayList<>();
private List<NodeInfo> nodeInfos = new LinkedList<>();
private int dispatchIndex = 0;
Expand All @@ -45,6 +46,7 @@ void addAgent(ActorRef agent, AgentId id) {
NodeInfo info = new AgentInfo(String.valueOf(id.getId()), location);
nodeInfos.add(info);
agentNodeInfos.put(agent, info);
agentActorRefs.put(info.getId(), agent);
EngineEventBus.post(new AddNodeMessage(info));
}

Expand All @@ -53,6 +55,7 @@ AgentId removeAgent(ActorRef agent) {
resources.remove(agent);
NodeInfo info = agentNodeInfos.remove(agent);
nodeInfos.remove(info);
agentActorRefs.remove(info.getId());
EngineEventBus.post(new RemoveNodeMessage(info));
return id;
}
Expand All @@ -61,10 +64,8 @@ boolean hasFreeAgent() {
return !resources.isEmpty();
}

ActorRef dispatch() {
int id = dispatchIndex % resources.size();
dispatchIndex++;
return resources.get(id);
ActorRef dispatch(String nodeId) {
return agentActorRefs.get(nodeId);
}

Map<ActorRef, AgentId> getAgents() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ private boolean handleClientMessage(Object message) {
if (message instanceof LaunchCmdlet) {
if (agentManager.hasFreeAgent()) {
LaunchCmdlet launch = (LaunchCmdlet) message;
ActorRef agent = this.agentManager.dispatch();
ActorRef agent = this.agentManager.dispatch(launch.getNodeId());
AgentId agentId = this.agentManager.getAgentId(agent);
agent.tell(launch, getSelf());
dispatches.put(launch.getCmdletId(), agent);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public class LaunchCmdlet implements AgentService.Message {
private long cmdletId;
private List<LaunchAction> launchActions;
private CmdletDispatchPolicy dispPolicy = CmdletDispatchPolicy.ANY;
private String nodeId;

public LaunchCmdlet(long cmdletId, List<LaunchAction> launchActions) {
this.cmdletId = cmdletId;
Expand Down Expand Up @@ -67,4 +68,12 @@ public void setDispPolicy(CmdletDispatchPolicy dispPolicy) {
public String toString() {
return String.format("{cmdletId = %d, dispPolicy = '%s'}", cmdletId, dispPolicy);
}

public String getNodeId() {
return nodeId;
}

public void setNodeId(String nodeId) {
this.nodeId = nodeId;
}
}

0 comments on commit a602462

Please sign in to comment.