Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace callbacks with promises #12

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 41 additions & 83 deletions lib/filestorage.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
'use strict';

const fs = require('fs');
const fs = require('./fs');
const path = require('path');
const common = require('@metarhia/common');
const utils = require('./utils');
Expand All @@ -12,141 +12,99 @@ const getFilepath = Symbol('getFilepath');
class FileStorage {
// Create new FileStorage
// options - <Object>
// dir - <string>, data storage directory, which should be created
// path - <string>, data storage directory, which should be created
// before FileStorage is used
// minCompressSize - <number>, minimal file size
// to be compressed, default = 1024
constructor(options) {
this.dir = path.resolve(options.dir);
this.path = path.resolve(options.path);
this.minCompressSize = options.minCompressSize || MIN_COMPRESS_SIZE;
}

// Write file to storage
// id - <common.Uint64>, id of file
// id - <common.Uint64> | <number> | <BigInt>, id of file
// data - <string> | <Buffer> | <Uint8Array>, data to be written
// opts - <Object>
// checksum - <string>, checksum type
// dedupHash - <string>, second checksum type
// cb - <Function>, callback
// err - <Error>
// stats - <Object>
// checksum - <string>, data checksum
// dedupHash - <string>, second data checksum
// size - <number>, data size
// Returns: <Promise>
// Throws: <TypeError>, if `opts.checksum` or `opts.dedupHash` is incorrect
write(id, data, opts, cb) {
async write(id, data, opts) {
const file = this[getFilepath](id);
common.mkdirp(path.dirname(file), err => {
if (err) {
cb(err);
return;
}
fs.writeFile(file, data, err => {
if (err) {
cb(err);
return;
}
const stats = utils.getDataStats(data, opts.checksum, opts.dedupHash);
cb(null, stats);
});
});
await utils.mkdirpPromise(path.dirname(file));
await fs.writeFile(file, data);

return utils.getDataStats(data, opts.checksum, opts.dedupHash);
}

// Update file in the storage
// id - <common.Uint64>, id of file
// id - <common.Uint64> | <number> | <BigInt>, id of file
// data - <string> | <Buffer> | <Uint8Array>, data to be written
// opts - <Object>
// checksum - <string>, checksum type
// dedupHash - <string>, second checksum type
// cb - <Function>, callback
// err - <Error>
// stats - <Object>
// checksum - <string>, data checksum
// dedupHash - <string>, second data checksum
// size - <number>, data size
// originalSize - <number>, size of original file
// Returns: <Promise>
// Throws: <TypeError>, if `opts.checksum` or `opts.dedupHash` is incorrect
update(id, data, opts, cb) {
async update(id, data, opts) {
const file = this[getFilepath](id);
fs.stat(file, (err, fstats) => {
if (err) {
cb(err);
return;
}
fs.writeFile(file, data, err => {
if (err) {
cb(err);
return;
}
const stats = utils.getDataStats(data, opts.checksum, opts.dedupHash);
stats.originalSize = fstats.size;
cb(null, stats);
});
});
const fstats = await fs.stat(file);
await fs.writeFile(file, data);

const stats = utils.getDataStats(data, opts.checksum, opts.dedupHash);
stats.originalSize = fstats.size;
return stats;
}

// Get information about file
// id - <common.Uint64>, id of file
// cb - <Function>, callback
// err - <Error>
// stats - <fs.Stats>
stat(id, cb) {
// id - <common.Uint64> | <number> | <BigInt>, id of file
// Returns: <Promise>
async stat(id) {
const file = this[getFilepath](id);
fs.stat(file, cb);
return fs.stat(file);
}

// Read file from storage
// id - <common.Uint64>, id of file
// id - <common.Uint64> | <number> | <BigInt>, id of file
// opts - <Object>
// encoding - <string>
// compression - <string>
// cb - <Function>, callback
// err - <Error>
// data - <Buffer> | <string>
read(id, opts, cb) {
// Returns: <Promise>
async read(id, opts) {
const file = this[getFilepath](id);
if (opts.compression) utils.uncompress(file, opts, cb);
else fs.readFile(file, opts.encoding, cb);
if (opts.compression) return utils.uncompress(file, opts);
return fs.readFile(file, opts.encoding);
}

// Delete file from storage
// id - <common.Uint64>, id of file
// cb - <Function>, callback
// err - <Error>
rm(id, cb) {
// id - <common.Uint64> | <number> | <BigInt>, id of file
async rm(id) {
const file = this[getFilepath](id);
fs.unlink(file, cb);
await fs.unlink(file);
}

// Compress file in storage
// id - <common.Uint64>, id of file
// id - <common.Uint64> | <number> | <BigInt>, id of file
// compression - <string>, compression type
// cb - <Function>, callback
// err - <Error>
// compressed - <boolean>, whether file was compressed
// Returns: <Promise>
// Throws: <TypeError>, if compression is incorrect
compress(id, compression, cb) {
async compress(id, compression) {
const file = this[getFilepath](id);
utils.compress(file, this.minCompressSize, compression, cb);
return utils.compress(file, this.minCompressSize, compression);
}

[getFilepath](id) {
return utils.getFilepath(this.dir, common.idToPath(id));
return utils.getFilepath(this.path, common.idToPath(id));
}
}

// Create new Filestorage and root directory if it doesn't exits
// options - <Object>
// dir - <string>, data storage directory
// path - <string>, data storage directory
// minCompressSize - <number>, minimal file size to be compressed
// cb - <Function>, callback
// err - <Error>
// storage - <FileStorage>
const create = (options, cb) => {
common.mkdirp(path.resolve(options.dir), err => {
if (err) cb(err);
else cb(null, new FileStorage(options));
});
// Returns: <Promise>
const create = async options => {
await utils.mkdirpPromise(path.resolve(options.path));
return new FileStorage(options);
};

module.exports = { FileStorage, create };
30 changes: 30 additions & 0 deletions lib/fs.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
'use strict';

// TODO: this file should be removed and `fs.promises` used instead
// when support for Node.js 8 is dropped

const fs = require('fs');
const { promisify } = require('util');

const { iter } = require('@metarhia/common');

const list = [
'readFile',
'writeFile',
'unlink',
'rename',
'stat',
'access',
'mkdir',
'rmdir',
'readdir',
];

if (fs.promises) {
module.exports = fs.promises;
} else {
module.exports = iter(list).collectWith(
{},
(obj, name) => (obj[name] = promisify(fs[name]))
);
}
98 changes: 34 additions & 64 deletions lib/utils.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
'use strict';

const fs = require('fs');
const fs = require('./fs');
const path = require('path');
const crypto = require('crypto');
const { crc32 } = require('crc');
const util = require('util');
const common = require('@metarhia/common');
const { zip, gzip } = require('compressing');
const metasync = require('metasync');

const CHECKSUM = 'CRC32';
const DEDUP_HASH = 'SHA256';
Expand All @@ -28,32 +29,6 @@ const FS_EXT = 'f';

const getFilepath = (...parts) => `${path.join(...parts)}.${FS_EXT}`;

const rmdirp = (dir, cb) => {
fs.stat(dir, (err, stats) => {
if (err) {
if (err.code === 'ENOENT') cb(null);
else cb(err);
return;
}

if (stats.isDirectory()) {
fs.readdir(dir, (err, files) => {
if (err) {
cb(err);
return;
}
files = files.map(file => path.join(dir, file));
metasync.each(files, rmdirp, err => {
if (err) cb(err);
else fs.rmdir(dir, cb);
});
});
} else {
fs.unlink(dir, cb);
}
});
};

const computeHash = (data, checksum) => {
const hasher = hashers[checksum];
if (!hasher) {
Expand All @@ -68,45 +43,40 @@ const getDataStats = (data, checksum, dedupHash) => ({
size: Buffer.byteLength(data),
});

const compress = (file, minCompressSize, compression, cb) => {
fs.stat(file, (err, stats) => {
if (err || stats.size <= minCompressSize) {
cb(err, false);
return;
}
const filec = file + 'z';
const compressor = compressors[compression];
if (!compressor) {
throw new Error(`Unknown compression type ${compression} specified`);
}
compressor
.compressFile(file, filec)
.then(() =>
fs.rename(filec, file, err => {
if (err) cb(err, false);
else cb(null, true);
})
)
.catch(err => cb(err, false));
});
const compress = async (file, minCompressSize, compression) => {
const compressor = compressors[compression];
lundibundi marked this conversation as resolved.
Show resolved Hide resolved
if (!compressor) {
throw new Error(`Unknown compression type ${compression} specified`);
}

const stats = await fs.stat(file);
if (stats.size <= minCompressSize) return false;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated to this PR, but:

Now that I think of it, wouldn't this lead for the user of Filestorage to a need to keep a map of compressed/uncompressed files? Because without such map if someone calls compress(file, 100, 'xz') and the file size is less than 100 then the file will not be compressed and if the user then calls read(file, { compression: 'xz' }) this would lead to an error as the file was not compressed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea was that metadata including file name, hash sum, deduplication hash, compression type etc. will be stored in the DB. I don't see a problem with the need for such a map since you already have to store files compression types somewhere.


const filec = file + 'z';
await compressor.compressFile(file, filec);
await fs.rename(filec, file);
return true;
};

const uncompress = (file, opts, cb) => {
fs.access(file, err => {
if (err) {
cb(err);
return;
}
const compressor = compressors[opts.compression];
if (!compressor) {
throw new Error(`Unknown compression type ${opts.compression} specified`);
}
const uncompress = async (file, opts) => {
const compressor = compressors[opts.compression];
if (!compressor) {
throw new Error(`Unknown compression type ${opts.compression} specified`);
}

if (opts.compression === 'GZIP') {
const memoryStream = new common.MemoryWritable();
compressor.uncompress(file, memoryStream);
return memoryStream.getData(opts.encoding);
}

return new Promise((resolve, reject) => {
const buffers = [];
new compressor.UncompressStream({ source: file })
.on('error', cb)
.on('finish', () => {
if (opts.encoding) cb(null, buffers.join());
else cb(null, Buffer.concat(buffers));
.once('error', reject)
.once('finish', () => {
if (opts.encoding) resolve(buffers.join(''));
else resolve(Buffer.concat(buffers));
})
.on('entry', (header, stream, next) => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that this will work with gzip stream, since it doesn't emit 'entry' event:

Note: tar, tgz and zip have the same uncompressing streaming API as above: it's a writable stream, and entries will be emitted while uncompressing one after one another, while that of gzip is slightly different: gzip.UncompressStream is a transform stream, so no entry event will be emitted and you can just pipe to another stream

Also, I think, you should just use the newly added common.MemoryWritable here.

if (opts.encoding) stream.setEncoding(opts.encoding);
Expand All @@ -120,7 +90,7 @@ module.exports = {
getFilepath,
computeHash,
getDataStats,
rmdirp,
compress,
uncompress,
mkdirpPromise: util.promisify(common.mkdirp),
};
Loading