Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(Orchestrator): now aborts outdated tasks #1036

Merged
merged 1 commit into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,9 @@ For changes to the BPDM Helm charts please consult the [changelog](charts/bpdm/C
### Changed

- BPDM Gate: Fix possible out of memory exception when handling large golden record process requests

- BPDM Pool: Fix not resolving golden record tasks on exceptions

- BPDM Gate: Fixed Gate not resending business partner data to the golden record process on error sharing state when member sends the exact same business partner again
- BPDM Orchestrator: Now aborts tasks that are outdated (that is when a Gate will send newer business partner data for the same record to the golden record process)

- BPDM Pool & Gate: Reduce standard batch size for golden record task processing ([#1032](https://github.com/eclipse-tractusx/bpdm/pull/1032))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,21 @@ class GoldenRecordTaskDb(
@Column(name = "site_has_changed")
var siteHasChanged: Boolean?
)

enum class ResultState{
Pending,
Success,
Error,
Aborted
}

enum class StepState{
Queued,
Reserved,
Success,
Error,
Aborted
}
}


Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
package org.eclipse.tractusx.bpdm.orchestrator.repository

import org.eclipse.tractusx.bpdm.orchestrator.entity.DbTimestamp
import org.eclipse.tractusx.bpdm.orchestrator.entity.GateRecordDb
import org.eclipse.tractusx.bpdm.orchestrator.entity.GoldenRecordTaskDb
import org.eclipse.tractusx.orchestrator.api.model.StepState
import org.eclipse.tractusx.orchestrator.api.model.TaskStep
import org.springframework.data.domain.Page
import org.springframework.data.domain.Pageable
Expand All @@ -37,9 +37,11 @@ interface GoldenRecordTaskRepository : CrudRepository<GoldenRecordTaskDb, Long>,
fun findByUuidIn(uuids: Set<UUID>): Set<GoldenRecordTaskDb>

@Query("SELECT task from GoldenRecordTaskDb task WHERE task.processingState.step = :step AND task.processingState.stepState = :stepState")
fun findByStepAndStepState(step: TaskStep, stepState: StepState, pageable: Pageable): Page<GoldenRecordTaskDb>
fun findByStepAndStepState(step: TaskStep, stepState: GoldenRecordTaskDb.StepState, pageable: Pageable): Page<GoldenRecordTaskDb>

fun findByProcessingStatePendingTimeoutBefore(time: DbTimestamp, pageable: Pageable): Page<GoldenRecordTaskDb>

fun findByProcessingStateRetentionTimeoutBefore(time: DbTimestamp, pageable: Pageable): Page<GoldenRecordTaskDb>

fun findTasksByGateRecordAndProcessingStateResultState(record: GateRecordDb, resultState: GoldenRecordTaskDb.ResultState) : Set<GoldenRecordTaskDb>
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class GoldenRecordTaskService(
logger.debug { "Creation of new golden record tasks: executing createTasks() with parameters $createRequest" }

val gateRecords = getOrCreateGateRecords(createRequest.requests)
gateRecords.forEach { abortOutdatedTasks(it) }

return createRequest.requests.zip(gateRecords)
.map { (request, record) -> goldenRecordTaskStateMachine.initTask(createRequest.mode, request.businessPartner, record) }
Expand All @@ -74,7 +75,7 @@ class GoldenRecordTaskService(
logger.debug { "Reservation of next golden record tasks: executing reserveTasksForStep() with parameters $reservationRequest" }
val now = Instant.now()

val foundTasks = taskRepository.findByStepAndStepState(reservationRequest.step, StepState.Queued, Pageable.ofSize(reservationRequest.amount)).content
val foundTasks = taskRepository.findByStepAndStepState(reservationRequest.step, GoldenRecordTaskDb.StepState.Queued, Pageable.ofSize(reservationRequest.amount)).content
val reservedTasks = foundTasks.map { task -> goldenRecordTaskStateMachine.doReserve(task) }
val pendingTimeout = reservedTasks.minOfOrNull { calculateTaskPendingTimeout(it) } ?: now

Expand All @@ -97,17 +98,16 @@ class GoldenRecordTaskService(
val foundTasksByUuid = foundTasks.associateBy { it.uuid.toString() }

resultRequest.results
.forEach { resultEntry ->
val task = foundTasksByUuid[resultEntry.taskId]
?: throw BpdmTaskNotFoundException(resultEntry.taskId)
.map { resultEntry -> Pair(foundTasksByUuid[resultEntry.taskId] ?: throw BpdmTaskNotFoundException(resultEntry.taskId), resultEntry) }
.filterNot { (task, _) -> task.processingState.resultState == GoldenRecordTaskDb.ResultState.Aborted }
.forEach { (task, resultEntry) ->
val step = resultRequest.step
val errors = resultEntry.errors
val resultBusinessPartner = resultEntry.businessPartner

if (errors.isNotEmpty()) {
goldenRecordTaskStateMachine.doResolveTaskToError(task, step, errors)
} else {
goldenRecordTaskStateMachine.resolveTaskStepToSuccess(task, step, resultBusinessPartner)
when{
errors.isNotEmpty() -> goldenRecordTaskStateMachine.doResolveTaskToError(task, step, errors)
else -> goldenRecordTaskStateMachine.resolveTaskStepToSuccess(task, step, resultBusinessPartner)
}
}
}
Expand Down Expand Up @@ -186,6 +186,11 @@ class GoldenRecordTaskService(
gateRecordRepository.save(gateRecord)
}
}

private fun abortOutdatedTasks(record: GateRecordDb){
return taskRepository.findTasksByGateRecordAndProcessingStateResultState(record, GoldenRecordTaskDb.ResultState.Pending)
.forEach { task -> goldenRecordTaskStateMachine.doAbortTask(task) }
}
}

data class PaginationInfo(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ class GoldenRecordTaskStateMachine(
val initialStep = getInitialStep(mode)
val initProcessingState = GoldenRecordTaskDb.ProcessingState(
mode = mode,
resultState = ResultState.Pending,
resultState = GoldenRecordTaskDb.ResultState.Pending,
step = initialStep,
errors = mutableListOf(),
stepState = StepState.Queued,
stepState = GoldenRecordTaskDb.StepState.Queued,
pendingTimeout = Instant.now().plus(taskConfigProperties.taskPendingTimeout).toTimestamp(),
retentionTimeout = null
)
Expand All @@ -65,12 +65,12 @@ class GoldenRecordTaskStateMachine(
logger.debug { "Executing doReserve() with parameters $task" }
val state = task.processingState

if (state.resultState != ResultState.Pending || state.stepState != StepState.Queued) {
if (state.resultState != GoldenRecordTaskDb.ResultState.Pending || state.stepState != GoldenRecordTaskDb.StepState.Queued) {
throw BpdmIllegalStateException(task.uuid, state)
}

// reserved for current step
task.processingState.stepState = StepState.Reserved
task.processingState.stepState = GoldenRecordTaskDb.StepState.Reserved
task.updatedAt = DbTimestamp(Instant.now())

return taskRepository.save(task)
Expand All @@ -84,7 +84,7 @@ class GoldenRecordTaskStateMachine(
logger.debug { "Executing doResolveTaskToSuccess() with parameters $task // $step and $resultBusinessPartner" }
val state = task.processingState

if (state.resultState != ResultState.Pending || state.stepState != StepState.Reserved || state.step != step) {
if (isResolvableForStep(state, step)) {
throw BpdmIllegalStateException(task.uuid, state)
}

Expand All @@ -108,9 +108,10 @@ class GoldenRecordTaskStateMachine(
logger.debug { "Executing doResolveTaskToError() with parameters $task // $step and $errors" }
val state = task.processingState

if (state.resultState != ResultState.Pending || state.stepState != StepState.Reserved || state.step != step) {
if (isResolvableForStep(state, step)) {
throw BpdmIllegalStateException(task.uuid, state)
}

task.processingState.toError(errors.map { requestMapper.toTaskError(it) })
task.updatedAt = DbTimestamp(Instant.now())

Expand All @@ -120,7 +121,7 @@ class GoldenRecordTaskStateMachine(
fun doResolveTaskToTimeout(task: GoldenRecordTaskDb): GoldenRecordTaskDb {
val state = task.processingState

if (state.resultState != ResultState.Pending) {
if (state.resultState != GoldenRecordTaskDb.ResultState.Pending) {
throw BpdmIllegalStateException(task.uuid, state)
}

Expand All @@ -131,27 +132,44 @@ class GoldenRecordTaskStateMachine(
return taskRepository.save(task)
}

fun doAbortTask(task: GoldenRecordTaskDb): GoldenRecordTaskDb{
if(task.processingState.resultState != GoldenRecordTaskDb.ResultState.Pending)
throw BpdmIllegalStateException(task.uuid, task.processingState)

task.processingState.toAborted()
task.updatedAt = DbTimestamp(Instant.now())

return taskRepository.save(task)
}

private fun GoldenRecordTaskDb.ProcessingState.toStep(nextStep: TaskStep) {
step = nextStep
stepState = StepState.Queued
stepState = GoldenRecordTaskDb.StepState.Queued
}

private fun GoldenRecordTaskDb.ProcessingState.toSuccess() {
resultState = ResultState.Success
stepState = StepState.Success
resultState = GoldenRecordTaskDb.ResultState.Success
stepState = GoldenRecordTaskDb.StepState.Success
pendingTimeout = null
retentionTimeout = Instant.now().plus(taskConfigProperties.taskRetentionTimeout).toTimestamp()

}

private fun GoldenRecordTaskDb.ProcessingState.toError(newErrors: List<TaskErrorDb>) {
resultState = ResultState.Error
stepState = StepState.Error
resultState = GoldenRecordTaskDb.ResultState.Error
stepState = GoldenRecordTaskDb.StepState.Error
errors.replace(newErrors)
pendingTimeout = null
retentionTimeout = Instant.now().plus(taskConfigProperties.taskRetentionTimeout).toTimestamp()
}

private fun GoldenRecordTaskDb.ProcessingState.toAborted() {
resultState = GoldenRecordTaskDb.ResultState.Aborted
stepState = GoldenRecordTaskDb.StepState.Aborted
pendingTimeout = null
retentionTimeout = Instant.now().plus(taskConfigProperties.taskRetentionTimeout).toTimestamp()
}

private fun getInitialStep(mode: TaskMode): TaskStep {
return getStepsForMode(mode).first()
}
Expand All @@ -168,4 +186,10 @@ class GoldenRecordTaskStateMachine(
TaskMode.UpdateFromSharingMember -> listOf(TaskStep.CleanAndSync, TaskStep.PoolSync)
TaskMode.UpdateFromPool -> listOf(TaskStep.Clean)
}

private fun isResolvableForStep(state: GoldenRecordTaskDb.ProcessingState, step: TaskStep): Boolean{
return state.resultState != GoldenRecordTaskDb.ResultState.Pending
|| state.stepState != GoldenRecordTaskDb.StepState.Reserved
|| state.step != step
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ class ResponseMapper {
fun toProcessingState(task: GoldenRecordTaskDb, timeout: Instant) =
with(task.processingState) {
TaskProcessingStateDto(
resultState = resultState,
resultState = toResultState(resultState),
step = step,
stepState = stepState,
stepState = toStepState(stepState),
errors = errors.map { toTaskError(it) },
createdAt = task.createdAt.instant,
modifiedAt = task.updatedAt.instant,
Expand Down Expand Up @@ -220,4 +220,21 @@ class ResponseMapper {
)
}

fun toResultState(resultState: GoldenRecordTaskDb.ResultState) =
when(resultState){
GoldenRecordTaskDb.ResultState.Pending -> ResultState.Pending
GoldenRecordTaskDb.ResultState.Success -> ResultState.Success
GoldenRecordTaskDb.ResultState.Error -> ResultState.Error
GoldenRecordTaskDb.ResultState.Aborted -> ResultState.Error
}

fun toStepState(stepState: GoldenRecordTaskDb.StepState) =
when(stepState){
GoldenRecordTaskDb.StepState.Queued -> StepState.Queued
GoldenRecordTaskDb.StepState.Reserved -> StepState.Reserved
GoldenRecordTaskDb.StepState.Success -> StepState.Success
GoldenRecordTaskDb.StepState.Error -> StepState.Error
GoldenRecordTaskDb.StepState.Aborted -> StepState.Error
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
ALTER TABLE golden_record_tasks
DROP CONSTRAINT golden_record_tasks_task_step_state_check;

ALTER TABLE golden_record_tasks
ADD CONSTRAINT golden_record_tasks_task_step_state_check
check ( task_step_state in ('Queued', 'Reserved', 'Success', 'Error', 'Aborted'));

ALTER TABLE golden_record_tasks
DROP CONSTRAINT golden_record_tasks_task_result_state_check;

ALTER TABLE golden_record_tasks
ADD CONSTRAINT golden_record_tasks_task_result_state_check
check ( task_result_state in ('Pending', 'Success', 'Error', 'Aborted'));
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.eclipse.tractusx.orchestrator.api.client.OrchestrationApiClient
import org.eclipse.tractusx.orchestrator.api.model.*
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertDoesNotThrow
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.test.context.SpringBootTest
import org.springframework.http.HttpStatus
Expand Down Expand Up @@ -547,6 +548,41 @@ class GoldenRecordTaskControllerIT @Autowired constructor(
}
}

@Test
fun `abort task when outdated`(){
val createdTasks = createTasks(entries = listOf(defaultBusinessPartner1, defaultBusinessPartner2).map { TaskCreateRequestEntry(null, it) }).createdTasks
val createdTaskIds = createdTasks.map { it.taskId }
val createdRecordIds = createdTasks.map { it.recordId }

//Create newer tasks for the given records
createTasks(entries = createdRecordIds
.zip(listOf(defaultBusinessPartner1, defaultBusinessPartner2))
.map { (recordId, bp) -> TaskCreateRequestEntry(recordId, bp) }
).createdTasks

val olderTasks = searchTaskStates(createdTaskIds).tasks

olderTasks.forEach { assertThat(it.processingState.resultState).isEqualTo(ResultState.Error) }
}

@Test
fun `aborted task throws no error when trying to resolve`(){
val createdTasks = createTasks(entries = listOf(defaultBusinessPartner1, defaultBusinessPartner2).map { TaskCreateRequestEntry(null, it) }).createdTasks
val createdRecordIds = createdTasks.map { it.recordId }

val reservedTasks = reserveTasks(TaskStep.CleanAndSync, amount = 2).reservedTasks

//Create newer tasks for the given records
createTasks(entries = createdRecordIds
.zip(listOf(defaultBusinessPartner1, defaultBusinessPartner2))
.map { (recordId, bp) -> TaskCreateRequestEntry(recordId, bp) }
).createdTasks

assertDoesNotThrow {
resolveTasks(TaskStep.CleanAndSync, reservedTasks.map { TaskStepResultEntryDto(it.taskId, it.businessPartner) })
}
}

private fun createTasks(mode: TaskMode = TaskMode.UpdateFromSharingMember,
entries: List<TaskCreateRequestEntry>? = null
): TaskCreateResponse{
Expand Down
Loading