-
Notifications
You must be signed in to change notification settings - Fork 19
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor: classify #23
Changes from 5 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,3 +3,4 @@ npm-debug.log | |
node_modules/ | ||
coverage/ | ||
run/ | ||
.vscode |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,7 +2,6 @@ sudo: false | |
language: node_js | ||
node_js: | ||
- '6' | ||
- '7' | ||
- '8' | ||
install: | ||
- npm i npminstall && npminstall | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,121 +1,43 @@ | ||
'use strict'; | ||
|
||
const loadSchedule = require('./lib/load_schedule'); | ||
const parser = require('cron-parser'); | ||
const ms = require('humanize-ms'); | ||
const safetimers = require('safe-timers'); | ||
const SCHEDULE_HANDLER = Symbol.for('egg#scheduleHandler'); | ||
const WorkerStrategy = require('./lib/strategy/worker'); | ||
const AllStrategy = require('./lib/strategy/all'); | ||
|
||
module.exports = agent => { | ||
// add handler into `agent[SCHEDULE_HANDLER]` for extend other kind of schedule type. | ||
// worker: will excute in one random worker when schedule excuted. | ||
// all: will excute in all workers when schedule excuted. | ||
const handlers = agent[SCHEDULE_HANDLER] = { | ||
worker: workerHandler, | ||
all: allHander, | ||
}; | ||
|
||
agent.messenger.once('egg-ready', startSchedule); | ||
|
||
function startSchedule() { | ||
agent.disableSchedule = false; | ||
const schedules = loadSchedule(agent); | ||
for (const s in schedules) { | ||
const schedule = schedules[s]; | ||
if (schedule.schedule.disable) continue; | ||
|
||
const type = schedule.schedule.type; | ||
const handler = handlers[type]; | ||
if (!handler) { | ||
const err = new Error(`schedule type [${type}] is not defined`); | ||
err.name = 'EggScheduleError'; | ||
throw err; | ||
} | ||
handler(schedule.schedule, { | ||
one() { | ||
sendMessage(agent, 'sendRandom', schedule.key); | ||
}, | ||
all() { | ||
sendMessage(agent, 'send', schedule.key); | ||
}, | ||
}); | ||
} | ||
} | ||
|
||
agent.on('close', () => { | ||
agent.disableSchedule = true; | ||
return; | ||
// register built-in strategy | ||
agent.schedule.use('worker', WorkerStrategy); | ||
agent.schedule.use('all', AllStrategy); | ||
|
||
// TODO: compatible, will remove at next major | ||
const handlers = {}; | ||
Object.defineProperty(agent, SCHEDULE_HANDLER, { | ||
get() { | ||
agent.deprecate('should use `agent.schedule.use()` instead of `agent[Symbol.for(\'egg#scheduleHandler\')]` to register handler.'); | ||
return handlers; | ||
}, | ||
}); | ||
}; | ||
|
||
function sendMessage(agent, method, key) { | ||
if (agent.disableSchedule) { | ||
agent.coreLogger.info(`[egg-schedule] message ${key} did not sent`); | ||
return; | ||
} | ||
agent.coreLogger.info(`[egg-schedule] send message: ${method} ${key}`); | ||
agent.messenger[method](key); | ||
} | ||
|
||
function workerHandler(schedule, sender) { | ||
baseHander(schedule, sender.one); | ||
} | ||
|
||
function allHander(schedule, sender) { | ||
baseHander(schedule, sender.all); | ||
} | ||
|
||
function baseHander(schedule, send) { | ||
if (!schedule.interval && !schedule.cron) { | ||
throw new Error('[egg-schedule] schedule.interval or schedule.cron must be present'); | ||
} | ||
|
||
if (schedule.interval) { | ||
const interval = ms(schedule.interval); | ||
safeInterval(send, interval); | ||
} | ||
|
||
if (schedule.cron) { | ||
let interval; | ||
try { | ||
interval = parser.parseExpression(schedule.cron); | ||
} catch (err) { | ||
err.message = `[egg-schedule] parse cron instruction(${schedule.cron}) error: ${err.message}`; | ||
throw err; | ||
agent.messenger.once('egg-ready', () => { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 加一下注释,需要等插件注册 strategy There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 加了,在下一行 |
||
const keys = Object.keys(handlers); | ||
for (const type of keys) { | ||
agent.schedule.use(type, handler2Class(type, handlers[type])); | ||
} | ||
startCron(interval, send); | ||
} | ||
|
||
if (schedule.immediate) { | ||
setImmediate(send); | ||
} | ||
} | ||
|
||
function startCron(interval, listener) { | ||
const now = Date.now(); | ||
let nextTick; | ||
do { | ||
nextTick = interval.next().getTime(); | ||
} while (now >= nextTick); | ||
|
||
safeTimeout(() => { | ||
listener(); | ||
startCron(interval, listener); | ||
}, nextTick - now); | ||
} | ||
|
||
function safeTimeout(fn, delay, ...args) { | ||
if (delay < safetimers.maxInterval) { | ||
setTimeout(fn, delay, ...args); | ||
} else { | ||
safetimers.setTimeout(fn, delay, ...args); | ||
} | ||
} | ||
// start schedule | ||
agent.schedule.start(); | ||
}); | ||
|
||
function safeInterval(fn, delay, ...args) { | ||
if (delay < safetimers.maxInterval) { | ||
setInterval(fn, delay, ...args); | ||
} else { | ||
safetimers.setInterval(fn, delay, ...args); | ||
function handler2Class(type, fn) { | ||
return class CustomStrategy extends agent.ScheduleStrategy { | ||
start() { | ||
fn(this.schedule, { | ||
one: this.sendOne.bind(this), | ||
all: this.sendAll.bind(this), | ||
}); | ||
} | ||
close() { | ||
this.agent.logger.warn(`schedule type [${type}] stop is not implemented yet`); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 没有 close 就不要实现了。 |
||
} | ||
}; | ||
} | ||
} | ||
}; |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
'use strict'; | ||
|
||
const Strategy = require('../../lib/strategy/base'); | ||
const Schedule = require('../../lib/schedule'); | ||
const SCHEDULE = Symbol('agent#schedule'); | ||
|
||
module.exports = { | ||
/** | ||
* @member agent#ScheduleStrategy | ||
*/ | ||
ScheduleStrategy: Strategy, | ||
|
||
/** | ||
* @member agent#schedule | ||
*/ | ||
get schedule() { | ||
if (!this[SCHEDULE]) { | ||
this[SCHEDULE] = new Schedule(this); | ||
this.beforeClose(() => { | ||
return this[SCHEDULE].close(); | ||
}); | ||
} | ||
return this[SCHEDULE]; | ||
}, | ||
}; |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
'use strict'; | ||
|
||
const STRATEGY = Symbol('strategy'); | ||
const HANDLER = Symbol('handler'); | ||
const assert = require('assert'); | ||
const loadSchedule = require('./load_schedule'); | ||
|
||
module.exports = class Schedule { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 用 sdk-base?实现 _init?这样不需要手动调用 start 了。 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 感觉继承 |
||
constructor(agent) { | ||
this.agent = agent; | ||
this[STRATEGY] = new Map(); | ||
this[HANDLER] = new Map(); | ||
this.closed = false; | ||
} | ||
|
||
use(type, clz) { | ||
assert(typeof clz.prototype.start === 'function', `schedule type [${type}] should implement \`start\` method`); | ||
this[STRATEGY].set(type, clz); | ||
} | ||
|
||
start() { | ||
this.closed = false; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 这里是不是不用设了 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 如果手动调用一次 close,然后再调用 start 这种场景呢? |
||
const scheduleItems = loadSchedule(this.agent); | ||
|
||
for (const k of Object.keys(scheduleItems)) { | ||
const { key, schedule } = scheduleItems[k]; | ||
const type = schedule.type; | ||
if (schedule.disable) continue; | ||
|
||
const Strategy = this[STRATEGY].get(type); | ||
if (!Strategy) { | ||
const err = new Error(`schedule type [${type}] is not defined`); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is not supported? |
||
err.name = 'EggScheduleError'; | ||
throw err; | ||
} | ||
|
||
const handler = new Strategy(schedule, this.agent, key); | ||
this[HANDLER].set(key, handler); | ||
handler.start(); | ||
} | ||
} | ||
|
||
close() { | ||
this.closed = true; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 实际化设 false ? |
||
for (const [ key, handler ] of this[HANDLER].entries()) { | ||
this.agent.coreLogger.info(`[egg-schedule] close tasks: ${key}`); | ||
handler.close(); | ||
} | ||
} | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dead-horse 这里原来就可以设置多个的吗
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
啥?