Skip to content

Commit

Permalink
Merge pull request #130 from strongloop/feature/fix-sql-count
Browse files Browse the repository at this point in the history
Fix Model.count base implementation and Normalize/validate the query filter
  • Loading branch information
raymondfeng committed Jun 4, 2014
2 parents 653aab8 + 5f3c856 commit ea5c766
Show file tree
Hide file tree
Showing 3 changed files with 257 additions and 29 deletions.
139 changes: 112 additions & 27 deletions lib/dao.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ var Memory = require('./connectors/memory').Memory;
var utils = require('./utils');
var fieldsToArray = utils.fieldsToArray;
var removeUndefined = utils.removeUndefined;
var util = require('util');

/**
* Base class for all persistent objects.
Expand All @@ -37,8 +38,6 @@ function DataAccessObject() {
}
}



function idName(m) {
return m.getDataSource().idName
? m.getDataSource().idName(m.modelName) : 'id';
Expand Down Expand Up @@ -400,13 +399,88 @@ var operators = {
nlike: 'NOT LIKE'
};

/*
* Normalize the filter object and throw errors if invalid values are detected
* @param {Object} filter The query filter object
* @returns {Object} The normalized filter object
* @private
*/
DataAccessObject._normalize = function (filter) {
if (!filter) {
return undefined;
}
var err = null;
if ((typeof filter !== 'object') || Array.isArray(filter)) {
err = new Error(util.format('The query filter %j is not an object', filter));
err.statusCode = 400;
throw err;
}
if (filter.limit || filter.skip || filter.offset) {
var limit = Number(filter.limit || 100);
var offset = Number(filter.skip || filter.offset || 0);
if (isNaN(limit) || limit <= 0 || Math.ceil(limit) !== limit) {
err = new Error(util.format('The limit parameter %j is not valid',
filter.limit));
err.statusCode = 400;
throw err;
}
if (isNaN(offset) || offset < 0 || Math.ceil(offset) !== offset) {
err = new Error(util.format('The offset/skip parameter %j is not valid',
filter.skip || filter.offset));
err.statusCode = 400;
throw err;
}
filter.limit = limit;
filter.offset = offset;
delete filter.skip;
}

// normalize fields as array of included property names
if (filter.fields) {
filter.fields = fieldsToArray(filter.fields,
Object.keys(this.definition.properties));
}

filter = removeUndefined(filter);
this._coerce(filter.where);
return filter;
};

/*
* Coerce values based the property types
* @param {Object} where The where clause
* @returns {Object} The coerced where clause
* @private
*/
DataAccessObject._coerce = function (where) {
var self = this;
if (!where) {
return where;
}

var props = this.getDataSource().getModelDefinition(this.modelName).properties;
var err;
if (typeof where !== 'object' || Array.isArray(where)) {
err = new Error(util.format('The where clause %j is not an object', where));
err.statusCode = 400;
throw err;
}

var props = self.definition.properties;
for (var p in where) {
// Handle logical operators
if (p === 'and' || p === 'or' || p === 'nor') {
var clauses = where[p];
if (Array.isArray(clauses)) {
for (var i = 0; i < clauses.length; i++) {
self._coerce(clauses[i]);
}
} else {
err = new Error(util.format('The %p operator has invalid clauses %j', p, clauses));
err.statusCode = 400;
throw err;
}
return where;
}
var DataType = props[p] && props[p].type;
if (!DataType) {
continue;
Expand Down Expand Up @@ -556,24 +630,21 @@ DataAccessObject.find = function find(query, cb) {
cb = query;
query = null;
}
var constr = this;
var self = this;

query = query || {};

if (query.where) {
query.where = this._coerce(query.where);
try {
this._normalize(query);
} catch (err) {
return process.nextTick(function () {
cb && cb(err);
});
}

var fields = query.fields;
var near = query && geo.nearFilter(query.where);
var supportsGeo = !!this.getDataSource().connector.buildNearFilter;

// normalize fields as array of included property names
if (fields) {
query.fields = fieldsToArray(fields, Object.keys(this.definition.properties));
}

query = removeUndefined(query);
if (near) {
if (supportsGeo) {
// convert it
Expand All @@ -583,15 +654,15 @@ DataAccessObject.find = function find(query, cb) {
// using all documents
this.getDataSource().connector.all(this.modelName, {}, function (err, data) {
var memory = new Memory();
var modelName = constr.modelName;
var modelName = self.modelName;

if (err) {
cb(err);
} else if (Array.isArray(data)) {
memory.define({
properties: constr.dataSource.definitions[constr.modelName].properties,
settings: constr.dataSource.definitions[constr.modelName].settings,
model: constr
properties: self.dataSource.definitions[self.modelName].properties,
settings: self.dataSource.definitions[self.modelName].settings,
model: self
});

data.forEach(function (obj) {
Expand All @@ -614,7 +685,7 @@ DataAccessObject.find = function find(query, cb) {
this.getDataSource().connector.all(this.modelName, query, function (err, data) {
if (data && data.forEach) {
data.forEach(function (d, i) {
var obj = new constr();
var obj = new self();

obj._initProperties(d, {fields: query.fields});

Expand Down Expand Up @@ -726,9 +797,15 @@ DataAccessObject.remove = DataAccessObject.deleteAll = DataAccessObject.destroyA
if(!err) Model.emit('deletedAll');
}.bind(this));
} else {
// Support an optional where object
where = removeUndefined(where);
where = this._coerce(where);
try {
// Support an optional where object
where = removeUndefined(where);
where = this._coerce(where);
} catch (err) {
return process.nextTick(function() {
cb && cb(err);
});
}
this.getDataSource().connector.destroyAll(this.modelName, where, function (err, data) {
cb && cb(err, data);
if(!err) Model.emit('deletedAll', where);
Expand Down Expand Up @@ -783,8 +860,14 @@ DataAccessObject.count = function (where, cb) {
cb = where;
where = null;
}
where = removeUndefined(where);
where = this._coerce(where);
try {
where = removeUndefined(where);
where = this._coerce(where);
} catch (err) {
return process.nextTick(function () {
cb && cb(err);
});
}
this.getDataSource().connector.count(this.modelName, cb, where);
};

Expand Down Expand Up @@ -999,10 +1082,11 @@ setRemoting(DataAccessObject.prototype.updateAttributes, {
http: {verb: 'put', path: '/'}
});

/**
/*
* Reload object from persistence
* Requires `id` member of `object` to be able to call `find`
* @param {Function} callback Called with (err, instance) arguments
* @private
*/
DataAccessObject.prototype.reload = function reload(callback) {
if (stillConnecting(this.getDataSource(), this, arguments)) {
Expand All @@ -1013,12 +1097,13 @@ DataAccessObject.prototype.reload = function reload(callback) {
};


/*!
/*
* Define readonly property on object
*
* @param {Object} obj
* @param {String} key
* @param {Mixed} value
* @private
*/
function defineReadonlyProp(obj, key, value) {
Object.defineProperty(obj, key, {
Expand All @@ -1044,12 +1129,12 @@ DataAccessObject.scope = function (name, query, targetClass) {
defineScope(this, targetClass || this, name, query);
};

/*!
/*
* Add 'include'
*/
jutil.mixin(DataAccessObject, Inclusion);

/*!
/*
* Add 'relation'
*/
jutil.mixin(DataAccessObject, Relation);
8 changes: 7 additions & 1 deletion lib/sql.js
Original file line number Diff line number Diff line change
Expand Up @@ -292,8 +292,14 @@ BaseSQL.prototype.count = function count(model, callback, where) {
var self = this;
var props = this._models[model].properties;

var whereClause = '';
if (typeof this.buildWhere === 'function') {
whereClause = this.buildWhere(model, where);
} else {
whereClause = buildWhere(where);
}
this.queryOne('SELECT count(*) as cnt FROM ' +
this.tableEscaped(model) + ' ' + buildWhere(where), function (err, res) {
this.tableEscaped(model) + ' ' + whereClause, function (err, res) {
if (err) {
return callback(err);
}
Expand Down
Loading

0 comments on commit ea5c766

Please sign in to comment.