Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/197 multiple axns #212

Merged
merged 5 commits into from
Jan 24, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES_NEXT_RELEASE
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@
- Fix: Alarms for database (replicaset) (#230)
- Change: Default value for refresh rules to core in config.js, now 5 minutes
- Change: Default value for DB checking in config.js, now 5 seconds
- Add: Several actions per rule (#197)
4 changes: 3 additions & 1 deletion documentation/plain_rules.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ The fields (all must be present) are

The rule name must consist of the ASCII characters from A to Z, from a to z, digits (0-9), underscore (_) and dash (-). It can have a maximum length of 50 characters.

The field `action` can be also an array of "actions", objects with the same structure than the single action described in the rest of the documentation. Each of those actions will be executed when the rule is fired, avoiding to duplicate a rule only for getting several actions executed. For practical purposes, it is the same result that would be obtained with multiple rules with the same condition.

## EPL text
The field ```text``` of the rule must be a valid EPL statement and additionally must honor several restrictions to match expectations of perseo and perseo-core.

Expand Down Expand Up @@ -661,4 +663,4 @@ A rule that will check if the employee has been hired in the last half hour, cou
}
}
}
```
```
27 changes: 19 additions & 8 deletions examples/blood_rule_email.json
Original file line number Diff line number Diff line change
@@ -1,13 +1,24 @@
{
"name": "blood_rule_email",
"text": "select *,\"blood_rule_email\" as ruleName, *,ev.BloodPressure? as Pressure, ev.id? as Meter from pattern [every ev=iotEvent(cast(cast(BloodPressure?,String),float)>1.5 and type=\"BloodMeter\")]",
"action": {
"type": "email",
"template": "Meter ${Meter} has pressure ${Pressure} (GEN RULE)",
"parameters": {
"to": "[email protected]",
"from": "[email protected]",
"subject": "${Meter} has changed"
"action": [
{
"type": "email",
"template": "Meter ${Meter} has pressure ${Pressure} (GEN RULE)",
"parameters": {
"to": "[email protected]",
"from": "[email protected]",
"subject": "${Meter} has changed"
}
},
{
"type": "email",
"template": "Meter ${Meter} has pressure ${Pressure} (GEN RULE)",
"parameters": {
"to": "[email protected]",
"from": "[email protected]",
"subject": "${Meter} has changed (remember)"
}
}
}
]
}
254 changes: 118 additions & 136 deletions lib/models/actions.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,49 +39,6 @@ var util = require('util'),

var inProcess = {};

function getInProcessArray(service, servicePath, ruleName, id, type) {
var inPService,
inPServicePath,
inPRule,
inPEntity,
inPType;

inPService = inProcess[service];
if (!inPService) {
inPService = inProcess[service] = {};
}
inPServicePath = inPService[servicePath];
if (!inPServicePath) {
inPServicePath = inPService[servicePath] = {};
}
inPRule = inPServicePath[ruleName];
if (!inPRule) {
inPRule = inPServicePath[ruleName] = {};
}
inPEntity = inPRule[id];
if (!inPEntity) {
inPEntity = inPRule[id] = {};
}
inPType = inPEntity[type];
if (!inPType) {
inPType = inPEntity[type] = [];
}
return inPType;
}

function deleteInProcessArray(service, servicePath, ruleName, id, type) {
var
servicePathMap = inProcess[service][servicePath],
ruleMap = servicePathMap[ruleName];

delete ruleMap[id][type];
if (Object.keys(ruleMap[id]).length === 0) {
delete ruleMap[id];
}
if (Object.keys(ruleMap).length === 0) {
delete servicePathMap[ruleMap];
}
}

function errorsAction(event) {
var ruleName = event.ruleName;
Expand All @@ -92,42 +49,50 @@ function errorsAction(event) {
}

function validateAction(axn) {
//There is an action
if (typeof axn.type !== 'string') {
return new errors.MissingActionType(axn);
var axnArr,
axnElem,
i;
if (util.isArray(axn)) {
axnArr = axn;
} else {
axnArr = [axn];
}
for (i = 0; i < axnArr.length; i++) {
axnElem = axnArr[i];
//There is an action
if (typeof axnElem.type !== 'string') {
return new errors.MissingActionType(axnElem);
}

//Action type
switch (axn.type) {
case 'email':
case 'sms':
case 'update':
case 'post':
case 'twitter':
break;
default:
return new errors.UnknownActionType(axn.type);
}
//Action type
switch (axnElem.type) {
case 'email':
case 'sms':
case 'update':
case 'post':
case 'twitter':
break;
default:
return new errors.UnknownActionType(axnElem.type);
}

//Do not use id/type as attribute
if (axn.type === 'update' && axn.parameters) {
if (axn.parameters.name === 'id') {
return new errors.IdAsAttribute(JSON.stringify(axn.parameters));
} else if (axn.parameters.name === 'type') {
return new errors.IdAsAttribute(JSON.stringify(axn.parameters));
//Do not use id/type as attribute
if (axnElem.type === 'update' && axnElem.parameters) {
if (axnElem.parameters.name === 'id') {
return new errors.IdAsAttribute(JSON.stringify(axnElem.parameters));
} else if (axnElem.parameters.name === 'type') {
return new errors.IdAsAttribute(JSON.stringify(axnElem.parameters));
}
}
}
//Everything is OK
return null;
}

function conditionalExec(action, event, lastTime, callbackW) {
function conditionalExec(task, lastTime, callbackW) {
var rightNow = Date.now(),
ruleName = event.ruleName,
eventId = event.id,
subservice = event.subservice,
service = event.service,
noticeId = event.noticeId,
action = task.action,
event = task.event,
localError;
action.interval = Number(action.interval);
if (isNaN(action.interval)) {
Expand Down Expand Up @@ -158,7 +123,7 @@ function conditionalExec(action, event, lastTime, callbackW) {
return callbackS(localError, null);
}
},
executionsStore.Update.bind(null, service, subservice, ruleName, eventId, noticeId)
executionsStore.Update.bind(null, task)
], callbackW);
}
else {
Expand All @@ -167,16 +132,12 @@ function conditionalExec(action, event, lastTime, callbackW) {
callbackW(null);
}
}
function slaveExec(action, event, alreadyExecuted, callback) {
var ruleName = event.ruleName,
eventId = event.id,
subservice = event.subservice,
service = event.service;
function slaveExec(task, alreadyExecuted, callback) {
if (!alreadyExecuted) {
async.waterfall(
[
executionsStore.LastTime.bind(null, service, subservice, ruleName, eventId),
conditionalExec.bind(null, action, event)
executionsStore.LastTime.bind(null, task),
conditionalExec.bind(null, task)
],
function(err, tasks) {
callback(err);
Expand All @@ -186,76 +147,73 @@ function slaveExec(action, event, alreadyExecuted, callback) {
}
}

function execAxn(action, event, cbCe) {
var ruleName = event.ruleName,
eventId = event.id,
subservice = event.subservice,
service = event.service,
noticeId = event.noticeId;
function execAxn(task, cbCe) {

logger.debug('executing axn task %j', task);
if (config.isMaster) {
async.waterfall(
[
executionsStore.LastTime.bind(null, service, subservice, ruleName, eventId),
conditionalExec.bind(null, action, event)
executionsStore.LastTime.bind(null, task),
conditionalExec.bind(null, task)
],
function(err, tasks) {
function(err, results) {
cbCe(err);
});
} else {
async.waterfall(
[
executionsStore.AlreadyDone.bind(null, service, subservice, ruleName, eventId, noticeId),
slaveExec.bind(null, action, event)
executionsStore.AlreadyDone.bind(null, task),
slaveExec.bind(null, task)
],
function(err, tasks) {
function(err, results) {
cbCe(err);
});
}
}

function doOneAction(eventArray, callback) {
function deleteInProcessArray(service, servicePath, ruleName, id, type) {
var
servicePathMap = inProcess[service][servicePath],
ruleMap = servicePathMap[ruleName];

var event,
ruleName,
subservice,
service,
id,
type,
localError;
event = eventArray[0];
ruleName = event.ruleName;
subservice = event.subservice;
service = event.service;
id = event.id;
type = event.type;
logger.debug('event %j', event);
localError = errorsAction(event);
if (localError !== null) {
myutils.logErrorIf(localError);
return callback(localError, null);
delete ruleMap[id][type];
if (Object.keys(ruleMap[id]).length === 0) {
delete ruleMap[id];
}
if (Object.keys(ruleMap).length === 0) {
delete servicePathMap[ruleMap];
}
actionStore.Find(service, subservice, ruleName, function(err, action) {
if (err) {
return callback(err, null);
}
localError = validateAction(action);
if (localError !== null) {
myutils.logErrorIf(localError);
return callback(localError, null);
}
return execAxn(action, event, function(error) {
eventArray.shift();
if (eventArray.length === 0) { //no more elements to process
deleteInProcessArray(service, subservice, ruleName, id, type);
return callback(error);
}
else { // recursive call
return doOneAction(eventArray, callback);
}
}
);
});
}

function getInProcessArray(service, servicePath, ruleName, id, type) {
var inPService,
inPServicePath,
inPRule,
inPEntity,
inPType;

inPService = inProcess[service];
if (!inPService) {
inPService = inProcess[service] = {};
}
inPServicePath = inPService[servicePath];
if (!inPServicePath) {
inPServicePath = inPService[servicePath] = {};
}
inPRule = inPServicePath[ruleName];
if (!inPRule) {
inPRule = inPServicePath[ruleName] = {};
}
inPEntity = inPRule[id];
if (!inPEntity) {
inPEntity = inPRule[id] = {};
}
inPType = inPEntity[type];
if (!inPType) {
inPType = inPEntity[type] = async.queue(execAxn);
inPType.drain = deleteInProcessArray.bind({}, service, servicePath, ruleName, id, type);
}
return inPType;
}

function DoAction(event, callback) {
Expand All @@ -264,20 +222,44 @@ function DoAction(event, callback) {
service = event.service,
id = event.id,
type = event.type,
localError,
eventArray;
localError;

logger.debug('service %s, subservice %s, event %j', service, subservice, event);
localError = errorsAction(event);
if (localError !== null) {
myutils.logErrorIf(localError);
return callback(localError, null);
}
eventArray = getInProcessArray(service, subservice, ruleName, id, type);
eventArray.push(event);
if (eventArray.length === 1) { // the one inserted right now
doOneAction(eventArray, callback); // start process
}

actionStore.Find(service, subservice, ruleName, function(err, actions) {
var localError,
queue;
if (err) {
return callback(err, null);
}
if (!util.isArray(actions)) {
actions = [actions];
}
// check all axns are right
for (var i = 0; i < actions.length; i++) {
localError = validateAction(actions[i]);
if (localError !== null) {
myutils.logErrorIf(localError);
return callback(localError, null);
}
}

queue = getInProcessArray(service, subservice, ruleName, id, type);
for (i = 0; i < actions.length; i++) {
actions[i].index = i;
queue.push({
action: actions[i],
event: event
});
}
});

return callback(null);
}

module.exports.Do = DoAction;
Expand Down
Loading