From 8da4d166ef9e1d17a1357328e8eb251d45cdf5c3 Mon Sep 17 00:00:00 2001 From: rodrigobasilio2022 Date: Mon, 30 Dec 2024 16:49:53 -0300 Subject: [PATCH 1/4] upload files in parallel --- packages/s3-deploy/lib/S3Ops.mjs | 166 ++++++++--- .../static-wado-deploy/lib/DeployGroup.mjs | 259 ++++++++++++++++-- .../lib/compressUploadDeploy.mjs | 115 +++++++- .../static-wado-deploy/lib/uploadIndex.mjs | 90 +++++- yarn.lock | 8 +- 5 files changed, 543 insertions(+), 95 deletions(-) diff --git a/packages/s3-deploy/lib/S3Ops.mjs b/packages/s3-deploy/lib/S3Ops.mjs index 098c335..e1e6d76 100644 --- a/packages/s3-deploy/lib/S3Ops.mjs +++ b/packages/s3-deploy/lib/S3Ops.mjs @@ -3,7 +3,8 @@ import fs from "fs"; import mime from "mime-types"; import { configGroup, endsWith } from "@radicalimaging/static-wado-util"; import ConfigPoint from "config-point"; -import { execFileSync } from "node:child_process"; +import { createHash } from "crypto"; +import { createReadStream } from "fs"; import copyTo from "./copyTo.mjs"; @@ -126,7 +127,18 @@ class S3Ops { return this.group.path ? `s3://${this.group.Bucket}${this.group.path}/${uri}` : `s3://${this.group.Bucket}/${uri}`; } - shouldSkip(item, fileName) { + async calculateMD5(filePath) { + return new Promise((resolve, reject) => { + const hash = createHash('md5'); + const stream = createReadStream(filePath); + + stream.on('data', data => hash.update(data)); + stream.on('end', () => resolve(hash.digest('hex'))); + stream.on('error', error => reject(error)); + }); + } + + async shouldSkip(item, fileName) { if (!item) return false; if (!fs.existsSync(fileName)) { console.verbose("Doesn't exist, not skipping", fileName); @@ -146,15 +158,15 @@ class S3Ops { } const { ETag } = item; if (!ETag) return true; - const md5 = execFileSync(`md5sum "${fileName}"`, { shell: true }); - for (let i = 1; i < ETag.length - 1; i++) { - if (md5[i] != ETag.charCodeAt(i)) { - // Leave this for now as there might be more file types needing specific encoding checks - console.warn("md5 different at", i, md5[i], ETag.charCodeAt(i), ETag, md5.toString()); - return false; - } + + try { + const md5 = await this.calculateMD5(fileName); + const etagMd5 = ETag.replace(/['"]/g, ''); // Remove quotes from ETag + return md5 === etagMd5; + } catch (error) { + console.warn("Error calculating MD5:", error); + return false; } - return true; } /** Retrieves the given s3 URI to the specified destination path */ @@ -253,50 +265,124 @@ class S3Ops { * Uploads file into the group s3 bucket. * Asynchronous */ + async createUploadStream(fileName, Key) { + return new Promise((resolve, reject) => { + const stream = fs.createReadStream(fileName); + let hasData = false; + + const cleanup = () => { + stream.removeAllListeners(); + stream.destroy(); + }; + + stream.once('error', (err) => { + cleanup(); + reject(new Error(`Failed to read file ${fileName}: ${err.message}`)); + }); + + stream.once('end', () => { + if (!hasData) { + cleanup(); + reject(new Error(`File ${fileName} is empty or unreadable`)); + } + }); + + stream.once('readable', () => { + hasData = true; + resolve(stream); + }); + }); + } + async upload(dir, file, hash, ContentSize, excludeExisting = {}) { + if (!file || !dir) { + throw new Error('File and directory are required'); + } + // Exclude the Mac garbage - if (file && file.indexOf(".DS_STORE") !== -1) return false; + if (file.indexOf(".DS_STORE") !== -1) return false; + const Key = this.fileToKey(file); - const ContentType = this.fileToContentType(file); - const Metadata = this.fileToMetadata(file, hash); - const ContentEncoding = this.fileToContentEncoding(file); const fileName = this.toFile(dir, file); - const isNoCacheKey = Key.match(noCachePattern); - const CacheControl = isNoCacheKey ? "no-cache" : undefined; - if (this.shouldSkip(excludeExisting[Key], fileName)) { + // Validate file exists + if (!fs.existsSync(fileName)) { + throw new Error(`File not found: ${fileName}`); + } + + // Check if we should skip + if (await this.shouldSkip(excludeExisting[Key], fileName)) { console.info("Exists", Key); return false; } - const Body = fs.createReadStream(fileName); - const command = new PutObjectCommand({ - Body, - Bucket: this.group.Bucket, - ContentType, - ContentEncoding, - Key, - CacheControl, - Metadata, - ContentSize, - }); - console.verbose("uploading", file, ContentType, ContentEncoding, Key, ContentSize, Metadata, this.group.Bucket); - console.info("Stored", Key); + // Handle dry run if (this.options.dryRun) { console.log("Dry run - not stored", Key); - // Pretend this uploaded - causes count to change return true; } - try { - await this.client.send(command); - console.info("Uploaded", Key); - return true; - } catch (error) { - console.log("Error sending", file, error); - return false; - } finally { - await Body.close(); + + const ContentType = this.fileToContentType(file); + const Metadata = this.fileToMetadata(file, hash); + const ContentEncoding = this.fileToContentEncoding(file); + const isNoCacheKey = Key.match(noCachePattern); + const CacheControl = isNoCacheKey ? "no-cache" : undefined; + + const maxRetries = 3; + let retryCount = 0; + let lastError; + + while (retryCount < maxRetries) { + let Body; + try { + // Create and validate the stream + Body = await this.createUploadStream(fileName, Key); + + const command = new PutObjectCommand({ + Body, + Bucket: this.group.Bucket, + ContentType, + ContentEncoding, + Key, + CacheControl, + Metadata, + ContentSize, + }); + + await this.client.send(command); + console.info("Successfully uploaded", Key); + return true; + } catch (error) { + lastError = error; + retryCount++; + + const isStreamError = error.message.includes('Failed to read file') || + error.message.includes('empty or unreadable'); + + // Don't retry on stream errors + if (isStreamError) { + console.error(`Stream error for ${Key}:`, error.message); + throw error; + } + + if (retryCount < maxRetries) { + const delay = Math.pow(2, retryCount) * 1000; + console.warn( + `Upload failed for ${Key} (attempt ${retryCount}/${maxRetries}). ` + + `Retrying in ${delay/1000}s. Error: ${error.message}` + ); + await new Promise(resolve => setTimeout(resolve, delay)); + } + } finally { + if (Body) { + Body.destroy(); + } + } } + + const errorMsg = `Failed to upload ${Key} after ${maxRetries} attempts. Last error: ${lastError?.message || 'Unknown error'}`; + console.error(errorMsg); + throw new Error(errorMsg); } } diff --git a/packages/static-wado-deploy/lib/DeployGroup.mjs b/packages/static-wado-deploy/lib/DeployGroup.mjs index ff81c75..68bb97d 100644 --- a/packages/static-wado-deploy/lib/DeployGroup.mjs +++ b/packages/static-wado-deploy/lib/DeployGroup.mjs @@ -14,11 +14,14 @@ import joinUri from "./joinUri.mjs"; */ class DeployGroup { - constructor(config, groupName, options, deployPlugin) { + constructor(config, groupName, options = {}, deployPlugin) { this.config = config; this.deployPlugin = deployPlugin; this.groupName = groupName; - this.options = options; + this.options = { + concurrentUploads: 1000, // Default number of concurrent uploads + ...options + }; this.group = configGroup(config, groupName); if (!this.group) throw new Error(`No group ${groupName}`); this.baseDir = handleHomeRelative(this.group.dir); @@ -36,34 +39,246 @@ class DeployGroup { } /** - * Stores the entire directory inside basePath / subdir. - * asynchronous function - * @params parentDir is the part of the path to include in the upload name - * @params name is the item to add + * Process a batch of files in parallel + * @param {Array} files Array of {parentDir, name, relativeName, size} objects + * @param {Object} excludeExisting Exclusion map + * @returns {Promise} Number of files uploaded */ - async store(parentDir = "", name = "", excludeExisting = {}) { - const fileName = path.join(this.baseDir, parentDir, name); - const lstat = await fs.promises.lstat(fileName); - const relativeName = (name && `${parentDir}/${name}`) || parentDir || ""; - let count = 0; - if (lstat.isDirectory()) { - const names = await fs.promises.readdir(fileName); - for (const childName of names) { - count += await this.store(relativeName, childName, excludeExisting); + async processBatch(files, excludeExisting, totalFiles, processStats) { + const batchPromises = files.map(async ({ baseDir, relativeName, size }) => { + const result = await this.ops.upload(baseDir, relativeName, null, size, excludeExisting); + processStats.count += 1; + + // Calculate progress metrics + const elapsedSeconds = (Date.now() - processStats.startTime) / 1000; + const overallSpeed = processStats.count / elapsedSeconds; + const progress = ((processStats.count / totalFiles) * 100).toFixed(1); + + // Update progress every 10 files or when batch completes + if (processStats.count % 10 === 0 || processStats.count === totalFiles) { + const remainingFiles = totalFiles - processStats.count; + const estimatedSecondsLeft = remainingFiles / overallSpeed; + + // Format time remaining in a human-readable format + const etaMinutes = Math.floor(estimatedSecondsLeft / 60); + const etaSeconds = Math.ceil(estimatedSecondsLeft % 60); + const etaDisplay = etaMinutes > 0 + ? `${etaMinutes}m ${etaSeconds}s` + : `${etaSeconds}s`; + + console.log( + `Progress: ${progress}% (${processStats.count}/${totalFiles}) | ` + + `Speed: ${overallSpeed.toFixed(1)} files/sec | ` + + `ETA: ${etaDisplay}` + ); + } + + return result; + }); + + const results = await Promise.all(batchPromises); + return results.reduce((sum, result) => sum + (result ? 1 : 0), 0); + } + + /** + * Collects all files to be uploaded from a directory + * @param {string} parentDir Parent directory path + * @param {string} name File/directory name + * @returns {Promise} Array of file objects + */ + /** + * Processes a directory in batches + * @param {string} dirPath Directory path + * @param {string} relativePath Relative path for file names + * @param {Set} excludePatterns Patterns to exclude + * @param {Object} stats Statistics object + * @returns {Promise} Array of file objects + */ + async processDirectory(dirPath, relativePath, excludePatterns, stats) { + const entries = await fs.promises.readdir(dirPath, { withFileTypes: true }); + const files = []; + const directories = []; + + // Separate files and directories for optimized processing + for (const entry of entries) { + const fullPath = path.join(dirPath, entry.name); + const entryRelativePath = path.join(relativePath, entry.name).replace(/\\/g, '/'); + + // Early exclusion check + const shouldExclude = Array.from(excludePatterns).some(pattern => + entryRelativePath.indexOf(pattern) !== -1 + ); + + if (shouldExclude) continue; + + if (entry.isDirectory()) { + directories.push({ path: fullPath, relativePath: entryRelativePath }); + } else { + const stat = await fs.promises.stat(fullPath); + files.push({ + baseDir: this.baseDir, + relativeName: entryRelativePath, + size: stat.size + }); + + // Update scanning progress + stats.filesFound++; + if (stats.filesFound % 100 === 0) { + const elapsed = (Date.now() - stats.startTime) / 1000; + const rate = stats.filesFound / elapsed; + console.log( + `Scanning: found ${stats.filesFound} files ` + + `(${rate.toFixed(1)} files/sec)` + ); + } } - return count; } + // Process subdirectories in parallel with concurrency limit + const batchSize = 5; // Process 5 directories at a time + const results = []; + + for (let i = 0; i < directories.length; i += batchSize) { + const batch = directories.slice(i, i + batchSize); + const batchResults = await Promise.all( + batch.map(dir => + this.processDirectory(dir.path, dir.relativePath, excludePatterns, stats) + ) + ); + results.push(...batchResults.flat()); + } + + return [...files, ...results]; + } + + /** + * Collects all files to be uploaded from a directory with optimized scanning + * @param {string} parentDir Parent directory path + * @param {string} name File/directory name + * @returns {Promise} Array of file objects + */ + async collectFiles(parentDir = "", name = "") { + const startPath = path.join(this.baseDir, parentDir, name); + const stats = { + startTime: Date.now(), + filesFound: 0 + }; + + // Convert exclude patterns to Set for faster lookups const { exclude = ["temp"] } = this.options; - const isExcluded = exclude.find((it) => relativeName.indexOf(it) !== -1); - if (isExcluded) { - return 0; + const excludePatterns = new Set(exclude); + + console.log("Starting directory scan..."); + + try { + const files = await this.processDirectory( + startPath, + parentDir || "", + excludePatterns, + stats + ); + + const elapsed = ((Date.now() - stats.startTime) / 1000).toFixed(1); + const rate = (stats.filesFound / elapsed).toFixed(1); + + console.log( + `\nDirectory scan complete:` + + `\n- Found ${stats.filesFound} files` + + `\n- Scan time: ${elapsed}s` + + `\n- Scan rate: ${rate} files/sec` + ); + + return files; + } catch (error) { + console.error("Error scanning directory:", error); + throw error; } + } - if (await this.ops.upload(this.baseDir, relativeName, null, lstat.size, excludeExisting)) { - count += 1; + /** + * Stores the entire directory inside basePath / subdir. + * Uses parallel processing for improved performance. + * @param {string} parentDir Parent directory path + * @param {string} name File/directory name + * @param {Object} excludeExisting Exclusion map + * @returns {Promise} Number of files uploaded + */ + /** + * Stores either a single file or an entire directory. + * @param {string} parentDir Parent directory path + * @param {string} name File/directory name + * @param {Object} excludeExisting Exclusion map + * @returns {Promise} Number of files uploaded + */ + async store(parentDir = "", name = "", excludeExisting = {}) { + const fullPath = path.join(this.baseDir, parentDir, name); + + try { + const stats = await fs.promises.stat(fullPath); + + // Handle single file upload + if (stats.isFile()) { + console.log("Processing single file:", name); + const relativePath = path.join(parentDir, name).replace(/\\/g, '/'); + const result = await this.ops.upload(this.baseDir, relativePath, null, stats.size, excludeExisting); + console.log(result ? "File uploaded successfully" : "File upload skipped"); + return result ? 1 : 0; + } + + // Handle directory upload + if (stats.isDirectory()) { + console.log("Collecting files from directory..."); + const files = await this.collectFiles(parentDir, name); + const totalFiles = files.length; + + if (totalFiles === 0) { + console.log("No files to upload"); + return 0; + } + + console.log(`Found ${totalFiles} files to process`); + const batchSize = this.options.concurrentUploads || 1000; + let count = 0; + + // Stats object to track overall progress + const processStats = { + count: 0, + startTime: Date.now() + }; + + for (let i = 0; i < files.length; i += batchSize) { + const batch = files.slice(i, i + batchSize); + count += await this.processBatch(batch, excludeExisting, totalFiles, processStats); + } + + const totalTime = ((Date.now() - processStats.startTime) / 1000).toFixed(1); + const avgSpeed = (count / totalTime).toFixed(1); + + // Format total time in a human-readable format + const totalMinutes = Math.floor(totalTime / 60); + const totalSeconds = Math.ceil(totalTime % 60); + const totalTimeDisplay = totalMinutes > 0 + ? `${totalMinutes}m ${totalSeconds}s` + : `${totalSeconds}s`; + + console.log( + `\nUpload complete:` + + `\n- ${count} files uploaded successfully` + + `\n- ${totalFiles - count} files skipped/failed` + + `\n- Total time: ${totalTimeDisplay}` + + `\n- Average speed: ${avgSpeed} files/sec` + ); + + return count; + } + + throw new Error(`Path is neither a file nor a directory: ${fullPath}`); + } catch (error) { + if (error.code === 'ENOENT') { + throw new Error(`Path does not exist: ${fullPath}`); + } + throw error; } - return count; } async dir(uri) { diff --git a/packages/static-wado-deploy/lib/compressUploadDeploy.mjs b/packages/static-wado-deploy/lib/compressUploadDeploy.mjs index ae2b735..421d0e7 100644 --- a/packages/static-wado-deploy/lib/compressUploadDeploy.mjs +++ b/packages/static-wado-deploy/lib/compressUploadDeploy.mjs @@ -1,22 +1,109 @@ -import { execFileSync } from "node:child_process"; -import { sleep } from "@radicalimaging/static-wado-util"; +import { createReadStream, createWriteStream, promises as fs } from "fs"; +import { createGzip } from "zlib"; +import path from "path"; +import { pipeline } from "stream/promises"; import DeployGroup from "./DeployGroup.mjs"; import uploadDeploy from "./uploadDeploy.mjs"; -export default async function compressUploadDeploy(directory, config, name, options, deployPlugin) { - const deployer = new DeployGroup(config, name, options, deployPlugin); +// File types that benefit from maximum compression +const HIGH_COMPRESSION_TYPES = ['.json', '.js', '.css', '.html', '.txt', '.xml']; + +/** + * Determines optimal compression level based on file type + * @param {string} filePath File path + * @returns {number} Compression level (1-9) + */ +function getCompressionLevel(filePath) { + const ext = path.extname(filePath).toLowerCase(); + return HIGH_COMPRESSION_TYPES.includes(ext) ? 9 : 6; +} - const args = ["gzip", "-9", "-r", `"${deployer.baseDir}"`]; +/** + * Compresses a single file using streaming + * @param {string} inputPath Source file path + * @param {string} outputPath Destination file path + * @param {number} level Compression level + */ +async function compressFile(inputPath, outputPath, level) { + const gzip = createGzip({ level }); + await pipeline( + createReadStream(inputPath), + gzip, + createWriteStream(outputPath) + ); +} - console.log("Waiting to compress", name, "directory", deployer.baseDir, directory); - execFileSync(args.join(" "), { shell: true, stdio: "inherit" }); - // execFileSync(`dir ${deployer.baseDir}`, { shell: true, stdio: "inherit" }); - console.log("Uploading compressed client", deployer.baseDir, directory); - await uploadDeploy(directory, config, name, options, deployPlugin); +/** + * Recursively finds all files in a directory + * @param {string} dir Directory path + * @returns {Promise} Array of file paths + */ +async function findFiles(dir) { + const files = []; + const entries = await fs.readdir(dir, { withFileTypes: true }); + + for (const entry of entries) { + const fullPath = path.join(dir, entry.name); + if (entry.isDirectory()) { + files.push(...await findFiles(fullPath)); + } else { + files.push(fullPath); + } + } + + return files; +} - // Shouldn't be needed, but... - await sleep(5000); - args[1] = "-d"; - execFileSync(args.join(" "), { shell: true, stdio: "inherit" }); +export default async function compressUploadDeploy(directory, config, name, options, deployPlugin) { + const deployer = new DeployGroup(config, name, options, deployPlugin); + const baseDir = deployer.baseDir; + + try { + console.log("Starting compression for", name, "directory", baseDir); + + // Find all files + const files = await findFiles(baseDir); + const total = files.length; + let processed = 0; + + // Process files in parallel batches + const batchSize = 5; // Process 5 files at a time + for (let i = 0; i < files.length; i += batchSize) { + const batch = files.slice(i, Math.min(i + batchSize, files.length)); + const compressionTasks = batch.map(async file => { + const level = getCompressionLevel(file); + const gzipPath = `${file}.gz`; + + try { + await compressFile(file, gzipPath, level); + processed++; + if (processed % 10 === 0 || processed === total) { + console.log(`Compression progress: ${Math.round((processed/total) * 100)}% (${processed}/${total})`); + } + } catch (err) { + console.warn(`Failed to compress ${file}:`, err); + return false; + } + return true; + }); + + await Promise.all(compressionTasks); + } + + console.log("Uploading compressed files..."); + await uploadDeploy(directory, config, name, options, deployPlugin); + + // Clean up compressed files + console.log("Cleaning up compressed files..."); + await Promise.all( + files.map(file => fs.unlink(`${file}.gz`).catch(err => { + console.warn(`Failed to clean up ${file}.gz:`, err); + })) + ); + + } catch (error) { + console.error("Compression failed:", error); + throw error; + } } diff --git a/packages/static-wado-deploy/lib/uploadIndex.mjs b/packages/static-wado-deploy/lib/uploadIndex.mjs index c8b53d0..1eeea8f 100644 --- a/packages/static-wado-deploy/lib/uploadIndex.mjs +++ b/packages/static-wado-deploy/lib/uploadIndex.mjs @@ -1,6 +1,57 @@ import { JSONReader, JSONWriter } from "@radicalimaging/static-wado-util"; import DeployGroup from "./DeployGroup.mjs"; +// Simple in-memory cache for the duration of the upload +const indexCache = new Map(); + +/** + * Batch processes multiple study indices + * @param {Array} indices Array of study indices to process + * @param {Array} allStudies Existing studies array + * @returns {Array} Updated studies array + */ +function batchProcessIndices(indices, allStudies) { + const sopMap = new Map(allStudies.map((study, index) => [study["0020000D"].Value[0], index])); + + indices.forEach(studyIndex => { + const studyItem = studyIndex[0] || studyIndex; + const sop = studyItem["0020000D"].Value[0]; + const existingIndex = sopMap.get(sop); + + if (existingIndex === undefined) { + allStudies.push(studyItem); + sopMap.set(sop, allStudies.length - 1); + } else { + allStudies[existingIndex] = studyItem; + } + }); + + return allStudies; +} + +/** + * Reads and caches index file + * @param {string} rootDir Root directory + * @param {string} indexPath Index file path + * @returns {Promise} Index content + */ +async function getCachedIndex(rootDir, indexPath) { + const cacheKey = `${rootDir}:${indexPath}`; + let index = indexCache.get(cacheKey); + + if (!index) { + index = await JSONReader(rootDir, indexPath, []); + indexCache.set(cacheKey, index); + + // Clear cache after 5 minutes + setTimeout(() => { + indexCache.delete(cacheKey); + }, 5 * 60 * 1000); + } + + return index; +} + /** * Reads the storeDirectory to get the index file, and adds that to the index directory */ @@ -16,20 +67,29 @@ export default async function uploadIndex(storeDirectory, config, name, options, console.log("Starting to update indices for", storeDirectory); const { config: deployConfig } = deployer; - // console.log("Retrieve remote index", indexFullName); - // deployer.retrieve(options, deployConfig.rootDir, indexFullName); - - // Read the index file, or create a dummy one: - const allStudies = await JSONReader(deployConfig.rootDir, indexFullName, []); - const studyIndex = await JSONReader(deployConfig.rootDir, `${storeDirectory}/index.json.gz`); - const studyItem = studyIndex[0] || studyIndex; - const sop = studyItem["0020000D"].Value[0]; - const allIndex = allStudies.findIndex((it) => it["0020000D"].Value[0] === sop); - if (allIndex === -1) { - allStudies.push(studyItem); - } else { - allStudies[allIndex] = studyItem; + try { + // Read indices with caching + const [allStudies, studyIndex] = await Promise.all([ + getCachedIndex(deployConfig.rootDir, indexFullName), + getCachedIndex(deployConfig.rootDir, `${storeDirectory}/index.json.gz`) + ]); + + // Process indices in batch + const updatedStudies = batchProcessIndices([studyIndex], allStudies); + + // Write updated index and upload + await JSONWriter(deployConfig.rootDir, indexFullName, updatedStudies, { + index: false, + compression: 'gzip' // Enable compression for index files + }); + + await deployer.store(indexFullName); + + // Update cache with new data + indexCache.set(`${deployConfig.rootDir}:${indexFullName}`, updatedStudies); + + } catch (error) { + console.error("Failed to update index:", error); + throw error; } - await JSONWriter(deployConfig.rootDir, indexFullName, allStudies, { index: false }); - await deployer.store(indexFullName); } diff --git a/yarn.lock b/yarn.lock index eb2db14..53d4443 100644 --- a/yarn.lock +++ b/yarn.lock @@ -9038,10 +9038,10 @@ uuid@^10.0.0: resolved "https://registry.yarnpkg.com/uuid/-/uuid-10.0.0.tgz#5a95aa454e6e002725c79055fd42aaba30ca6294" integrity sha512-8XkAphELsDnEGrDxUOHB3RGvXz6TeuYSGEZBOjtTtPm2lwhGBjLgOzLHB63IUWfBpNucQjND6d3AOudO+H3RWQ== -uuid@^8.3.2: - version "8.3.2" - resolved "https://registry.yarnpkg.com/uuid/-/uuid-8.3.2.tgz#80d5b5ced271bb9af6c445f21a1a04c606cefbe2" - integrity sha512-+NYs2QeMWy+GWFOEm9xnn6HCDp0l7QBD7ml8zLUmJ+93Q5NF0NocErnwkTkXVFNiX3/fpC6afS8Dhb/gz7R7eg== +uuid@^11.0.2: + version "11.0.3" + resolved "https://registry.yarnpkg.com/uuid/-/uuid-11.0.3.tgz#248451cac9d1a4a4128033e765d137e2b2c49a3d" + integrity sha512-d0z310fCWv5dJwnX1Y/MncBAqGMKEzlBb1AOf7z9K8ALnd0utBX/msg/fA0+sbyN1ihbMsLhrBlnl1ak7Wa0rg== uuid@^9.0.1: version "9.0.1" From e24253ec3173f85f79f89e93b3d5d1fb832fa83b Mon Sep 17 00:00:00 2001 From: rodrigobasilio2022 Date: Sat, 18 Jan 2025 14:33:33 -0300 Subject: [PATCH 2/4] Fix index update in deploy --- .../static-wado-deploy/lib/uploadIndex.mjs | 22 +++++++++++-------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/packages/static-wado-deploy/lib/uploadIndex.mjs b/packages/static-wado-deploy/lib/uploadIndex.mjs index 1eeea8f..853cf50 100644 --- a/packages/static-wado-deploy/lib/uploadIndex.mjs +++ b/packages/static-wado-deploy/lib/uploadIndex.mjs @@ -14,16 +14,20 @@ function batchProcessIndices(indices, allStudies) { const sopMap = new Map(allStudies.map((study, index) => [study["0020000D"].Value[0], index])); indices.forEach(studyIndex => { - const studyItem = studyIndex[0] || studyIndex; - const sop = studyItem["0020000D"].Value[0]; - const existingIndex = sopMap.get(sop); + // Handle both single study and array of studies + const studies = Array.isArray(studyIndex) ? studyIndex : [studyIndex]; - if (existingIndex === undefined) { - allStudies.push(studyItem); - sopMap.set(sop, allStudies.length - 1); - } else { - allStudies[existingIndex] = studyItem; - } + studies.forEach(studyItem => { + const sop = studyItem["0020000D"].Value[0]; + const existingIndex = sopMap.get(sop); + + if (existingIndex === undefined) { + allStudies.push(studyItem); + sopMap.set(sop, allStudies.length - 1); + } else { + allStudies[existingIndex] = studyItem; + } + }); }); return allStudies; From 3f97a9932fe55538de007b2aa566893811164ed0 Mon Sep 17 00:00:00 2001 From: rodrigobasilio2022 Date: Sat, 18 Jan 2025 14:52:59 -0300 Subject: [PATCH 3/4] Fix node hangs after upload --- packages/static-wado-deploy/lib/uploadIndex.mjs | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/packages/static-wado-deploy/lib/uploadIndex.mjs b/packages/static-wado-deploy/lib/uploadIndex.mjs index 853cf50..0a2b212 100644 --- a/packages/static-wado-deploy/lib/uploadIndex.mjs +++ b/packages/static-wado-deploy/lib/uploadIndex.mjs @@ -46,16 +46,16 @@ async function getCachedIndex(rootDir, indexPath) { if (!index) { index = await JSONReader(rootDir, indexPath, []); indexCache.set(cacheKey, index); - - // Clear cache after 5 minutes - setTimeout(() => { - indexCache.delete(cacheKey); - }, 5 * 60 * 1000); } return index; } +// Clear the entire cache after the operation is complete +function clearCache() { + indexCache.clear(); +} + /** * Reads the storeDirectory to get the index file, and adds that to the index directory */ @@ -95,5 +95,8 @@ export default async function uploadIndex(storeDirectory, config, name, options, } catch (error) { console.error("Failed to update index:", error); throw error; + } finally { + // Clear the cache to allow process to exit cleanly + clearCache(); } } From f442a42f1533cc4f83ca8d083e737688a0cfe2a6 Mon Sep 17 00:00:00 2001 From: rodrigobasilio2022 Date: Mon, 20 Jan 2025 09:10:46 -0300 Subject: [PATCH 4/4] Add progress bar in processing images --- .../static-wado-creator/lib/StaticWado.js | 35 +++++++++++++++++-- .../lib/mkdicomwebConfig.js | 5 +++ 2 files changed, 38 insertions(+), 2 deletions(-) diff --git a/packages/static-wado-creator/lib/StaticWado.js b/packages/static-wado-creator/lib/StaticWado.js index 5b0add8..c2b6ba9 100644 --- a/packages/static-wado-creator/lib/StaticWado.js +++ b/packages/static-wado-creator/lib/StaticWado.js @@ -46,10 +46,13 @@ function internalGenerateImage(originalImageFrame, dataset, metadata, transferSy class StaticWado { constructor(configuration) { - const { rootDir = "~/dicomweb", pathDeduplicated = "deduplicated", pathInstances = "instances", verbose } = configuration; + const { rootDir = "~/dicomweb", pathDeduplicated = "deduplicated", pathInstances = "instances", verbose, showProgress = true } = configuration; dicomCodec.setConfig({ verbose }); const directoryName = handleHomeRelative(rootDir); + this.showProgress = showProgress; + this.processedFiles = 0; + this.totalFiles = 0; this.options = { ...configuration, @@ -106,20 +109,48 @@ class StaticWado { * @param {*} callback * @param {*} params */ + updateProgress() { + if (!this.showProgress) return; + this.processedFiles++; + const percentage = Math.round((this.processedFiles / this.totalFiles) * 100); + const progressBar = '='.repeat(Math.floor(percentage / 4)) + '-'.repeat(25 - Math.floor(percentage / 4)); + process.stdout.write(`\r[${progressBar}] ${percentage}% | ${this.processedFiles}/${this.totalFiles} files`); + } + async processFiles(files, params) { - return dirScanner(files, { + if (this.showProgress) { + // Count total files first + for (const file of files) { + if (fs.statSync(file).isDirectory()) { + const dirFiles = fs.readdirSync(file, { recursive: true }); + this.totalFiles += dirFiles.filter(f => !fs.statSync(path.join(file, f)).isDirectory()).length; + } else { + this.totalFiles++; + } + } + console.log(`\nProcessing ${this.totalFiles} DICOM files...\n`); + } + + const result = await dirScanner(files, { ...params, callback: async (file) => { try { const dicomp10stream = fs.createReadStream(file); await this.importBinaryDicom(dicomp10stream, { ...params, file }); Stats.StudyStats.add("DICOM P10", "Parse DICOM P10 file"); + this.updateProgress(); } catch (e) { console.error("Couldn't process", file); console.verbose("Error", e); + this.updateProgress(); } }, }); + + if (this.showProgress) { + console.log('\n'); // Move to next line after progress bar + } + return result; } /** diff --git a/packages/static-wado-creator/lib/mkdicomwebConfig.js b/packages/static-wado-creator/lib/mkdicomwebConfig.js index c38a116..8ebd3ad 100644 --- a/packages/static-wado-creator/lib/mkdicomwebConfig.js +++ b/packages/static-wado-creator/lib/mkdicomwebConfig.js @@ -69,6 +69,11 @@ const { mkdicomwebConfig } = ConfigPoint.register({ description: "Write verbose output", defaultValue: false, }, + { + key: "--show-progress", + description: "Show progress during DICOM file processing", + defaultValue: true, + }, { key: "-t, --content-type ", description: 'Destination type to compress to (choices: "jpeg", "jls", "lei", "jls-lossy", "jhc", "jxl" or DICOM Transfer Syntax UID - default: "jls")',