Skip to content

Commit

Permalink
Merge pull request #535 from catenax-ng/feat/orchestrator_API_resolve…
Browse files Browse the repository at this point in the history
…_step

feat: Extend TaskStepResultRequest in orchestrator-api with step
  • Loading branch information
nicoprow authored Oct 18, 2023
2 parents 599e98c + 8828ceb commit b462219
Show file tree
Hide file tree
Showing 8 changed files with 60 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,26 +41,27 @@ class CleaningServiceDummy(
fun pollForCleaningTasks() {
try {
logger.info { "Starting polling for cleaning tasks from Orchestrator..." }
val step = TaskStep.CleanAndSync

// Step 1: Fetch and reserve the next cleaning request
val cleaningRequest = orchestrationApiClient.goldenRecordTasks
.reserveTasksForStep(TaskStepReservationRequest(amount = 10, TaskStep.CleanAndSync))
.reserveTasksForStep(TaskStepReservationRequest(amount = 10, step))

val cleaningTasks = cleaningRequest.reservedTasks

logger.info { "${cleaningTasks.size} tasks found for cleaning. Proceeding with cleaning..." }

if (cleaningTasks.isNotEmpty()) {

val cleaningResults = cleaningTasks.map { reservedTask ->
// Step 2: Generate dummy cleaning results
processCleaningTask(reservedTask)
}

// Step 3: Send the cleaning result back to the Orchestrator
orchestrationApiClient.goldenRecordTasks.resolveStepResults(TaskStepResultRequest(cleaningResults))
orchestrationApiClient.goldenRecordTasks.resolveStepResults(TaskStepResultRequest(step, cleaningResults))
logger.info { "Cleaning tasks processing completed for this iteration." }
}

} catch (e: Exception) {
logger.error(e) { "Error while processing cleaning task" }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ class CleaningServiceApiCallsTest @Autowired constructor(

@Test
fun `pollForCleaningTasks should reserve and resolve tasks from orchestrator`() {

// Call the method under test
cleaningServiceDummy.pollForCleaningTasks()

Expand All @@ -91,11 +90,8 @@ class CleaningServiceApiCallsTest @Autowired constructor(

@Test
fun `reserveTasksForStep should return expected response`() {


val expectedResponse = jacksonObjectMapper.writeValueAsString(createSampleTaskStepReservationResponse(businessPartnerWithBpnA))


val result = orchestrationApiClient.goldenRecordTasks.reserveTasksForStep(
TaskStepReservationRequest(amount = 10, TaskStep.Clean)
)
Expand All @@ -105,14 +101,12 @@ class CleaningServiceApiCallsTest @Autowired constructor(
assertEquals(expectedResult, result)

orchestrationApiClient.goldenRecordTasks.resolveStepResults(
TaskStepResultRequest(emptyList())
TaskStepResultRequest(TaskStep.Clean, emptyList())
)

}


fun mockOrchestratorReserveApi() {

// Orchestrator reserve
orchestratorMockApi.stubFor(
post(urlPathEqualTo(ORCHESTRATOR_RESERVE_TASKS_URL))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ interface GoldenRecordTaskApi {

@Operation(
summary = "Post step results for reserved golden record tasks in the given step queue",
description = "Post business partner step results for the given tasks. " +
description = "Post business partner step results for the given tasks in the given step queue. " +
"In order to post a result for a task it needs to be reserved first, has to currently be in the given step queue and the time limit is not exceeded. " +
"The number of results you can post at a time does not need to match the original number of reserved tasks. " +
"Results are accepted via strategy 'all or nothing'. " +
Expand All @@ -114,7 +114,11 @@ interface GoldenRecordTaskApi {
responseCode = "204",
description = "If the results could be processed"
),
ApiResponse(responseCode = "400", description = "On malformed task create requests or reaching upsert limit", content = [Content()]),
ApiResponse(
responseCode = "400",
description = "On malformed requests, reaching upsert limit or posting results for tasks which are missing or in the wrong step queue",
content = [Content()]
),
]
)
@Tag(name = TagWorker)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,8 @@ import io.swagger.v3.oas.annotations.media.Schema
@Schema(description = "Request object for posting step results of previously reserved tasks")
data class TaskStepResultRequest(

@get:Schema(description = "The step queue containing the tasks for which results are posted", required = true)
val step: TaskStep,

val results: List<TaskStepResultEntryDto>
)
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,14 @@ class GoldenRecordTaskService(
.forEach { resultEntry ->
val task = taskStorage.getTask(resultEntry.taskId)
?: throw BpdmTaskNotFoundException(resultEntry.taskId)

val step = resultRequest.step
val errors = resultEntry.errors
val resultBusinessPartner = resultEntry.businessPartner

if (errors.isNotEmpty()) {
goldenRecordTaskStateMachine.doResolveFailed(task, errors)
goldenRecordTaskStateMachine.doResolveFailed(task, step, errors)
} else if (resultBusinessPartner != null) {
goldenRecordTaskStateMachine.doResolveSuccessful(task, resultBusinessPartner)
goldenRecordTaskStateMachine.doResolveSuccessful(task, step, resultBusinessPartner)
} else {
throw BpdmEmptyResultException(resultEntry.taskId)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,11 @@ class GoldenRecordTaskStateMachine(
state.taskModifiedAt = now
}

fun doResolveSuccessful(task: GoldenRecordTask, resultBusinessPartner: BusinessPartnerFullDto) {
fun doResolveSuccessful(task: GoldenRecordTask, step: TaskStep, resultBusinessPartner: BusinessPartnerFullDto) {
val state = task.processingState
val now = Instant.now()

if (state.resultState != ResultState.Pending || state.stepState != StepState.Reserved) {
if (state.resultState != ResultState.Pending || state.stepState != StepState.Reserved || state.step != step) {
throw BpdmIllegalStateException(task.taskId, state)
}

Expand All @@ -92,11 +92,11 @@ class GoldenRecordTaskStateMachine(
task.businessPartner = resultBusinessPartner
}

fun doResolveFailed(task: GoldenRecordTask, errors: List<TaskErrorDto>) {
fun doResolveFailed(task: GoldenRecordTask, step: TaskStep, errors: List<TaskErrorDto>) {
val state = task.processingState
val now = Instant.now()

if (state.resultState != ResultState.Pending || state.stepState != StepState.Reserved) {
if (state.resultState != ResultState.Pending || state.stepState != StepState.Reserved || state.step != step) {
throw BpdmIllegalStateException(task.taskId, state)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ class GoldenRecordTaskControllerIT @Autowired constructor(
taskId = taskId,
businessPartner = businessPartnerFull1
)
resolveTasks(listOf(resultEntry1))
resolveTasks(TaskStep.CleanAndSync, listOf(resultEntry1))

// now in next step and stepState==Queued
assertProcessingStateDto(
Expand Down Expand Up @@ -235,7 +235,7 @@ class GoldenRecordTaskControllerIT @Autowired constructor(
taskId = taskId,
businessPartner = businessPartnerFull2
)
resolveTasks(listOf(resultEntry2))
resolveTasks(TaskStep.PoolSync, listOf(resultEntry2))

// final step -> now in stepState==Success
val finalStateDto = searchTaskStates(listOf(taskId)).tasks.single()
Expand Down Expand Up @@ -270,7 +270,7 @@ class GoldenRecordTaskControllerIT @Autowired constructor(
taskId = taskId,
errors = listOf(errorDto)
)
resolveTasks(listOf(resultEntry))
resolveTasks(TaskStep.CleanAndSync, listOf(resultEntry))

// now in error state
val stateDto = searchTaskStates(listOf(taskId)).tasks.single()
Expand Down Expand Up @@ -335,7 +335,7 @@ class GoldenRecordTaskControllerIT @Autowired constructor(
)

assertBadRequestException {
resolveTasks(resultEntries)
resolveTasks(TaskStep.CleanAndSync, resultEntries)
}
}

Expand All @@ -360,6 +360,7 @@ class GoldenRecordTaskControllerIT @Autowired constructor(
// post wrong task ids
assertBadRequestException {
resolveTasks(
TaskStep.CleanAndSync,
listOf(
TaskStepResultEntryDto(
taskId = "WRONG-ID"
Expand All @@ -368,9 +369,10 @@ class GoldenRecordTaskControllerIT @Autowired constructor(
)
}

// post correct task id but neither empty content
// post correct task id but empty content
assertBadRequestException {
resolveTasks(
TaskStep.CleanAndSync,
listOf(
TaskStepResultEntryDto(
taskId = tasksIds[0]
Expand All @@ -379,8 +381,22 @@ class GoldenRecordTaskControllerIT @Autowired constructor(
)
}

// post correct task id but wrong step (Clean)
assertBadRequestException {
resolveTasks(
TaskStep.Clean,
listOf(
TaskStepResultEntryDto(
taskId = tasksIds[0],
businessPartner = BusinessPartnerTestValues.businessPartner1Full
)
)
)
}

// post correct task id with business partner content
resolveTasks(
TaskStep.CleanAndSync,
listOf(
TaskStepResultEntryDto(
taskId = tasksIds[0],
Expand All @@ -392,6 +408,7 @@ class GoldenRecordTaskControllerIT @Autowired constructor(
// post task twice
assertBadRequestException {
resolveTasks(
TaskStep.CleanAndSync,
listOf(
TaskStepResultEntryDto(
taskId = tasksIds[0],
Expand All @@ -404,6 +421,7 @@ class GoldenRecordTaskControllerIT @Autowired constructor(

// post correct task id with error content
resolveTasks(
TaskStep.CleanAndSync,
listOf(
TaskStepResultEntryDto(
tasksIds[1], errors = listOf(
Expand All @@ -430,11 +448,9 @@ class GoldenRecordTaskControllerIT @Autowired constructor(
)
)

private fun resolveTasks(results: List<TaskStepResultEntryDto>) =
private fun resolveTasks(step: TaskStep, results: List<TaskStepResultEntryDto>) =
orchestratorClient.goldenRecordTasks.resolveStepResults(
TaskStepResultRequest(
results = results
)
TaskStepResultRequest(step, results)
)

private fun searchTaskStates(taskIds: List<String>) =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,26 +91,31 @@ class GoldenRecordTaskStateMachineIT @Autowired constructor(
}.isInstanceOf(BpdmIllegalStateException::class.java)

// 1st resolve
goldenRecordTaskStateMachine.doResolveSuccessful(task, BusinessPartnerTestValues.businessPartner1Full)
goldenRecordTaskStateMachine.doResolveSuccessful(task, TaskStep.CleanAndSync, BusinessPartnerTestValues.businessPartner1Full)
assertProcessingStateDto(task.processingState, ResultState.Pending, TaskStep.PoolSync, StepState.Queued)
assertThat(task.processingState.reservationTimeout).isNull()

// Can't resolve again!
assertThatThrownBy {
goldenRecordTaskStateMachine.doResolveSuccessful(task, BusinessPartnerTestValues.businessPartner1Full)
goldenRecordTaskStateMachine.doResolveSuccessful(task, TaskStep.CleanAndSync, BusinessPartnerTestValues.businessPartner1Full)
}.isInstanceOf(BpdmIllegalStateException::class.java)

// 2nd reserve
goldenRecordTaskStateMachine.doReserve(task)
assertProcessingStateDto(task.processingState, ResultState.Pending, TaskStep.PoolSync, StepState.Reserved)

// Can't resolve with wrong step (CleanAndSync)!
assertThatThrownBy {
goldenRecordTaskStateMachine.doResolveSuccessful(task, TaskStep.CleanAndSync, BusinessPartnerTestValues.businessPartner1Full)
}.isInstanceOf(BpdmIllegalStateException::class.java)

// 2nd resolve
goldenRecordTaskStateMachine.doResolveSuccessful(task, BusinessPartnerTestValues.businessPartner1Full)
goldenRecordTaskStateMachine.doResolveSuccessful(task, TaskStep.PoolSync, BusinessPartnerTestValues.businessPartner1Full)
assertProcessingStateDto(task.processingState, ResultState.Success, TaskStep.PoolSync, StepState.Success)

// Can't resolve again!
assertThatThrownBy {
goldenRecordTaskStateMachine.doResolveFailed(task, listOf(TaskErrorDto(TaskErrorType.Unspecified, "error")))
goldenRecordTaskStateMachine.doResolveFailed(task, TaskStep.PoolSync, listOf(TaskErrorDto(TaskErrorType.Unspecified, "error")))
}.isInstanceOf(BpdmIllegalStateException::class.java)
}

Expand Down Expand Up @@ -140,7 +145,7 @@ class GoldenRecordTaskStateMachineIT @Autowired constructor(
Thread.sleep(10)

// resolve
goldenRecordTaskStateMachine.doResolveSuccessful(task, BusinessPartnerTestValues.businessPartner1Full)
goldenRecordTaskStateMachine.doResolveSuccessful(task, TaskStep.Clean, BusinessPartnerTestValues.businessPartner1Full)
assertProcessingStateDto(task.processingState, ResultState.Success, TaskStep.Clean, StepState.Success)
val modified2 = task.processingState.taskModifiedAt
assertThat(modified2).isAfter(modified1)
Expand All @@ -167,7 +172,7 @@ class GoldenRecordTaskStateMachineIT @Autowired constructor(
TaskErrorDto(TaskErrorType.Unspecified, "Unspecific error"),
TaskErrorDto(TaskErrorType.Timeout, "Timeout")
)
goldenRecordTaskStateMachine.doResolveFailed(task, errors)
goldenRecordTaskStateMachine.doResolveFailed(task, TaskStep.Clean, errors)
assertProcessingStateDto(task.processingState, ResultState.Error, TaskStep.Clean, StepState.Error)
assertThat(task.processingState.errors).isEqualTo(errors)

Expand All @@ -176,9 +181,9 @@ class GoldenRecordTaskStateMachineIT @Autowired constructor(
goldenRecordTaskStateMachine.doReserve(task)
}.isInstanceOf(BpdmIllegalStateException::class.java)

// Can't reserve now!
// Can't resolve now!
assertThatThrownBy {
goldenRecordTaskStateMachine.doResolveSuccessful(task, BusinessPartnerTestValues.businessPartner1Full)
goldenRecordTaskStateMachine.doResolveSuccessful(task, TaskStep.Clean, BusinessPartnerTestValues.businessPartner1Full)
}.isInstanceOf(BpdmIllegalStateException::class.java)
}

Expand Down

0 comments on commit b462219

Please sign in to comment.