Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] FIX migrate MongoDB driver to 4.x #689

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
4 changes: 3 additions & 1 deletion config.js
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ config.mongo = {
// The URI to use for the database connection. It supports replica set URIs.
// mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]]
// I.e.: 'mongodb://user:pass@host1:27017,host2:27018,host3:27019/cep?replicaSet=myrep'
url: 'mongodb://localhost:27017/cep'
url: 'mongodb://localhost:27017/cep',
// MongoDB connection timeout. Default is 30000ms
connectTimeoutMS: 30000
};

/**
Expand Down
38 changes: 18 additions & 20 deletions lib/db.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,12 @@ function getDbAux(url, component, callback) {
client.connect(
url,
{
bufferMaxEntries: config.checkDB.bufferMaxEntries,
domainsEnabled: true,
reconnectTries: config.checkDB.reconnectTries,
reconnectInterval: config.checkDB.reconnectInterval
socketTimeoutMS: config.mongo.connectTimeoutMS,
connectTimeoutMS: config.mongo.connectTimeoutMS
//bufferMaxEntries: config.checkDB.bufferMaxEntries,
//domainsEnabled: true,
//reconnectTries: config.checkDB.reconnectTries,
//reconnectInterval: config.checkDB.reconnectInterval
//useUnifiedTopology: true
},
function(err, client) {
Expand All @@ -69,10 +71,10 @@ function getDbAux(url, component, callback) {
// The driver has given up getting a connection, so we will die (restart perseo usually)
// and re-try from scratch.
// The ReplSet does not emit 'reconnectFailed'
db.serverConfig.on('reconnectFailed', function() {
logger.fatal('too many tries to reconnect to database, dying ...');
process.exit(-2);
});
//db.serverConfig.on('reconnectFailed', function() {
// logger.fatal('too many tries to reconnect to database, dying ...');
// process.exit(-2);
//});
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From https://stackoverflow.com/questions/72699235/mongodb-node-driver-connect-ignores-connecttimeoutms-and-sockettimeoutms-set/73214803#73214803

You are always considered "connected" by the driver and then under the hood it auto reconnects for you

So all the reconnection logic in our code can be just removed. The driver will do it for us.


checkDbHealthFunc = function checkDbHealth() {
pingAux(db, component, function(err, result) {
Expand All @@ -98,21 +100,17 @@ function getOrionDb(callback) {
}

function ensureIndex(collection, fields, callback) {
database.collection(collection, function(err, collection) {
myutils.logErrorIf(err, collection, context);
collection.createIndex(fields, { unique: true }, function(err, indexName) {
myutils.logErrorIf(err, 'ensureIndex ' + collection, context);
callback(err, indexName);
});
var col = database.collection(collection);
col.createIndex(fields, { unique: true }, function(err, indexName) {
myutils.logErrorIf(err, 'ensureIndex ' + collection, context);
callback(err, indexName);
});
}
function ensureIndexTTL(collection, fields, ttl, callback) {
database.collection(collection, function(err, collection) {
myutils.logErrorIf(err, collection);
collection.createIndex(fields, { expireAfterSeconds: ttl }, function(err, indexName) {
myutils.logErrorIf(err, 'ensureIndex ' + collection, context);
callback(err, indexName);
});
var col = database.collection(collection);
col.createIndex(fields, { expireAfterSeconds: ttl }, function(err, indexName) {
myutils.logErrorIf(err, 'ensureIndex ' + collection, context);
callback(err, indexName);
});
}

Expand Down
46 changes: 21 additions & 25 deletions lib/models/entitiesStore.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ function orionServiceDb(service) {
}

function findSilentEntities(service, subservice, ruleData, func, callback) {
var cb = function(err, result) {
logger.debug(context, 'findSilentEntities %s', myutils.firstChars(result));
return callback(err, result);
};

var db,
context = { op: 'checkNoSignal', comp: constants.COMPONENT_NAME },
criterion = {};
Expand All @@ -60,32 +65,23 @@ function findSilentEntities(service, subservice, ruleData, func, callback) {
criterion['_id.type'] = ruleData.type;
}
logger.debug(context, 'findSilentEntities criterion %j', criterion);
async.waterfall(
[
db.collection.bind(db, entitiesCollectionName, { strict: true }),
function(col, cb) {
var count = 0;
col.find(criterion)
.batchSize(config.orionDb.batchSize)
.each(function(err, one) {
if (err) {
return cb(err, null);
}
if (one === null) {
//cursor exhausted
return cb(err, 'silent ones count ' + count);
}
logger.debug(context, 'silent entity %j', one._id);
func(one);
count++;
});
var col = db.collection(entitiesCollectionName);

var count = 0;
col.find(criterion)
.batchSize(config.orionDb.batchSize)
.each(function(err, one) {
if (err) {
return cb(err, null);
}
],
function(err, result) {
logger.debug(context, 'findSilentEntities %s', myutils.firstChars(result));
return callback(err, result);
}
);
if (one === null) {
//cursor exhausted
return cb(err, 'silent ones count ' + count);
}
logger.debug(context, 'silent entity %j', one._id);
func(one);
count++;
});
}

module.exports = {
Expand Down
114 changes: 49 additions & 65 deletions lib/models/executionsStore.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,93 +37,77 @@ module.exports = {
id = task.event.id,
index = task.action.index;

db.collection(execCollectionName, { strict: true }, function(err, col) {
const col = db.collection(execCollectionName);

var cursor = col
.find(
{
name: ruleName,
subservice: subservice,
service: service,
id: id,
index: index
},
{}
)
.sort({ lastTime: -1 });
cursor = cursor.limit(1);
cursor.toArray(function(err, results) {
myutils.logErrorIf(err);
if (err) {
myutils.logErrorIf(err);
return callback(err, null);
}
var cursor = col
.find(
{
name: ruleName,
subservice: subservice,
service: service,
id: id,
index: index
},
{}
)
.sort({ lastTime: -1 });
cursor = cursor.limit(1);
cursor.toArray(function(err, results) {
myutils.logErrorIf(err);
if (err) {
return callback(err, null);
}
var data = results.length ? results[0] : null;
return callback(null, (data && data.lastTime && data.lastTime.getTime()) || 0);
});
var data = results.length ? results[0] : null;
return callback(null, (data && data.lastTime && data.lastTime.getTime()) || 0);
});
},
AlreadyDone: function AlreadyDone(task, callback) {
var db = appContext.Db(),
var col = appContext.Db().collection(execCollectionName),
service = task.event.service,
subservice = task.event.subservice,
ruleName = task.event.ruleName,
id = task.event.id,
index = task.action.index,
noticeId = task.event.noticeId;
db.collection(execCollectionName, { strict: true }, function(err, col) {
if (err) {

col.findOne(
{
name: ruleName,
subservice: subservice,
service: service,
id: id,
notice: noticeId,
index: index
},
function(err, data) {
myutils.logErrorIf(err);
return callback(err, null);
}
col.findOne(
{
name: ruleName,
subservice: subservice,
service: service,
id: id,
notice: noticeId,
index: index
},
function(err, data) {
myutils.logErrorIf(err);
if (err) {
return callback(err, null);
}
return callback(null, data);
if (err) {
return callback(err, null);
}
);
});
return callback(null, data);
}
);
},
Update: function Update(task, callback) {
var db = appContext.Db(),
var col = appContext.Db().collection(execCollectionName),
service = task.event.service,
subservice = task.event.subservice,
ruleName = task.event.ruleName,
id = task.event.id,
index = task.action.index,
noticeId = task.event.noticeId;
async.waterfall(
[
db.collection.bind(db, execCollectionName, { strict: true }),
function(col, cb) {
col.update(
{
name: ruleName,
subservice: subservice,
service: service,
id: id,
notice: noticeId,
index: index
},
{ $currentDate: { lastTime: true } },
{ upsert: true },
cb
);
}
],

col.update(
{
name: ruleName,
subservice: subservice,
service: service,
id: id,
notice: noticeId,
index: index
},
{ $currentDate: { lastTime: true } },
{ upsert: true },
function(err, result) {
myutils.logErrorIf(err);
logger.info('executionsStore.Update %j', result);
Expand Down
Loading