Skip to content

Commit

Permalink
feat: message event type
Browse files Browse the repository at this point in the history
  • Loading branch information
atian25 committed Sep 29, 2017
1 parent 92958f8 commit 4fd1f03
Show file tree
Hide file tree
Showing 12 changed files with 92 additions and 43 deletions.
7 changes: 2 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,10 @@ The rule of thumbs is one job per file.

## Task

Task is a generator function, and accept one parameter, `ctx`. The syntax is, `exports.task = function* (ctx) { ... };`

When the scheduled task runs, the scheduled job information will be logged and written to a local file in a folder called `/logs`. The log file contains many useful information, for example,
Task is a generator/async function, and accept one parameter `ctx` which is an anonymous context with:

- ctx.method: `SCHEDULE`
- ctx.path: `/__schedule/${schedulePath}`. example path: `/__schedule?path=/FULL_PATH_TO/cleandb.js&type=worker&interval=3h`
- ctx.query: `scheule config(type=worker&cron=*%2F5%20*%20*%20*%20*%20*)`
- ctx.path: `/__schedule?path=${schedulePath}&${schedule}`.


To create a task, it is as simple as write a generator / async function. For example:
Expand Down
3 changes: 2 additions & 1 deletion agent.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@ module.exports = agent => {
});

agent.messenger.once('egg-ready', () => {
// wait for other plugin to register custom strategy
const keys = Object.keys(handlers);
for (const type of keys) {
agent.schedule.use(type, handler2Class(type, handlers[type]));
}
// start schedule
// start schedule after worker ready
agent.schedule.start();
});

Expand Down
59 changes: 34 additions & 25 deletions app.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,35 +33,44 @@ module.exports = app => {
return schedule.task(ctx);
};

// log schedule list
for (const s in schedules) {
const schedule = schedules[s];
if (schedule.schedule.disable) continue;
if (!schedule.schedule.disable) app.coreLogger.info('[egg-schedule]: register schedule %s', schedule.key);
}

const task = schedule.task;
const key = schedule.key;
app.coreLogger.info('[egg-schedule]: register schedule %s', key);
app.messenger.on(key, () => {
app.coreLogger.info('[egg-schedule]: get message %s', key);
// register schedule event
app.messenger.on('egg-schedule', data => {
app.coreLogger.info('[egg-schedule]: get message: %j', data);
const key = data.key;
const schedule = schedules[key];

// run with anonymous context
const ctx = app.createAnonymousContext({
method: 'SCHEDULE',
url: `/__schedule?path=${s}&${qs.stringify(schedule.schedule)}`,
});
if (!schedule) {
app.coreLogger.warn(`[egg-schedule] unknown task: ${key}`);
return;
}
/* istanbul ignore next */
if (schedule.schedule.disable) return;

const start = Date.now();
task(ctx)
.then(() => true) // succeed
.catch(err => {
err.message = `[egg-schedule] ${key} excute error. ${err.message}`;
app.logger.error(err);
return false; // failed
})
.then(success => {
const rt = Date.now() - start;
const status = success ? 'succeed' : 'failed';
app.coreLogger.info(`[egg-schedule] ${key} excute ${status}, used ${rt}ms`);
});
// run with anonymous context
const ctx = app.createAnonymousContext({
method: 'SCHEDULE',
url: `/__schedule?path=${key}&${qs.stringify(schedule.schedule)}`,
});
}

const start = Date.now();
const task = schedule.task;
task(ctx)
.then(() => true) // succeed
.catch(err => {
err.message = `[egg-schedule] ${key} excute error. ${err.message}`;
app.logger.error(err);
return false; // failed
})
.then(success => {
const rt = Date.now() - start;
const status = success ? 'succeed' : 'failed';
app.coreLogger.info(`[egg-schedule] ${key} excute ${status}, used ${rt}ms`);
});
});
};
2 changes: 1 addition & 1 deletion lib/load_schedule.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ function getScheduleLoader(app) {
target[fullpath] = {
schedule: schedule.schedule,
task: co.wrap(schedule.task),
key: `egg-schedule:${fullpath}`,
key: fullpath,
};
}
}
Expand Down
4 changes: 2 additions & 2 deletions lib/strategy/base.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ module.exports = class BaseStrategy {
return;
}
this.agent.coreLogger.info(`[egg-schedule] send message to random worker: ${this.key}`);
this.agent.messenger.sendRandom(this.key);
this.agent.messenger.sendRandom('egg-schedule', { key: this.key });
}

sendAll() {
Expand All @@ -24,6 +24,6 @@ module.exports = class BaseStrategy {
return;
}
this.agent.coreLogger.info(`[egg-schedule] send message to all worker: ${this.key}`);
this.agent.messenger.send(this.key);
this.agent.messenger.send('egg-schedule', { key: this.key });
}
};
6 changes: 0 additions & 6 deletions test/fixtures/customType/agent.js
Original file line number Diff line number Diff line change
@@ -1,12 +1,6 @@
'use strict';

const SCHEDULE_HANDLER = Symbol.for('egg#scheduleHandler');

module.exports = function(agent) {
agent[SCHEDULE_HANDLER].custom = function(schedule, sender) {
setInterval(sender.one, schedule.interval);
};

class ClusterStrategy extends agent.ScheduleStrategy {
constructor(...args) {
super(...args);
Expand Down
9 changes: 9 additions & 0 deletions test/fixtures/customTypeOld/agent.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
'use strict';

const SCHEDULE_HANDLER = Symbol.for('egg#scheduleHandler');

module.exports = function(agent) {
agent[SCHEDULE_HANDLER].custom = function(schedule, sender) {
setInterval(sender.one, schedule.interval);
};
};
3 changes: 3 additions & 0 deletions test/fixtures/customTypeOld/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"name": "customTypeOld"
}
9 changes: 9 additions & 0 deletions test/fixtures/unknown/agent.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
'use strict';

module.exports = agent => {
agent.messenger.once('egg-ready', () => {
setTimeout(() => {
agent.messenger.sendRandom('egg-schedule', { key: 'no-exist' });
}, 100);
});
};
3 changes: 3 additions & 0 deletions test/fixtures/unknown/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"name": "unknown"
}
30 changes: 27 additions & 3 deletions test/schedule.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ describe('test/schedule.test.js', () => {
describe('schedule type worker', () => {
it('should support interval and cron', function* () {
app = mm.cluster({ baseDir: 'worker', workers: 2, cache: false });
// app.debug();
yield app.ready();
yield sleep(5000);
const log = getLogContent('worker');
Expand All @@ -28,8 +29,7 @@ describe('test/schedule.test.js', () => {
const log = getLogContent('cronOptions');
const agentLog = getAgentLogContent('cronOptions');
// console.log(log);
const count = contains(log, 'cron-options');
assert(count >= 1 && count <= 2);
assert(contains(log, 'cron-options') >= 1);
assert(/cron-options.js reach endDate, will stop/.test(agentLog));
});

Expand Down Expand Up @@ -105,9 +105,18 @@ describe('test/schedule.test.js', () => {
yield sleep(5000);
const log = getLogContent('customType');
// console.log(log);
assert(contains(log, 'custom_log') === 1);
assert(contains(log, 'cluster_log') === 1);
});

it('should support old way', function* () {
app = mm.cluster({ baseDir: 'customTypeOld', workers: 2 });
// app.debug();
yield app.ready();
yield sleep(5000);
const log = getLogContent('customTypeOld');
// console.log(log);
assert(contains(log, 'custom_log') === 1);
});
});

describe('schedule config error', () => {
Expand Down Expand Up @@ -138,6 +147,16 @@ describe('test/schedule.test.js', () => {
});
});

describe('schedule unknown task', () => {
it('should skip', function* () {
app = mm.cluster({ baseDir: 'unknown', workers: 2 });
// app.debug();
yield app.ready();
yield sleep(3000);
assert(getWebLogContent('unknown').match(/\[egg-schedule] unknown task: no-exist/));
});
});

describe('schedule excute error', () => {
it('should thrown', function* () {
app = mm.cluster({ baseDir: 'excuteError', workers: 2 });
Expand Down Expand Up @@ -263,6 +282,11 @@ function getLogContent(name) {
return fs.readFileSync(logPath, 'utf8');
}

function getWebLogContent(name) {
const logPath = path.join(__dirname, 'fixtures', name, 'logs', name, 'egg-web.log');
return fs.readFileSync(logPath, 'utf8');
}

function getErrorLogContent(name) {
const logPath = path.join(__dirname, 'fixtures', name, 'logs', name, 'common-error.log');
return fs.readFileSync(logPath, 'utf8');
Expand Down

0 comments on commit 4fd1f03

Please sign in to comment.