-
Notifications
You must be signed in to change notification settings - Fork 9.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(core): Add basic execution concurrency limits in main mode #8458
Conversation
b3d9a5c
to
11b7b49
Compare
11b7b49
to
f681b8d
Compare
0b27aed
to
0502e77
Compare
0502e77
to
6a11f4a
Compare
5686ad5
to
f350b14
Compare
6a11f4a
to
bc61aa4
Compare
c1c2bbc
to
8f28c36
Compare
7b546d0
to
1e1ea50
Compare
1e1ea50
to
fea3475
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work, this is going to be so useful!
Quick pass, not tested yet.
executionStatus: ExecutionStatus, | ||
) { | ||
// Wait here in-case execution concurrency limit is reached | ||
await this.getQueue(executionData.executionMode).enqueue(executionId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd've expected all this to be encapsulated? As in, a ConcurrencyService
that contains the two queues and is smart enough to identify manual and production executions without being told what kind they are, e.g. this.concurrencyService.enqueue(execution)
@@ -21,6 +24,11 @@ import { Logger } from '@/Logger'; | |||
|
|||
@Service() | |||
export class ActiveExecutions { | |||
private queues = { | |||
manual: new ConcurrencyQueue(config.getEnv('executions.manualConcurrency')), | |||
others: new ConcurrencyQueue(config.getEnv('executions.concurrency')), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or is there a reason to keep it intentionally vague?
others: new ConcurrencyQueue(config.getEnv('executions.concurrency')), | |
production: new ConcurrencyQueue(config.getEnv('executions.concurrency')), |
packages/cli/src/ActiveExecutions.ts
Outdated
getRunningExecutionIds() { | ||
const executions = Object.entries(this.activeExecutions); | ||
return executions.filter(([, value]) => value.status === 'running').map(([id]) => id); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we using this?
packages/cli/src/ConcurrencyQueue.ts
Outdated
@@ -0,0 +1,30 @@ | |||
export class ConcurrencyQueue { | |||
private waiting: Array<[string, () => void]> = []; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
readonly
waiting
is an execution status, is there a better term?paused
?rateLimited
?throttled
?onHold
?- This is a nice use case for labeled tuples:
Array<[executionId: string, resolveFn: () => void]> = [];
packages/cli/src/ConcurrencyQueue.ts
Outdated
|
||
constructor(private capacity: number) {} | ||
|
||
async enqueue(id: string) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
async enqueue(id: string) { | |
async enqueue(executionId: string) { |
}); | ||
|
||
describe('ConcurrencyQueue', () => { | ||
test('should limit concurrency', async () => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We're testing state at various Ts but we're not testing that the concurrency limit is being enforced?
@@ -518,6 +518,7 @@ | |||
"executionDetails.confirmMessage.headline": "Delete Execution?", | |||
"executionDetails.confirmMessage.message": "Are you sure that you want to delete the current execution?", | |||
"executionDetails.deleteExecution": "Delete this execution", | |||
"executionDetails.enqueuedMessage": "Execution hasn't started yet.", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As a user, I'd like to know why the execution hasn't started - if there's a problem or if this is behaving as intended, especially if I'm unaware of the concurrency capacity and I might be seeing this message for a while. Execution was rate-limited
? Might need product input.
@@ -558,6 +559,7 @@ | |||
"executionsList.confirmMessage.message": "Are you sure that you want to delete the {numSelected} selected execution(s)?", | |||
"executionsList.clearSelection": "Clear selection", | |||
"executionsList.error": "Failed", | |||
"executionsList.enqueued": "Enqueued", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A user not on queue mode might be confused by the reference to an implementation detail. Rate limited
? Might need product input.
@@ -162,22 +200,18 @@ export class WorkflowRunner { | |||
this.activeExecutions.attachResponsePromise(executionId, responsePromise); | |||
} | |||
|
|||
if (this.executionsMode === 'queue' && data.executionMode !== 'manual') { | |||
const runInMainProcess = this.executionsMode !== 'queue' || data.executionMode === 'manual'; | |||
if (!runInMainProcess) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's flip for readability?
afterAll(() => { | ||
jest.useRealTimers(); | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this needed? If we don't do this, fake timers will bleed over to other suites?
jumping back on this now. |
More ideas:
|
767c1ce
to
f4baf87
Compare
5e191f7
to
3406f40
Compare
3406f40
to
0ef7883
Compare
closing in favor of #9453 |
This PR implements a very rudimentary concurrency limit in
main
mode, to be a bit more consistent withqueue
mode, and also to prevent the application from thrashing the event-loop when under high load.Ticket: N8N-7052
Review / Merge checklist