Skip to content

Commit

Permalink
feat(portal): 将文件直接提交为sbatch命令 (#891)
Browse files Browse the repository at this point in the history
### 做了什么

**需求**
在文件管理中增加对所有文件对象增加”提交“按钮直接sbatch执行
暂时只考虑提交的文本作为作业直接执行
执行后也应该展示在正在运行的作业和已结束的作业中
提交文件大小限制为1M

**实现**
此pr完成上述需求
增加文件是否为文本文件的判断
增加此操作在审计系统中的操作日志

![image](https://github.com/PKUHPC/SCOW/assets/43978285/44df586e-110b-42be-ac99-a224d69dd330)

![image](https://github.com/PKUHPC/SCOW/assets/43978285/c50febf4-dcee-48f5-8a9b-e4cd9ac69210)

![image](https://github.com/PKUHPC/SCOW/assets/43978285/63ceac2c-8398-4ced-b7ab-fe19c670b6a8)

提交失败时

![fileSize-error](https://github.com/PKUHPC/SCOW/assets/43978285/aa6efe74-2955-430d-bd61-79086b5a82af)

![not-text](https://github.com/PKUHPC/SCOW/assets/43978285/b33edf48-aaa0-42ac-b62d-34dfe12118ce)

![sbatchfailed](https://github.com/PKUHPC/SCOW/assets/43978285/b562d79c-58b8-4419-9e98-0354e615237f)
操作日志追加

![image](https://github.com/PKUHPC/SCOW/assets/43978285/9a55c5c5-e5fd-4236-804c-9f16f56febd3)


调度器适配器接口仓库分支暂时变更为临时分支,测试结束后合并调度器适配器接口分支进主分支,修改复原SCOW中调度器适配器仓库
  • Loading branch information
piccaSun authored Nov 11, 2023
1 parent 22441e3 commit 135f2b1
Show file tree
Hide file tree
Showing 22 changed files with 387 additions and 4 deletions.
5 changes: 5 additions & 0 deletions .changeset/cold-moons-bathe.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@scow/grpc-api": minor
---

新增 submitFileAsJob 接口,直接把文件作为作业提交调度器执行
11 changes: 11 additions & 0 deletions .changeset/proud-seahorses-repeat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
"@scow/scheduler-adapter-protos": minor
"@scow/lib-scheduler-adapter": minor
"@scow/portal-server": minor
"@scow/test-adapter": minor
"@scow/portal-web": minor
"@scow/mis-web": minor
"@scow/utils": minor
---

在门户系统的文件管理下,新增将文件直接作为作业文本提交调度器执行的功能,如果调度器API版本低于此接口版本报错
2 changes: 2 additions & 0 deletions apps/mis-web/src/i18n/en.ts
Original file line number Diff line number Diff line change
Expand Up @@ -989,6 +989,7 @@ export default {
setPlatformBilling: "Set Platform Job Billing",
createTenant: "Create Tenant",
tenantPay: "Tenant Recharge",
submitFileItemAsJob: "Script Submission",
},
operationDetails: {
login: "User Login",
Expand Down Expand Up @@ -1041,6 +1042,7 @@ export default {
createTenant: "Create tenant {}, administrator: {}",
tenantPay: "Recharge tenant {} by {} yuan",
setPlatformBilling: "Set platform billing item {} price to {} yuan",
submitFileItemAsJob: "Cluster: {}, Submit Script: {}",
},
},
userRoles: {
Expand Down
2 changes: 2 additions & 0 deletions apps/mis-web/src/i18n/zh_cn.ts
Original file line number Diff line number Diff line change
Expand Up @@ -988,6 +988,7 @@ export default {
setPlatformBilling: "设置平台作业计费",
createTenant: "创建租户",
tenantPay: "租户充值",
submitFileItemAsJob: "提交脚本",
},
operationDetails: {
login: "用户登录",
Expand Down Expand Up @@ -1040,6 +1041,7 @@ export default {
createTenant: "创建租户{}, 租户管理员为: {}",
tenantPay: "为租户{}充值{}元",
setPlatformBilling: "设置平台的计费项{}价格为{}元",
submitFileItemAsJob: "集群:{},提交脚本:{}",
},
},
userRoles: {
Expand Down
5 changes: 5 additions & 0 deletions apps/mis-web/src/models/operationLog.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ export const OperationType: OperationTypeEnum = {
setPlatformBilling: "setPlatformBilling",
createTenant: "createTenant",
tenantPay: "tenantPay",
submitFileItemAsJob: "submitFileItemAsJob",
};

export const OperationLog = Type.Object({
Expand Down Expand Up @@ -163,6 +164,7 @@ export const getOperationTypeTexts = (t: OperationTextsTransType): { [key in Lib
setPlatformBilling: t(pTypes("setPlatformBilling")),
createTenant: t(pTypes("createTenant")),
tenantPay: t(pTypes("tenantPay")),
submitFileItemAsJob: t(pTypes("submitFileItemAsJob")),
};

};
Expand All @@ -186,6 +188,7 @@ export const OperationCodeMap: { [key in LibOperationType]: string } = {
deleteDirectory: "010505",
moveFileItem: "010506",
copyFileItem: "010507",
submitFileItemAsJob: "010508",
setJobTimeLimit: "010601",
createUser: "020201",
addUserToAccount: "020202",
Expand Down Expand Up @@ -363,6 +366,8 @@ export const getOperationDetail = (
case "setPlatformBilling":
return t(pDetails("setPlatformBilling"),
[operationEvent[logEvent].path, nullableMoneyToString(operationEvent[logEvent].price)]);
case "submitFileItemAsJob":
return t(pDetails("submitFileItemAsJob"), [operationEvent[logEvent].clusterId, operationEvent[logEvent].path]);
default:
return "-";
}
Expand Down
74 changes: 72 additions & 2 deletions apps/portal-server/src/services/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@ import { ServiceError } from "@ddadaal/tsgrpc-common";
import { plugin } from "@ddadaal/tsgrpc-server";
import { Status } from "@grpc/grpc-js/build/src/constants";
import { jobInfoToPortalJobInfo, jobInfoToRunningjob } from "@scow/lib-scheduler-adapter";
import { createDirectoriesRecursively } from "@scow/lib-ssh";
import { createDirectoriesRecursively, sftpReadFile, sftpStat } from "@scow/lib-ssh";
import { JobServiceServer, JobServiceService } from "@scow/protos/build/portal/job";
import { parseErrorDetails } from "@scow/rich-error-model";
import { ApiVersion } from "@scow/utils/build/version";
import { getClusterOps } from "src/clusterops";
import { JobTemplate } from "src/clusterops/api/job";
import { getAdapterClient } from "src/utils/clusters";
import { checkSchedulerApiVersion, getAdapterClient } from "src/utils/clusters";
import { clusterNotFound } from "src/utils/errors";
import { getClusterLoginNode, sshConnect } from "src/utils/ssh";

Expand Down Expand Up @@ -218,6 +219,75 @@ export const jobServiceServer = plugin((server) => {
return [{ jobId: reply.jobId }];
},


submitFileAsJob: async ({ request, logger }) => {
const { cluster, userId, filePath } = request;

const client = getAdapterClient(cluster);
if (!client) { throw clusterNotFound(cluster); }

// 当前接口要求的最低调度器接口版本
const minRequiredApiVersion: ApiVersion = { major: 1, minor: 2, patch: 0 };
// 检验调度器的API版本是否符合要求,不符合要求报错
await checkSchedulerApiVersion(client, minRequiredApiVersion);

const host = getClusterLoginNode(cluster);
if (!host) { throw clusterNotFound(cluster); }

const script = await sshConnect(host, userId, logger, async (ssh) => {

const sftp = await ssh.requestSFTP();

// 判断文件操作权限
const stat = await sftpStat(sftp)(filePath).catch((e) => {
logger.error(e, "stat %s as %s failed", filePath, userId);
throw <ServiceError> {
code: Status.PERMISSION_DENIED, message: `${filePath} is not accessible`,
};
});
// 文件SIZE大于1M不能提交sbatch执行
if (stat.size / (1024 * 1024) > 1) {
throw <ServiceError> {
code: Status.INVALID_ARGUMENT, message: `${filePath} is too large. Maximum file size is 1M`,
};
}

const isTextFile = await ssh.exec("file", [filePath]).then((res) => {
return res.match(/text/);
});
// 文件不是文本文件不能提交Sbatch执行
if (!isTextFile) {
throw <ServiceError> {
code: Status.INVALID_ARGUMENT, message: `${filePath} is not a text file`,
};
}

return await sftpReadFile(sftp)(filePath)
.then((buffer) => {
return buffer.toString("utf-8");
});
});

const reply = await asyncClientCall(client.job, "submitScriptAsJob", {
userId, script,
}).catch((e) => {
const ex = e as ServiceError;
const errors = parseErrorDetails(ex.metadata);
if (errors[0] && errors[0].$type === "google.rpc.ErrorInfo" && errors[0].reason === "SBATCH_FAILED") {
throw <ServiceError> {
code: Status.INTERNAL,
message: "sbatch failed",
details: e.details,
};
} else {
throw e;
}
});

return [{ jobId: reply.jobId }];
},


});

});
62 changes: 61 additions & 1 deletion apps/portal-server/src/utils/clusters.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,14 @@
* See the Mulan PSL v2 for more details.
*/

import { asyncClientCall } from "@ddadaal/tsgrpc-client";
import { ServiceError, status } from "@grpc/grpc-js";
import { Status } from "@grpc/grpc-js/build/src/constants";
import { getSchedulerAdapterClient, SchedulerAdapterClient } from "@scow/lib-scheduler-adapter";
import { parseErrorDetails } from "@scow/rich-error-model";
import { ApiVersion } from "@scow/utils/build/version";
import { clusters } from "src/config/clusters";


const adapterClientForClusters = Object.entries(clusters).reduce((prev, [cluster, c]) => {
const client = getSchedulerAdapterClient(c.adapterUrl);
prev[cluster] = client;
Expand All @@ -23,3 +27,59 @@ const adapterClientForClusters = Object.entries(clusters).reduce((prev, [cluster
export const getAdapterClient = (cluster: string) => {
return adapterClientForClusters[cluster];
};

/**
* 判断当前集群下的调度器API版本对比传入的接口是否已过时
* @param client
* @param minVersion
*/
export async function checkSchedulerApiVersion(client: SchedulerAdapterClient,
minVersion: ApiVersion): Promise<void> {

let scheduleApiVersion: ApiVersion | null;
try {
scheduleApiVersion = await asyncClientCall(client.version, "getVersion", {});
} catch (e) {
const ex = e as ServiceError;
const errors = parseErrorDetails(ex.metadata);
// 如果找不到获取版本号的接口,指定版本为接口存在前的最新版1.0.0
if (((e as any).code === status.UNIMPLEMENTED) ||
(errors[0] && errors[0].$type === "google.rpc.ErrorInfo" && errors[0].reason === "UNIMPLEMENTED")) {
scheduleApiVersion = { major: 1, minor: 0, patch: 0 };
} else {
throw <ServiceError> {
code: Status.UNIMPLEMENTED,
message: "unimplemented",
details: "The scheduler API version can not be confirmed."
+ "To use this method, the scheduler adapter must be upgraded to the version "
+ `${minVersion.major}.${minVersion.minor}.${minVersion.patch} `
+ "or higher.",
};
}
}

if (scheduleApiVersion) {

// 检查调度器接口版本是否大于等于最低要求版本
let geMinVersion: boolean;
if (scheduleApiVersion.major !== minVersion.major) {
geMinVersion = (scheduleApiVersion.major > minVersion.major);
} else if (scheduleApiVersion.minor !== minVersion.minor) {
geMinVersion = (scheduleApiVersion.minor > minVersion.minor);
} else {
geMinVersion = true;
}

if (!geMinVersion) {
throw <ServiceError> {
code: Status.FAILED_PRECONDITION,
message: "precondition failed",
details: "The method is not supported with the current scheduler adapter version. "
+ "To use this method, the scheduler adapter must be upgraded to the version "
+ `${minVersion.major}.${minVersion.minor}.${minVersion.patch} `
+ "or higher.",
};
}
}

};
2 changes: 2 additions & 0 deletions apps/portal-web/src/apis/api.mock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,8 @@ export const mockApi: MockApi<typeof api> = {

submitJob: async () => ({ jobId: 10 }),

submitFileAsJob: async () => ({ jobId: 10 }),

getAppLastSubmission: async () => ({
lastSubmissionInfo: {
userId: "test123",
Expand Down
2 changes: 2 additions & 0 deletions apps/portal-web/src/apis/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import type { GetJobTemplateSchema } from "src/pages/api/job/getJobTemplate";
import type { GetRunningJobsSchema } from "src/pages/api/job/getRunningJobs";
import type { ListJobTemplatesSchema } from "src/pages/api/job/listJobTemplates";
import type { RenameJobTemplateSchema } from "src/pages/api/job/renameJobTemplate";
import { SubmitFileAsJobSchema } from "src/pages/api/job/submitFileAsJob";
import type { SubmitJobSchema } from "src/pages/api/job/submitJob";
import type { ChangePasswordSchema } from "src/pages/api/profile/changePassword";
import type { CheckPasswordSchema } from "src/pages/api/profile/checkPassword";
Expand Down Expand Up @@ -97,6 +98,7 @@ export const api = {
listJobTemplates: apiClient.fromTypeboxRoute<typeof ListJobTemplatesSchema>("GET", "/api/job/listJobTemplates"),
renameJobTemplate: apiClient.fromTypeboxRoute<typeof RenameJobTemplateSchema>("POST", "/api/job/renameJobTemplate"),
submitJob: apiClient.fromTypeboxRoute<typeof SubmitJobSchema>("POST", "/api/job/submitJob"),
submitFileAsJob: apiClient.fromTypeboxRoute<typeof SubmitFileAsJobSchema>("POST", "/api/job/submitFileAsJob"),
changePassword: apiClient.fromTypeboxRoute<typeof ChangePasswordSchema>("PATCH", "/api/profile/changePassword"),
checkPassword: apiClient.fromTypeboxRoute<typeof CheckPasswordSchema>("GET", "/api/profile/checkPassword"),
startFileTransfer: apiClient.fromTypeboxRoute<typeof StartFileTransferSchema>("PATCH", "/api/file/startFileTransfer"),
Expand Down
5 changes: 5 additions & 0 deletions apps/portal-web/src/i18n/en.ts
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,11 @@ export default {
deleteConfirmContent: "Confirm deletion of {}?",
deleteConfirmOk: "Confirm",
deleteSuccessMessage: "Deleted successfully",
submitConfirmTitle: "Submit Confirmation",
submitConfirmContent: "Confirm submission of {} to {}?",
submitConfirmOk: "Confirm",
submitSuccessMessage: "Submitted successfully! Your new job ID is: {}",
submitFailedMessage: "Submitted Failed",
},
},
fileTable: {
Expand Down
5 changes: 5 additions & 0 deletions apps/portal-web/src/i18n/zh_cn.ts
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,11 @@ export default {
deleteConfirmContent: "确认删除{}?",
deleteConfirmOk: "确认",
deleteSuccessMessage: "删除成功",
submitConfirmTitle: "确认提交",
submitConfirmContent: "确认提交{}至{}?",
submitConfirmOk: "确认",
submitSuccessMessage: "提交成功!您的新作业ID为:{}",
submitFailedMessage: "提交失败",
},
},
fileTable: {
Expand Down
1 change: 1 addition & 0 deletions apps/portal-web/src/models/operationLog.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,5 @@ export const OperationType: OperationTypeEnum = {
tenantPay: "tenantPay",
blockAccount: "blockAccount",
unblockAccount: "unblockAccount",
submitFileItemAsJob: "submitFileItemAsJob",
};
41 changes: 41 additions & 0 deletions apps/portal-web/src/pageComponents/filemanager/FileManager.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,47 @@ export const FileManager: React.FC<Props> = ({ cluster, path, urlPrefix }) => {
>
{t("button.deleteButton")}
</a>
{
i.type === "FILE" ? (
<a onClick={() => {
const fullPath = join(path, i.name);
modal.confirm({
title: t(p("tableInfo.submitConfirmTitle")),
content: t(p("tableInfo.submitConfirmContent"),
[i.name, getI18nConfigCurrentText(cluster.name, languageId)]),
okText: t(p("tableInfo.submitConfirmOk")),
onOk: async () => {
await api.submitFileAsJob({
body: {
cluster: cluster.id,
filePath: fullPath,
},
})
.httpError(500, (e) => {
e.code === "SCHEDULER_FAILED" || e.code === "FAILED_PRECONDITION" ? modal.error({
title: t(p("tableInfo.submitFailedMessage")),
content: e.message,
}) : (() => { throw e; })();
})
.httpError(400, (e) => {
e.code === "INVALID_ARGUMENT" || e.code === "INVALID_PATH" ? modal.error({
title: t(p("tableInfo.submitFailedMessage")),
content: e.message,
}) : (() => { throw e; })();
})
.then((result) => {
message.success(t(p("tableInfo.submitSuccessMessage"), [result.jobId]));
resetSelectedAndOperation();
reload();
});
},
});
}}
>
{t("button.submitButton")}
</a>
) : undefined
}
</Space>
)}
/>
Expand Down
Loading

0 comments on commit 135f2b1

Please sign in to comment.