Skip to content

Commit

Permalink
feat: timer
Browse files Browse the repository at this point in the history
  • Loading branch information
atian25 committed Sep 29, 2017
1 parent 60aeba7 commit 1dd935d
Show file tree
Hide file tree
Showing 11 changed files with 103 additions and 69 deletions.
27 changes: 13 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,9 @@ exports.schedule = {
/**
* @property {Object} schedule
* - {String} type - schedule type, `worker` or `all`
* - {String} [cron] - cron expression, [see below](#cron-style-scheduling)
* - {String | Number} [interval] - interval expression in millisecond or express explicitly like '1h'. [see below](#interval-style-scheduling)
* - {String} [cron] - cron expression, see [below](#cron-style-scheduling)
* - {Object} [cronOptions] - cron options, see [cron-parser#options](https://github.com/harrisiirak/cron-parser#options)
* - {String | Number} [interval] - interval expression in millisecond or express explicitly like '1h'. see [below](#interval-style-scheduling)
* - {Boolean} [immediate] - To run a scheduler at startup
* - {Boolean} [disable] - whether to disable a scheduler, usually use in dynamic schedule
*/
Expand Down Expand Up @@ -80,7 +81,7 @@ When the scheduled task runs, the scheduled job information will be logged and w
- ctx.query: `scheule config(type=worker&cron=*%2F5%20*%20*%20*%20*%20*)`


To create a task, it is as simple as write a generator function. For example:
To create a task, it is as simple as write a generator / async function. For example:

```js
// A simple logger example
Expand All @@ -92,14 +93,14 @@ exports.task = function* (ctx) {
```js
// A real world example: wipe out your database.
// Use it with caution. :)
exports.task = function* (ctx) {
yield ctx.service.db.cleandb();
exports.task = async function(ctx) {
await ctx.service.db.cleandb();
};
```

## Scheduling

`schedule` is an object that contains one required property, `type`, four optional properties, `{ cron, interval, immediate, disable }`.
`schedule` is an object that contains one required property, `type`, and optional properties, `{ cron, cronOptions, interval, immediate, disable }`.

### Cron-style Scheduling

Expand Down Expand Up @@ -128,6 +129,9 @@ Example:
exports.schedule = {
type: 'worker',
cron: '0 0 */3 * * *',
cronOptions: {
// tz: 'Europe/Athens',
}
};
```

Expand Down Expand Up @@ -161,23 +165,18 @@ You can schedule the task to be executed by one random worker or all workers wit
// {app_root}/agent.js
module.exports = function(agent) {
class CustomStrategy extends agent.ScheduleStrategy {
start() {
constructor(...args) {
super(...args);
this.interval = setInterval(() => {
this.sendOne();
}, this.schedule.interval);
}
close() {
if (this.interval) {
this.clear(this.interval);
this.interval = undefined;
}
}
}
agent.schedule.use('custsom', CustomStrategy);
};
```

Then you could use defined your job:
Then you could use it to defined your job:

```js
// {app_root}/app/schedule/other.js
Expand Down
6 changes: 2 additions & 4 deletions agent.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,13 @@ module.exports = agent => {

function handler2Class(type, fn) {
return class CustomStrategy extends agent.ScheduleStrategy {
start() {
constructor(...args) {
super(...args);
fn(this.schedule, {
one: this.sendOne.bind(this),
all: this.sendAll.bind(this),
});
}
close() {
this.agent.logger.warn(`schedule type [${type}] stop is not implemented yet`);
}
};
}
};
16 changes: 16 additions & 0 deletions app/extend/agent.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@

const Strategy = require('../../lib/strategy/base');
const Schedule = require('../../lib/schedule');
const Timer = require('../../lib/timer');

const SCHEDULE = Symbol('agent#schedule');
const TIMER = Symbol('agent#scheduleTimer');

module.exports = {
/**
Expand All @@ -22,4 +25,17 @@ module.exports = {
}
return this[SCHEDULE];
},

/**
* @member agent#scheduleTimer
*/
get scheduleTimer() {
if (!this[TIMER]) {
this[TIMER] = new Timer(this);
this.beforeClose(() => {
return this[TIMER].close();
});
}
return this[TIMER];
},
};
23 changes: 12 additions & 11 deletions lib/schedule.js
Original file line number Diff line number Diff line change
@@ -1,23 +1,27 @@
'use strict';

const STRATEGY = Symbol('strategy');
const HANDLER = Symbol('handler');
const assert = require('assert');
const loadSchedule = require('./load_schedule');

module.exports = class Schedule {
constructor(agent) {
this.agent = agent;
this[STRATEGY] = new Map();
this[HANDLER] = new Map();
this.closed = false;
}

/**
* register a custom Schedule Strategy
* @param {String} type - strategy type
* @param {Strategy} clz - Strategy class
*/
use(type, clz) {
assert(typeof clz.prototype.start === 'function', `schedule type [${type}] should implement \`start\` method`);
this[STRATEGY].set(type, clz);
}

/**
* start schedule
*/
start() {
this.closed = false;
const scheduleItems = loadSchedule(this.agent);
Expand All @@ -34,17 +38,14 @@ module.exports = class Schedule {
throw err;
}

const handler = new Strategy(schedule, this.agent, key);
this[HANDLER].set(key, handler);
handler.start();
new Strategy(schedule, this.agent, key);
}
}

/**
* close schedule
*/
close() {
this.closed = true;
for (const [ key, handler ] of this[HANDLER].entries()) {
this.agent.coreLogger.info(`[egg-schedule] close tasks: ${key}`);
handler.close();
}
}
};
11 changes: 1 addition & 10 deletions lib/strategy/all.js
Original file line number Diff line number Diff line change
@@ -1,19 +1,10 @@
'use strict';

const Strategy = require('./base');
const Timer = require('../timer');

module.exports = class AllStrategy extends Strategy {
constructor(...args) {
super(...args);
this.timer = new Timer();
}

start() {
this.timer.start(this.key, this.schedule, () => this.sendAll());
}

close() {
this.timer.stop();
this.agent.scheduleTimer.start(this.key, this.schedule, () => this.sendAll());
}
};
11 changes: 1 addition & 10 deletions lib/strategy/worker.js
Original file line number Diff line number Diff line change
@@ -1,19 +1,10 @@
'use strict';

const Strategy = require('./base');
const Timer = require('../timer');

module.exports = class WorkerStrategy extends Strategy {
constructor(...args) {
super(...args);
this.timer = new Timer();
}

start() {
this.timer.start(this.key, this.schedule, () => this.sendOne());
}

close() {
this.timer.stop();
this.agent.scheduleTimer.start(this.key, this.schedule, () => this.sendOne());
}
};
40 changes: 27 additions & 13 deletions lib/timer.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,42 +5,51 @@ const ms = require('humanize-ms');
const safetimers = require('safe-timers');

class Timer {
constructor() {
constructor(agent) {
this.agent = agent;
this.interval = new Map();
this.timer = new Map();
}

start(key, schedule, cb) {
const { interval, cron, immediate } = schedule;
/**
* start a timer
* @param {String} key - schedule key
* @param {Object} schedule - schedule config `{ interval, cron, cronOptions, immediate}`
* @param {Function} listener - sender handler
*/
start(key, schedule, listener) {
const { interval, cron, cronOptions, immediate } = schedule;
if (!interval && !cron) {
throw new Error('[egg-schedule] schedule.interval or schedule.cron must be present');
}

let tid;

if (interval) {
tid = this.safeInterval(cb, ms(interval));
tid = this.safeInterval(listener, ms(interval));
this.interval.set(key, tid);
}

if (cron) {
let interval;
try {
// TODO: cronOptions
interval = parser.parseExpression(cron);
interval = parser.parseExpression(cron, cronOptions);
} catch (err) {
err.message = `[egg-schedule] parse cron instruction(${cron}) error: ${err.message}`;
throw err;
}
this.startCron(key, interval, cb);
this.startCron(key, interval, listener);
}

if (immediate) {
setImmediate(cb);
setImmediate(listener);
}
}

stop() {
/**
* clean all timers
*/
close() {
for (const tid of this.interval.values()) {
clearInterval(tid);
}
Expand All @@ -55,10 +64,15 @@ class Timer {
startCron(key, interval, listener) {
const now = Date.now();
let nextTick;
do {
// TODO: try error when reach endDate
nextTick = interval.next().getTime();
} while (now >= nextTick);
try {
do {
nextTick = interval.next().getTime();
} while (now >= nextTick);
} catch (err) {
// when reach endDate
this.agent.coreLogger.warn(`[egg-schedule] ${key} reach endDate, will stop.`);
return;
}

const tid = this.safeTimeout(() => {
listener();
Expand Down
13 changes: 13 additions & 0 deletions test/fixtures/cronOptions/app/schedule/cron-options.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
'use strict';

exports.schedule = {
type: 'worker',
cron: '*/2 * * * * *',
cronOptions: {
endDate: Date.now() + 4500,
}
};

exports.task = function* (ctx) {
ctx.logger.info('cron-options');
};
3 changes: 3 additions & 0 deletions test/fixtures/cronOptions/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"name": "cronOptions"
}
9 changes: 2 additions & 7 deletions test/fixtures/customType/agent.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,12 @@ module.exports = function(agent) {
};

class ClusterStrategy extends agent.ScheduleStrategy {
start() {
constructor(...args) {
super(...args);
this.interval = setInterval(() => {
this.sendOne();
}, this.schedule.interval);
}
close() {
if (this.interval) {
this.clear(this.interval);
this.interval = undefined;
}
}
}
agent.schedule.use('cluster', ClusterStrategy);
};
13 changes: 13 additions & 0 deletions test/schedule.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,19 @@ describe('test/schedule.test.js', () => {
assert(contains(log, 'cron') === 1);
});

it('should support cronOptions', function* () {
app = mm.cluster({ baseDir: 'cronOptions', workers: 2 });
// app.debug();
yield app.ready();
yield sleep(8000);
const log = getLogContent('cronOptions');
const agentLog = getAgentLogContent('cronOptions');
// console.log(log);
assert(contains(log, 'cron-options') === 2);
assert(/cron-options.js reach endDate, will stop/.test(agentLog));
});


it('should support context', function* () {
app = mm.cluster({ baseDir: 'context', workers: 2 });
yield app.ready();
Expand Down

0 comments on commit 1dd935d

Please sign in to comment.