Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: upload files in parallel #72

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
166 changes: 126 additions & 40 deletions packages/s3-deploy/lib/S3Ops.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ import fs from "fs";
import mime from "mime-types";
import { configGroup, endsWith } from "@radicalimaging/static-wado-util";
import ConfigPoint from "config-point";
import { execFileSync } from "node:child_process";
import { createHash } from "crypto";
import { createReadStream } from "fs";

import copyTo from "./copyTo.mjs";

Expand Down Expand Up @@ -126,7 +127,18 @@ class S3Ops {
return this.group.path ? `s3://${this.group.Bucket}${this.group.path}/${uri}` : `s3://${this.group.Bucket}/${uri}`;
}

shouldSkip(item, fileName) {
async calculateMD5(filePath) {
return new Promise((resolve, reject) => {
const hash = createHash('md5');
const stream = createReadStream(filePath);

stream.on('data', data => hash.update(data));
stream.on('end', () => resolve(hash.digest('hex')));
stream.on('error', error => reject(error));
});
}

async shouldSkip(item, fileName) {
if (!item) return false;
if (!fs.existsSync(fileName)) {
console.verbose("Doesn't exist, not skipping", fileName);
Expand All @@ -146,15 +158,15 @@ class S3Ops {
}
const { ETag } = item;
if (!ETag) return true;
const md5 = execFileSync(`md5sum "${fileName}"`, { shell: true });
for (let i = 1; i < ETag.length - 1; i++) {
if (md5[i] != ETag.charCodeAt(i)) {
// Leave this for now as there might be more file types needing specific encoding checks
console.warn("md5 different at", i, md5[i], ETag.charCodeAt(i), ETag, md5.toString());
return false;
}

try {
const md5 = await this.calculateMD5(fileName);
const etagMd5 = ETag.replace(/['"]/g, ''); // Remove quotes from ETag
return md5 === etagMd5;
} catch (error) {
console.warn("Error calculating MD5:", error);
return false;
}
return true;
}

/** Retrieves the given s3 URI to the specified destination path */
Expand Down Expand Up @@ -253,50 +265,124 @@ class S3Ops {
* Uploads file into the group s3 bucket.
* Asynchronous
*/
async createUploadStream(fileName, Key) {
return new Promise((resolve, reject) => {
const stream = fs.createReadStream(fileName);
let hasData = false;

const cleanup = () => {
stream.removeAllListeners();
stream.destroy();
};

stream.once('error', (err) => {
cleanup();
reject(new Error(`Failed to read file ${fileName}: ${err.message}`));
});

stream.once('end', () => {
if (!hasData) {
cleanup();
reject(new Error(`File ${fileName} is empty or unreadable`));
}
});

stream.once('readable', () => {
hasData = true;
resolve(stream);
});
});
}

async upload(dir, file, hash, ContentSize, excludeExisting = {}) {
if (!file || !dir) {
throw new Error('File and directory are required');
}

// Exclude the Mac garbage
if (file && file.indexOf(".DS_STORE") !== -1) return false;
if (file.indexOf(".DS_STORE") !== -1) return false;

const Key = this.fileToKey(file);
const ContentType = this.fileToContentType(file);
const Metadata = this.fileToMetadata(file, hash);
const ContentEncoding = this.fileToContentEncoding(file);
const fileName = this.toFile(dir, file);
const isNoCacheKey = Key.match(noCachePattern);
const CacheControl = isNoCacheKey ? "no-cache" : undefined;

if (this.shouldSkip(excludeExisting[Key], fileName)) {
// Validate file exists
if (!fs.existsSync(fileName)) {
throw new Error(`File not found: ${fileName}`);
}

// Check if we should skip
if (await this.shouldSkip(excludeExisting[Key], fileName)) {
console.info("Exists", Key);
return false;
}

const Body = fs.createReadStream(fileName);
const command = new PutObjectCommand({
Body,
Bucket: this.group.Bucket,
ContentType,
ContentEncoding,
Key,
CacheControl,
Metadata,
ContentSize,
});
console.verbose("uploading", file, ContentType, ContentEncoding, Key, ContentSize, Metadata, this.group.Bucket);
console.info("Stored", Key);
// Handle dry run
if (this.options.dryRun) {
console.log("Dry run - not stored", Key);
// Pretend this uploaded - causes count to change
return true;
}
try {
await this.client.send(command);
console.info("Uploaded", Key);
return true;
} catch (error) {
console.log("Error sending", file, error);
return false;
} finally {
await Body.close();

const ContentType = this.fileToContentType(file);
const Metadata = this.fileToMetadata(file, hash);
const ContentEncoding = this.fileToContentEncoding(file);
const isNoCacheKey = Key.match(noCachePattern);
const CacheControl = isNoCacheKey ? "no-cache" : undefined;

const maxRetries = 3;
let retryCount = 0;
let lastError;

while (retryCount < maxRetries) {
let Body;
try {
// Create and validate the stream
Body = await this.createUploadStream(fileName, Key);

const command = new PutObjectCommand({
Body,
Bucket: this.group.Bucket,
ContentType,
ContentEncoding,
Key,
CacheControl,
Metadata,
ContentSize,
});

await this.client.send(command);
console.info("Successfully uploaded", Key);
return true;
} catch (error) {
lastError = error;
retryCount++;

const isStreamError = error.message.includes('Failed to read file') ||
error.message.includes('empty or unreadable');

// Don't retry on stream errors
if (isStreamError) {
console.error(`Stream error for ${Key}:`, error.message);
throw error;
}

if (retryCount < maxRetries) {
const delay = Math.pow(2, retryCount) * 1000;
console.warn(
`Upload failed for ${Key} (attempt ${retryCount}/${maxRetries}). ` +
`Retrying in ${delay/1000}s. Error: ${error.message}`
);
await new Promise(resolve => setTimeout(resolve, delay));
}
} finally {
if (Body) {
Body.destroy();
}
}
}

const errorMsg = `Failed to upload ${Key} after ${maxRetries} attempts. Last error: ${lastError?.message || 'Unknown error'}`;
console.error(errorMsg);
throw new Error(errorMsg);
}
}

Expand Down
35 changes: 33 additions & 2 deletions packages/static-wado-creator/lib/StaticWado.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,13 @@ function internalGenerateImage(originalImageFrame, dataset, metadata, transferSy

class StaticWado {
constructor(configuration) {
const { rootDir = "~/dicomweb", pathDeduplicated = "deduplicated", pathInstances = "instances", verbose } = configuration;
const { rootDir = "~/dicomweb", pathDeduplicated = "deduplicated", pathInstances = "instances", verbose, showProgress = true } = configuration;

dicomCodec.setConfig({ verbose });
const directoryName = handleHomeRelative(rootDir);
this.showProgress = showProgress;
this.processedFiles = 0;
this.totalFiles = 0;

this.options = {
...configuration,
Expand Down Expand Up @@ -106,20 +109,48 @@ class StaticWado {
* @param {*} callback
* @param {*} params
*/
updateProgress() {
if (!this.showProgress) return;
this.processedFiles++;
const percentage = Math.round((this.processedFiles / this.totalFiles) * 100);
const progressBar = '='.repeat(Math.floor(percentage / 4)) + '-'.repeat(25 - Math.floor(percentage / 4));
process.stdout.write(`\r[${progressBar}] ${percentage}% | ${this.processedFiles}/${this.totalFiles} files`);
}

async processFiles(files, params) {
return dirScanner(files, {
if (this.showProgress) {
// Count total files first
for (const file of files) {
if (fs.statSync(file).isDirectory()) {
const dirFiles = fs.readdirSync(file, { recursive: true });
this.totalFiles += dirFiles.filter(f => !fs.statSync(path.join(file, f)).isDirectory()).length;
} else {
this.totalFiles++;
}
}
console.log(`\nProcessing ${this.totalFiles} DICOM files...\n`);
}

const result = await dirScanner(files, {
...params,
callback: async (file) => {
try {
const dicomp10stream = fs.createReadStream(file);
await this.importBinaryDicom(dicomp10stream, { ...params, file });
Stats.StudyStats.add("DICOM P10", "Parse DICOM P10 file");
this.updateProgress();
} catch (e) {
console.error("Couldn't process", file);
console.verbose("Error", e);
this.updateProgress();
}
},
});

if (this.showProgress) {
console.log('\n'); // Move to next line after progress bar
}
return result;
}

/**
Expand Down
5 changes: 5 additions & 0 deletions packages/static-wado-creator/lib/mkdicomwebConfig.js
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ const { mkdicomwebConfig } = ConfigPoint.register({
description: "Write verbose output",
defaultValue: false,
},
{
key: "--show-progress",
description: "Show progress during DICOM file processing",
defaultValue: true,
},
{
key: "-t, --content-type <type>",
description: 'Destination type to compress to (choices: "jpeg", "jls", "lei", "jls-lossy", "jhc", "jxl" or DICOM Transfer Syntax UID - default: "jls")',
Expand Down
Loading