From 927d3988d3f4370dd86205225706f280b5a07e66 Mon Sep 17 00:00:00 2001 From: littlezhou Date: Thu, 2 Aug 2018 16:27:32 +0800 Subject: [PATCH] Solve #1876, Add host info for actions (#1877) --- .../java/org/smartdata/model/ActionInfo.java | 11 +++++++ .../smartdata/server/cluster/NodeInfo.java | 31 ++++++++++++++++--- .../server/engine/CmdletManager.java | 15 +++++++++ .../engine/cmdlet/CmdletDispatcher.java | 20 ++++++++++-- .../smartdata/metastore/dao/ActionDao.java | 15 ++++++--- .../metastore/utils/MetaStoreUtils.java | 1 + 6 files changed, 81 insertions(+), 12 deletions(-) diff --git a/smart-common/src/main/java/org/smartdata/model/ActionInfo.java b/smart-common/src/main/java/org/smartdata/model/ActionInfo.java index 2b866b45576..3a11051c255 100644 --- a/smart-common/src/main/java/org/smartdata/model/ActionInfo.java +++ b/smart-common/src/main/java/org/smartdata/model/ActionInfo.java @@ -32,6 +32,8 @@ public class ActionInfo { private String result; private String log; + // For action set flexibility + private String execHost; private boolean successful; private long createTime; private boolean finished; @@ -52,6 +54,7 @@ public ActionInfo(long actionId, long cmdletId, String actionName, this.args = args; this.result = result; this.log = log; + this.execHost = ""; this.successful = successful; this.createTime = createTime; this.finished = finished; @@ -159,6 +162,14 @@ public void setProgress(float progress) { this.progress = progress; } + public String getExecHost() { + return execHost; + } + + public void setExecHost(String execHost) { + this.execHost = execHost; + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/smart-engine/src/main/java/org/smartdata/server/cluster/NodeInfo.java b/smart-engine/src/main/java/org/smartdata/server/cluster/NodeInfo.java index 7922447127a..42c7074ec72 100644 --- a/smart-engine/src/main/java/org/smartdata/server/cluster/NodeInfo.java +++ b/smart-engine/src/main/java/org/smartdata/server/cluster/NodeInfo.java @@ -26,13 +26,14 @@ */ public class NodeInfo { private String id; - private String location; + private String host; + private int port; private ExecutorType executorType; public NodeInfo(String id, String location, ExecutorType executorType) { this.id = id; - this.location = location; this.executorType = executorType; + doSetLocation(location); } public String getId() { @@ -44,11 +45,31 @@ public void setId(String id) { } public String getLocation() { - return location; + return host + ":" + port; } public void setLocation(String location) { - this.location = location; + doSetLocation(location); + } + + private void doSetLocation(String location) { + host = null; + port = 0; + if (location != null) { + String[] its = location.split(":"); + if (its.length > 1) { + port = Integer.valueOf(its[1]); + } + host = its[0]; + } + } + + public String getHost() { + return host; + } + + public int getPort() { + return port; } public ExecutorType getExecutorType() { @@ -57,7 +78,7 @@ public ExecutorType getExecutorType() { @Override public String toString() { - return String.format("{id=%s, location=%s, executorType=%s}", id, location, executorType); + return String.format("{id=%s, location=%s, executorType=%s}", id, getLocation(), executorType); } } diff --git a/smart-engine/src/main/java/org/smartdata/server/engine/CmdletManager.java b/smart-engine/src/main/java/org/smartdata/server/engine/CmdletManager.java index 847dfd3ea16..05b92eed52d 100644 --- a/smart-engine/src/main/java/org/smartdata/server/engine/CmdletManager.java +++ b/smart-engine/src/main/java/org/smartdata/server/engine/CmdletManager.java @@ -980,6 +980,21 @@ public List getFileActions(long rid, int size) throws IOExce } } + public void updateCmdletExecHost(long cmdletId, String host) throws IOException { + CmdletInfo cmdlet = getCmdletInfo(cmdletId); + if (cmdlet == null) { + return; + } + + ActionInfo action; + for (long id : cmdlet.getAids()) { + action = getActionInfo(id); + if (action != null) { + action.setExecHost(host); + } + } + } + /** * Delete all cmdlets related with rid. * @param rid diff --git a/smart-engine/src/main/java/org/smartdata/server/engine/cmdlet/CmdletDispatcher.java b/smart-engine/src/main/java/org/smartdata/server/engine/cmdlet/CmdletDispatcher.java index 701e2616d1a..034893e2895 100644 --- a/smart-engine/src/main/java/org/smartdata/server/engine/cmdlet/CmdletDispatcher.java +++ b/smart-engine/src/main/java/org/smartdata/server/engine/cmdlet/CmdletDispatcher.java @@ -30,6 +30,7 @@ import org.smartdata.model.action.ActionScheduler; import org.smartdata.protocol.message.ActionStatus; import org.smartdata.protocol.message.CmdletStatus; +import org.smartdata.server.cluster.NodeInfo; import org.smartdata.server.engine.CmdletManager; import org.smartdata.server.engine.cmdlet.message.LaunchCmdlet; import org.smartdata.server.engine.message.NodeMessage; @@ -72,6 +73,7 @@ public class CmdletDispatcher { private AtomicInteger index = new AtomicInteger(0); private Map regNodes = new HashMap<>(); + private Map regNodeInfos = new HashMap<>(); private List> cmdExecSrvNodeIds = new ArrayList<>(); private String[] completeOn = new String[ExecutorType.values().length]; @@ -158,7 +160,15 @@ public LaunchCmdlet getNextCmdletToRun() throws IOException { return launchCmdlet; } - private void updateCmdActionStatus(LaunchCmdlet cmdlet) { + private void updateCmdActionStatus(LaunchCmdlet cmdlet, String host) { + if (cmdletManager != null) { + try { + cmdletManager.updateCmdletExecHost(cmdlet.getCmdletId(), host); + } catch (IOException e) { + // Ignore this + } + } + try { for (LaunchAction action : cmdlet.getLaunchActions()) { ActionStatus actionStatus = new ActionStatus( @@ -339,14 +349,16 @@ private boolean dispatch(LaunchCmdlet cmdlet) { return false; } - updateCmdActionStatus(cmdlet); + NodeInfo nodeInfo = regNodeInfos.get(nodeId); + String host = nodeInfo == null ? "" : nodeInfo.getHost(); + updateCmdActionStatus(cmdlet, host); dispatchedToSrvs.put(cmdlet.getCmdletId(), selected.getExecutorType()); if (logDispResult) { LOG.info( String.format( "Dispatching cmdlet->[%s] to executor service %s : %s", - cmdlet.getCmdletId(), selected.getExecutorType(), nodeId)); + cmdlet.getCmdletId(), selected.getExecutorType(), host)); } return true; } @@ -422,6 +434,7 @@ public void onNodeMessage(NodeMessage msg, boolean isAdd) { return; } else { regNodes.put(nodeId, new AtomicInteger(defaultSlots)); + regNodeInfos.put(nodeId, msg.getNodeInfo()); cmdExecSrvNodeIds.get(msg.getNodeInfo().getExecutorType().ordinal()).add(nodeId); } } else { @@ -430,6 +443,7 @@ public void onNodeMessage(NodeMessage msg, boolean isAdd) { return; } else { regNodes.remove(nodeId); + regNodeInfos.remove(nodeId); cmdExecSrvNodeIds.get(msg.getNodeInfo().getExecutorType().ordinal()).remove(nodeId); } } diff --git a/smart-metastore/src/main/java/org/smartdata/metastore/dao/ActionDao.java b/smart-metastore/src/main/java/org/smartdata/metastore/dao/ActionDao.java index eeba6d3d3e3..5eef33d39c3 100644 --- a/smart-metastore/src/main/java/org/smartdata/metastore/dao/ActionDao.java +++ b/smart-metastore/src/main/java/org/smartdata/metastore/dao/ActionDao.java @@ -193,6 +193,7 @@ public List searchAction(String path, long start, long offset, List< + "OR cid LIKE '%" + path + "%' ESCAPE '/' " + "OR args LIKE '%" + path + "%' ESCAPE '/' " + "OR result LIKE '%" + path + "%' ESCAPE '/' " + + "OR exec_host LIKE '%" + path + "%' ESCAPE '/' " + "OR progress LIKE '%" + path + "%' ESCAPE '/' " + "OR log LIKE '%" + path + "%' ESCAPE '/' " + "OR action_name LIKE '%" + path + "%' ESCAPE '/')"; @@ -309,8 +310,9 @@ public int[] replace(final ActionInfo[] actionInfos) { + "create_time, " + "finished, " + "finish_time, " + + "exec_host, " + "progress)" - + " VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; + + " VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; return jdbcTemplate.batchUpdate(sql, new BatchPreparedStatementSetter() { public void setValues(PreparedStatement ps, @@ -325,7 +327,8 @@ public void setValues(PreparedStatement ps, ps.setLong(8, actionInfos[i].getCreateTime()); ps.setBoolean(9, actionInfos[i].isFinished()); ps.setLong(10, actionInfos[i].getFinishTime()); - ps.setFloat(11, actionInfos[i].getProgress()); + ps.setString(11, actionInfos[i].getExecHost()); + ps.setFloat(12, actionInfos[i].getProgress()); } public int getBatchSize() { return actionInfos.length; @@ -349,6 +352,7 @@ public int[] update(final ActionInfo[] actionInfos) { + "create_time = ?, " + "finished = ?, " + "finish_time = ?, " + + "exec_host = ?, " + "progress = ? " + "WHERE aid = ?"; return jdbcTemplate.batchUpdate(sql, @@ -361,8 +365,9 @@ public void setValues(PreparedStatement ps, ps.setLong(4, actionInfos[i].getCreateTime()); ps.setBoolean(5, actionInfos[i].isFinished()); ps.setLong(6, actionInfos[i].getFinishTime()); - ps.setFloat(7, actionInfos[i].getProgress()); - ps.setLong(8, actionInfos[i].getActionId()); + ps.setString(7, actionInfos[i].getExecHost()); + ps.setFloat(8, actionInfos[i].getProgress()); + ps.setLong(9, actionInfos[i].getActionId()); } public int getBatchSize() { @@ -395,6 +400,7 @@ private Map toMap(ActionInfo actionInfo) { parameters.put("create_time", actionInfo.getCreateTime()); parameters.put("finished", actionInfo.isFinished()); parameters.put("finish_time", actionInfo.getFinishTime()); + parameters.put("exec_host", actionInfo.getExecHost()); parameters.put("progress", actionInfo.getProgress()); return parameters; } @@ -416,6 +422,7 @@ public ActionInfo mapRow(ResultSet resultSet, int i) throws SQLException { actionInfo.setCreateTime(resultSet.getLong("create_time")); actionInfo.setFinished(resultSet.getBoolean("finished")); actionInfo.setFinishTime(resultSet.getLong("finish_time")); + actionInfo.setExecHost(resultSet.getString("exec_host")); actionInfo.setProgress(resultSet.getFloat("progress")); return actionInfo; } diff --git a/smart-metastore/src/main/java/org/smartdata/metastore/utils/MetaStoreUtils.java b/smart-metastore/src/main/java/org/smartdata/metastore/utils/MetaStoreUtils.java index 9a0f81c3959..7fce86f07f7 100644 --- a/smart-metastore/src/main/java/org/smartdata/metastore/utils/MetaStoreUtils.java +++ b/smart-metastore/src/main/java/org/smartdata/metastore/utils/MetaStoreUtils.java @@ -286,6 +286,7 @@ public static void initializeDataBase( + " create_time bigint(20) NOT NULL,\n" + " finished tinyint(4) NOT NULL,\n" + " finish_time bigint(20) NOT NULL,\n" + + " exec_host varchar(255),\n" + " progress float NOT NULL\n" + ") ;", "CREATE TABLE file_diff (\n"