diff --git a/lib/algos/list/skip.js b/lib/algos/list/skip.js new file mode 100644 index 000000000..ee40ad7f9 --- /dev/null +++ b/lib/algos/list/skip.js @@ -0,0 +1,65 @@ +const assert = require('assert'); + +const { FILTER_END, FILTER_SKIP, SKIP_NONE } = require('./tools'); + + +const MAX_STREAK_LENGTH = 100; + + +class Skip { + constructor(params) { + assert(params.extension); + + this.extension = params.extension; + this.gteParams = params.gte; + + this.listingEndCb = null; + this.skipRangeCb = null; + this.streakLength = 0; + } + + setListingEndCb(cb) { + this.listingEndCb = cb; + } + + setSkipRangeCb(cb) { + this.skipRangeCb = cb; + } + + filter(entry) { + const filteringResult = this.extension.filter(entry); + const skippingRange = this.extension.skipping(); + + if (filteringResult === FILTER_END) { + if (this.listingEndCb) { + this.listingEndCb(); + } + } else if (filteringResult === FILTER_SKIP + && skippingRange !== SKIP_NONE) { + if (++this.streakLength >= MAX_STREAK_LENGTH) { + /* Avoid to loop on the same range again and again. */ + const newRange = this._inc(skippingRange); + if (newRange === this.gteParams) { + this.streakLength = 0; + } else if (this.skipRangeCb) { + this.skipRangeCb(newRange); + } + } + } else { + this.streakLength = 0; + } + } + + _inc(str) { + if (!str) { + return str; + } + + const lastCharValue = str.charCodeAt(str.length - 1); + const lastCharNewValue = String.fromCharCode(lastCharValue + 1); + return `${str.slice(0, str.length - 1)}${lastCharNewValue}`; + } +} + + +module.exports = { Skip }; diff --git a/lib/storage/metadata/mongoclient/MongoClientInterface.js b/lib/storage/metadata/mongoclient/MongoClientInterface.js index c3bc0907e..4d86b7a18 100644 --- a/lib/storage/metadata/mongoclient/MongoClientInterface.js +++ b/lib/storage/metadata/mongoclient/MongoClientInterface.js @@ -36,6 +36,7 @@ const { DEL_MST, DataCounter, } = require('./DataCounter'); +const Skip = require('../../../algos/list/skip'); const USERSBUCKET = '__usersbucket'; const METASTORE = '__metastore'; @@ -44,7 +45,6 @@ const __UUID = 'uuid'; const PENSIEVE = 'PENSIEVE'; const ASYNC_REPAIR_TIMEOUT = 15000; const itemScanRefreshDelay = 1000 * 60 * 60; // 1 hour -const MAX_STREAK_LENGTH = 100; const CONNECT_TIMEOUT_MS = 5000; const initialInstanceID = process.env.INITIAL_INSTANCE_ID; @@ -962,47 +962,34 @@ class MongoClientInterface { internalListObject(bucketName, params, extension, log, cb) { const c = this.getCollection(bucketName); + const stream = new MongoReadStream(c, params, params.mongifiedSearch); + const skip = new Skip({ + extension, + gte: params.gte, + }); let cbDone = false; - let streakLength = 0; - const stream = new MongoReadStream(c, params, - params.mongifiedSearch); + + skip.setListingEndCb(() => { + stream.emit('end'); + stream.destroy(); + }); + skip.setSkipRangeCb(range => { + // stop listing this key range + stream.destroy(); + // update the new listing parameters here + // eslint-disable-next-line no-param-reassign + params.start = undefined; // 'start' is deprecated + // eslint-disable-next-line no-param-reassign + params.gt = undefined; + // eslint-disable-next-line no-param-reassign + params.gte = range; + // then continue listing the next key range + this.internalListObject(bucketName, params, extension, log, cb); + }); + stream .on('data', e => { - // filtering result: - // - filteringResult > 0: entry accepted and included in the - // listing result - // - filteringResult = 0: entry accepted but not included in - // the listing result (skipping) - // - filteringResult < 0: entry is rejected, listing finishes - const filteringResult = extension.filter(e); - // skipping: provides insight into why the extension is - // skipping the entry, for example: entry starts with a - // commonPrefix or entry is a specific version; in such - // case, repd needs to skip this range and go on with the next - const skippingRange = extension.skipping(); - if (filteringResult < 0) { - stream.emit('end'); - stream.destroy(); - } else if (filteringResult === 0 && skippingRange) { - // check if MAX_STREAK_LENGTH consecutive keys have been - // skipped - if (++streakLength === MAX_STREAK_LENGTH) { - // stop listing this key range - stream.destroy(); - // update the new listing parameters here - // eslint-disable-next-line no-param-reassign - params.start = undefined; // 'start' is deprecated - // eslint-disable-next-line no-param-reassign - params.gt = undefined; - // eslint-disable-next-line no-param-reassign - params.gte = inc(skippingRange); - // then continue listing the next key range - this.internalListObject(bucketName, params, - extension, log, cb); - } - } else { - streakLength = 1; - } + skip.filter(e); }) .on('error', err => { if (!cbDone) {