diff --git a/CHANGELOG.md b/CHANGELOG.md index fea3c3b0d..6da1cf621 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,10 +15,9 @@ For changes to the BPDM Helm charts please consult the [changelog](charts/bpdm/C ### Changed - BPDM Gate: Fix possible out of memory exception when handling large golden record process requests - - BPDM Pool: Fix not resolving golden record tasks on exceptions - - BPDM Gate: Fixed Gate not resending business partner data to the golden record process on error sharing state when member sends the exact same business partner again +- BPDM Orchestrator: Now aborts tasks that are outdated (that is when a Gate will send newer business partner data for the same record to the golden record process) ## [6.1.0] - [2024-07-15] diff --git a/bpdm-orchestrator/src/main/kotlin/org/eclipse/tractusx/bpdm/orchestrator/entity/GoldenRecordTaskDb.kt b/bpdm-orchestrator/src/main/kotlin/org/eclipse/tractusx/bpdm/orchestrator/entity/GoldenRecordTaskDb.kt index a22b25be7..455c82d95 100644 --- a/bpdm-orchestrator/src/main/kotlin/org/eclipse/tractusx/bpdm/orchestrator/entity/GoldenRecordTaskDb.kt +++ b/bpdm-orchestrator/src/main/kotlin/org/eclipse/tractusx/bpdm/orchestrator/entity/GoldenRecordTaskDb.kt @@ -180,6 +180,21 @@ class GoldenRecordTaskDb( @Column(name = "site_has_changed") var siteHasChanged: Boolean? ) + + enum class ResultState{ + Pending, + Success, + Error, + Aborted + } + + enum class StepState{ + Queued, + Reserved, + Success, + Error, + Aborted + } } diff --git a/bpdm-orchestrator/src/main/kotlin/org/eclipse/tractusx/bpdm/orchestrator/repository/GoldenRecordTaskRepository.kt b/bpdm-orchestrator/src/main/kotlin/org/eclipse/tractusx/bpdm/orchestrator/repository/GoldenRecordTaskRepository.kt index df86b6ec5..6dd56a786 100644 --- a/bpdm-orchestrator/src/main/kotlin/org/eclipse/tractusx/bpdm/orchestrator/repository/GoldenRecordTaskRepository.kt +++ b/bpdm-orchestrator/src/main/kotlin/org/eclipse/tractusx/bpdm/orchestrator/repository/GoldenRecordTaskRepository.kt @@ -20,8 +20,8 @@ package org.eclipse.tractusx.bpdm.orchestrator.repository import org.eclipse.tractusx.bpdm.orchestrator.entity.DbTimestamp +import org.eclipse.tractusx.bpdm.orchestrator.entity.GateRecordDb import org.eclipse.tractusx.bpdm.orchestrator.entity.GoldenRecordTaskDb -import org.eclipse.tractusx.orchestrator.api.model.StepState import org.eclipse.tractusx.orchestrator.api.model.TaskStep import org.springframework.data.domain.Page import org.springframework.data.domain.Pageable @@ -37,9 +37,11 @@ interface GoldenRecordTaskRepository : CrudRepository, fun findByUuidIn(uuids: Set): Set @Query("SELECT task from GoldenRecordTaskDb task WHERE task.processingState.step = :step AND task.processingState.stepState = :stepState") - fun findByStepAndStepState(step: TaskStep, stepState: StepState, pageable: Pageable): Page + fun findByStepAndStepState(step: TaskStep, stepState: GoldenRecordTaskDb.StepState, pageable: Pageable): Page fun findByProcessingStatePendingTimeoutBefore(time: DbTimestamp): Set fun findByProcessingStateRetentionTimeoutBefore(time: DbTimestamp): Set + + fun findTasksByGateRecordAndProcessingStateResultState(record: GateRecordDb, resultState: GoldenRecordTaskDb.ResultState) : Set } \ No newline at end of file diff --git a/bpdm-orchestrator/src/main/kotlin/org/eclipse/tractusx/bpdm/orchestrator/service/GoldenRecordTaskService.kt b/bpdm-orchestrator/src/main/kotlin/org/eclipse/tractusx/bpdm/orchestrator/service/GoldenRecordTaskService.kt index f55ee8782..3da3b12da 100644 --- a/bpdm-orchestrator/src/main/kotlin/org/eclipse/tractusx/bpdm/orchestrator/service/GoldenRecordTaskService.kt +++ b/bpdm-orchestrator/src/main/kotlin/org/eclipse/tractusx/bpdm/orchestrator/service/GoldenRecordTaskService.kt @@ -21,8 +21,8 @@ package org.eclipse.tractusx.bpdm.orchestrator.service import mu.KotlinLogging import org.eclipse.tractusx.bpdm.orchestrator.config.TaskConfigProperties -import org.eclipse.tractusx.bpdm.orchestrator.entity.GateRecordDb import org.eclipse.tractusx.bpdm.orchestrator.entity.DbTimestamp +import org.eclipse.tractusx.bpdm.orchestrator.entity.GateRecordDb import org.eclipse.tractusx.bpdm.orchestrator.entity.GoldenRecordTaskDb import org.eclipse.tractusx.bpdm.orchestrator.exception.BpdmRecordNotFoundException import org.eclipse.tractusx.bpdm.orchestrator.exception.BpdmTaskNotFoundException @@ -52,6 +52,7 @@ class GoldenRecordTaskService( logger.debug { "Creation of new golden record tasks: executing createTasks() with parameters $createRequest" } val gateRecords = getOrCreateGateRecords(createRequest.requests) + gateRecords.forEach { abortOutdatedTasks(it) } return createRequest.requests.zip(gateRecords) .map { (request, record) -> goldenRecordTaskStateMachine.initTask(createRequest.mode, request.businessPartner, record) } @@ -73,7 +74,7 @@ class GoldenRecordTaskService( logger.debug { "Reservation of next golden record tasks: executing reserveTasksForStep() with parameters $reservationRequest" } val now = Instant.now() - val foundTasks = taskRepository.findByStepAndStepState(reservationRequest.step, StepState.Queued, Pageable.ofSize(reservationRequest.amount)).content + val foundTasks = taskRepository.findByStepAndStepState(reservationRequest.step, GoldenRecordTaskDb.StepState.Queued, Pageable.ofSize(reservationRequest.amount)).content val reservedTasks = foundTasks.map { task -> goldenRecordTaskStateMachine.doReserve(task) } val pendingTimeout = reservedTasks.minOfOrNull { calculateTaskPendingTimeout(it) } ?: now @@ -90,17 +91,16 @@ class GoldenRecordTaskService( val foundTasksByUuid = foundTasks.associateBy { it.uuid.toString() } resultRequest.results - .forEach { resultEntry -> - val task = foundTasksByUuid[resultEntry.taskId] - ?: throw BpdmTaskNotFoundException(resultEntry.taskId) + .map { resultEntry -> Pair(foundTasksByUuid[resultEntry.taskId] ?: throw BpdmTaskNotFoundException(resultEntry.taskId), resultEntry) } + .filterNot { (task, _) -> task.processingState.resultState == GoldenRecordTaskDb.ResultState.Aborted } + .forEach { (task, resultEntry) -> val step = resultRequest.step val errors = resultEntry.errors val resultBusinessPartner = resultEntry.businessPartner - if (errors.isNotEmpty()) { - goldenRecordTaskStateMachine.doResolveTaskToError(task, step, errors) - } else { - goldenRecordTaskStateMachine.resolveTaskStepToSuccess(task, step, resultBusinessPartner) + when{ + errors.isNotEmpty() -> goldenRecordTaskStateMachine.doResolveTaskToError(task, step, errors) + else -> goldenRecordTaskStateMachine.resolveTaskStepToSuccess(task, step, resultBusinessPartner) } } } @@ -170,4 +170,9 @@ class GoldenRecordTaskService( gateRecordRepository.save(gateRecord) } } + + private fun abortOutdatedTasks(record: GateRecordDb){ + return taskRepository.findTasksByGateRecordAndProcessingStateResultState(record, GoldenRecordTaskDb.ResultState.Pending) + .forEach { task -> goldenRecordTaskStateMachine.doAbortTask(task) } + } } diff --git a/bpdm-orchestrator/src/main/kotlin/org/eclipse/tractusx/bpdm/orchestrator/service/GoldenRecordTaskStateMachine.kt b/bpdm-orchestrator/src/main/kotlin/org/eclipse/tractusx/bpdm/orchestrator/service/GoldenRecordTaskStateMachine.kt index 327a6fe09..8eafb21ff 100644 --- a/bpdm-orchestrator/src/main/kotlin/org/eclipse/tractusx/bpdm/orchestrator/service/GoldenRecordTaskStateMachine.kt +++ b/bpdm-orchestrator/src/main/kotlin/org/eclipse/tractusx/bpdm/orchestrator/service/GoldenRecordTaskStateMachine.kt @@ -44,10 +44,10 @@ class GoldenRecordTaskStateMachine( val initialStep = getInitialStep(mode) val initProcessingState = GoldenRecordTaskDb.ProcessingState( mode = mode, - resultState = ResultState.Pending, + resultState = GoldenRecordTaskDb.ResultState.Pending, step = initialStep, errors = mutableListOf(), - stepState = StepState.Queued, + stepState = GoldenRecordTaskDb.StepState.Queued, pendingTimeout = Instant.now().plus(taskConfigProperties.taskPendingTimeout).toTimestamp(), retentionTimeout = null ) @@ -65,12 +65,12 @@ class GoldenRecordTaskStateMachine( logger.debug { "Executing doReserve() with parameters $task" } val state = task.processingState - if (state.resultState != ResultState.Pending || state.stepState != StepState.Queued) { + if (state.resultState != GoldenRecordTaskDb.ResultState.Pending || state.stepState != GoldenRecordTaskDb.StepState.Queued) { throw BpdmIllegalStateException(task.uuid, state) } // reserved for current step - task.processingState.stepState = StepState.Reserved + task.processingState.stepState = GoldenRecordTaskDb.StepState.Reserved task.updatedAt = DbTimestamp(Instant.now()) return taskRepository.save(task) @@ -84,7 +84,7 @@ class GoldenRecordTaskStateMachine( logger.debug { "Executing doResolveTaskToSuccess() with parameters $task // $step and $resultBusinessPartner" } val state = task.processingState - if (state.resultState != ResultState.Pending || state.stepState != StepState.Reserved || state.step != step) { + if (isResolvableForStep(state, step)) { throw BpdmIllegalStateException(task.uuid, state) } @@ -108,9 +108,10 @@ class GoldenRecordTaskStateMachine( logger.debug { "Executing doResolveTaskToError() with parameters $task // $step and $errors" } val state = task.processingState - if (state.resultState != ResultState.Pending || state.stepState != StepState.Reserved || state.step != step) { + if (isResolvableForStep(state, step)) { throw BpdmIllegalStateException(task.uuid, state) } + task.processingState.toError(errors.map { requestMapper.toTaskError(it) }) task.updatedAt = DbTimestamp(Instant.now()) @@ -120,7 +121,7 @@ class GoldenRecordTaskStateMachine( fun doResolveTaskToTimeout(task: GoldenRecordTaskDb): GoldenRecordTaskDb { val state = task.processingState - if (state.resultState != ResultState.Pending) { + if (state.resultState != GoldenRecordTaskDb.ResultState.Pending) { throw BpdmIllegalStateException(task.uuid, state) } @@ -131,27 +132,44 @@ class GoldenRecordTaskStateMachine( return taskRepository.save(task) } + fun doAbortTask(task: GoldenRecordTaskDb): GoldenRecordTaskDb{ + if(task.processingState.resultState != GoldenRecordTaskDb.ResultState.Pending) + throw BpdmIllegalStateException(task.uuid, task.processingState) + + task.processingState.toAborted() + task.updatedAt = DbTimestamp(Instant.now()) + + return taskRepository.save(task) + } + private fun GoldenRecordTaskDb.ProcessingState.toStep(nextStep: TaskStep) { step = nextStep - stepState = StepState.Queued + stepState = GoldenRecordTaskDb.StepState.Queued } private fun GoldenRecordTaskDb.ProcessingState.toSuccess() { - resultState = ResultState.Success - stepState = StepState.Success + resultState = GoldenRecordTaskDb.ResultState.Success + stepState = GoldenRecordTaskDb.StepState.Success pendingTimeout = null retentionTimeout = Instant.now().plus(taskConfigProperties.taskRetentionTimeout).toTimestamp() } private fun GoldenRecordTaskDb.ProcessingState.toError(newErrors: List) { - resultState = ResultState.Error - stepState = StepState.Error + resultState = GoldenRecordTaskDb.ResultState.Error + stepState = GoldenRecordTaskDb.StepState.Error errors.replace(newErrors) pendingTimeout = null retentionTimeout = Instant.now().plus(taskConfigProperties.taskRetentionTimeout).toTimestamp() } + private fun GoldenRecordTaskDb.ProcessingState.toAborted() { + resultState = GoldenRecordTaskDb.ResultState.Aborted + stepState = GoldenRecordTaskDb.StepState.Aborted + pendingTimeout = null + retentionTimeout = Instant.now().plus(taskConfigProperties.taskRetentionTimeout).toTimestamp() + } + private fun getInitialStep(mode: TaskMode): TaskStep { return getStepsForMode(mode).first() } @@ -168,4 +186,10 @@ class GoldenRecordTaskStateMachine( TaskMode.UpdateFromSharingMember -> listOf(TaskStep.CleanAndSync, TaskStep.PoolSync) TaskMode.UpdateFromPool -> listOf(TaskStep.Clean) } + + private fun isResolvableForStep(state: GoldenRecordTaskDb.ProcessingState, step: TaskStep): Boolean{ + return state.resultState != GoldenRecordTaskDb.ResultState.Pending + || state.stepState != GoldenRecordTaskDb.StepState.Reserved + || state.step != step + } } diff --git a/bpdm-orchestrator/src/main/kotlin/org/eclipse/tractusx/bpdm/orchestrator/service/ResponseMapper.kt b/bpdm-orchestrator/src/main/kotlin/org/eclipse/tractusx/bpdm/orchestrator/service/ResponseMapper.kt index 192751046..7091b128f 100644 --- a/bpdm-orchestrator/src/main/kotlin/org/eclipse/tractusx/bpdm/orchestrator/service/ResponseMapper.kt +++ b/bpdm-orchestrator/src/main/kotlin/org/eclipse/tractusx/bpdm/orchestrator/service/ResponseMapper.kt @@ -41,9 +41,9 @@ class ResponseMapper { fun toProcessingState(task: GoldenRecordTaskDb, timeout: Instant) = with(task.processingState) { TaskProcessingStateDto( - resultState = resultState, + resultState = toResultState(resultState), step = step, - stepState = stepState, + stepState = toStepState(stepState), errors = errors.map { toTaskError(it) }, createdAt = task.createdAt.instant, modifiedAt = task.updatedAt.instant, @@ -220,4 +220,21 @@ class ResponseMapper { ) } + fun toResultState(resultState: GoldenRecordTaskDb.ResultState) = + when(resultState){ + GoldenRecordTaskDb.ResultState.Pending -> ResultState.Pending + GoldenRecordTaskDb.ResultState.Success -> ResultState.Success + GoldenRecordTaskDb.ResultState.Error -> ResultState.Error + GoldenRecordTaskDb.ResultState.Aborted -> ResultState.Error + } + + fun toStepState(stepState: GoldenRecordTaskDb.StepState) = + when(stepState){ + GoldenRecordTaskDb.StepState.Queued -> StepState.Queued + GoldenRecordTaskDb.StepState.Reserved -> StepState.Reserved + GoldenRecordTaskDb.StepState.Success -> StepState.Success + GoldenRecordTaskDb.StepState.Error -> StepState.Error + GoldenRecordTaskDb.StepState.Aborted -> StepState.Error + } + } diff --git a/bpdm-orchestrator/src/main/resources/db/migration/V6_2_0_0__add_abort_status_check.sql b/bpdm-orchestrator/src/main/resources/db/migration/V6_2_0_0__add_abort_status_check.sql new file mode 100644 index 000000000..0b3fcedba --- /dev/null +++ b/bpdm-orchestrator/src/main/resources/db/migration/V6_2_0_0__add_abort_status_check.sql @@ -0,0 +1,13 @@ +ALTER TABLE golden_record_tasks +DROP CONSTRAINT golden_record_tasks_task_step_state_check; + +ALTER TABLE golden_record_tasks +ADD CONSTRAINT golden_record_tasks_task_step_state_check +check ( task_step_state in ('Queued', 'Reserved', 'Success', 'Error', 'Aborted')); + +ALTER TABLE golden_record_tasks +DROP CONSTRAINT golden_record_tasks_task_result_state_check; + +ALTER TABLE golden_record_tasks +ADD CONSTRAINT golden_record_tasks_task_result_state_check +check ( task_result_state in ('Pending', 'Success', 'Error', 'Aborted')); diff --git a/bpdm-orchestrator/src/test/kotlin/org/eclipse/tractusx/bpdm/orchestrator/controller/GoldenRecordTaskControllerIT.kt b/bpdm-orchestrator/src/test/kotlin/org/eclipse/tractusx/bpdm/orchestrator/controller/GoldenRecordTaskControllerIT.kt index fbbcba011..84598fef3 100644 --- a/bpdm-orchestrator/src/test/kotlin/org/eclipse/tractusx/bpdm/orchestrator/controller/GoldenRecordTaskControllerIT.kt +++ b/bpdm-orchestrator/src/test/kotlin/org/eclipse/tractusx/bpdm/orchestrator/controller/GoldenRecordTaskControllerIT.kt @@ -30,6 +30,7 @@ import org.eclipse.tractusx.orchestrator.api.client.OrchestrationApiClient import org.eclipse.tractusx.orchestrator.api.model.* import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertDoesNotThrow import org.springframework.beans.factory.annotation.Autowired import org.springframework.boot.test.context.SpringBootTest import org.springframework.http.HttpStatus @@ -547,6 +548,41 @@ class GoldenRecordTaskControllerIT @Autowired constructor( } } + @Test + fun `abort task when outdated`(){ + val createdTasks = createTasks(entries = listOf(defaultBusinessPartner1, defaultBusinessPartner2).map { TaskCreateRequestEntry(null, it) }).createdTasks + val createdTaskIds = createdTasks.map { it.taskId } + val createdRecordIds = createdTasks.map { it.recordId } + + //Create newer tasks for the given records + createTasks(entries = createdRecordIds + .zip(listOf(defaultBusinessPartner1, defaultBusinessPartner2)) + .map { (recordId, bp) -> TaskCreateRequestEntry(recordId, bp) } + ).createdTasks + + val olderTasks = searchTaskStates(createdTaskIds).tasks + + olderTasks.forEach { assertThat(it.processingState.resultState).isEqualTo(ResultState.Error) } + } + + @Test + fun `aborted task throws no error when trying to resolve`(){ + val createdTasks = createTasks(entries = listOf(defaultBusinessPartner1, defaultBusinessPartner2).map { TaskCreateRequestEntry(null, it) }).createdTasks + val createdRecordIds = createdTasks.map { it.recordId } + + val reservedTasks = reserveTasks(TaskStep.CleanAndSync, amount = 2).reservedTasks + + //Create newer tasks for the given records + createTasks(entries = createdRecordIds + .zip(listOf(defaultBusinessPartner1, defaultBusinessPartner2)) + .map { (recordId, bp) -> TaskCreateRequestEntry(recordId, bp) } + ).createdTasks + + assertDoesNotThrow { + resolveTasks(TaskStep.CleanAndSync, reservedTasks.map { TaskStepResultEntryDto(it.taskId, it.businessPartner) }) + } + } + private fun createTasks(mode: TaskMode = TaskMode.UpdateFromSharingMember, entries: List? = null ): TaskCreateResponse{