Skip to content

Commit

Permalink
feat(Queuer): Send notifications via delegator
Browse files Browse the repository at this point in the history
  • Loading branch information
nokome committed Dec 5, 2019
1 parent 100f001 commit b0193c7
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 11 deletions.
14 changes: 9 additions & 5 deletions src/base/Manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,16 @@ export class Manager extends Listener {
* places the call on the queue if it is unable to
* be delegated.
*/
public call(method: Method, params: { [key: string]: any }): Promise<any> {
public async call(
method: Method,
params: { [key: string]: any }
): Promise<any> {
try {
return this.delegator.call(method, params)
const result = await this.delegator.call(method, params)
return result
} catch (error) {
if (error instanceof CapabilityError) {
return this.queuer.call(method, params)
return this.queuer.call(method, params, this)
}
throw error
}
Expand All @@ -47,8 +51,8 @@ export class Manager extends Listener {
* @override Override of {@link Listener.start} which
* also starts periodic checking of the queue
*/
async start(): Promise<void> {
await super.start()
async start(servers: Server[] = []): Promise<void> {
await super.start(servers)
await this.queuer.check(this.delegator)
}
}
4 changes: 2 additions & 2 deletions src/base/Queuer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ test('clean', async () => {
await delay(20)

queuer.clean()
await expect(p0).rejects.toThrow(/Request has become stale/)
await expect(p0).rejects.toThrow(/Job has become stale/)
expect(queue.length).toBe(0)

const p1 = queuer.decode('')
Expand All @@ -118,7 +118,7 @@ test('start + stop', async () => {
const p0 = queuer.decode('')
expect(queue.length).toBe(1)

await expect(p0).rejects.toThrow(/Request has become stale/)
await expect(p0).rejects.toThrow(/Job has become stale/)

const p1 = queuer.decode('')
expect(queue.length).toBe(1)
Expand Down
28 changes: 24 additions & 4 deletions src/base/Queuer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ const log = getLogger('executa:queuer')
interface Job<Type> {
id: string
call: Call
delegator?: Executor
date: Date
resolve: (result: Type) => void
reject: (error: Error) => void
Expand Down Expand Up @@ -44,10 +45,14 @@ export class Queuer extends Executor {
/**
* @implements Implements {@link Executor.call} by placing
* all requests on the queue.
*
* @param delegator The executor, if any that delegated this call.
* Used to notify that executor's clients on the status of the job.
*/
public async call<Type>(
method: Method,
params: Call['params'] = {}
params: Call['params'] = {},
delegator?: Executor
): Promise<Type> {
const {
queue,
Expand All @@ -59,9 +64,10 @@ export class Queuer extends Executor {

return new Promise<Type>((resolve, reject) => {
const id = `job-${uid()}`
queue.push({
const job = {
id,
call: { method, params },
delegator,
date: new Date(),
resolve: (result: Type) => {
this.remove(id)
Expand All @@ -71,10 +77,24 @@ export class Queuer extends Executor {
this.remove(id)
reject(error)
}
})
}
const position = queue.push(job)
this.notify(
'info',
`Job has been added to queue at position ${position}`,
job
)
})
}

notify(subject: string, message: string, job: Job<any>) {
const { call, delegator } = job
if (delegator !== undefined) {
const { claims: { clients = []} = {}} = call.params
delegator.notify(subject, message, undefined, clients)
}
}

/**
* Check the queue on an ongoing basis and attempt to
* reduce its size by completing jobs.
Expand Down Expand Up @@ -139,7 +159,7 @@ export class Queuer extends Executor {
const now = Date.now()
for (const { date, reject } of [...this.queue]) {
if ((now - date.valueOf()) / 1000 >= this.config.queueStale) {
reject(new Error('Request has become stale'))
reject(new Error('Job has become stale'))
}
}
}
Expand Down

0 comments on commit b0193c7

Please sign in to comment.