Skip to content

Commit

Permalink
Better scan on target/files and target/s3files (#47)
Browse files Browse the repository at this point in the history
* always define argv on res

* better s3 scan & overwrite

* better files scan and overwrite
  • Loading branch information
ZJONSSON authored Aug 4, 2024
1 parent 8973d1c commit b6bb6b6
Show file tree
Hide file tree
Showing 5 changed files with 155 additions and 48 deletions.
2 changes: 1 addition & 1 deletion output.js
Original file line number Diff line number Diff line change
Expand Up @@ -266,8 +266,8 @@ module.exports = async function(obj, argv) {
const res = { Σ_in, Σ_out };
if (argv.test) {
res.data = argv.test;
Object.defineProperty(res, 'argv', { value: argv });
}
Object.defineProperty(res, 'argv', { value: argv });
if (argv.Σ_skipped) res.Σ_skipped = argv.Σ_skipped;
return res;
};
96 changes: 57 additions & 39 deletions targets/files.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
const etl = require('etl');
const fs = require('fs-extra');
const Bluebird = require('bluebird');
const { createWriteStream, rename, utimes, stat, ensureDir } = require('fs-extra');
const recursive = require('recursive-readdir');
const recursiveAsync = Bluebird.promisify(recursive);
const renameAsync = Bluebird.promisify(fs.rename);
const utimesAsync = Bluebird.promisify(fs.utimes);

const convert = require('./lib/convert');
const path = require('path');
const os = require('os');
Expand All @@ -16,44 +13,65 @@ module.exports = async function(stream, argv) {
if (target_dir.startsWith('~')) {
target_dir = path.join(os.homedir(), target_dir.slice(1));
}
let files = new Set([]);

let files = new Set([]);
argv.target_files_scanned = false;

return etl.toStream(async () => {
if (!argv.no_skip) files = new Set(await recursiveAsync(target_dir));
return stream;
})
.pipe(etl.map(argv.concurrency || 1, async d => {
const Key = path.join(target_dir, d.filename);
if (files.has(Key)) return { message: 'skipping', Key };
if (filter_files && !filter_files.test(Key)) return { message: 'ignoring', Key };

let Body = typeof d.body === 'function' ? await d.body() : d.body;
if (typeof Body == 'function') Body = Body();
if (!Body) return { Key, message: 'No body' };
Body = convert(Body, d.filename, argv);
await fs.ensureDir(path.dirname(Key));
const tmpKey = `${Key}.download`;
await new Promise( (resolve, reject) => {
Body
.on('error', reject)
.pipe(fs.createWriteStream(tmpKey))
.on('close', async () => {
await renameAsync(tmpKey, Key);
if (d.timestamp) {
const timestamp = new Date(+d.timestamp);
if (!isNaN(+timestamp)) await utimesAsync(Key, timestamp, timestamp);
}
resolve();
})
.on('error', e => reject(e));
if (!argv.target_overwrite && !argv.target_skip_scan) {
if (argv.target_await_scan) {
files = new Set(await recursive(target_dir));
argv.target_files_scanned = true;
} else {
recursive(target_dir).then(d => {
files = new Set(d);
argv.target_files_scanned = true;
});
return { Key, message: 'OK' };
}
}

}, {
catch: function(e) {
console.error(e);
return stream.pipe(etl.map(argv.concurrency || 1, async d => {
const Key = path.join(target_dir, d.filename);
let skip = files.has(Key);
if (!skip && !argv.target_files_scanned && !argv.target_overwrite) {
try {
await stat(Key);
skip = true;
} catch(e) {
if (e.code !== 'ENOENT') throw e;
}
}));
}
if (skip) {
argv.Σ_skipped += 1;
return { message: 'skipping', Key };
}

if (filter_files && !filter_files.test(Key)) return { message: 'ignoring', Key };

let Body = typeof d.body === 'function' ? await d.body() : d.body;
if (typeof Body == 'function') Body = Body();
if (!Body) return { Key, message: 'No body' };
Body = convert(Body, d.filename, argv);
await ensureDir(path.dirname(Key));
const tmpKey = `${Key}.download`;
await new Promise( (resolve, reject) => {
Body
.on('error', reject)
.pipe(createWriteStream(tmpKey))
.on('close', async () => {
await rename(tmpKey, Key);
if (d.timestamp) {
const timestamp = new Date(+d.timestamp);
if (!isNaN(+timestamp)) await utimes(Key, timestamp, timestamp);
}
resolve();
})
.on('error', e => reject(e));
});
return { Key, message: 'OK' };

}, {
catch: function(e) {
console.error(e);
}
}));
};
45 changes: 37 additions & 8 deletions targets/s3files.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ const etl = require('etl');
const path = require('path');
const { Upload } = require("@aws-sdk/lib-storage");
const { paginateListObjectsV2,
S3Client
S3Client,
HeadObjectCommand
} = require('@aws-sdk/client-s3');
const { createConfig } = require('../util');

Expand All @@ -12,25 +13,53 @@ module.exports = async function(stream, argv) {
const client = new S3Client(config);
const Bucket = argv.target_params[0] || config.bucket;
const Prefix = argv.target_params.slice(1).join('/') || config.prefix || '';
//const reFilter = RegExp(config.filter);

if (!Bucket) throw 'S3 Bucket missing';


const files = new Set([]);

const query = { Bucket, Prefix };

if (!config.overwrite) {
for await (const res of paginateListObjectsV2({ client }, query)) {
res?.Contents?.forEach(d => {
files.add(d.Key);
});
argv.target_files_scanned = false;

if (!config.overwrite && !config.skip_scan) {
// We scan in the background
async function scan() {
for await (const res of paginateListObjectsV2({ client }, query)) {
res?.Contents?.forEach(d => {
files.add(d.Key);
});
}
argv.target_files_scanned = true;
};
if (config.await_scan) {
await scan();
} else {
scan();
}
}

return stream.pipe(etl.map(argv.concurrency || 1, async d => {
const Key = path.join(Prefix, d.filename);
if (files.has(Key)) {

let skip = files.has(Key);

// If we haven't scanned all the files yet we check if the file
// exists with HeadObjectCommand
if (!skip && !argv.target_files_scanned && !config.overwrite) {
const command = new HeadObjectCommand({ Bucket, Key });
try {
await client.send(command);
skip = true;
} catch(e) {
if (e.name !== 'NotFound') {
throw e;
}
}
}

if (skip) {
argv.Σ_skipped += 1;
return { message: 'skipping', Key };
}
Expand Down
33 changes: 33 additions & 0 deletions test/files-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,39 @@ tap.test('files', async t => {
t.same(res, { 'Σ_in': 2, 'Σ_out': 2 });
});

t.test('writing files again', async () => {
const cmd = `etl files/${__dirname}/support/testfiles files/${tmpDir}/testfiles/ --silent`;
const res = await cli(cmd);
t.same(res, { 'Σ_in': 2, 'Σ_out': 2, Σ_skipped: 2 });
});

t.test('writing files again via params', async () => {
const cmd = `etl files --source_dir=${__dirname}/support/testfiles files --target_dir=${tmpDir}/testfiles/ --silent`;
const res = await cli(cmd);
t.same(res, { 'Σ_in': 2, 'Σ_out': 2, Σ_skipped: 2 });
});

t.test('writing files skipping scan', async () => {
const cmd = `etl files/${__dirname}/support/testfiles files/${tmpDir}/testfiles/ --silent --target_skip_scan=true`;
const res = await cli(cmd);
t.same(res.argv.target_files_scanned, false);
t.same(res, { 'Σ_in': 2, 'Σ_out': 2, Σ_skipped: 2 });
});

t.test('writing files await scan', async () => {
const cmd = `etl files/${__dirname}/support/testfiles files/${tmpDir}/testfiles/ --silent --target_await_scan=true`;
const res = await cli(cmd);
t.same(res.argv.target_files_scanned, true);
t.same(res, { 'Σ_in': 2, 'Σ_out': 2, Σ_skipped: 2 });
});

t.test('writing files with overwrite', async () => {
const cmd = `etl files/${__dirname}/support/testfiles files/${tmpDir}/testfiles/ --silent --target_overwrite=true`;
const res = await cli(cmd);
t.same(res.argv.target_files_scanned, false);
t.same(res, { 'Σ_in': 2, 'Σ_out': 2 });
});

t.test('reading them back', async () => {
const cmd = `etl files/${tmpDir}/testfiles test --silent`;
const res = await cli(cmd);
Expand Down
27 changes: 27 additions & 0 deletions test/s3-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,33 @@ tap.test('s3files', async t => {
t.same(res, { Σ_in: 2, Σ_out: 2, Σ_skipped: 2 }, 'skips uploading files that already exist');
});


t.test('uploading files again to s3 via params', async t => {
const cmd = `etl files/${__dirname}/support/testfiles s3files --target_bucket=${Bucket} --target_prefix="test/subdirectory" --target_endpoint=http://localhost:9090 --target_forcePathStyle=true`;
const res = await cli(cmd);
t.same(res, { Σ_in: 2, Σ_out: 2, Σ_skipped: 2 }, 'skips uploading files that already exist');
});

t.test('uploading files again to s3 skipping scan', async t => {
const cmd = `etl files/${__dirname}/support/testfiles s3files/${Bucket}/test/subdirectory --target_endpoint=http://localhost:9090 --target_forcePathStyle=true --target_skip_scan=true`;
const res = await cli(cmd);
t.same(res.argv.target_files_scanned, false);
t.same(res, { Σ_in: 2, Σ_out: 2, Σ_skipped: 2 }, 'skips uploading files that already exist');
});

t.test('uploading files again to s3 awaiting scan', async t => {
const cmd = `etl files/${__dirname}/support/testfiles s3files/${Bucket}/test/subdirectory --target_endpoint=http://localhost:9090 --target_forcePathStyle=true --target_await_scan=true`;
const res = await cli(cmd);
t.same(res.argv.target_files_scanned, true);
t.same(res, { Σ_in: 2, Σ_out: 2, Σ_skipped: 2 }, 'skips uploading files that already exist');
});

t.test('uploading files again to s3 with target_overwrite', async t => {
const cmd = `etl files/${__dirname}/support/testfiles s3files/${Bucket}/test/subdirectory --target_endpoint=http://localhost:9090 --target_forcePathStyle=true --target_overwrite=true`;
const res = await cli(cmd);
t.same(res, { Σ_in: 2, Σ_out: 2 }, 'overwrite files that already exist');
});

t.test('downloading files from s3', async t => {
const cmd = `etl s3files/${Bucket}/test test --silent --source_endpoint=http://localhost:9090 --source_forcePathStyle=true`;
const res = await cli(cmd);
Expand Down

0 comments on commit b6bb6b6

Please sign in to comment.