Skip to content

Commit

Permalink
fix(waiting-executions) : concurrent waiting executions doesn't follo…
Browse files Browse the repository at this point in the history
…w FIFO (backport #4415) (#4503)

Co-authored-by: Jalander Ramagiri <[email protected]>
  • Loading branch information
mergify[bot] and rjalander authored Aug 15, 2023
1 parent 977d164 commit c8a1745
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,19 +85,15 @@ class CompleteExecutionHandler(
}
}
log.debug("Execution ${execution.id} is with ${execution.status} status and Disabled concurrent executions is ${execution.isLimitConcurrent}")
if (execution.isLimitConcurrent) {
if (execution.status != RUNNING) {
execution.pipelineConfigId?.let {
queue.push(StartWaitingExecutions(it, purgeQueue = !execution.isKeepWaitingPipelines))
}
} else {
log.debug("Not starting waiting executions as execution ${execution.id} is currently RUNNING with Disabled concurrent executions.")
}
} else {
log.debug("Execution ${execution.id} is not Disabled for concurrent executions, no need to run waiting executions")
if (execution.status != RUNNING) {
execution.pipelineConfigId?.let {
queue.push(StartWaitingExecutions(it, purgeQueue = !execution.isKeepWaitingPipelines))
}
} else {
log.debug("Not starting waiting executions as execution ${execution.id} is currently RUNNING.")
}
}
}


private fun CompleteExecution.determineFinalStatus(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ object CompleteExecutionHandlerTest : SubjectSpek<CompleteExecutionHandler>({
stage {
refId = "1"
status = stageStatus
isLimitConcurrent = true
}
}
val message = CompleteExecution(pipeline)
Expand All @@ -136,7 +135,7 @@ object CompleteExecutionHandlerTest : SubjectSpek<CompleteExecutionHandler>({
subject.handle(message)
}

it("triggers any waiting pipelines with concurrent execution is disabled") {
it("triggers any waiting pipelines") {
verify(queue).push(StartWaitingExecutions(configId, !pipeline.isKeepWaitingPipelines))
}

Expand Down Expand Up @@ -384,7 +383,7 @@ object CompleteExecutionHandlerTest : SubjectSpek<CompleteExecutionHandler>({
}
}

describe("when a pipeline has branches and is running with ConcurrentExecutions disabled") {
describe("when a pipeline has branches and running with waiting executions") {
val configId = UUID.randomUUID().toString()
val runningPipeline = pipeline {
pipelineConfigId = configId
Expand All @@ -393,12 +392,10 @@ object CompleteExecutionHandlerTest : SubjectSpek<CompleteExecutionHandler>({
stage {
refId = "11"
status = RUNNING
isLimitConcurrent = true
}
stage {
refId = "12"
status = RUNNING
isLimitConcurrent = true
}
}
val waitingPipeline = pipeline {
Expand All @@ -408,12 +405,10 @@ object CompleteExecutionHandlerTest : SubjectSpek<CompleteExecutionHandler>({
stage {
refId = "21"
status = NOT_STARTED
isLimitConcurrent = true
}
stage {
refId = "22"
status = NOT_STARTED
isLimitConcurrent = true
}
}

Expand All @@ -432,11 +427,63 @@ object CompleteExecutionHandlerTest : SubjectSpek<CompleteExecutionHandler>({
subject.handle(message2)
}

it("triggers any waiting pipelines with concurrent execution is disabled, but not the running Pipeline") {
it("triggers only waiting Pipeline, but not the running Pipeline") {
verify(queue).push(message1, retryDelay)
verify(queue).push(message2, retryDelay)
verify(queue, times(1)).push(StartWaitingExecutions(configId, !waitingPipeline.isKeepWaitingPipelines))
}
}

describe("when pipeline has branches and all executions are RUNNING ") {
val configId = UUID.randomUUID().toString()
val runningPipeline1 = pipeline {
pipelineConfigId = configId
isLimitConcurrent = true
status = RUNNING
stage {
refId = "11"
status = RUNNING
}
stage {
refId = "12"
status = RUNNING
}
}
val runningPipeline2 = pipeline {
pipelineConfigId = configId
isLimitConcurrent = true
status = RUNNING
stage {
refId = "21"
status = RUNNING
}
stage {
refId = "22"
status = RUNNING
}
}

val message1 = CompleteExecution(runningPipeline1)
val message2 = CompleteExecution(runningPipeline2)

beforeGroup {
whenever(repository.retrieve(PIPELINE, message1.executionId)) doReturn runningPipeline1
whenever(repository.retrieve(PIPELINE, message2.executionId)) doReturn runningPipeline2
}

afterGroup(::resetMocks)

on("receiving message") {
subject.handle(message1)
subject.handle(message2)
}

it("never triggers RUNNING Pipeline") {
verify(queue).push(message1, retryDelay)
verify(queue).push(message2, retryDelay)
verify(queue, never()).push(StartWaitingExecutions(configId, !runningPipeline1.isKeepWaitingPipelines))
verify(queue, never()).push(StartWaitingExecutions(configId, !runningPipeline2.isKeepWaitingPipelines))
}
}

})

0 comments on commit c8a1745

Please sign in to comment.