From 14604d0c97a4695334a9108f195a911120864960 Mon Sep 17 00:00:00 2001 From: Nuryagdy Mustapayev Date: Sun, 13 Jun 2021 13:03:29 +0300 Subject: [PATCH] v0.2.0 release support for local to remote synchronization --- README.md | 80 ++++++---- example.js | 11 +- package-lock.json | 2 +- package.json | 6 +- src/{sync.js => base-sync.js} | 283 ++++++++++++++++------------------ src/index.js | 7 + src/local-to-remote-sync.js | 133 ++++++++++++++++ src/remote-to-local-sync.js | 137 ++++++++++++++++ src/remote-util.js | 110 +++++++++++-- 9 files changed, 574 insertions(+), 195 deletions(-) rename src/{sync.js => base-sync.js} (77%) create mode 100644 src/index.js create mode 100644 src/local-to-remote-sync.js create mode 100644 src/remote-to-local-sync.js diff --git a/README.md b/README.md index d49ed33..3918504 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,11 @@ # node-ftpsync -An Remote to Local FTP synchronization library for NodeJS based on +A Remote to Local and Local to Remote FTP synchronization library for NodeJS based on [basic-ftp](https://www.npmjs.com/package/basic-ftp). +**Notice: This application will delete files and directories on the remote server/local device to match the local/remote machine. +Use this library in production at your own risk.** + ### Requirements - NodeJS `>=12.10.0` @@ -11,13 +14,30 @@ An Remote to Local FTP synchronization library for NodeJS based on `npm i node-ftpsync` #### Usage -You can find usage example in [example.js](example.js) file. - -#### Run the example script - -`node example.js` +```js +const {Rem2LocSync, Loc2RemSync} = require("node-ftpsync"); +const config = require("./config.json"); +/** + * console as a logger or any other logger that supports `info`, `debug`, `error` methods + * @type {BaseSync} + */ +// for Remote to Local synchronization +//const synchronizer = new Rem2LocSync(config, console); + +// for Local to Remote synchronization +const synchronizer = new Loc2RemSync(config, console); + +const interval = setInterval(() => { + console.log("synchronizer status", synchronizer.getUpdateStatus()); +}, 5000); +synchronizer.run((err, results) => { + clearInterval(interval); + console.log("run response", synchronizer.getUpdateStatus()); +}) +``` +You can find usage more examples in [example.js](example.js) file. -example configuration +#### Configuration ```json { @@ -52,7 +72,7 @@ example configuration The file and directory listings for the local host. - `ftpsync.local.dirs` - contains a string array. Each path represents a local directory. -- `ftpsync.local.files` - contains a list of objects. Each object in the list represents a file and contains a `id`, `size`, and `time` attribute with the requisite values for that file. +- `ftpsync.local.files` - contains a list of objects. Each object in the list represents a file and contains a `id` (path), `size`, and `time` attribute with the requisite values for that file. Populated by running `ftpsync.collect()` or `ftpsync.localUtil.walk()`. @@ -61,41 +81,41 @@ Populated by running `ftpsync.collect()` or `ftpsync.localUtil.walk()`. The file and directory listings for the remote host. - `ftpsync.remote.dirs` - contains a string array. Each path represents a remote directory. -- `ftpsync.remote.files` - contains a list of objects. Each object in the list represents a file and contains a `id`, `size`, and `time` attribute with the requisite values for that file. +- `ftpsync.remote.files` - contains a list of objects. Each object in the list represents a file and contains a `id` (path), `size`, and `time` attribute with the requisite values for that file. Populated by running `ftpsync.collect()` or `ftpsync.remoteUtil.walk()`. #### ftpsync.mkdirQueue[] -The list of directories queued for creation on the local device. +The list of directories queued for creation. Populated by running `ftpsync.consolidate()`. #### ftpsync.rmdirQueue[] -The list of directories queued for deletion on the local device. +The list of directories queued for deletion. -**Note:** If parent and its sub directory is going to be deleted, then this array will contain only parent directory. +**Note:** On Remote to Local synchronization if parent and its sub directory is going to be deleted, then this array will contain only parent directory. Populated by running `ftpsync.consolidate()`. #### ftpsync.addFileQueue[] -The list of files queued for addition on the local device. +The list of files queued for addition. Populated by running `ftpsync.consolidate()`. #### ftpsync.updateFileQueue[] -The list of files queued for an update on the local device. +The list of files queued for an update. Populated by running `ftpsync.consolidate()`. #### ftpsync.removeFileQueue[] -The list of files queued for removal from the local device. +The list of files queued for removal. -**Note:** if a directory is going to be removed then files in this directory will not be listed in this list. +**Note:** On Remote to Local synchronization if a directory is going to be removed then files in this directory will not be listed in this list. Populated by running `ftpsync.consolidate()`. @@ -112,16 +132,20 @@ It tries to open an FTP connection. #### ftpsync.collect(callback) -Walks the file trees for both the local host and remote server and prepares them for further processing. The resulting file lists are stored in `ftpsync.local[]`, and `ftpsync.remote[]` upon successful completion. +Walks file trees for both the local host and remote server and prepares them for further processing. The resulting file lists are stored in `ftpsync.local[]`, and `ftpsync.remote[]` upon successful completion. #### ftpsync.consolidate(callback) Runs comparisons on the local and remote file listings. - -- Files/directories that exist in the local directory but not in the remote server are queued up for removal. -- Files/directories that exist in on the remote directory but not the local are queued for addition. -- Files that exist in both but are different (determined by file size and time stamp) are queued for update. - The resulting queues can be found in `mkdirQueue[]`, `rmdirQueue[]`, `addFileQueue[]`, `updateFileQueue[]`, and `removeFileQueue[]` upon successful completion. +- Files that exist in both on remote and local but are different (determined by file size and time stamp) are queued for update. +- ignored paths will not be touched. +##### Remote To Local Sync +- Files/directories that exist on the local directory but not on the remote directory are queued for removal. +- Files/directories that exist on the remote directory but not on the local directory are queued for addition. +##### Local To Remote Sync +- Files/directories that exist on the remote directory but not on the local directory are queued up for removal. +- Files/directories that exist on the local directory but not on the remote directory are queued for addition. #### ftpsync.commit(callback) @@ -132,7 +156,7 @@ Processes 4. `removeFileQueue[]` 5. `rmdirQueue[]` -these queues in order. +these queues one by one. #### ftpsync.getUpdateStatus() @@ -143,17 +167,17 @@ Returns following object: "numOfChanges": 233, "numOfLocalFiles": 121, "numOfRemoteFiles": 176, - "totalDownloadSize": 91791972, + "totalTransferSize": 91791972, "totalDownloadedSize": 0, "totalLocalSize": 38663190, "totalRemoteSize": 95514914 } ``` -- `numOfChanges` - `== ftpsync.removeFileQueue.length + ftpsync.rmdirQueue.length + ftpsync.addFileQueue.length + ftpsync.updateFileQueue.length`; +- `numOfChanges` - `== ftpsync.removeFileQueue.length + ftpsync.rmdirQueue.length + ftpsync.addFileQueue.length + ftpsync.updateFileQueue.length;` - `numOfLocalFiles` - `== ftpsync.local.files.length`. - `numOfRemoteFiles` - `== ftpsync.remote.files.length`. -- `totalDownloadSize` - `== sumFileSizes(ftpsync.addFileQueue) + sumFileSizes(ftpsync.updateFileQueue)`. total bytes that are going to be downloaded. -- `totalDownloadedSize` - in bytes, updated as files successfully downloaded. Should be equal to totalDownloadSize when commit() finishes successfully. +- `totalTransferSize` - `== sumFileSizes(ftpsync.addFileQueue) + sumFileSizes(ftpsync.updateFileQueue)`. total bytes that are going to be downloaded/uploaded. +- `totalTransferredSize` - in bytes, updated as files successfully downloaded/uploaded. Should be equal to `totalTransferSize` when commit() finishes successfully. - `totalLocalSize` - `== sumFileSizes(ftpsync.local.files)` in bytes - `totalRemoteSize` - `== sumFileSizes(ftpsync.remote.files)` in bytes @@ -161,5 +185,5 @@ Roadmap ------- ### Short Term - support for multiple FTP connections -### Long Term - - remote to local sync functionality. + - unit tests + - command line support diff --git a/example.js b/example.js index a3c59da..d8a0b56 100644 --- a/example.js +++ b/example.js @@ -3,7 +3,7 @@ const http = require("http"); http.createServer(function (req, res) { }).listen(3000, "127.0.0.1"); -const Sync = require("./src/sync"); +const {Rem2LocSync, Loc2RemSync} = require("./src/index"); let config = { "host": "example.com", @@ -20,17 +20,18 @@ let config = { "connections": 1, //retry times on ETIMEDOUT error "retryLimit": 3, - "verbose": false + "verbose": true }; /** * console as a logger or any other logger that supports `info`, `debug`, `error` methods - * @type {Sync} + * @type {BaseSync} */ -const synchronizer = new Sync(config, console); +//const synchronizer = new Rem2LocSync(config, console); +const synchronizer = new Loc2RemSync(config, console); const interval = setInterval(() => { console.log("synchronizer status", synchronizer.getUpdateStatus()); -}, 5000) +}, 5000); synchronizer.run((err, results) => { clearInterval(interval); console.log("run response", synchronizer.getUpdateStatus()); diff --git a/package-lock.json b/package-lock.json index ea66c86..03bcb0c 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,6 +1,6 @@ { "name": "node-ftpsync", - "version": "0.1.2", + "version": "0.2.0", "lockfileVersion": 1, "requires": true, "dependencies": { diff --git a/package.json b/package.json index 13c0474..3308e4c 100644 --- a/package.json +++ b/package.json @@ -1,8 +1,8 @@ { "name": "node-ftpsync", - "version": "0.1.2", - "description": "Remote to local file synchronization over FTP", - "main": "src/sync.js", + "version": "0.2.0", + "description": "A Remote to Local and Local to Remote synchronization over FTP", + "main": "src/index.js", "keywords": [ "ftp", "client", diff --git a/src/sync.js b/src/base-sync.js similarity index 77% rename from src/sync.js rename to src/base-sync.js index b870fe5..d09cf4e 100644 --- a/src/sync.js +++ b/src/base-sync.js @@ -2,7 +2,7 @@ const async = require("async"); const LocalUtil = require("./local-util"); const RemoteUtil = require("./remote-util"); -class Sync { +class BaseSync { settings; ftpConnectionConfig; @@ -42,7 +42,7 @@ class Sync { */ removeFileQueue = []; - totalDownloadSize = 0; + totalTransferSize = 0; totalNumOfChanges = 0; totalLocalSize = 0; totalRemoteSize = 0; @@ -74,6 +74,17 @@ class Sync { this.remoteUtil = new RemoteUtil(this.ftpConnectionConfig, this.settings.remote, this.settings.local, this.settings.ignore, this.logger, this.settings.retryLimit, this.settings.verbose); } + /** + * compare local vs remote file sizes + * @param localFile + * @param remoteFile + * + * @returns {boolean} + */ + static isDifferent = (localFile, remoteFile) => { + return localFile.size !== remoteFile.size; + } + setUp = (callback) => { this.logger.debug("Setup"); if (this.settings.verbose) { @@ -139,7 +150,7 @@ class Sync { } consolidate = (callback) => { - this.totalDownloadSize = 0; + this.totalTransferSize = 0; this.totalNumOfChanges = 0; this.logger.debug("Consolidating"); if (this.settings.verbose) { @@ -148,11 +159,8 @@ class Sync { this.consolidateDirectories(this.remote.dirs, this.local.dirs); - //if directory will be removed we can skip files located in those directories. - const localFiles = this.filtersFilesInGivenDirs(this.local.files, this.rmdirQueue); - - this.consolidateFiles(this.remote.files, localFiles); - this.totalDownloadSize = this.calculateTotalDownloadSize(); + this.consolidateFiles(this.remote.files, this.local.files); + this.totalTransferSize = this.calculateTotalTransferSize(); this.totalNumOfChanges = this.calculateTotalNumberOfChanges(); this.logger.debug(`Mkdir ${this.mkdirQueue.length} directories:`); @@ -179,18 +187,7 @@ class Sync { if (this.settings.verbose) { this.logger.info("-------------------------------------------------------------"); } - async.series([ - // add directories - this.processMkdirQueue, - // add files - this.processAddFileQueue, - // update files - this.processUpdateQueue, - // remove dirs - this.processRemoveDirQueue, - // remove files - this.processRemoveFileQueue, - ], + async.series(this.commitCmdQueue, (err, results) => { if (err) { this.logger.error("Commit failed.", err); @@ -214,15 +211,14 @@ class Sync { ], callback); } - getUpdateStatus = () => { + getUpdateStatus() { return { numOfChanges: this.totalNumOfChanges, numOfLocalFiles: this.local.files.length, numOfRemoteFiles: this.remote.files.length, - totalDownloadSize: this.totalDownloadSize, - totalDownloadedSize: this.remoteUtil.totalDownloadedSize, totalLocalSize: this.totalLocalSize, totalRemoteSize: this.totalRemoteSize, + totalTransferSize: this.totalTransferSize, } } @@ -231,6 +227,7 @@ class Sync { this.totalLocalSize = 0; this.totalRemoteSize = 0; this.remoteUtil.totalDownloadedSize = 0; + this.remoteUtil.totalUploadedSize = 0; this.local = { files: [], dirs: [], @@ -242,158 +239,80 @@ class Sync { } /** - * @private + * @protected + * @returns {Function[]} + */ + get commitCmdQueue() { + return []; + } + + /** + * @protected */ processMkdirQueue = (callback) => { - if (this.mkdirQueue.length === 0) { - callback(null, "no mkdirs"); - return; - } - async.mapLimit(this.mkdirQueue, this.settings.connections, this.localUtil.mkdir, (err) => { - if (err) { - this.logger.error("MKDIRs failed."); - return callback(err); - } - this.logger.debug("MKDIRs complete."); - callback(null); - }); + callback(null); } /** - * @private + * @protected */ processAddFileQueue = (callback) => { - if (this.addFileQueue.length === 0) { - callback(null, "no additions"); - return; - } - this.logger.debug("Additions started."); - async.mapLimit(this.addFileQueue, this.settings.connections, this.remoteUtil.download, (err) => { - if (err) { - this.logger.error("Additions failed."); - return callback(err); - } - this.logger.debug("Additions complete."); - callback(null); - }); + callback(null); } /** - * @private + * @protected */ processUpdateQueue = (callback) => { - if (this.updateFileQueue.length === 0) { - callback(null, "no updates"); - return; - } - this.logger.debug("Updates started."); - async.mapLimit(this.updateFileQueue, this.settings.connections, this.remoteUtil.download, (err) => { - if (err) { - this.logger.error("Updates failed."); - return callback(err); - } - this.logger.debug("Updates complete."); - callback(null); - }); + callback(null); } /** - * @private + * @protected */ processRemoveFileQueue = (callback) => { - if (this.removeFileQueue.length === 0) { - callback(null, "no removals"); - return; - } - async.mapLimit(this.removeFileQueue, this.settings.connections, this.localUtil.remove, (err) => { - if (err) { - this.logger.error("Removals failed."); - return callback(err); - } - this.logger.debug("Removals complete"); - callback(null); - }); + callback(null); } /** - * @private + * @protected */ processRemoveDirQueue = (callback) => { - if (this.rmdirQueue.length === 0) { - callback(null, "no rmdirs"); - return; - } - async.mapLimit(this.rmdirQueue, this.settings.connections, this.localUtil.rmdir, (err) => { - if (err) { - this.logger.error("RMDIRs failed.", err); - return callback(err); - } - this.logger.debug("RMDIRs complete."); - callback(null); - }); + callback(null); } /** - * creates list of directories to be created and removed by comparing remote and local directories - * @private + * marks common directories + * @protected * @param {string[]} remoteDirs * @param {string[]} localDirs */ consolidateDirectories(remoteDirs, localDirs) { - // compare directories for modifications - remoteDirs.forEach((dir) => { + remoteDirs.forEach((dir, rIDX) => { // if a match is found - let lIDX = localDirs.indexOf(dir); + const lIDX = localDirs.indexOf(dir); if (lIDX !== -1) { - let rIDX = remoteDirs.indexOf(dir); localDirs[lIDX] = ""; remoteDirs[rIDX] = ""; } }); - - // process the rest - let rmdirQueue = localDirs.filter((dir) => dir !== ""); - this.mkdirQueue = remoteDirs.filter((dir) => dir !== ""); - - this.rmdirQueue = this.filtersSubDirsFromArray(rmdirQueue); } /** * creates list of files to be added and removed by comparing remote and local files - * @private + * @protected * @param {*[]} remoteFiles * @param {*[]} localFiles */ consolidateFiles(remoteFiles, localFiles) { - - const processedLocalFileIndexes = []; - // compare files for modifications - remoteFiles.forEach((rFile, rIDX) => { - let lIDX = localFiles.findIndex((f) => (f.id === rFile.id)); - // if a match is found - if (lIDX !== -1) { - const lFile = localFiles[lIDX]; - if (Sync.isDifferent(lFile, rFile) || - Sync.isModified(lFile, rFile)) { - this.updateFileQueue.push(rFile); - } - // mark updates as processed - - processedLocalFileIndexes.push(lIDX); - } else { - this.addFileQueue.push(rFile); - } - }); - - this.removeFileQueue = localFiles.filter((f, index) => !processedLocalFileIndexes.includes(index)); } /** * if there is subdirectories they will be removed. * for example if there are ["/dir1", "/parent", "/parent/child", "/parent/child2", "/parent/child/sub-child"] * result will be ["/dir1", "/parent"] - * @private + * @protected * @param {string[]} dirs */ filtersSubDirsFromArray(dirs) { @@ -416,7 +335,7 @@ class Sync { /** * files that are located in given dir list will be filtered. - * @private + * @protected * @param {*[]} files * @param {string[]} dirs * @@ -446,10 +365,11 @@ class Sync { } /** - * @private + * calculates total download/upload size + * @protected * @return {number} */ - calculateTotalDownloadSize = () => { + calculateTotalTransferSize = () => { let total = 0; total += this.sumFileSizes(this.addFileQueue); total += this.sumFileSizes(this.updateFileQueue); @@ -458,7 +378,7 @@ class Sync { } /** - * @private + * @protected * @param {{size: number}[]} files * @return {number} */ @@ -474,33 +394,100 @@ class Sync { return this.removeFileQueue.length + this.rmdirQueue.length + this.addFileQueue.length + this.updateFileQueue.length; } + /** + * @protected + */ + doProcessMkdirQueue = (callback, mkDirFn) => { + if (this.mkdirQueue.length === 0) { + callback(null, "no mkdirs"); + return; + } + this.logger.debug("mkdirs started."); + async.mapLimit(this.mkdirQueue, this.settings.connections, mkDirFn, (err) => { + if (err) { + this.logger.error("mkdirs failed."); + return callback(err); + } + this.logger.debug("mkdirs complete."); + callback(null); + }); + } /** - * compare local vs remote file sizes - * @param localFile - * @param remoteFile - * - * @returns {boolean} + * @protected */ - static isDifferent = (localFile, remoteFile) => { - return localFile.size !== remoteFile.size; + doProcessAddFileQueue = (callback, addFileFn) => { + if (this.addFileQueue.length === 0) { + callback(null, "no additions"); + return; + } + this.logger.debug("Additions started."); + async.mapLimit(this.addFileQueue, this.settings.connections, addFileFn, (err) => { + if (err) { + this.logger.error("Additions failed."); + return callback(err); + } + this.logger.debug("Additions complete."); + callback(null); + }); } /** - * compare a local vs remote file time for modification - * - * @param localFile - * @param remoteFile - * @returns {boolean} return TRUE if remote file's modified date is later than local file's + * @protected */ - static isModified = (localFile, remoteFile) => { - // round to the nearest minute - const minutes = 1000 * 60; - const lTime = new Date((Math.round(localFile.time.getTime() / minutes) * minutes)); - const rTime = new Date((Math.round(remoteFile.time.getTime() / minutes) * minutes)); + doProcessUpdateQueue = (callback, updateFileFn) => { + if (this.updateFileQueue.length === 0) { + callback(null, "no updates"); + return; + } + this.logger.debug("Updates started."); + async.mapLimit(this.updateFileQueue, this.settings.connections, updateFileFn, (err) => { + if (err) { + this.logger.error("Updates failed."); + return callback(err); + } + this.logger.debug("Updates complete."); + callback(null); + }); + } - return lTime < rTime; + /** + * @protected + */ + doProcessRemoveFileQueue = (callback, rmFileFn) => { + if (this.removeFileQueue.length === 0) { + callback(null, "no removals"); + return; + } + this.logger.debug("Removals started."); + async.mapLimit(this.removeFileQueue, this.settings.connections, rmFileFn, (err) => { + if (err) { + this.logger.error("Removals failed."); + return callback(err); + } + this.logger.debug("Removals complete"); + callback(null); + }); + } + + /** + * @protected + */ + doProcessRemoveDirQueue = (callback, rmDirFn) => { + if (this.rmdirQueue.length === 0) { + callback(null, "no rmdirs"); + return; + } + this.logger.debug("rmdirs started."); + async.mapLimit(this.rmdirQueue, this.settings.connections, rmDirFn, (err) => { + if (err) { + this.logger.error("rmdirs failed.", err); + return callback(err); + } + this.logger.debug("rmdirs complete."); + callback(null); + }); } } -module.exports = Sync; +module.exports = BaseSync; diff --git a/src/index.js b/src/index.js new file mode 100644 index 0000000..382e58d --- /dev/null +++ b/src/index.js @@ -0,0 +1,7 @@ +const Rem2LocSync = require('./remote-to-local-sync'); +const Loc2RemSync = require('./local-to-remote-sync'); + +module.exports = { + Rem2LocSync, + Loc2RemSync +}; diff --git a/src/local-to-remote-sync.js b/src/local-to-remote-sync.js new file mode 100644 index 0000000..d9b2b1d --- /dev/null +++ b/src/local-to-remote-sync.js @@ -0,0 +1,133 @@ +const BaseSync = require("./base-sync"); + +/** + * synchronizes local files to remote files. + */ +class Loc2RemSync extends BaseSync { + + /** + * compare a local vs remote file time for modification + * + * @param {{time: Date}} localFile + * @param {{time: Date}} remoteFile + * @returns {boolean} return TRUE if local file's modified date is later than remote file's + */ + static isModified = (localFile, remoteFile) => { + // round to the nearest minute + const minutes = 1000 * 60; + const lTime = new Date((Math.round(localFile.time.getTime() / minutes) * minutes)); + const rTime = new Date((Math.round(remoteFile.time.getTime() / minutes) * minutes)); + + return lTime > rTime; + } + + getUpdateStatus() { + const status = super.getUpdateStatus(); + status.totalTransferredSize = this.remoteUtil.totalUploadedSize; + + return status; + } + + /** + * @protected + * @returns {Function[]} + */ + get commitCmdQueue() { + return [ + // add directories + this.processMkdirQueue, + // add files + this.processAddFileQueue, + // update files + this.processUpdateQueue, + // remove files + this.processRemoveFileQueue, + // remove dirs + this.processRemoveDirQueue, + ]; + } + + /** + * @protected + */ + processMkdirQueue = (callback) => { + this.doProcessMkdirQueue(callback, this.remoteUtil.mkdir); + } + + /** + * @protected + */ + processAddFileQueue = (callback) => { + this.doProcessAddFileQueue(callback, this.remoteUtil.upload); + } + + /** + * @protected + */ + processUpdateQueue = (callback) => { + this.doProcessUpdateQueue(callback, this.remoteUtil.upload); + } + + /** + * @protected + */ + processRemoveFileQueue = (callback) => { + this.doProcessRemoveFileQueue(callback, this.remoteUtil.remove); + } + + /** + * @protected + */ + processRemoveDirQueue = (callback) => { + this.doProcessRemoveDirQueue(callback, this.remoteUtil.rmdir); + } + + /** + * creates list of directories to be created and removed by comparing remote and local directories + * @protected + * @param {string[]} remoteDirs + * @param {string[]} localDirs + */ + consolidateDirectories(remoteDirs, localDirs) { + + super.consolidateDirectories(remoteDirs, localDirs); + + // process the rest + this.mkdirQueue = localDirs.filter((dir) => dir !== ""); + this.rmdirQueue = remoteDirs.filter((dir) => dir !== ""); + //reversing puts sub directories to the beginning of the array to delete sub directories first and then parent dir + this.rmdirQueue.reverse(); + } + + /** + * creates list of files to be added and removed by comparing remote and local files + * @protected + * @param {*[]} remoteFiles + * @param {*[]} localFiles + */ + consolidateFiles(remoteFiles, localFiles) { + + const processedLocalFileIndexes = []; + // compare files for modifications + remoteFiles.forEach((rFile) => { + let lIDX = localFiles.findIndex((f) => (f.id === rFile.id)); + // if a match is found + if (lIDX !== -1) { + const lFile = localFiles[lIDX]; + if (Loc2RemSync.isDifferent(lFile, rFile) || + Loc2RemSync.isModified(lFile, rFile)) { + this.updateFileQueue.push(rFile); + } + // mark updates as processed + + processedLocalFileIndexes.push(lIDX); + } else { + this.removeFileQueue.push(rFile); + } + }); + + this.addFileQueue = localFiles.filter((f, index) => !processedLocalFileIndexes.includes(index)); + } +} + +module.exports = Loc2RemSync; diff --git a/src/remote-to-local-sync.js b/src/remote-to-local-sync.js new file mode 100644 index 0000000..93836e9 --- /dev/null +++ b/src/remote-to-local-sync.js @@ -0,0 +1,137 @@ +const BaseSync = require("./base-sync"); + +/** + * synchronizes remote files to local files. + */ +class Rem2LocSync extends BaseSync { + + /** + * compare a local vs remote file time for modification + * + * @param {{time: Date}} localFile + * @param {{time: Date}} remoteFile + * @returns {boolean} return TRUE if remote file's modified date is later than local file's + */ + static isModified = (localFile, remoteFile) => { + // round to the nearest minute + const minutes = 1000 * 60; + const lTime = new Date((Math.round(localFile.time.getTime() / minutes) * minutes)); + const rTime = new Date((Math.round(remoteFile.time.getTime() / minutes) * minutes)); + + return lTime < rTime; + } + + getUpdateStatus() { + const status = super.getUpdateStatus(); + status.totalTransferredSize = this.remoteUtil.totalDownloadedSize; + + return status; + } + + /** + * @protected + * @returns {Function[]} + */ + get commitCmdQueue() { + return [ + // add directories + this.processMkdirQueue, + // add files + this.processAddFileQueue, + // update files + this.processUpdateQueue, + // remove dirs + this.processRemoveDirQueue, + // remove files + this.processRemoveFileQueue, + ]; + } + + /** + * @protected + */ + processMkdirQueue = (callback) => { + this.doProcessMkdirQueue(callback, this.localUtil.mkdir); + } + + /** + * @protected + */ + processAddFileQueue = (callback) => { + this.doProcessAddFileQueue(callback, this.remoteUtil.download); + } + + /** + * @protected + */ + processUpdateQueue = (callback) => { + this.doProcessUpdateQueue(callback, this.remoteUtil.download); + } + + /** + * @protected + */ + processRemoveFileQueue = (callback) => { + this.doProcessRemoveFileQueue(callback, this.localUtil.remove); + } + + /** + * @protected + */ + processRemoveDirQueue = (callback) => { + this.doProcessRemoveDirQueue(callback, this.localUtil.rmdir); + } + + + /** + * creates list of directories to be created and removed by comparing remote and local directories + * @protected + * @param {string[]} remoteDirs + * @param {string[]} localDirs + */ + consolidateDirectories(remoteDirs, localDirs) { + + super.consolidateDirectories(remoteDirs, localDirs); + + // process the rest + let rmdirQueue = localDirs.filter((dir) => dir !== ""); + this.mkdirQueue = remoteDirs.filter((dir) => dir !== ""); + + this.rmdirQueue = this.filtersSubDirsFromArray(rmdirQueue); + } + + /** + * creates list of files to be added and removed by comparing remote and local files + * @protected + * @param {*[]} remoteFiles + * @param {*[]} localFiles + */ + consolidateFiles(remoteFiles, localFiles) { + + //if directory will be removed we can skip files located in those directories. + localFiles = this.filtersFilesInGivenDirs(localFiles, this.rmdirQueue); + + const processedLocalFileIndexes = []; + // compare files for modifications + remoteFiles.forEach((rFile) => { + let lIDX = localFiles.findIndex((f) => (f.id === rFile.id)); + // if a match is found + if (lIDX !== -1) { + const lFile = localFiles[lIDX]; + if (Rem2LocSync.isDifferent(lFile, rFile) || + Rem2LocSync.isModified(lFile, rFile)) { + this.updateFileQueue.push(rFile); + } + // mark updates as processed + + processedLocalFileIndexes.push(lIDX); + } else { + this.addFileQueue.push(rFile); + } + }); + + this.removeFileQueue = localFiles.filter((f, index) => !processedLocalFileIndexes.includes(index)); + } +} + +module.exports = Rem2LocSync; diff --git a/src/remote-util.js b/src/remote-util.js index 13af3e3..93cee15 100644 --- a/src/remote-util.js +++ b/src/remote-util.js @@ -26,6 +26,7 @@ class RemoteUtil { _logger; totalDownloadedSize = 0; + totalUploadedSize = 0; _retryLimit; /** @@ -56,12 +57,11 @@ class RemoteUtil { this._logger.debug("walk remote complete."); callback(null, result); }).catch((e) => { - this._logger.error("walk remote failed.", e); - callback("error"); + this._logger.error("walk remote failed.", JSON.stringify(e)); + callback("error", e); }); } - /** * recursively walks over directories * @private @@ -115,18 +115,108 @@ class RemoteUtil { async setUpConnection() { - const accessFn = this._ftp.access.bind(this._ftp, this._ftpConfig); - return await this.retryConnect(accessFn, this._retryLimit); + const ftpFn = this._ftp.access.bind(this._ftp, this._ftpConfig); + return await this.retryConnect(ftpFn, this._retryLimit); } async ftpList(currentPath) { - const listFn = this._ftp.list.bind(this._ftp, currentPath); - return await this.retry(listFn, this._retryLimit); + const ftpFn = this._ftp.list.bind(this._ftp, currentPath); + return await this.retry(ftpFn, this._retryLimit); } async ftpDownloadTo(remote, local) { - const downloadFn = this._ftp.downloadTo.bind(this._ftp, remote, local); - return await this.retry(downloadFn, this._retryLimit); + const ftpFn = this._ftp.downloadTo.bind(this._ftp, remote, local); + return await this.retry(ftpFn, this._retryLimit); + } + + async ftpUploadFrom(local, remote) { + const ftpFn = this._ftp.uploadFrom.bind(this._ftp, local, remote); + return await this.retry(ftpFn, this._retryLimit); + } + + async ftpRemove(path) { + const ftpFn = this._ftp.remove.bind(this._ftp, path); + return await this.retry(ftpFn, this._retryLimit); + } + + async ftpMkDir(remoteDirPath) { + const ftpFn = this._ftp.ensureDir.bind(this._ftp, remoteDirPath); + return await this.retry(ftpFn, this._retryLimit); + } + + async ftpRmDir(remoteDirPath) { + const ftpFn = this._ftp.removeDir.bind(this._ftp, remoteDirPath); + return await this.retry(ftpFn, this._retryLimit); + } + + /** + * @param {string} dir + * @param {function} callback + */ + rmdir = (dir, callback) => { + this.ftpRmDir(this._basePath + dir).then(() => { + if (this._verbose) { + this._logger.info("-", dir, "remote rmdir successfully"); + } + callback(null, dir); + }).catch((err) => { + this._logger.error("remote rmdir failed.", err); + callback(err); + }); + } + + /** + * @param {string} dir + * @param {function} callback + */ + mkdir = (dir, callback) => { + this.ftpMkDir(this._basePath + dir).then((err, data) => { + if (this._verbose) { + this._logger.info("-", dir, "remote mkdir successfully"); + } + callback(null, dir); + }).catch((err) => { + this._logger.error("remote mkdir failed.", err); + callback(err); + }); + } + + /** + * @param {{id: string}} file + * @param {function} callback + */ + remove = (file, callback) => { + this.ftpRemove(this._basePath + file.id).then((err, data) => { + if (this._verbose) { + this._logger.info("-", file.id, "remote remove successfully"); + } + callback(null, file); + }).catch((err) => { + this._logger.error("remote remove failed.", err); + callback(err); + }); + } + + /** + * @param {{id: string, size: number}} file + * @param {function} callback + */ + upload = (file, callback) => { + const local = this._localBasePath + file.id; + const remote = this._basePath + file.id; + if (this._verbose) { + this._logger.info("uploading: ", local, remote); + } + this.ftpUploadFrom(local, remote).then((err, data) => { + if (this._verbose) { + this._logger.info("-", file.id, "remote remove successfully"); + } + this.totalUploadedSize += file.size; + callback(null, file); + }).catch((err) => { + this._logger.error("remote remove failed.", err); + callback(err); + }); } /** @@ -165,7 +255,7 @@ class RemoteUtil { */ retryConnect = (fn, retries= 3, err= null) => { if (err) { - this._logger.error('FTP error', err); + this._logger.error('FTP error', JSON.stringify(err)); if ("ETIMEDOUT" !== err.code) { return Promise.reject(err); }