Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] FIX migrate MongoDB driver to 4.x #689

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
1 change: 1 addition & 0 deletions CHANGES_NEXT_RELEASE
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
- Fix: typo in non signal event internalCurentTime -> internalCurrentTime
- Upgrade mongodb dep from 3.6.12 to 4.7.0
- Upgrade NodeJS version from 14-slim to 16-slim in Dockerfile
4 changes: 3 additions & 1 deletion config.js
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ config.mongo = {
// The URI to use for the database connection. It supports replica set URIs.
// mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]]
// I.e.: 'mongodb://user:pass@host1:27017,host2:27018,host3:27019/cep?replicaSet=myrep'
url: 'mongodb://localhost:27017/cep'
url: 'mongodb://localhost:27017/cep',
// MongoDB connection timeout. Default is 30000ms
connectTimeoutMS: 30000
};

/**
Expand Down
43 changes: 23 additions & 20 deletions lib/db.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,17 @@ function getDbAux(url, component, callback) {
client.connect(
url,
{
bufferMaxEntries: config.checkDB.bufferMaxEntries,
domainsEnabled: true,
reconnectTries: config.checkDB.reconnectTries,
reconnectInterval: config.checkDB.reconnectInterval
// connectTimeoutMS is no longer supported in MongoDB 4.x
// (see https://stackoverflow.com/q/72699235/1485926)
// we keep connectTimeoutMS as configuration parameter, but
// the driver parameters are now socketTimeoutMS and
// serverSelectionTimeoutMS
socketTimeoutMS: config.mongo.connectTimeoutMS,
serverSelectionTimeoutMS: config.mongo.connectTimeoutMS
//bufferMaxEntries: config.checkDB.bufferMaxEntries,
//domainsEnabled: true,
//reconnectTries: config.checkDB.reconnectTries,
//reconnectInterval: config.checkDB.reconnectInterval
//useUnifiedTopology: true
},
function(err, client) {
Expand All @@ -69,10 +76,10 @@ function getDbAux(url, component, callback) {
// The driver has given up getting a connection, so we will die (restart perseo usually)
// and re-try from scratch.
// The ReplSet does not emit 'reconnectFailed'
db.serverConfig.on('reconnectFailed', function() {
logger.fatal('too many tries to reconnect to database, dying ...');
process.exit(-2);
});
//db.serverConfig.on('reconnectFailed', function() {
// logger.fatal('too many tries to reconnect to database, dying ...');
// process.exit(-2);
//});
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From https://stackoverflow.com/questions/72699235/mongodb-node-driver-connect-ignores-connecttimeoutms-and-sockettimeoutms-set/73214803#73214803

You are always considered "connected" by the driver and then under the hood it auto reconnects for you

So all the reconnection logic in our code can be just removed. The driver will do it for us.


checkDbHealthFunc = function checkDbHealth() {
pingAux(db, component, function(err, result) {
Expand All @@ -98,21 +105,17 @@ function getOrionDb(callback) {
}

function ensureIndex(collection, fields, callback) {
database.collection(collection, function(err, collection) {
myutils.logErrorIf(err, collection, context);
collection.createIndex(fields, { unique: true }, function(err, indexName) {
myutils.logErrorIf(err, 'ensureIndex ' + collection, context);
callback(err, indexName);
});
var col = database.collection(collection);
col.createIndex(fields, { unique: true }, function(err, indexName) {
myutils.logErrorIf(err, 'ensureIndex ' + collection, context);
callback(err, indexName);
});
}
function ensureIndexTTL(collection, fields, ttl, callback) {
database.collection(collection, function(err, collection) {
myutils.logErrorIf(err, collection);
collection.createIndex(fields, { expireAfterSeconds: ttl }, function(err, indexName) {
myutils.logErrorIf(err, 'ensureIndex ' + collection, context);
callback(err, indexName);
});
var col = database.collection(collection);
col.createIndex(fields, { expireAfterSeconds: ttl }, function(err, indexName) {
myutils.logErrorIf(err, 'ensureIndex ' + collection, context);
callback(err, indexName);
});
}

Expand Down
58 changes: 31 additions & 27 deletions lib/models/entitiesStore.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@
*/
'use strict';

var async = require('async'),
appContext = require('../appContext'),
var appContext = require('../appContext'),
config = require('../../config'),
entitiesCollectionName = require('../../config').orionDb.collection,
myutils = require('../myutils'),
Expand All @@ -39,6 +38,11 @@ function findSilentEntities(service, subservice, ruleData, func, callback) {
context = { op: 'checkNoSignal', comp: constants.COMPONENT_NAME },
criterion = {};

var cb = function(err, result) {
logger.debug(context, 'findSilentEntities %s', myutils.firstChars(result));
return callback(err, result);
};

db = orionServiceDb(service);
criterion['attrs.' + ruleData.attribute + '.modDate'] = {
$lt: Date.now() / 1000 - ruleData.reportInterval
Expand All @@ -60,32 +64,32 @@ function findSilentEntities(service, subservice, ruleData, func, callback) {
criterion['_id.type'] = ruleData.type;
}
logger.debug(context, 'findSilentEntities criterion %j', criterion);
async.waterfall(
[
db.collection.bind(db, entitiesCollectionName, { strict: true }),
function(col, cb) {
var count = 0;
col.find(criterion)
.batchSize(config.orionDb.batchSize)
.each(function(err, one) {
if (err) {
return cb(err, null);
}
if (one === null) {
//cursor exhausted
return cb(err, 'silent ones count ' + count);
}
logger.debug(context, 'silent entity %j', one._id);
func(one);
count++;
});
}
],
function(err, result) {
logger.debug(context, 'findSilentEntities %s', myutils.firstChars(result));
return callback(err, result);

myutils.collectionExists(db, entitiesCollectionName, function(exists) {
if (!exists) {
return cb('collection ' + entitiesCollectionName + ' does not exist');
}
);

var col = db.collection(entitiesCollectionName);

var count = 0;
col.find(criterion)
.batchSize(config.orionDb.batchSize)
.forEach(
function(one) {
logger.debug(context, 'silent entity %j', one._id);
func(one);
count++;
},
function(err) {
if (err) {
return cb(err, null);
} else {
return cb(null, 'silent ones count ' + count);
}
}
);
});
}

module.exports = {
Expand Down
97 changes: 49 additions & 48 deletions lib/models/executionsStore.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@
*/
'use strict';

var async = require('async'),
appContext = require('../appContext'),
var appContext = require('../appContext'),
logger = require('logops'),
execCollectionName = require('../../config').collections.executions,
myutils = require('../myutils');
Expand All @@ -37,11 +36,13 @@ module.exports = {
id = task.event.id,
index = task.action.index;

db.collection(execCollectionName, { strict: true }, function(err, col) {
if (err) {
myutils.logErrorIf(err);
return callback(err, null);
myutils.collectionExists(db, execCollectionName, function(exists) {
if (!exists) {
return callback('collection ' + execCollectionName + ' does not exist');
}

const col = db.collection(execCollectionName);

var cursor = col
.find(
{
Expand All @@ -66,18 +67,19 @@ module.exports = {
});
},
AlreadyDone: function AlreadyDone(task, callback) {
var db = appContext.Db(),
service = task.event.service,
subservice = task.event.subservice,
ruleName = task.event.ruleName,
id = task.event.id,
index = task.action.index,
noticeId = task.event.noticeId;
db.collection(execCollectionName, { strict: true }, function(err, col) {
if (err) {
myutils.logErrorIf(err);
return callback(err, null);
myutils.collectionExists(appContext.Db(), execCollectionName, function(exists) {
if (!exists) {
return callback('collection ' + execCollectionName + ' does not exist');
}

var col = appContext.Db().collection(execCollectionName),
service = task.event.service,
subservice = task.event.subservice,
ruleName = task.event.ruleName,
id = task.event.id,
index = task.action.index,
noticeId = task.event.noticeId;

col.findOne(
{
name: ruleName,
Expand All @@ -98,37 +100,36 @@ module.exports = {
});
},
Update: function Update(task, callback) {
var db = appContext.Db(),
service = task.event.service,
subservice = task.event.subservice,
ruleName = task.event.ruleName,
id = task.event.id,
index = task.action.index,
noticeId = task.event.noticeId;
async.waterfall(
[
db.collection.bind(db, execCollectionName, { strict: true }),
function(col, cb) {
col.update(
{
name: ruleName,
subservice: subservice,
service: service,
id: id,
notice: noticeId,
index: index
},
{ $currentDate: { lastTime: true } },
{ upsert: true },
cb
);
}
],
function(err, result) {
myutils.logErrorIf(err);
logger.info('executionsStore.Update %j', result);
return callback(err, result);
myutils.collectionExists(appContext.Db(), execCollectionName, function(exists) {
if (!exists) {
return callback('collection ' + execCollectionName + ' does not exist');
}
);

var col = appContext.Db().collection(execCollectionName),
service = task.event.service,
subservice = task.event.subservice,
ruleName = task.event.ruleName,
id = task.event.id,
index = task.action.index,
noticeId = task.event.noticeId;

col.update(
{
name: ruleName,
subservice: subservice,
service: service,
id: id,
notice: noticeId,
index: index
},
{ $currentDate: { lastTime: true } },
{ upsert: true },
function(err, result) {
myutils.logErrorIf(err);
logger.info('executionsStore.Update %j', result);
return callback(err, result);
}
);
});
}
};
Loading