Skip to content

Commit

Permalink
reafactor: review
Browse files Browse the repository at this point in the history
  • Loading branch information
atian25 committed Sep 28, 2017
1 parent 24fbf8a commit 2eef536
Show file tree
Hide file tree
Showing 12 changed files with 121 additions and 146 deletions.
1 change: 0 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ sudo: false
language: node_js
node_js:
- '6'
- '7'
- '8'
install:
- npm i npminstall && npminstall
Expand Down
41 changes: 20 additions & 21 deletions agent.js
Original file line number Diff line number Diff line change
@@ -1,45 +1,44 @@
'use strict';

const SCHEDULE_HANDLER = Symbol.for('egg#scheduleHandler');
const WorkerStrategy = require('./lib/strategy/worker');
const AllStrategy = require('./lib/strategy/all');
const BaseStrategy = require('./lib/strategy/base');
const Schedule = require('./lib/schedule');

module.exports = agent => {
const Schedule = require('./lib/schedule');
const schedule = new Schedule(agent);

const handlers = agent[SCHEDULE_HANDLER] = {};
agent.schedule = new Schedule(agent);
agent.ScheduleStrategy = BaseStrategy;

agent.beforeClose(() => {
return agent.schedule.close();
});

agent.messenger.once('egg-ready', () => {
agent.schedule.use('worker', WorkerStrategy);
agent.schedule.use('all', AllStrategy);
// Compatible
for (const type of Object.keys(handlers)) {
schedule.use(type, handler2Class(type, handlers[type]));
const keys = Object.keys(handlers);
if (keys.length) agent.deprecate('should use `schedule.use()` instead of `agent[Symbol.for(\'egg#scheduleHandler\')]` to register handler.');
for (const type of keys) {
agent.schedule.use(type, handler2Class(type, handlers[type]));
}
schedule.start();
agent.schedule.start();
});

function handler2Class(type, fn) {
return class CustomStrategy extends Schedule.Strategy {
return class CustomStrategy extends BaseStrategy {
constructor(...args) {
super(...args);
this.type = type;
}
start() {
fn(this.scheduleInfo, {
one: function() {
this.agent.coreLogger.info(`[egg-schedule] send message to random worker: ${this.key}`);
this.agent.messenger.sendRandom(this.key);
}.bind(this),

all: function() {
this.agent.coreLogger.info(`[egg-schedule] send message to all worker: ${this.key}`);
this.agent.messenger.send(this.key);
}.bind(this),
one: this.sendOne.bind(this),
all: this.sendAll.bind(this),
});
}
stop() {
const err = new Error(`schedule type [${this.type}] is implement stop handler`);
err.name = 'EggScheduleError';
throw err;
}
};
}
};
1 change: 0 additions & 1 deletion appveyor.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
environment:
matrix:
- nodejs_version: '6'
- nodejs_version: '7'
- nodejs_version: '8'

install:
Expand Down
32 changes: 18 additions & 14 deletions lib/schedule.js
Original file line number Diff line number Diff line change
@@ -1,44 +1,48 @@
'use strict';

const STRATEGY = Symbol('strategy');
const HANDLER = Symbol('handler');
const loadSchedule = require('./load_schedule');
const WorkerStrategy = require('./strategy/worker');
const AllStrategy = require('./strategy/all');
const Strategy = require('./strategy/base');

module.exports = class Schedule {
constructor(agent) {
this.agent = agent;
this.handler = new Map();
this.strategy = new Map();
this.use('worker', WorkerStrategy);
this.use('all', AllStrategy);
this[STRATEGY] = new Map();
this[HANDLER] = new Map();
}

use(type, clz) {
this.strategy.set(type, clz);
clz.prototype.type = type;
this[STRATEGY].set(type, clz);
}

start() {
this.agent.disableSchedule = false;
this.closed = false;
const scheduleItems = loadSchedule(this.agent);

for (const k of Object.keys(scheduleItems)) {
const { key, schedule } = scheduleItems[k];
const type = schedule.type;
if (schedule.disable) continue;

const Strategy = this.strategy.get(type);
const Strategy = this[STRATEGY].get(type);
if (!Strategy) {
const err = new Error(`schedule type [${type}] is not defined`);
err.name = 'EggScheduleError';
throw err;
}

const handler = new Strategy(this.agent, key, schedule);
this.handler.set(key, handler);
const handler = new Strategy(schedule, this.agent, key);
this[HANDLER].set(key, handler);
handler.start();
}
}
};

module.exports.Strategy = Strategy;
close() {
this.closed = true;
for (const handler of this[HANDLER].values()) {
handler.close();
}
this.agent.coreLogger.info('[egg-schedule] close tasks.');
}
};
5 changes: 2 additions & 3 deletions lib/strategy/all.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
'use strict';

const Strategy = require('./base');
const Strategy = require('./timer');

module.exports = class WorkerStrategy extends Strategy {
constructor(...args) {
Expand All @@ -9,7 +9,6 @@ module.exports = class WorkerStrategy extends Strategy {
}

send() {
this.agent.coreLogger.info(`[egg-schedule] send message to all worker: ${this.key}`);
this.agent.messenger.send(this.key);
this.sendAll();
}
};
94 changes: 19 additions & 75 deletions lib/strategy/base.js
Original file line number Diff line number Diff line change
@@ -1,93 +1,37 @@
'use strict';

const parser = require('cron-parser');
const ms = require('humanize-ms');
const safetimers = require('safe-timers');

module.exports = class BaseStrategy {
constructor(agent, key, scheduleInfo) {
constructor(scheduleInfo, agent, key) {
this.agent = agent;
this.key = key;
this.scheduleInfo = scheduleInfo;
this.interval = undefined;
this.timer = undefined;
}

start() {
const { interval, cron, immediate } = this.scheduleInfo;
if (!interval && !cron) {
throw new Error('[egg-schedule] schedule.interval or schedule.cron must be present');
}

const send = () => {
if (this.agent.disableSchedule) {
this.agent.coreLogger.info(`[egg-schedule] message ${this.key} did not sent`);
return;
}
this.send();
};

if (interval) {
this.interval = this.safeInterval(send, ms(interval));
}

if (cron) {
let interval;
try {
// TODO: cronOptions
interval = parser.parseExpression(cron);
} catch (err) {
err.message = `[egg-schedule] parse cron instruction(${cron}) error: ${err.message}`;
throw err;
}
this.startCron(interval, send);
}

if (immediate) {
setImmediate(send);
}
}

stop() {
if (this.interval) {
clearInterval(this.interval);
this.interval = undefined;
}

if (this.timer) {
clearTimeout(this.timer);
this.timer = undefined;
sendOne() {
if (this.agent.schedule.closed) {
this.agent.coreLogger.info(`[egg-schedule] message ${this.key} did not sent`);
return;
}
this.agent.coreLogger.info(`[egg-schedule] send message to random worker: ${this.key}`);
this.agent.messenger.sendRandom(this.key);
}

send() {
}

startCron(interval, listener, done) {
const now = Date.now();
let nextTick;
try {
do {
// TODO: try
nextTick = interval.next().getTime();
} while (now >= nextTick);
} catch (err) {
return done(err);
sendAll() {
if (this.agent.schedule.closed) {
this.agent.coreLogger.info(`[egg-schedule] message ${this.key} did not sent`);
return;
}

this.timer = this.safeTimeout(() => {
listener();
this.timer = this.startCron(interval, listener, done);
}, nextTick - now);
this.agent.coreLogger.info(`[egg-schedule] send message to all worker: ${this.key}`);
this.agent.messenger.send(this.key);
}

safeTimeout(handler, delay, ...args) {
const fn = delay < safetimers.maxInterval ? setTimeout : safetimers.setTimeout;
return fn(handler, delay, ...args);
start() {
const err = new Error(`schedule type [${this.type}] start is not implemented yet`);
err.name = 'EggScheduleError';
throw err;
}

safeInterval(handler, delay, ...args) {
const fn = delay < safetimers.maxInterval ? setInterval : safetimers.setInterval;
return fn(handler, delay, ...args);
close() {
this.agent.logger.warn(`schedule type [${this.type}] stop is not implemented yet`);
}
};
5 changes: 2 additions & 3 deletions lib/strategy/worker.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
'use strict';

const Strategy = require('./base');
const Strategy = require('./timer');

module.exports = class WorkerStrategy extends Strategy {
constructor(...args) {
Expand All @@ -9,7 +9,6 @@ module.exports = class WorkerStrategy extends Strategy {
}

send() {
this.agent.coreLogger.info(`[egg-schedule] send message to random worker: ${this.key}`);
this.agent.messenger.sendRandom(this.key);
this.sendOne();
}
};
20 changes: 10 additions & 10 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,19 @@
],
"dependencies": {
"co": "^4.6.0",
"cron-parser": "^2.4.0",
"cron-parser": "^2.4.1",
"humanize-ms": "^1.2.1",
"is-type-of": "^1.0.0",
"is-type-of": "^1.2.0",
"safe-timers": "^1.0.1"
},
"devDependencies": {
"autod": "^2.8.0",
"egg": "^1.4.0",
"egg-bin": "^3.4.1",
"egg-ci": "^1.6.0",
"egg-mock": "^3.7.1",
"eslint": "^3.19.0",
"eslint-config-egg": "^4.2.0",
"autod": "^2.9.0",
"egg": "^1.9.0",
"egg-bin": "^4.3.3",
"egg-ci": "^1.8.0",
"egg-mock": "^3.12.2",
"eslint": "^4.7.2",
"eslint-config-egg": "^5.1.1",
"supertest": "^3.0.0"
},
"engines": {
Expand All @@ -51,7 +51,7 @@
"pkgfiles": "egg-bin pkgfiles"
},
"ci": {
"version": "6, 7, 8",
"version": "6, 8",
"license": true
},
"author": "dead_horse",
Expand Down
17 changes: 17 additions & 0 deletions test/fixtures/customType/agent.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,21 @@ module.exports = function(agent) {
agent[SCHEDULE_HANDLER].custom = function(schedule, sender) {
setInterval(sender.one, schedule.interval);
};

class ClusterStrategy extends agent.ScheduleStrategy {
start() {
this.interval = setInterval(() => {
this.sendOne();
}, this.scheduleInfo.interval);
}
close() {
if (this.interval) {
this.clear(this.interval);
this.interval = undefined;
}
}
}
agent.schedule.use('cluster', ClusterStrategy);

agent.schedule.use('error', class ErrorStrategy extends agent.ScheduleStrategy {});
};
10 changes: 10 additions & 0 deletions test/fixtures/customType/app/schedule/cluster.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
'use strict';

exports.schedule = {
type: 'cluster',
interval: 4000,
};

exports.task = function* (ctx) {
ctx.logger.info('cluster_log');
};
2 changes: 1 addition & 1 deletion test/fixtures/customType/app/schedule/custom.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@ exports.schedule = {
};

exports.task = function* (ctx) {
ctx.logger.info('custom');
ctx.logger.info('custom_log');
};
Loading

0 comments on commit 2eef536

Please sign in to comment.