Skip to content

Commit

Permalink
feat(Gate): when resolving golden record tasks now more error feedbac…
Browse files Browse the repository at this point in the history
…k is created
  • Loading branch information
nicoprow committed May 7, 2024
1 parent 383452f commit 3a02e39
Showing 1 changed file with 47 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -153,41 +153,65 @@ class GoldenRecordTaskService(
return orchestrationApiClient.goldenRecordTasks.createTasks(TaskCreateRequest(mode, orchestratorBusinessPartnersDto)).createdTasks
}

private fun resolvePendingTasksForOwner(tasks: List<TaskClientStateDto>, sharingStates: List<SharingStateDb>, ownerBpnl: String?){
val sharingStateMap = sharingStates.associateBy { it.taskId }
private fun resolvePendingTasksForOwner(allTasks: List<TaskClientStateDto>, sharingStates: List<SharingStateDb>, ownerBpnl: String?){

val taskStatesByResult = tasks
.mapNotNull { task -> sharingStateMap[task.taskId]?.let { state -> Pair(task, state) } }
.groupBy { (task, _) -> task.processingState.resultState }
val allTasksById = allTasks.associateBy { it.taskId }
val inputsByExternalId = businessPartnerRepository.findByStageAndAssociatedOwnerBpnlAndExternalIdIn(StageType.Input, ownerBpnl, sharingStates.map { it.externalId }, Pageable.unpaged())
.associateBy { it.externalId }

val businessPartnerInputs = businessPartnerRepository.findByStageAndAssociatedOwnerBpnlAndExternalIdIn(StageType.Input, ownerBpnl, sharingStates.map { it.externalId }, Pageable.unpaged())
val inputsByExternalId = businessPartnerInputs.associateBy { it.externalId }
val (statesWithTask, statesWithoutTask) = sharingStates.map { state -> Pair(state, allTasksById[state.taskId])}.partition { (_, task) -> task != null }
val (statesWithTaskAndInput, statesWithoutInput) = statesWithTask.map { (state, task) -> Triple(state, task!!, inputsByExternalId[state.externalId]) }.partition { (_, _, input) -> input != null }
val statesInSuccess = statesWithTaskAndInput.filter { (_,task,_) -> task.processingState.resultState == ResultState.Success }
val statesInError = statesWithTaskAndInput.filter { (_,task,_) -> task.processingState.resultState == ResultState.Error }

val businessPartnersToUpsert = taskStatesByResult[ResultState.Success]?.map { (task, sharingState) ->
val roles = inputsByExternalId[sharingState.externalId]?.roles?.toSortedSet() ?: sortedSetOf()
orchestratorMappings.toBusinessPartner(task.businessPartnerResult, sharingState.externalId, sharingState.associatedOwnerBpnl, roles)
} ?: emptyList()
businessPartnerService.upsertBusinessPartnersOutputFromCandidates(businessPartnersToUpsert, ownerBpnl)
val (statesWithOutput, statesWithoutOutput) = statesInSuccess.map { (state, task, input) ->
val roles = input!!.roles.toSortedSet()
val output = try{
orchestratorMappings.toBusinessPartner(task.businessPartnerResult, state.externalId, state.associatedOwnerBpnl, roles)
}catch (_: Exception){ null }
Pair(state, output)
}.partition { (_, output) -> output != null }

val errorRequests = (taskStatesByResult[ResultState.Error]?.map { (task, sharingState) ->
val upsertedPartners = businessPartnerService.upsertBusinessPartnersOutputFromCandidates(statesWithOutput.map { (_, output) -> output!! }, ownerBpnl)

val statesWithoutTaskErrors = statesWithoutTask.map { (state, _) ->
SharingStateService.ErrorRequest(
state.externalId,
BusinessPartnerSharingError.MissingTaskID,
errorMessage = "Missing Task in Orchestrator"
)
}

val statesWithoutInputErrors = statesWithoutInput.map { (state, _, _) ->
SharingStateService.ErrorRequest(
state.externalId,
BusinessPartnerSharingError.SharingProcessError,
errorMessage = "No input data found for the references business partner"
)
}

val statesInErrorErrors = statesInError.map { (state, task, _) ->
SharingStateService.ErrorRequest(
sharingState.externalId,
state.externalId,
BusinessPartnerSharingError.SharingProcessError,
if (task.processingState.errors.isNotEmpty()) task.processingState.errors.joinToString(" // ") { it.description }.take(255) else null
)
} ?: emptyList()).toMutableList()
}

val sharingStatesWithoutTasks = sharingStates.filter { it.taskId !in tasks.map { task -> task.taskId } }
errorRequests.addAll(sharingStatesWithoutTasks.map { sharingState ->
val statesWithoutOutputError = statesWithoutOutput.map { (state, _) ->
SharingStateService.ErrorRequest(
sharingState.externalId,
BusinessPartnerSharingError.MissingTaskID,
errorMessage = "Missing Task in Orchestrator"
state.externalId,
BusinessPartnerSharingError.SharingProcessError,
"Output could not be created from golden record result"
)
})
}

val allErrors = statesWithoutTaskErrors + statesWithoutInputErrors + statesInErrorErrors + statesWithoutOutputError

sharingStateService.setError(errorRequests, ownerBpnl)
sharingStateService.setError(allErrors, ownerBpnl)

logger.info { "Resolved ${businessPartnersToUpsert.size} tasks as successful and ${errorRequests.size} as errors for owner $ownerBpnl" }
logger.info { "Resolved ${upsertedPartners.size} tasks as successful and ${allErrors.size} as errors for owner $ownerBpnl" }
}


}

0 comments on commit 3a02e39

Please sign in to comment.