-
Notifications
You must be signed in to change notification settings - Fork 19
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
13 changed files
with
159 additions
and
175 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,7 +2,6 @@ sudo: false | |
language: node_js | ||
node_js: | ||
- '6' | ||
- '7' | ||
- '8' | ||
install: | ||
- npm i npminstall && npminstall | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,45 +1,44 @@ | ||
'use strict'; | ||
|
||
const SCHEDULE_HANDLER = Symbol.for('egg#scheduleHandler'); | ||
const WorkerStrategy = require('./lib/strategy/worker'); | ||
const AllStrategy = require('./lib/strategy/all'); | ||
const BaseStrategy = require('./lib/strategy/base'); | ||
const Schedule = require('./lib/schedule'); | ||
|
||
module.exports = agent => { | ||
const Schedule = require('./lib/schedule'); | ||
const schedule = new Schedule(agent); | ||
|
||
const handlers = agent[SCHEDULE_HANDLER] = {}; | ||
agent.schedule = new Schedule(agent); | ||
agent.ScheduleStrategy = BaseStrategy; | ||
|
||
agent.beforeClose(() => { | ||
return agent.schedule.close(); | ||
}); | ||
|
||
agent.messenger.once('egg-ready', () => { | ||
// Compatible | ||
for (const type of Object.keys(handlers)) { | ||
schedule.use(type, handler2Class(type, handlers[type])); | ||
agent.schedule.use('worker', WorkerStrategy); | ||
agent.schedule.use('all', AllStrategy); | ||
//TODO: compatible, will remove at next major | ||
const keys = Object.keys(handlers); | ||
if (keys.length) agent.deprecate('should use `schedule.use()` instead of `agent[Symbol.for(\'egg#scheduleHandler\')]` to register handler.'); | ||
for (const type of keys) { | ||
agent.schedule.use(type, handler2Class(type, handlers[type])); | ||
} | ||
schedule.start(); | ||
agent.schedule.start(); | ||
}); | ||
|
||
function handler2Class(type, fn) { | ||
return class CustomStrategy extends Schedule.Strategy { | ||
return class CustomStrategy extends BaseStrategy { | ||
constructor(...args) { | ||
super(...args); | ||
this.type = type; | ||
} | ||
start() { | ||
fn(this.scheduleInfo, { | ||
one: function() { | ||
this.agent.coreLogger.info(`[egg-schedule] send message to random worker: ${this.key}`); | ||
this.agent.messenger.sendRandom(this.key); | ||
}.bind(this), | ||
|
||
all: function() { | ||
this.agent.coreLogger.info(`[egg-schedule] send message to all worker: ${this.key}`); | ||
this.agent.messenger.send(this.key); | ||
}.bind(this), | ||
fn(this.schedule, { | ||
one: this.sendOne.bind(this), | ||
all: this.sendAll.bind(this), | ||
}); | ||
} | ||
stop() { | ||
const err = new Error(`schedule type [${this.type}] is implement stop handler`); | ||
err.name = 'EggScheduleError'; | ||
throw err; | ||
} | ||
}; | ||
} | ||
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,7 +1,6 @@ | ||
environment: | ||
matrix: | ||
- nodejs_version: '6' | ||
- nodejs_version: '7' | ||
- nodejs_version: '8' | ||
|
||
install: | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,44 +1,48 @@ | ||
'use strict'; | ||
|
||
const STRATEGY = Symbol('strategy'); | ||
const HANDLER = Symbol('handler'); | ||
const loadSchedule = require('./load_schedule'); | ||
const WorkerStrategy = require('./strategy/worker'); | ||
const AllStrategy = require('./strategy/all'); | ||
const Strategy = require('./strategy/base'); | ||
|
||
module.exports = class Schedule { | ||
constructor(agent) { | ||
this.agent = agent; | ||
this.handler = new Map(); | ||
this.strategy = new Map(); | ||
this.use('worker', WorkerStrategy); | ||
this.use('all', AllStrategy); | ||
this[STRATEGY] = new Map(); | ||
this[HANDLER] = new Map(); | ||
} | ||
|
||
use(type, clz) { | ||
this.strategy.set(type, clz); | ||
clz.prototype.type = type; | ||
this[STRATEGY].set(type, clz); | ||
} | ||
|
||
start() { | ||
this.agent.disableSchedule = false; | ||
this.closed = false; | ||
const scheduleItems = loadSchedule(this.agent); | ||
|
||
for (const k of Object.keys(scheduleItems)) { | ||
const { key, schedule } = scheduleItems[k]; | ||
const type = schedule.type; | ||
if (schedule.disable) continue; | ||
|
||
const Strategy = this.strategy.get(type); | ||
const Strategy = this[STRATEGY].get(type); | ||
if (!Strategy) { | ||
const err = new Error(`schedule type [${type}] is not defined`); | ||
err.name = 'EggScheduleError'; | ||
throw err; | ||
} | ||
|
||
const handler = new Strategy(this.agent, key, schedule); | ||
this.handler.set(key, handler); | ||
const handler = new Strategy(schedule, this.agent, key); | ||
this[HANDLER].set(key, handler); | ||
handler.start(); | ||
} | ||
} | ||
}; | ||
|
||
module.exports.Strategy = Strategy; | ||
close() { | ||
this.closed = true; | ||
for (const handler of this[HANDLER].values()) { | ||
handler.close(); | ||
} | ||
this.agent.coreLogger.info('[egg-schedule] close tasks.'); | ||
} | ||
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,15 +1,9 @@ | ||
'use strict'; | ||
|
||
const Strategy = require('./base'); | ||
const Strategy = require('./timer'); | ||
|
||
module.exports = class WorkerStrategy extends Strategy { | ||
constructor(...args) { | ||
super(...args); | ||
this.type = 'all'; | ||
} | ||
|
||
send() { | ||
this.agent.coreLogger.info(`[egg-schedule] send message to all worker: ${this.key}`); | ||
this.agent.messenger.send(this.key); | ||
this.sendAll(); | ||
} | ||
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,93 +1,41 @@ | ||
'use strict'; | ||
|
||
const parser = require('cron-parser'); | ||
const ms = require('humanize-ms'); | ||
const safetimers = require('safe-timers'); | ||
|
||
module.exports = class BaseStrategy { | ||
constructor(agent, key, scheduleInfo) { | ||
constructor(schedule, agent, key) { | ||
this.agent = agent; | ||
this.key = key; | ||
this.scheduleInfo = scheduleInfo; | ||
this.interval = undefined; | ||
this.timer = undefined; | ||
} | ||
|
||
start() { | ||
const { interval, cron, immediate } = this.scheduleInfo; | ||
if (!interval && !cron) { | ||
throw new Error('[egg-schedule] schedule.interval or schedule.cron must be present'); | ||
} | ||
|
||
const send = () => { | ||
if (this.agent.disableSchedule) { | ||
this.agent.coreLogger.info(`[egg-schedule] message ${this.key} did not sent`); | ||
return; | ||
} | ||
this.send(); | ||
}; | ||
|
||
if (interval) { | ||
this.interval = this.safeInterval(send, ms(interval)); | ||
} | ||
|
||
if (cron) { | ||
let interval; | ||
try { | ||
// TODO: cronOptions | ||
interval = parser.parseExpression(cron); | ||
} catch (err) { | ||
err.message = `[egg-schedule] parse cron instruction(${cron}) error: ${err.message}`; | ||
throw err; | ||
} | ||
this.startCron(interval, send); | ||
} | ||
|
||
if (immediate) { | ||
setImmediate(send); | ||
} | ||
this.schedule = schedule; | ||
} | ||
|
||
stop() { | ||
if (this.interval) { | ||
clearInterval(this.interval); | ||
this.interval = undefined; | ||
} | ||
|
||
if (this.timer) { | ||
clearTimeout(this.timer); | ||
this.timer = undefined; | ||
sendOne() { | ||
/* istanbul ignore next */ | ||
if (this.agent.schedule.closed) { | ||
this.agent.coreLogger.info(`[egg-schedule] message ${this.key} did not sent`); | ||
return; | ||
} | ||
this.agent.coreLogger.info(`[egg-schedule] send message to random worker: ${this.key}`); | ||
this.agent.messenger.sendRandom(this.key); | ||
} | ||
|
||
send() { | ||
} | ||
|
||
startCron(interval, listener, done) { | ||
const now = Date.now(); | ||
let nextTick; | ||
try { | ||
do { | ||
// TODO: try | ||
nextTick = interval.next().getTime(); | ||
} while (now >= nextTick); | ||
} catch (err) { | ||
return done(err); | ||
sendAll() { | ||
/* istanbul ignore next */ | ||
if (this.agent.schedule.closed) { | ||
this.agent.coreLogger.info(`[egg-schedule] message ${this.key} did not sent`); | ||
return; | ||
} | ||
|
||
this.timer = this.safeTimeout(() => { | ||
listener(); | ||
this.timer = this.startCron(interval, listener, done); | ||
}, nextTick - now); | ||
this.agent.coreLogger.info(`[egg-schedule] send message to all worker: ${this.key}`); | ||
this.agent.messenger.send(this.key); | ||
} | ||
|
||
safeTimeout(handler, delay, ...args) { | ||
const fn = delay < safetimers.maxInterval ? setTimeout : safetimers.setTimeout; | ||
return fn(handler, delay, ...args); | ||
start() { | ||
const err = new Error(`schedule type [${this.type}] start is not implemented yet`); | ||
err.name = 'EggScheduleError'; | ||
throw err; | ||
} | ||
|
||
safeInterval(handler, delay, ...args) { | ||
const fn = delay < safetimers.maxInterval ? setInterval : safetimers.setInterval; | ||
return fn(handler, delay, ...args); | ||
/* istanbul ignore next */ | ||
close() { | ||
/* istanbul ignore next */ | ||
this.agent.logger.warn(`schedule type [${this.type}] stop is not implemented yet`); | ||
} | ||
}; |
Oops, something went wrong.