Skip to content

Commit

Permalink
ft: ZENKO-597 account for transient source in TDM
Browse files Browse the repository at this point in the history
  • Loading branch information
alexanderchan-scality committed Jun 29, 2018
1 parent d907c99 commit 382f72a
Show file tree
Hide file tree
Showing 6 changed files with 293 additions and 31 deletions.
1 change: 1 addition & 0 deletions lib/storage/metadata/MetadataWrapper.js
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ class MetadataWrapper {
database: params.mongodb.database,
replicationGroupId: params.replicationGroupId,
path: params.mongodb.path,
config: params.config,
logger,
});
this.implName = 'mongoclient';
Expand Down
23 changes: 22 additions & 1 deletion lib/storage/metadata/mongoclient/DataCounter.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,22 @@ class DataCounter {
byLocation: {},
};
this.populated = false;
this.transientList = {};
}

/**
* updateTransientList - update data counter list of transient locations
* @param {Object} newLocations - list of locations constraint details
* @return {undefined}
*/
updateTransientList(newLocations) {
if (newLocations) {
const tempList = {};
Object.keys(newLocations).forEach(loc => {
tempList[loc] = newLocations[loc].isTransient;
});
this.transientList = tempList;
}
}

/**
Expand Down Expand Up @@ -187,10 +203,11 @@ class DataCounter {
* @return {undefined}
*/
_updateObject(currMD, prevMD, type) {
const transientList = Object.assign({}, this.transientList);
if (currMD && prevMD) {
// check for changes in replication
const { replicationInfo: currLocs,
'content-length': size } = currMD;
'content-length': size, dataStoreName } = currMD;
const { replicationInfo: prevLocs } = prevMD;
const { backends: prevBackends } = prevLocs || {};
const { backends: currBackends } = currLocs || {};
Expand All @@ -210,6 +227,10 @@ class DataCounter {
}
});
}
if (currLocs.status === 'COMPLETED' &&
transientList[dataStoreName]) {
this._delLocation(dataStoreName, size, type);
}
}
}

Expand Down
54 changes: 48 additions & 6 deletions lib/storage/metadata/mongoclient/MongoClientInterface.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
*/
const async = require('async');

const { EventEmitter } = require('events');
const constants = require('../../../constants');

const errors = require('../../../errors');
Expand Down Expand Up @@ -101,7 +102,7 @@ function isUserBucket(bucketName) {
class MongoClientInterface {
constructor(params) {
const { replicaSetHosts, writeConcern, replicaSet, readPreference, path,
database, logger, replicationGroupId } = params;
database, logger, replicationGroupId, config } = params;
this.mongoUrl = `mongodb://${replicaSetHosts}/?w=${writeConcern}&` +
`replicaSet=${replicaSet}&readPreference=${readPreference}`;
this.logger = logger;
Expand All @@ -112,6 +113,17 @@ class MongoClientInterface {
this.database = database;
this.lastItemScanTime = null;
this.dataCount = new DataCounter();
if (config && config instanceof EventEmitter) {
this.config = config;
this.config.on('location-constraints-update', () => {
this.dataCount
.updateTransientList(this.config.locationConstraints);
if (this.config.isTest) {
this.config.emit('MongoClientTestDone');
}
});
this.dataCount.updateTransientList(this.config.locationConstraints);
}
}

setup(cb) {
Expand Down Expand Up @@ -1212,10 +1224,11 @@ class MongoClientInterface {
return results;
}

_handleMongo(c, filter, log, cb) {
_handleMongo(c, filter, isTransient, log, cb) {
const reducedFields = {
'_id': 1,
'value.versionId': 1,
'value.replicationInfo.status': 1,
'value.replicationInfo.backends': 1,
'value.content-length': 1,
'value.dataStoreName': 1,
Expand Down Expand Up @@ -1252,13 +1265,29 @@ class MongoClientInterface {
} },
];

const aggCompleted = [
{ $project: {
'value.dataStoreName': 1,
'value.content-length': 1,
'inComplete': {
$eq: ['$value.replicationInfo.status', 'COMPLETED'],
},
} },
{ $match: { inComplete: true } },
{ $group: {
_id: '$value.dataStoreName',
bytes: { $sum: '$value.content-length' },
} },
];

return c.aggregate([
{ $project: reducedFields },
{ $match: filter },
{ $facet: {
count: aggCount,
data: aggData,
repData: aggRepData,
compData: isTransient ? aggCompleted : undefined,
} },
]).toArray((err, res) => {
if (err) {
Expand Down Expand Up @@ -1292,6 +1321,14 @@ class MongoClientInterface {
}
retResult.data[site] += repDataEntries[site];
});
if (isTransient && agg.compData) {
const compDataEntries = this._handleEntries(agg.compData);
Object.keys(compDataEntries).forEach(site => {
if (retResult.data[site]) {
retResult.data[site] -= compDataEntries[site];
}
});
}
return cb(null, retResult);
});
}
Expand All @@ -1304,7 +1341,9 @@ class MongoClientInterface {
isVersioned: !!bucketInfo.getVersioningConfiguration(),
ownerCanonicalId: bucketInfo.getOwner(),
};

const isTransient =
this.config.getLocationConstraint(retBucketInfo.location)
.isTransient;
const mstFilter = {
'_id': { $regex: /^[^\0]+$/ },
'value.versionId': { $exists: true },
Expand All @@ -1316,9 +1355,12 @@ class MongoClientInterface {
};

async.parallel({
version: done => this._handleMongo(c, verFilter, log, done),
null: done => this._handleMongo(c, nullFilter, log, done),
master: done => this._handleMongo(c, mstFilter, log, done),
version: done =>
this._handleMongo(c, verFilter, isTransient, log, done),
null: done =>
this._handleMongo(c, nullFilter, isTransient, log, done),
master: done =>
this._handleMongo(c, mstFilter, isTransient, log, done),
}, (err, res) => {
if (err) {
return callback(err);
Expand Down
131 changes: 125 additions & 6 deletions tests/unit/storage/metadata/mongoclient/DataCounter.js
Original file line number Diff line number Diff line change
Expand Up @@ -76,27 +76,36 @@ const refMultiObj = {
},
};

// eslint-disable-next-line quote-props
const singleSite = size => ({
'content-length': size,
dataStoreName: 'locationOne',
replicationInfo: {
'dataStoreName': 'locationOne',
'replicationInfo': {
backends: [],
},
});

// eslint-disable-next-line quote-props
const multiSite = (size, isComplete) => ({
'content-length': size,
dataStoreName: 'locationOne',
replicationInfo: {
'dataStoreName': 'locationOne',
'replicationInfo': {
backends: [{
site: 'locationTwo',
status: isComplete ? 'COMPLETED' : 'PENDING',
}],
},
});

const transientSite = (size, status, backends) => ({
'content-length': size,
'dataStoreName': 'locationOne',
'replicationInfo': { status, backends },
});

const locationConstraints = {
locationOne: { isTransient: true },
locationTwo: { isTransient: false },
};

const dataCounter = new DataCounter();

describe('DataCounter Class', () => {
Expand All @@ -112,6 +121,16 @@ describe('DataCounter Class', () => {
});
});

describe('DateCounter::updateTransientList', () => {
afterEach(() => dataCounter.updateTransientList({}));
it('should set transient list', () => {
assert.deepStrictEqual(dataCounter.transientList, {});
dataCounter.updateTransientList(locationConstraints);
const expectedRes = { locationOne: true, locationTwo: false };
assert.deepStrictEqual(dataCounter.transientList, expectedRes);
});
});

describe('DataCounter::addObject', () => {
const tests = [
{
Expand Down Expand Up @@ -279,6 +298,106 @@ describe('DataCounter::addObject', () => {
}));
});

describe('DataCounter, update with transient location', () => {
before(() => dataCounter.updateTransientList(locationConstraints));
after(() => dataCounter.updateTransientList({}));

const pCurrMD = transientSite(100, 'PENDING', [
{ site: 'site1', status: 'PENDING' },
{ site: 'site2', status: 'COMPLETED' },
]);
const cCurrMD = transientSite(100, 'COMPLETED', [
{ site: 'site1', status: 'COMPLETED' },
{ site: 'site2', status: 'COMPLETED' },
]);
const prevMD = transientSite(100, 'PENDING', [
{ site: 'site1', status: 'PENDING' },
{ site: 'site2', status: 'PENDING' },
]);
const transientTest = [
{
it: 'should correctly update DataCounter, ' +
'version object, replication status = PENDING',
init: refSingleObjVer,
input: [pCurrMD, prevMD, UPDATE_VER],
expectedRes: {
objects: 1, versions: 1,
dataManaged: {
total: { curr: 100, prev: 200 },
byLocation: {
locationOne: { curr: 100, prev: 100 },
site2: { curr: 0, prev: 100 },
},
},
},
},
{
it: 'should correctly update DataCounter, ' +
'version object, replication status = COMPLETED',
init: refSingleObjVer,
input: [cCurrMD, prevMD, UPDATE_VER],
expectedRes: {
objects: 1, versions: 1,
dataManaged: {
total: { curr: 100, prev: 200 },
byLocation: {
locationOne: { curr: 100, prev: 0 },
site1: { curr: 0, prev: 100 },
site2: { curr: 0, prev: 100 },
},
},
},
},
{
it: 'should correctly update DataCounter, ' +
'master object, replication status = PENDING',
init: refSingleObjVer,
input: [pCurrMD, prevMD, UPDATE_MST],
expectedRes: {
objects: 1, versions: 1,
dataManaged: {
total: { curr: 200, prev: 100 },
byLocation: {
locationOne: { curr: 100, prev: 100 },
site2: { curr: 100, prev: 0 },
},
},
},
},
{
it: 'should correctly update DataCounter, ' +
'master object, replication status = COMPLETED',
init: refSingleObjVer,
input: [cCurrMD, prevMD, UPDATE_MST],
expectedRes: {
objects: 1, versions: 1,
dataManaged: {
total: { curr: 200, prev: 100 },
byLocation: {
locationOne: { curr: 0, prev: 100 },
site1: { curr: 100, prev: 0 },
site2: { curr: 100, prev: 0 },
},
},
},
},
];

transientTest.forEach(test => it(test.it, () => {
const { expectedRes, input, init } = test;
dataCounter.set(init);
dataCounter.addObject(...input);
const testResults = dataCounter.results();
Object.keys(expectedRes).forEach(key => {
if (typeof expectedRes[key] === 'object') {
assert.deepStrictEqual(testResults[key], expectedRes[key]);
} else {
assert.strictEqual(testResults[key], expectedRes[key]);
}
});
}));
});

describe('DataCounter::delObject', () => {
const tests = [
{
Expand Down
Loading

0 comments on commit 382f72a

Please sign in to comment.