Skip to content

Commit

Permalink
Merge pull request #1036 from eclipse-tractusx/feat/orchestrator-abort
Browse files Browse the repository at this point in the history
feat(Orchestrator): now aborts outdated tasks
  • Loading branch information
nicoprow authored Aug 26, 2024
2 parents c6bee9b + 4b36fd3 commit ecd3108
Show file tree
Hide file tree
Showing 9 changed files with 157 additions and 40 deletions.
3 changes: 1 addition & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,9 @@ For changes to the BPDM Helm charts please consult the [changelog](charts/bpdm/C
### Changed

- BPDM Gate: Fix possible out of memory exception when handling large golden record process requests

- BPDM Pool: Fix not resolving golden record tasks on exceptions

- BPDM Gate: Fixed Gate not resending business partner data to the golden record process on error sharing state when member sends the exact same business partner again
- BPDM Orchestrator: Now aborts tasks that are outdated (that is when a Gate will send newer business partner data for the same record to the golden record process)

- BPDM Pool & Gate: Reduce standard batch size for golden record task processing ([#1032](https://github.com/eclipse-tractusx/bpdm/pull/1032))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,21 @@ class GoldenRecordTaskDb(
@Column(name = "site_has_changed")
var siteHasChanged: Boolean?
)

enum class ResultState{
Pending,
Success,
Error,
Aborted
}

enum class StepState{
Queued,
Reserved,
Success,
Error,
Aborted
}
}


Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
package org.eclipse.tractusx.bpdm.orchestrator.repository

import org.eclipse.tractusx.bpdm.orchestrator.entity.DbTimestamp
import org.eclipse.tractusx.bpdm.orchestrator.entity.GateRecordDb
import org.eclipse.tractusx.bpdm.orchestrator.entity.GoldenRecordTaskDb
import org.eclipse.tractusx.orchestrator.api.model.StepState
import org.eclipse.tractusx.orchestrator.api.model.TaskStep
import org.springframework.data.domain.Page
import org.springframework.data.domain.Pageable
Expand All @@ -37,9 +37,11 @@ interface GoldenRecordTaskRepository : CrudRepository<GoldenRecordTaskDb, Long>,
fun findByUuidIn(uuids: Set<UUID>): Set<GoldenRecordTaskDb>

@Query("SELECT task from GoldenRecordTaskDb task WHERE task.processingState.step = :step AND task.processingState.stepState = :stepState")
fun findByStepAndStepState(step: TaskStep, stepState: StepState, pageable: Pageable): Page<GoldenRecordTaskDb>
fun findByStepAndStepState(step: TaskStep, stepState: GoldenRecordTaskDb.StepState, pageable: Pageable): Page<GoldenRecordTaskDb>

fun findByProcessingStatePendingTimeoutBefore(time: DbTimestamp, pageable: Pageable): Page<GoldenRecordTaskDb>

fun findByProcessingStateRetentionTimeoutBefore(time: DbTimestamp, pageable: Pageable): Page<GoldenRecordTaskDb>

fun findTasksByGateRecordAndProcessingStateResultState(record: GateRecordDb, resultState: GoldenRecordTaskDb.ResultState) : Set<GoldenRecordTaskDb>
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class GoldenRecordTaskService(
logger.debug { "Creation of new golden record tasks: executing createTasks() with parameters $createRequest" }

val gateRecords = getOrCreateGateRecords(createRequest.requests)
gateRecords.forEach { abortOutdatedTasks(it) }

return createRequest.requests.zip(gateRecords)
.map { (request, record) -> goldenRecordTaskStateMachine.initTask(createRequest.mode, request.businessPartner, record) }
Expand All @@ -74,7 +75,7 @@ class GoldenRecordTaskService(
logger.debug { "Reservation of next golden record tasks: executing reserveTasksForStep() with parameters $reservationRequest" }
val now = Instant.now()

val foundTasks = taskRepository.findByStepAndStepState(reservationRequest.step, StepState.Queued, Pageable.ofSize(reservationRequest.amount)).content
val foundTasks = taskRepository.findByStepAndStepState(reservationRequest.step, GoldenRecordTaskDb.StepState.Queued, Pageable.ofSize(reservationRequest.amount)).content
val reservedTasks = foundTasks.map { task -> goldenRecordTaskStateMachine.doReserve(task) }
val pendingTimeout = reservedTasks.minOfOrNull { calculateTaskPendingTimeout(it) } ?: now

Expand All @@ -97,17 +98,16 @@ class GoldenRecordTaskService(
val foundTasksByUuid = foundTasks.associateBy { it.uuid.toString() }

resultRequest.results
.forEach { resultEntry ->
val task = foundTasksByUuid[resultEntry.taskId]
?: throw BpdmTaskNotFoundException(resultEntry.taskId)
.map { resultEntry -> Pair(foundTasksByUuid[resultEntry.taskId] ?: throw BpdmTaskNotFoundException(resultEntry.taskId), resultEntry) }
.filterNot { (task, _) -> task.processingState.resultState == GoldenRecordTaskDb.ResultState.Aborted }
.forEach { (task, resultEntry) ->
val step = resultRequest.step
val errors = resultEntry.errors
val resultBusinessPartner = resultEntry.businessPartner

if (errors.isNotEmpty()) {
goldenRecordTaskStateMachine.doResolveTaskToError(task, step, errors)
} else {
goldenRecordTaskStateMachine.resolveTaskStepToSuccess(task, step, resultBusinessPartner)
when{
errors.isNotEmpty() -> goldenRecordTaskStateMachine.doResolveTaskToError(task, step, errors)
else -> goldenRecordTaskStateMachine.resolveTaskStepToSuccess(task, step, resultBusinessPartner)
}
}
}
Expand Down Expand Up @@ -186,6 +186,11 @@ class GoldenRecordTaskService(
gateRecordRepository.save(gateRecord)
}
}

private fun abortOutdatedTasks(record: GateRecordDb){
return taskRepository.findTasksByGateRecordAndProcessingStateResultState(record, GoldenRecordTaskDb.ResultState.Pending)
.forEach { task -> goldenRecordTaskStateMachine.doAbortTask(task) }
}
}

data class PaginationInfo(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ class GoldenRecordTaskStateMachine(
val initialStep = getInitialStep(mode)
val initProcessingState = GoldenRecordTaskDb.ProcessingState(
mode = mode,
resultState = ResultState.Pending,
resultState = GoldenRecordTaskDb.ResultState.Pending,
step = initialStep,
errors = mutableListOf(),
stepState = StepState.Queued,
stepState = GoldenRecordTaskDb.StepState.Queued,
pendingTimeout = Instant.now().plus(taskConfigProperties.taskPendingTimeout).toTimestamp(),
retentionTimeout = null
)
Expand All @@ -65,12 +65,12 @@ class GoldenRecordTaskStateMachine(
logger.debug { "Executing doReserve() with parameters $task" }
val state = task.processingState

if (state.resultState != ResultState.Pending || state.stepState != StepState.Queued) {
if (state.resultState != GoldenRecordTaskDb.ResultState.Pending || state.stepState != GoldenRecordTaskDb.StepState.Queued) {
throw BpdmIllegalStateException(task.uuid, state)
}

// reserved for current step
task.processingState.stepState = StepState.Reserved
task.processingState.stepState = GoldenRecordTaskDb.StepState.Reserved
task.updatedAt = DbTimestamp(Instant.now())

return taskRepository.save(task)
Expand All @@ -84,7 +84,7 @@ class GoldenRecordTaskStateMachine(
logger.debug { "Executing doResolveTaskToSuccess() with parameters $task // $step and $resultBusinessPartner" }
val state = task.processingState

if (state.resultState != ResultState.Pending || state.stepState != StepState.Reserved || state.step != step) {
if (isResolvableForStep(state, step)) {
throw BpdmIllegalStateException(task.uuid, state)
}

Expand All @@ -108,9 +108,10 @@ class GoldenRecordTaskStateMachine(
logger.debug { "Executing doResolveTaskToError() with parameters $task // $step and $errors" }
val state = task.processingState

if (state.resultState != ResultState.Pending || state.stepState != StepState.Reserved || state.step != step) {
if (isResolvableForStep(state, step)) {
throw BpdmIllegalStateException(task.uuid, state)
}

task.processingState.toError(errors.map { requestMapper.toTaskError(it) })
task.updatedAt = DbTimestamp(Instant.now())

Expand All @@ -120,7 +121,7 @@ class GoldenRecordTaskStateMachine(
fun doResolveTaskToTimeout(task: GoldenRecordTaskDb): GoldenRecordTaskDb {
val state = task.processingState

if (state.resultState != ResultState.Pending) {
if (state.resultState != GoldenRecordTaskDb.ResultState.Pending) {
throw BpdmIllegalStateException(task.uuid, state)
}

Expand All @@ -131,27 +132,44 @@ class GoldenRecordTaskStateMachine(
return taskRepository.save(task)
}

fun doAbortTask(task: GoldenRecordTaskDb): GoldenRecordTaskDb{
if(task.processingState.resultState != GoldenRecordTaskDb.ResultState.Pending)
throw BpdmIllegalStateException(task.uuid, task.processingState)

task.processingState.toAborted()
task.updatedAt = DbTimestamp(Instant.now())

return taskRepository.save(task)
}

private fun GoldenRecordTaskDb.ProcessingState.toStep(nextStep: TaskStep) {
step = nextStep
stepState = StepState.Queued
stepState = GoldenRecordTaskDb.StepState.Queued
}

private fun GoldenRecordTaskDb.ProcessingState.toSuccess() {
resultState = ResultState.Success
stepState = StepState.Success
resultState = GoldenRecordTaskDb.ResultState.Success
stepState = GoldenRecordTaskDb.StepState.Success
pendingTimeout = null
retentionTimeout = Instant.now().plus(taskConfigProperties.taskRetentionTimeout).toTimestamp()

}

private fun GoldenRecordTaskDb.ProcessingState.toError(newErrors: List<TaskErrorDb>) {
resultState = ResultState.Error
stepState = StepState.Error
resultState = GoldenRecordTaskDb.ResultState.Error
stepState = GoldenRecordTaskDb.StepState.Error
errors.replace(newErrors)
pendingTimeout = null
retentionTimeout = Instant.now().plus(taskConfigProperties.taskRetentionTimeout).toTimestamp()
}

private fun GoldenRecordTaskDb.ProcessingState.toAborted() {
resultState = GoldenRecordTaskDb.ResultState.Aborted
stepState = GoldenRecordTaskDb.StepState.Aborted
pendingTimeout = null
retentionTimeout = Instant.now().plus(taskConfigProperties.taskRetentionTimeout).toTimestamp()
}

private fun getInitialStep(mode: TaskMode): TaskStep {
return getStepsForMode(mode).first()
}
Expand All @@ -168,4 +186,10 @@ class GoldenRecordTaskStateMachine(
TaskMode.UpdateFromSharingMember -> listOf(TaskStep.CleanAndSync, TaskStep.PoolSync)
TaskMode.UpdateFromPool -> listOf(TaskStep.Clean)
}

private fun isResolvableForStep(state: GoldenRecordTaskDb.ProcessingState, step: TaskStep): Boolean{
return state.resultState != GoldenRecordTaskDb.ResultState.Pending
|| state.stepState != GoldenRecordTaskDb.StepState.Reserved
|| state.step != step
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ class ResponseMapper {
fun toProcessingState(task: GoldenRecordTaskDb, timeout: Instant) =
with(task.processingState) {
TaskProcessingStateDto(
resultState = resultState,
resultState = toResultState(resultState),
step = step,
stepState = stepState,
stepState = toStepState(stepState),
errors = errors.map { toTaskError(it) },
createdAt = task.createdAt.instant,
modifiedAt = task.updatedAt.instant,
Expand Down Expand Up @@ -220,4 +220,21 @@ class ResponseMapper {
)
}

fun toResultState(resultState: GoldenRecordTaskDb.ResultState) =
when(resultState){
GoldenRecordTaskDb.ResultState.Pending -> ResultState.Pending
GoldenRecordTaskDb.ResultState.Success -> ResultState.Success
GoldenRecordTaskDb.ResultState.Error -> ResultState.Error
GoldenRecordTaskDb.ResultState.Aborted -> ResultState.Error
}

fun toStepState(stepState: GoldenRecordTaskDb.StepState) =
when(stepState){
GoldenRecordTaskDb.StepState.Queued -> StepState.Queued
GoldenRecordTaskDb.StepState.Reserved -> StepState.Reserved
GoldenRecordTaskDb.StepState.Success -> StepState.Success
GoldenRecordTaskDb.StepState.Error -> StepState.Error
GoldenRecordTaskDb.StepState.Aborted -> StepState.Error
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
ALTER TABLE golden_record_tasks
DROP CONSTRAINT golden_record_tasks_task_step_state_check;

ALTER TABLE golden_record_tasks
ADD CONSTRAINT golden_record_tasks_task_step_state_check
check ( task_step_state in ('Queued', 'Reserved', 'Success', 'Error', 'Aborted'));

ALTER TABLE golden_record_tasks
DROP CONSTRAINT golden_record_tasks_task_result_state_check;

ALTER TABLE golden_record_tasks
ADD CONSTRAINT golden_record_tasks_task_result_state_check
check ( task_result_state in ('Pending', 'Success', 'Error', 'Aborted'));
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.eclipse.tractusx.orchestrator.api.client.OrchestrationApiClient
import org.eclipse.tractusx.orchestrator.api.model.*
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertDoesNotThrow
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.test.context.SpringBootTest
import org.springframework.http.HttpStatus
Expand Down Expand Up @@ -547,6 +548,41 @@ class GoldenRecordTaskControllerIT @Autowired constructor(
}
}

@Test
fun `abort task when outdated`(){
val createdTasks = createTasks(entries = listOf(defaultBusinessPartner1, defaultBusinessPartner2).map { TaskCreateRequestEntry(null, it) }).createdTasks
val createdTaskIds = createdTasks.map { it.taskId }
val createdRecordIds = createdTasks.map { it.recordId }

//Create newer tasks for the given records
createTasks(entries = createdRecordIds
.zip(listOf(defaultBusinessPartner1, defaultBusinessPartner2))
.map { (recordId, bp) -> TaskCreateRequestEntry(recordId, bp) }
).createdTasks

val olderTasks = searchTaskStates(createdTaskIds).tasks

olderTasks.forEach { assertThat(it.processingState.resultState).isEqualTo(ResultState.Error) }
}

@Test
fun `aborted task throws no error when trying to resolve`(){
val createdTasks = createTasks(entries = listOf(defaultBusinessPartner1, defaultBusinessPartner2).map { TaskCreateRequestEntry(null, it) }).createdTasks
val createdRecordIds = createdTasks.map { it.recordId }

val reservedTasks = reserveTasks(TaskStep.CleanAndSync, amount = 2).reservedTasks

//Create newer tasks for the given records
createTasks(entries = createdRecordIds
.zip(listOf(defaultBusinessPartner1, defaultBusinessPartner2))
.map { (recordId, bp) -> TaskCreateRequestEntry(recordId, bp) }
).createdTasks

assertDoesNotThrow {
resolveTasks(TaskStep.CleanAndSync, reservedTasks.map { TaskStepResultEntryDto(it.taskId, it.businessPartner) })
}
}

private fun createTasks(mode: TaskMode = TaskMode.UpdateFromSharingMember,
entries: List<TaskCreateRequestEntry>? = null
): TaskCreateResponse{
Expand Down
Loading

0 comments on commit ecd3108

Please sign in to comment.