diff --git a/.changeset/warm-gifts-know.md b/.changeset/warm-gifts-know.md new file mode 100644 index 0000000000..2460c6e97d --- /dev/null +++ b/.changeset/warm-gifts-know.md @@ -0,0 +1,11 @@ +--- +"@scow/portal-server": patch +"@scow/lib-operation-log": patch +"@scow/scowd-protos": patch +"@scow/portal-web": patch +"@scow/mis-web": patch +"@scow/lib-server": patch +"@scow/grpc-api": minor +--- + +接入 scowd 文件分片上传 diff --git a/apps/mis-web/src/i18n/en.ts b/apps/mis-web/src/i18n/en.ts index 68051b7652..1e4df4b744 100644 --- a/apps/mis-web/src/i18n/en.ts +++ b/apps/mis-web/src/i18n/en.ts @@ -1244,6 +1244,8 @@ export default { customEvent: "Custom Operation Event", activateCluster: "Activate Cluster", deactivateCluster: "Deactivate Cluster", + mergeFileChunks: "Merge and upload temporary file blocks", + initMultipartUpload: "Initial multipart upload file", }, operationDetails: { login: "User Login", diff --git a/apps/mis-web/src/i18n/zh_cn.ts b/apps/mis-web/src/i18n/zh_cn.ts index 4c2b146483..9448aa1f25 100644 --- a/apps/mis-web/src/i18n/zh_cn.ts +++ b/apps/mis-web/src/i18n/zh_cn.ts @@ -1242,6 +1242,8 @@ export default { customEvent: "自定义操作行为", activateCluster: "启用集群", deactivateCluster: "停用集群", + mergeFileChunks: "合并临时文件块", + initMultipartUpload: "初始化分片上传", }, operationDetails: { login: "用户登录", diff --git a/apps/mis-web/src/models/operationLog.ts b/apps/mis-web/src/models/operationLog.ts index 7ef8c6111b..794daef88c 100644 --- a/apps/mis-web/src/models/operationLog.ts +++ b/apps/mis-web/src/models/operationLog.ts @@ -175,6 +175,8 @@ export const getOperationTypeTexts = (t: OperationTextsTransType): {[key in LibO activateCluster: t(pTypes("activateCluster")), deactivateCluster: t(pTypes("deactivateCluster")), customEvent: t(pTypes("customEvent")), + mergeFileChunks: t(pTypes("mergeFileChunks")), + initMultipartUpload: t(pTypes("initMultipartUpload")), }; }; @@ -202,6 +204,8 @@ export const OperationCodeMap: {[key in LibOperationType]: string } = { moveFileItem: "010506", copyFileItem: "010507", submitFileItemAsJob: "010508", + mergeFileChunks: "010509", + initMultipartUpload: "010510", setJobTimeLimit: "010601", createImage:"010701", updateImage:"010702", diff --git a/apps/portal-server/src/clusterops/file/index.ts b/apps/portal-server/src/clusterops/file/index.ts index a609024f67..0a8a76c820 100644 --- a/apps/portal-server/src/clusterops/file/index.ts +++ b/apps/portal-server/src/clusterops/file/index.ts @@ -25,7 +25,7 @@ export const fileOps = (cluster: string): FileOps => { const clusterInfo = configClusters[cluster]; if (clusterInfo.scowd?.enabled) { const client = getScowdClient(cluster); - + return { ...scowdFileServices(client), }; @@ -33,7 +33,7 @@ export const fileOps = (cluster: string): FileOps => { const host = getClusterLoginNode(cluster); if (!host) { throw clusterNotFound(cluster); } - + return { ...sshFileServices(host), }; diff --git a/apps/portal-server/src/clusterops/file/scowdFile.ts b/apps/portal-server/src/clusterops/file/scowdFile.ts index 8295fb0bcf..d4a3b1ed93 100644 --- a/apps/portal-server/src/clusterops/file/scowdFile.ts +++ b/apps/portal-server/src/clusterops/file/scowdFile.ts @@ -110,7 +110,7 @@ export const scowdFileServices = (client: ScowdClient): FileOps => ({ type: fileInfo_FileTypeFromJSON(info.fileType), mtime: info.modTime, mode: info.mode, - size: Number(info.size), + size: Number(info.sizeByte), }; }); return { results }; @@ -124,7 +124,7 @@ export const scowdFileServices = (client: ScowdClient): FileOps => ({ try { const readStream = client.file.download({ - userId, path, chunkSize: config.DOWNLOAD_CHUNK_SIZE, + userId, path, chunkSizeByte: config.DOWNLOAD_CHUNK_SIZE, }); for await (const response of readStream) { @@ -183,7 +183,7 @@ export const scowdFileServices = (client: ScowdClient): FileOps => ({ try { const res = await client.file.getFileMetadata({ userId, filePath: path }); - return { size: Number(res.size), type: res.type === FileType.DIR ? "dir" : "file" }; + return { size: Number(res.sizeByte), type: res.type === FileType.DIR ? "dir" : "file" }; } catch (err) { throw mapTRPCExceptionToGRPC(err); diff --git a/apps/portal-server/src/services/file.ts b/apps/portal-server/src/services/file.ts index efe361e467..6ed60c428c 100644 --- a/apps/portal-server/src/services/file.ts +++ b/apps/portal-server/src/services/file.ts @@ -12,13 +12,17 @@ import { plugin } from "@ddadaal/tsgrpc-server"; import { ServiceError, status } from "@grpc/grpc-js"; +import { Status } from "@grpc/grpc-js/build/src/constants"; import { loggedExec, sftpAppendFile, sftpExists, sftpMkdir, sftpReadFile, sftpRealPath, sshRmrf } from "@scow/lib-ssh"; -import { FileServiceServer, FileServiceService, TransferInfo } from "@scow/protos/build/portal/file"; +import { + FileInfo, fileInfo_FileTypeFromJSON, FileServiceServer, FileServiceService, TransferInfo, +} from "@scow/protos/build/portal/file"; import { getClusterOps } from "src/clusterops"; import { configClusters } from "src/config/clusters"; import { checkActivatedClusters } from "src/utils/clusters"; import { clusterNotFound } from "src/utils/errors"; +import { getScowdClient, mapTRPCExceptionToGRPC } from "src/utils/scowd"; import { getClusterLoginNode, getClusterTransferNode, sshConnect, tryGetClusterTransferNode } from "src/utils/ssh"; export const fileServiceServer = plugin((server) => { @@ -193,6 +197,78 @@ export const fileServiceServer = plugin((server) => { }, + initMultipartUpload: async ({ request }) => { + + const { cluster, userId, path, name } = request; + await checkActivatedClusters({ clusterIds: cluster }); + + const host = getClusterLoginNode(cluster); + + if (!host) { throw clusterNotFound(cluster); } + + const clusterInfo = configClusters[cluster]; + + if (!clusterInfo.scowd?.enabled) { + throw { + code: Status.UNIMPLEMENTED, + message: "To use this interface, you need to enable scowd.", + } as ServiceError; + } + + const client = getScowdClient(cluster); + + try { + const initData = await client.file.initMultipartUpload({ userId, path, name }); + + return [{ + ...initData, + chunkSizeByte: Number(initData.chunkSizeByte), + filesInfo: initData.filesInfo.map((info): FileInfo => { + return { + name: info.name, + type: fileInfo_FileTypeFromJSON(info.fileType), + mtime: info.modTime, + mode: info.mode, + size: Number(info.sizeByte), + }; + }), + }]; + + } catch (err) { + throw mapTRPCExceptionToGRPC(err); + } + }, + + mergeFileChunks: async ({ request }) => { + const { cluster, userId, path, name, sizeByte } = request; + await checkActivatedClusters({ clusterIds: cluster }); + + const host = getClusterLoginNode(cluster); + + if (!host) { throw clusterNotFound(cluster); } + + const clusterInfo = configClusters[cluster]; + + if (!clusterInfo.scowd?.enabled) { + throw { + code: Status.UNIMPLEMENTED, + message: "To use this interface, you need to enable scowd.", + } as ServiceError; + } + + const client = getScowdClient(cluster); + + try { + await client.file.mergeFileChunks({ userId, path, name, sizeByte: BigInt(sizeByte) }); + + return [{}]; + + } catch (err) { + throw mapTRPCExceptionToGRPC(err); + } + }, + + getFileMetadata: async ({ request, logger }) => { const { userId, cluster, path } = request; await checkActivatedClusters({ clusterIds: cluster }); diff --git a/apps/portal-server/src/utils/scowd.ts b/apps/portal-server/src/utils/scowd.ts index 61811991a9..8d73adae16 100644 --- a/apps/portal-server/src/utils/scowd.ts +++ b/apps/portal-server/src/utils/scowd.ts @@ -29,6 +29,7 @@ export function getLoginNodeScowdUrl(cluster: string, host: string): string | un if (!loginNode) return undefined; const { address, scowdPort } = loginNode; + return config.SCOWD_SSL_ENABLED ? `https://${removePort(address)}:${scowdPort}` : `http://${removePort(address)}:${scowdPort}`; } diff --git a/apps/portal-web/package.json b/apps/portal-web/package.json index 051840bdd4..b288e99ece 100644 --- a/apps/portal-web/package.json +++ b/apps/portal-web/package.json @@ -52,8 +52,11 @@ "@sinclair/typebox": "0.32.34", "@uiw/codemirror-theme-github": "4.22.2", "@uiw/react-codemirror": "4.21.20", + "@xterm/addon-fit": "0.10.0", + "@xterm/xterm": "5.5.0", "antd": "5.18.3", "busboy": "1.6.0", + "crypto-js": "4.2.0", "dayjs": "1.11.11", "google-protobuf": "3.21.2", "http-proxy": "1.18.1", @@ -63,6 +66,7 @@ "next-compose-plugins": "2.2.1", "nookies": "2.5.2", "nprogress": "0.2.0", + "p-limit": "6.1.0", "react": "18.3.1", "react-async": "10.0.1", "react-dom": "18.3.1", @@ -73,9 +77,7 @@ "styled-components": "6.1.11", "tslib": "2.6.3", "typescript": "5.5.2", - "ws": "8.17.1", - "@xterm/xterm": "5.5.0", - "@xterm/addon-fit": "0.10.0" + "ws": "8.17.1" }, "devDependencies": { "@ddadaal/next-typed-api-routes-cli": "0.9.1", diff --git a/apps/portal-web/src/apis/api.mock.ts b/apps/portal-web/src/apis/api.mock.ts index 21bd57797c..de57dee982 100644 --- a/apps/portal-web/src/apis/api.mock.ts +++ b/apps/portal-web/src/apis/api.mock.ts @@ -315,6 +315,12 @@ export const mockApi: MockApi = { }, }), + mergeFileChunks: null, + initMultipartUpload: async () => ({ + tempFileDir: "home/user/scow/tempDir", + chunkSizeByte: 5 * 1024 * 1024, + filesInfo: [], + }), getClustersRuntimeInfo: async () => ({ results: [{ clusterId: "hpc01", diff --git a/apps/portal-web/src/apis/api.ts b/apps/portal-web/src/apis/api.ts index feaf15a167..fbd1c94a5b 100644 --- a/apps/portal-web/src/apis/api.ts +++ b/apps/portal-web/src/apis/api.ts @@ -44,8 +44,10 @@ import type { DownloadFileSchema } from "src/pages/api/file/download"; import type { FileExistSchema } from "src/pages/api/file/fileExist"; import type { GetFileTypeSchema } from "src/pages/api/file/getFileType"; import type { GetHomeDirectorySchema } from "src/pages/api/file/getHome"; +import type { InitMultipartUploadSchema } from "src/pages/api/file/initMultipartUpload"; import type { ListFileSchema } from "src/pages/api/file/list"; import type { ListAvailableTransferClustersSchema } from "src/pages/api/file/listAvailableTransferClusters"; +import type { MergeFileChunksSchema } from "src/pages/api/file/mergeFileChunks"; import type { MkdirSchema } from "src/pages/api/file/mkdir"; import type { MoveFileItemSchema } from "src/pages/api/file/move"; import type { QueryFileTransferProgressSchema } from "src/pages/api/file/queryFileTransferProgress"; @@ -97,8 +99,10 @@ export const api = { fileExist: apiClient.fromTypeboxRoute("GET", "/api/file/fileExist"), getFileType: apiClient.fromTypeboxRoute("GET", "/api/file/getFileType"), getHomeDirectory: apiClient.fromTypeboxRoute("GET", "/api/file/getHome"), + initMultipartUpload: apiClient.fromTypeboxRoute("POST", "/api/file/initMultipartUpload"), listFile: apiClient.fromTypeboxRoute("GET", "/api/file/list"), listAvailableTransferClusters: apiClient.fromTypeboxRoute("GET", "/api/file/listAvailableTransferClusters"), + mergeFileChunks: apiClient.fromTypeboxRoute("POST", "/api/file/mergeFileChunks"), mkdir: apiClient.fromTypeboxRoute("POST", "/api/file/mkdir"), moveFileItem: apiClient.fromTypeboxRoute("PATCH", "/api/file/move"), queryFileTransferProgress: apiClient.fromTypeboxRoute("GET", "/api/file/queryFileTransferProgress"), diff --git a/apps/portal-web/src/i18n/en.ts b/apps/portal-web/src/i18n/en.ts index 2ddfbeb301..8ab7dc9710 100644 --- a/apps/portal-web/src/i18n/en.ts +++ b/apps/portal-web/src/i18n/en.ts @@ -343,6 +343,10 @@ export default { existedModalOk: "Confirm", dragText: "Click or drag files here", hintText: "Supports uploading single or multiple files", + multipartUploadError: "Upload file failed: {}", + calculateHashError: "Error calculating hash: {}", + uploadFileListNotExist: "The uploaded file list does not exist: {}", + mergeFileChunksErrorText: "Failed to merge file {}, please try again", }, }, // desktop diff --git a/apps/portal-web/src/i18n/zh_cn.ts b/apps/portal-web/src/i18n/zh_cn.ts index 3f94eb3303..fd924603ab 100644 --- a/apps/portal-web/src/i18n/zh_cn.ts +++ b/apps/portal-web/src/i18n/zh_cn.ts @@ -343,6 +343,10 @@ export default { existedModalOk: "确认", dragText: "点击或者将文件拖动到这里", hintText: "支持上传单个或者多个文件", + multipartUploadError: "文件上传失败: {}", + calculateHashError: "计算哈希值错误: {}", + uploadFileListNotExist: "上传文件列表中不存在: {}", + mergeFileChunksErrorText: "合并文件 {} 失败,请重试", }, }, // desktop diff --git a/apps/portal-web/src/pageComponents/filemanager/FileManager.tsx b/apps/portal-web/src/pageComponents/filemanager/FileManager.tsx index ffe5c06191..fe39aa1d0b 100644 --- a/apps/portal-web/src/pageComponents/filemanager/FileManager.tsx +++ b/apps/portal-web/src/pageComponents/filemanager/FileManager.tsx @@ -54,6 +54,7 @@ interface Props { cluster: Cluster; path: string; urlPrefix: string; + scowdEnabled: boolean; } interface PromiseSettledResult { @@ -93,7 +94,7 @@ interface Operation { const p = prefix("pageComp.fileManagerComp.fileManager."); -export const FileManager: React.FC = ({ cluster, path, urlPrefix }) => { +export const FileManager: React.FC = ({ cluster, path, urlPrefix, scowdEnabled }) => { const router = useRouter(); @@ -411,6 +412,7 @@ export const FileManager: React.FC = ({ cluster, path, urlPrefix }) => { cluster={cluster.id} path={path} reload={reload} + scowdEnabled={scowdEnabled} > {t(p("tableInfo.uploadButton"))} diff --git a/apps/portal-web/src/pageComponents/filemanager/UploadModal.tsx b/apps/portal-web/src/pageComponents/filemanager/UploadModal.tsx index 6a1065ae03..eee6201b99 100644 --- a/apps/portal-web/src/pageComponents/filemanager/UploadModal.tsx +++ b/apps/portal-web/src/pageComponents/filemanager/UploadModal.tsx @@ -12,36 +12,158 @@ import { DeleteOutlined, InboxOutlined } from "@ant-design/icons"; import { App, Button, Modal, Upload, UploadFile } from "antd"; +import pLimit from "p-limit"; import { join } from "path"; -import { useState } from "react"; +import { useEffect, useRef, useState } from "react"; import { api } from "src/apis"; import { prefix, useI18nTranslateToString } from "src/i18n"; import { urlToUpload } from "src/pageComponents/filemanager/api"; import { publicConfig } from "src/utils/config"; +import { calculateBlobSHA256 } from "src/utils/file"; import { convertToBytes } from "src/utils/format"; - interface Props { open: boolean; onClose: () => void; reload: () => void; cluster: string; path: string; + scowdEnabled: boolean; +} + +interface UploadProgressEvent { + percent: number; } const p = prefix("pageComp.fileManagerComp.uploadModal."); -export const UploadModal: React.FC = ({ open, onClose, path, reload, cluster }) => { +type OnProgressCallback = undefined | ((progressEvent: UploadProgressEvent) => void); +export const UploadModal: React.FC = ({ open, onClose, path, reload, cluster, scowdEnabled }) => { const { message, modal } = App.useApp(); const [ uploadFileList, setUploadFileList ] = useState([]); + const uploadControllers = useRef(new Map()); const t = useI18nTranslateToString(); + useEffect(() => { + return () => { + setUploadFileList([]); + }; + }, [open]); + const onModalClose = () => { - setUploadFileList([]); + for (const controller of Array.from(uploadControllers.current.values())) { + controller.abort(); + } + + uploadControllers.current.clear(); onClose(); }; + const handleRemove = (file: UploadFile) => { + const controller = uploadControllers.current.get(file.uid); + if (controller) { + controller.abort(); + uploadControllers.current.delete(file.uid); + } + + return true; + }; + + const startMultipartUpload = async (file: File, onProgress: OnProgressCallback) => { + const { tempFileDir, chunkSizeByte, filesInfo } = await api.initMultipartUpload({ + body: { cluster, path, name: file.name }, + }); + + const uploadedChunkIndices = new Set( + filesInfo.map((item) => { + const reg = /_(\d+).scowuploadtemp/; + const match = reg.exec(item.name); + return match ? parseInt(match[1]) : null; + }).filter((index) => index !== null), + ); + + const totalCount = Math.ceil(file.size / chunkSizeByte); + const concurrentChunks = 3; + let uploadedCount = uploadedChunkIndices.size; + + const uploadFile = uploadFileList.find((uploadFile) => uploadFile.name === file.name); + if (!uploadFile) { + message.error(t(p("uploadFileListNotExist"), [file.name])); + return; + } + + const updateProgress = (count: number) => { + uploadedCount += count; + onProgress?.({ percent: Number(((uploadedCount / totalCount) * 100).toFixed(2)) }); + }; + + const controller = new AbortController(); + uploadControllers.current.set(uploadFile.uid, controller); + + const limit = pLimit(concurrentChunks); + + const uploadChunk = async (start: number): Promise => { + if (controller.signal.aborted) { + return; + } + + if (uploadedChunkIndices.has(start + 1)) { + // 如果文件块已经上传,直接跳过 + return; + } + + const chunk = file.slice(start * chunkSizeByte, (start + 1) * chunkSizeByte); + const hash = await calculateBlobSHA256(chunk); + const fileName = `${hash}_${start + 1}.scowuploadtemp`; + + const formData = new FormData(); + formData.append("file", chunk); + + const response = await fetch(urlToUpload(cluster, join(tempFileDir, fileName)), { + method: "POST", + body: formData, + signal: controller.signal, + }); + + if (!response.ok) { + throw new Error(response.statusText); + } + + updateProgress(1); + + }; + + try { + const batchSize = 10; // 每次上传10个文件块 + for (let i = 0; i < totalCount; i += batchSize) { + const batchPromises: Promise[] = []; + for (let j = i; j < Math.min(i + batchSize, totalCount); j++) { + if (controller.signal.aborted) { + break; + } + batchPromises.push(limit(() => uploadChunk(j))); + } + await Promise.all(batchPromises); + } + + if (!controller.signal.aborted) { + await api.mergeFileChunks({ body: { cluster, path, name: file.name, sizeByte: file.size } }) + .httpError(520, (err) => { + message.error(t(p("mergeFileChunksErrorText"), [file.name])); + throw err; + }); + } + + } catch (err) { + message.error(t(p("multipartUploadError"), [err.message])); + throw err; + } finally { + uploadControllers.current.delete(uploadFile.uid); + } + }; + + return ( = ({ open, onClose, path, reload, clus urlToUpload(cluster, join(path, file.name))} + {...(scowdEnabled ? { + customRequest: ({ file, onSuccess, onError, onProgress }) => { + startMultipartUpload(file as File, onProgress).then(onSuccess).catch(onError); + }, + } : { + action: async (file) => urlToUpload(cluster, join(path, file.name)), + })} withCredentials showUploadList={{ removeIcon: (file) => { return ( - + file.status === "uploading" + ? ( + handleRemove(file) : undefined} + title={t(p("cancelUpload"))} + /> + ) + : ); }, }} @@ -134,3 +267,4 @@ export const UploadModal: React.FC = ({ open, onClose, path, reload, clus ); }; + diff --git a/apps/portal-web/src/pages/api/file/initMultipartUpload.ts b/apps/portal-web/src/pages/api/file/initMultipartUpload.ts new file mode 100644 index 0000000000..73d6394283 --- /dev/null +++ b/apps/portal-web/src/pages/api/file/initMultipartUpload.ts @@ -0,0 +1,90 @@ +/** + * Copyright (c) 2022 Peking University and Peking University Institute for Computing and Digital Economy + * SCOW is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * http://license.coscl.org.cn/MulanPSL2 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + */ + +import { typeboxRouteSchema } from "@ddadaal/next-typed-api-routes-runtime"; +import { asyncUnaryCall } from "@ddadaal/tsgrpc-client"; +import { status } from "@grpc/grpc-js"; +import { OperationType } from "@scow/lib-operation-log"; +import { FileServiceClient } from "@scow/protos/build/portal/file"; +import { Type } from "@sinclair/typebox"; +import { authenticate } from "src/auth/server"; +import { OperationResult } from "src/models/operationLog"; +import { callLog } from "src/server/operationLog"; +import { getClient } from "src/utils/client"; +import { route } from "src/utils/route"; +import { handlegRPCError, parseIp } from "src/utils/server"; + +import { FileInfo, mapType } from "./list"; + +export const InitMultipartUploadSchema = typeboxRouteSchema({ + method: "POST", + + body: Type.Object({ + cluster: Type.String(), + path: Type.String(), + name: Type.String(), + }), + + responses: { + 200: Type.Object({ + tempFileDir: Type.String(), + chunkSizeByte: Type.Number(), + filesInfo: Type.Array(FileInfo), + }), + 403: Type.Object({ code: Type.Literal("PERMISSION_DENIED") }), + 500: Type.Object({ code: Type.Literal("INITIAL_UPLOAD_FAILED") }), + 501: Type.Object({ code: Type.Literal("UNIMPLEMENTED") }), + 520: Type.Object({ code: Type.Literal("UNKNOWN_ERROR") }), + }, +}); + +const auth = authenticate(() => true); + +export default route(InitMultipartUploadSchema, async (req, res) => { + + const info = await auth(req, res); + + if (!info) { return; } + + const { cluster, path, name } = req.body; + + const client = getClient(FileServiceClient); + + const logInfo = { + operatorUserId: info.identityId, + operatorIp: parseIp(req) ?? "", + operationTypeName: OperationType.initMultipartUpload, + operationTypePayload:{ + clusterId: cluster, path, name, + }, + }; + + return asyncUnaryCall(client, "initMultipartUpload", { + cluster, path, userId: info.identityId, name, + }).then(async (res) => { + await callLog(logInfo, OperationResult.SUCCESS); + return { 200: { + ...res, + filesInfo: res.filesInfo.map(({ mode, mtime, name, size, type }) => ({ + mode, mtime, name, size, type: mapType[type], + })), + } }; + }, handlegRPCError({ + [status.INTERNAL]: () => ({ 500: { code: "INITIAL_UPLOAD_FAILED" as const } }), + [status.PERMISSION_DENIED]: () => ({ 403: { code: "PERMISSION_DENIED" as const } }), + [status.UNKNOWN]: () => ({ 520: { code: "UNKNOWN_ERROR" as const } }), + [status.UNIMPLEMENTED]: () => ({ 501: { code: "UNIMPLEMENTED" as const } }), + }, + async () => await callLog(logInfo, OperationResult.FAIL), + )); + +}); diff --git a/apps/portal-web/src/pages/api/file/list.ts b/apps/portal-web/src/pages/api/file/list.ts index 2257375d20..3e9765742d 100644 --- a/apps/portal-web/src/pages/api/file/list.ts +++ b/apps/portal-web/src/pages/api/file/list.ts @@ -55,7 +55,7 @@ export const ListFileSchema = typeboxRouteSchema({ const auth = authenticate(() => true); -const mapType = { +export const mapType = { [FileInfo_FileType.DIR]: "DIR", [FileInfo_FileType.FILE]: "FILE", } as const; @@ -81,5 +81,4 @@ export default route(ListFileSchema, async (req, res) => { [status.PERMISSION_DENIED]: () => ({ 403: { code: "NOT_ACCESSIBLE" as const } }), [status.INVALID_ARGUMENT]: () => ({ 412: { code: "DIRECTORY_NOT_FOUND" as const } }), })); - }); diff --git a/apps/portal-web/src/pages/api/file/mergeFileChunks.ts b/apps/portal-web/src/pages/api/file/mergeFileChunks.ts new file mode 100644 index 0000000000..41a0504396 --- /dev/null +++ b/apps/portal-web/src/pages/api/file/mergeFileChunks.ts @@ -0,0 +1,80 @@ +/** + * Copyright (c) 2022 Peking University and Peking University Institute for Computing and Digital Economy + * SCOW is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * http://license.coscl.org.cn/MulanPSL2 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + */ + +import { typeboxRouteSchema } from "@ddadaal/next-typed-api-routes-runtime"; +import { asyncUnaryCall } from "@ddadaal/tsgrpc-client"; +import { status } from "@grpc/grpc-js"; +import { OperationType } from "@scow/lib-operation-log"; +import { FileServiceClient } from "@scow/protos/build/portal/file"; +import { Type } from "@sinclair/typebox"; +import { authenticate } from "src/auth/server"; +import { OperationResult } from "src/models/operationLog"; +import { callLog } from "src/server/operationLog"; +import { getClient } from "src/utils/client"; +import { route } from "src/utils/route"; +import { handlegRPCError, parseIp } from "src/utils/server"; + +export const MergeFileChunksSchema = typeboxRouteSchema({ + method: "POST", + + body: Type.Object({ + cluster: Type.String(), + path: Type.String(), + name: Type.String(), + sizeByte: Type.Number(), + }), + + responses: { + 204: Type.Null(), + 403: Type.Object({ code: Type.Literal("PERMISSION_DENIED") }), + 404: Type.Object({ code: Type.Literal("FILE_NOT_EXISTS") }), + 501: Type.Object({ code: Type.Literal("UNIMPLEMENTED") }), + 520: Type.Object({ code: Type.Literal("MERGE_CHUNKS_FAILED") }), + }, +}); + +const auth = authenticate(() => true); + +export default route(MergeFileChunksSchema, async (req, res) => { + + const info = await auth(req, res); + + if (!info) { return; } + + const { cluster, path, name, sizeByte } = req.body; + + const client = getClient(FileServiceClient); + + const logInfo = { + operatorUserId: info.identityId, + operatorIp: parseIp(req) ?? "", + operationTypeName: OperationType.mergeFileChunks, + operationTypePayload:{ + clusterId: cluster, path, name, sizeByte, + }, + }; + + return asyncUnaryCall(client, "mergeFileChunks", { + cluster, path, userId: info.identityId, name, sizeByte, + }).then(async () => { + await callLog(logInfo, OperationResult.SUCCESS); + return { 204: null }; + }, handlegRPCError({ + [status.NOT_FOUND]: () => ({ 404: { code: "FILE_NOT_EXISTS" as const } }), + [status.PERMISSION_DENIED]: () => ({ 403: { code: "PERMISSION_DENIED" as const } }), + [status.UNKNOWN]: () => ({ 520: { code: "MERGE_CHUNKS_FAILED" as const } }), + [status.UNIMPLEMENTED]: () => ({ 501: { code: "UNIMPLEMENTED" as const } }), + }, + async () => await callLog(logInfo, OperationResult.FAIL), + )); + +}); diff --git a/apps/portal-web/src/pages/files/[cluster]/[[...path]].tsx b/apps/portal-web/src/pages/files/[cluster]/[[...path]].tsx index aa18d31952..f8bf9a8c92 100644 --- a/apps/portal-web/src/pages/files/[cluster]/[[...path]].tsx +++ b/apps/portal-web/src/pages/files/[cluster]/[[...path]].tsx @@ -13,16 +13,31 @@ import { queryToArray, queryToString } from "@scow/lib-web/build/utils/querystring"; import { getI18nConfigCurrentText } from "@scow/lib-web/build/utils/systemLanguage"; import { Result } from "antd"; -import { NextPage } from "next"; +import { GetServerSideProps, NextPage } from "next"; import { useRouter } from "next/router"; +import { useState } from "react"; import { useStore } from "simstate"; +import { api } from "src/apis"; +import { USE_MOCK } from "src/apis/useMock"; +import { getTokenFromCookie } from "src/auth/cookie"; import { requireAuth } from "src/auth/requireAuth"; +import { AuthResultError, ssrAuthenticate } from "src/auth/server"; +import { UnifiedErrorPage } from "src/components/errorPages/UnifiedErrorPage"; import { useI18n, useI18nTranslateToString } from "src/i18n"; import { FileManager } from "src/pageComponents/filemanager/FileManager"; import { ClusterInfoStore } from "src/stores/ClusterInfoStore"; import { Head } from "src/utils/head"; -export const FileManagerPage: NextPage = requireAuth(() => true)(() => { +type Props = { + error: AuthResultError; +} | { + scowdEnabledClusters: string[]; +}; + +export const FileManagerPage: NextPage = requireAuth(() => true)((props: Props) => { + if ("error" in props) { + return ; + } const languageId = useI18n().currentLanguage.id; @@ -30,6 +45,7 @@ export const FileManagerPage: NextPage = requireAuth(() => true)(() => { const pathParts = queryToArray(router.query.path); const cluster = queryToString(router.query.cluster); + const [ scowdEnabled, _ ] = useState(!!props.scowdEnabledClusters?.includes(cluster)); const t = useI18nTranslateToString(); @@ -57,9 +73,43 @@ export const FileManagerPage: NextPage = requireAuth(() => true)(() => { cluster={clusterObj} path={fullPath} urlPrefix="/files" + scowdEnabled={scowdEnabled} /> ); }); +export const getServerSideProps: GetServerSideProps = async ({ req }) => { + + const auth = ssrAuthenticate(() => true); + + const info = await auth(req); + if (typeof info === "number") { + return { props: { error: info } }; + } + + // Cannot directly call api routes here, so mock is not available directly. + // manually call mock + if (USE_MOCK) { + return { + props: { + scowdEnabledClusters: [ "hpc01" ], + }, + }; + } + + const token = getTokenFromCookie({ req }); + const resp = await api.getClusterConfigFiles({ query: { token } }); + + const scowdEnabledClusters: string[] = Object.entries(resp.clusterConfigs) + .filter(([_, config]) => !!config.scowd?.enabled) + .map(([cluster, _]) => cluster); + + return { + props: { + scowdEnabledClusters, + }, + }; +}; + export default FileManagerPage; diff --git a/apps/portal-web/src/utils/file.ts b/apps/portal-web/src/utils/file.ts index 972dd2c580..6cd572a211 100644 --- a/apps/portal-web/src/utils/file.ts +++ b/apps/portal-web/src/utils/file.ts @@ -11,12 +11,12 @@ */ import { CloseOutlined, FileOutlined, FolderOutlined } from "@ant-design/icons"; +import * as crypto from "crypto"; import { join } from "path"; import { FilterFormContainer } from "src/components/FilterFormContainer"; import { FileInfo, FileType } from "src/pages/api/file/list"; import { styled } from "styled-components"; - export type FileInfoKey = React.Key; export const fileInfoKey = (f: FileInfo, path: string): FileInfoKey => join(path, f.name); @@ -50,3 +50,21 @@ export const nodeModeToString = (mode: number) => { export const openPreviewLink = (href: string) => { window.open(href, "ViewFile", "location=yes,resizable=yes,scrollbars=yes,status=yes"); }; + +export async function calculateBlobSHA256(blob: Blob): Promise { + + try { + + const arrayBuffer = await blob.arrayBuffer(); + const buffer = Buffer.from(arrayBuffer); + const hash = crypto.createHash("sha256"); + hash.update(buffer); + const hashHex = hash.digest("hex"); + + return hashHex; + } catch (error) { + + throw new Error(`Failed to calculate hash: ${error.message}`); + } +} + diff --git a/libs/operation-log/src/constant.ts b/libs/operation-log/src/constant.ts index cbb1212b31..f5f69f9927 100644 --- a/libs/operation-log/src/constant.ts +++ b/libs/operation-log/src/constant.ts @@ -37,7 +37,9 @@ export const OperationType: OperationTypeEnum = { createApp: "createApp", createFile: "createFile", deleteFile: "deleteFile", + initMultipartUpload: "initMultipartUpload", uploadFile: "uploadFile", + mergeFileChunks: "mergeFileChunks", createDirectory: "createDirectory", deleteDirectory: "deleteDirectory", moveFileItem: "moveFileItem", diff --git a/libs/protos/scowd/protos/api/storage/file.proto b/libs/protos/scowd/protos/api/storage/file.proto index ba2a24bef7..5b0c45b186 100644 --- a/libs/protos/scowd/protos/api/storage/file.proto +++ b/libs/protos/scowd/protos/api/storage/file.proto @@ -13,7 +13,7 @@ message FileInfo { FileType file_type = 1; string name = 2; string mod_time = 3; - uint64 size = 4; + uint64 size_byte = 4; uint32 mode = 5; } @@ -64,7 +64,7 @@ message GetFileMetadataRequest { } message GetFileMetadataResponse { - uint64 size = 1; + uint64 size_byte = 1; FileType type = 2; } @@ -103,6 +103,18 @@ message CopyRequest { message CopyResponse { } +message InitMultipartUploadRequest { + string user_id = 1; + string path = 2; + string name = 3; +} + +message InitMultipartUploadResponse { + string temp_file_dir = 1; + uint64 chunk_size_byte = 2; + repeated FileInfo files_info = 3; +} + message UploadRequest { message Info { @@ -120,10 +132,20 @@ message UploadResponse { uint64 written_bytes = 1; } +message MergeFileChunksRequest { + string user_id = 1; + string path = 2; + string name = 3; + uint64 size_byte = 4; +} + +message MergeFileChunksResponse { +} + message DownloadRequest { string user_id = 1; string path = 2; - uint32 chunk_size = 3; + uint32 chunk_size_byte = 3; } message DownloadResponse { @@ -192,7 +214,9 @@ service FileService { rpc DeleteFile(DeleteFileRequest) returns (DeleteFileResponse) {} rpc Download(DownloadRequest) returns (stream DownloadResponse); + rpc InitMultipartUpload(InitMultipartUploadRequest) returns (InitMultipartUploadResponse); rpc Upload(stream UploadRequest) returns (UploadResponse); + rpc MergeFileChunks(MergeFileChunksRequest) returns (MergeFileChunksResponse); rpc GetFileMetadata(GetFileMetadataRequest) returns (GetFileMetadataResponse); rpc GetHomeDirectory(GetHomeDirectoryRequest) returns (GetHomeDirectoryResponse); diff --git a/libs/server/src/typeConversion.ts b/libs/server/src/typeConversion.ts index 9287cdfb84..2614241c7f 100644 --- a/libs/server/src/typeConversion.ts +++ b/libs/server/src/typeConversion.ts @@ -76,6 +76,9 @@ export const convertClusterConfigsToServerProtoType = ( displayName: getI18nSeverTypeFormat(item.displayName)!, adapterUrl: item.adapterUrl, priority: item.priority, + scowd: item.scowd ? { + enabled: item.scowd?.enabled ?? false, + } : undefined, proxyGateway: item.proxyGateway ? { url: item.proxyGateway.url || "", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index bc02c556c5..bbf8c53d26 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1047,6 +1047,9 @@ importers: busboy: specifier: 1.6.0 version: 1.6.0 + crypto-js: + specifier: 4.2.0 + version: 4.2.0 dayjs: specifier: 1.11.11 version: 1.11.11 @@ -1077,6 +1080,9 @@ importers: nprogress: specifier: 0.2.0 version: 0.2.0 + p-limit: + specifier: 6.1.0 + version: 6.1.0 react: specifier: 18.3.1 version: 18.3.1 @@ -5568,6 +5574,9 @@ packages: uWebSockets.js: optional: true + crypto-js@4.2.0: + resolution: {integrity: sha512-KALDyEYgpY+Rlob/iriUtjV6d5Eq+Y191A5g4UqLAi8CyGP9N1+FdVbkc1SxKc2r4YAYqG8JzO2KGL+AizD70Q==} + crypto-random-string@4.0.0: resolution: {integrity: sha512-x8dy3RnvYdlUcPOjkEHqozhiwzKNSq7GcPuXFbnyMOCHxX8V3OgIg/pYuabl2sbUPfIJaeAQB7PMOK8DFIdoRA==} engines: {node: '>=12'} @@ -8856,6 +8865,10 @@ packages: resolution: {integrity: sha512-5b0R4txpzjPWVw/cXXUResoD4hb6U/x9BH08L7nw+GN1sezDzPdxeRvpc9c433fZhBan/wusjbCsqwqm4EIBIQ==} engines: {node: ^12.20.0 || ^14.13.1 || >=16.0.0} + p-limit@6.1.0: + resolution: {integrity: sha512-H0jc0q1vOzlEk0TqAKXKZxdl7kX3OFUzCnNVUnq5Pc3DGo0kpeaMuPqxQn235HibwBEb0/pm9dgKTjXy66fBkg==} + engines: {node: '>=18'} + p-locate@3.0.0: resolution: {integrity: sha512-x+12w/To+4GFfgJhBEpiDcLozRJGegY+Ei7/z0tSLkMmxGZNybVMSfWj9aJn8Z5Fc7dBUNJOOVgPv2H7IwulSQ==} engines: {node: '>=6'} @@ -11985,6 +11998,10 @@ packages: resolution: {integrity: sha512-9bnSc/HEW2uRy67wc+T8UwauLuPJVn28jb+GtJY16iiKWyvmYJRXVT4UamsAEGQfPohgr2q4Tq0sQbQlxTfi1g==} engines: {node: '>=12.20'} + yocto-queue@1.1.1: + resolution: {integrity: sha512-b4JR1PFR10y1mKjhHY9LaGo6tmrgjit7hxVIeAmyMw3jegXR4dhYqLaQF5zMXZxY7tLpMyJeLjr1C4rLmkVe8g==} + engines: {node: '>=12.20'} + z-schema@5.0.5: resolution: {integrity: sha512-D7eujBWkLa3p2sIpJA0d1pr7es+a7m0vFAnZLlCEKq/Ij2k0MLi9Br2UPxoxdYystm5K1yeBGzub0FlYUEWj2Q==} engines: {node: '>=8.0.0'} @@ -17981,6 +17998,8 @@ snapshots: crossws@0.2.4: {} + crypto-js@4.2.0: {} + crypto-random-string@4.0.0: dependencies: type-fest: 1.4.0 @@ -22054,6 +22073,10 @@ snapshots: dependencies: yocto-queue: 1.0.0 + p-limit@6.1.0: + dependencies: + yocto-queue: 1.1.1 + p-locate@3.0.0: dependencies: p-limit: 2.3.0 @@ -25730,6 +25753,8 @@ snapshots: yocto-queue@1.0.0: {} + yocto-queue@1.1.1: {} + z-schema@5.0.5: dependencies: lodash.get: 4.4.2 diff --git a/protos/audit/operation_log.proto b/protos/audit/operation_log.proto index 131554104d..7f313d9ce1 100644 --- a/protos/audit/operation_log.proto +++ b/protos/audit/operation_log.proto @@ -137,11 +137,24 @@ message DeleteFile { string path = 2; } +message InitMultipartUpload { + string cluster_id = 1; + string path = 2; + string name = 3; +} + message UploadFile { string cluster_id = 1; string path = 2; } +message MergeFileChunks { + string cluster_id = 1; + string path = 2; + string name = 3; + uint64 size_byte = 4; +} + message CreateDirectory { string cluster_id = 1; string path = 2; @@ -700,6 +713,8 @@ message CreateOperationLogRequest { ShareModelVersion share_model_version = 94; DeleteModelVersion delete_model_version = 95; CopyModelVersion copy_model_version = 96; + MergeFileChunks merge_file_chunks = 97; + InitMultipartUpload init_multipart_upload = 98; } } @@ -804,6 +819,8 @@ message OperationLog { ShareModelVersion share_model_version = 96; DeleteModelVersion delete_model_version = 97; CopyModelVersion copy_model_version = 98; + MergeFileChunks merge_file_chunks = 99; + InitMultipartUpload init_multipart_upload = 100; } } diff --git a/protos/portal/file.proto b/protos/portal/file.proto index f3acf7ac59..ad817aa85d 100644 --- a/protos/portal/file.proto +++ b/protos/portal/file.proto @@ -129,6 +129,19 @@ message MakeDirectoryRequest { message MakeDirectoryResponse { } +message InitMultipartUploadRequest { + string cluster = 1; + string user_id = 2; + string path = 3; + string name = 4; +} + +message InitMultipartUploadResponse { + string temp_file_dir = 1; + uint64 chunk_size_byte = 2; + repeated FileInfo files_info = 3; +} + message UploadRequest { message Info { @@ -148,6 +161,17 @@ message UploadResponse { uint64 written_bytes = 1; } +message MergeFileChunksRequest { + string user_id = 1; + string cluster = 2; + string path = 3; + string name = 4; + uint64 size_byte = 5; +} + +message MergeFileChunksResponse { +} + message GetFileMetadataRequest { string user_id = 1; string cluster = 2; @@ -234,7 +258,18 @@ service FileService { rpc MakeDirectory(MakeDirectoryRequest) returns (MakeDirectoryResponse); + /** + * InitMultipartUpload 用于初始化文件分片上传。配置开启 scowd 才可使用 + * 获取文件分片临时存储目录、文件分片的大小和已上传的分片 + */ + rpc InitMultipartUpload(InitMultipartUploadRequest) returns (InitMultipartUploadResponse); rpc Upload(stream UploadRequest) returns (UploadResponse); + /** + * MergeFileChunks 用于在文件全部分片上传完成后,合并文件所有分片,形成最终完整文件。 + * 配置开启 scowd 才可使用 + * 合并过程中会校验每个文件分片的完整性 + */ + rpc MergeFileChunks(MergeFileChunksRequest) returns (MergeFileChunksResponse); rpc GetFileMetadata(GetFileMetadataRequest) returns (GetFileMetadataResponse);