Skip to content

Commit

Permalink
ZENKO-1124: mongo listing, avoid to loop
Browse files Browse the repository at this point in the history
  • Loading branch information
Jeremy Desanlis committed Sep 10, 2018
1 parent 7088812 commit 566e3fd
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 39 deletions.
65 changes: 65 additions & 0 deletions lib/algos/list/skip.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
const assert = require('assert');

const { FILTER_END, FILTER_SKIP, SKIP_NONE } = require('./tools');


const MAX_STREAK_LENGTH = 100;


class Skip {
constructor(params) {
assert(params.extension);

this.extension = params.extension;
this.gteParams = params.gte;

this.listingEndCb = null;
this.skipRangeCb = null;
this.streakLength = 0;
}

setListingEndCb(cb) {
this.listingEndCb = cb;
}

setSkipRangeCb(cb) {
this.skipRangeCb = cb;
}

filter(entry) {
const filteringResult = this.extension.filter(entry);
const skippingRange = this.extension.skipping();

if (filteringResult === FILTER_END) {
if (this.listingEndCb) {
this.listingEndCb();
}
} else if (filteringResult === FILTER_SKIP
&& skippingRange !== SKIP_NONE) {
if (++this.streakLength >= MAX_STREAK_LENGTH) {
/* Avoid to loop on the same range again and again. */
const newRange = this._inc(skippingRange);
if (newRange === this.gteParams) {
this.streakLength = 0;
} else if (this.skipRangeCb) {
this.skipRangeCb(newRange);
}
}
} else {
this.streakLength = 0;
}
}

_inc(str) {
if (!str) {
return str;
}

const lastCharValue = str.charCodeAt(str.length - 1);
const lastCharNewValue = String.fromCharCode(lastCharValue + 1);
return `${str.slice(0, str.length - 1)}${lastCharNewValue}`;
}
}


module.exports = { Skip };
65 changes: 26 additions & 39 deletions lib/storage/metadata/mongoclient/MongoClientInterface.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ const {
DEL_MST,
DataCounter,
} = require('./DataCounter');
const Skip = require('./skip');

const USERSBUCKET = '__usersbucket';
const METASTORE = '__metastore';
Expand All @@ -44,7 +45,6 @@ const __UUID = 'uuid';
const PENSIEVE = 'PENSIEVE';
const ASYNC_REPAIR_TIMEOUT = 15000;
const itemScanRefreshDelay = 1000 * 60 * 60; // 1 hour
const MAX_STREAK_LENGTH = 100;
const CONNECT_TIMEOUT_MS = 5000;

const initialInstanceID = process.env.INITIAL_INSTANCE_ID;
Expand Down Expand Up @@ -962,47 +962,34 @@ class MongoClientInterface {

internalListObject(bucketName, params, extension, log, cb) {
const c = this.getCollection(bucketName);
const stream = new MongoReadStream(c, params, params.mongifiedSearch);
const skip = new Skip({
extension,
gte: params.gte,
});
let cbDone = false;
let streakLength = 0;
const stream = new MongoReadStream(c, params,
params.mongifiedSearch);

skip.setListingEndCb(() => {
stream.emit('end');
stream.destroy();
});
skip.setSkipRangeCb(range => {
// stop listing this key range
stream.destroy();
// update the new listing parameters here
// eslint-disable-next-line no-param-reassign
params.start = undefined; // 'start' is deprecated
// eslint-disable-next-line no-param-reassign
params.gt = undefined;
// eslint-disable-next-line no-param-reassign
params.gte = range;
// then continue listing the next key range
this.internalListObject(bucketName, params, extension, log, cb);
});

stream
.on('data', e => {
// filtering result:
// - filteringResult > 0: entry accepted and included in the
// listing result
// - filteringResult = 0: entry accepted but not included in
// the listing result (skipping)
// - filteringResult < 0: entry is rejected, listing finishes
const filteringResult = extension.filter(e);
// skipping: provides insight into why the extension is
// skipping the entry, for example: entry starts with a
// commonPrefix or entry is a specific version; in such
// case, repd needs to skip this range and go on with the next
const skippingRange = extension.skipping();
if (filteringResult < 0) {
stream.emit('end');
stream.destroy();
} else if (filteringResult === 0 && skippingRange) {
// check if MAX_STREAK_LENGTH consecutive keys have been
// skipped
if (++streakLength === MAX_STREAK_LENGTH) {
// stop listing this key range
stream.destroy();
// update the new listing parameters here
// eslint-disable-next-line no-param-reassign
params.start = undefined; // 'start' is deprecated
// eslint-disable-next-line no-param-reassign
params.gt = undefined;
// eslint-disable-next-line no-param-reassign
params.gte = inc(skippingRange);
// then continue listing the next key range
this.internalListObject(bucketName, params,
extension, log, cb);
}
} else {
streakLength = 1;
}
skip.filter(e);
})
.on('error', err => {
if (!cbDone) {
Expand Down

0 comments on commit 566e3fd

Please sign in to comment.