Skip to content

Commit

Permalink
Merge pull request #513 from catenax-ng/refactor/gate/sharing-state-s…
Browse files Browse the repository at this point in the history
…ervice

Feat: Create Sharing State Transition Logic for the Sharing State Service
  • Loading branch information
nicoprow authored Oct 25, 2023
2 parents 7b8b4be + 259e740 commit 752dc06
Show file tree
Hide file tree
Showing 10 changed files with 705 additions and 220 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class BusinessPartnerController(
if (businessPartners.size > apiConfigProperties.upsertLimit || businessPartners.map { it.externalId }.containsDuplicates()) {
return ResponseEntity(HttpStatus.BAD_REQUEST)
}
val result = businessPartnerService.upsertBusinessPartnersInput(businessPartners)
val result = businessPartnerService.upsertBusinessPartnersInput(businessPartners.toList())
return ResponseEntity.ok(result)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*******************************************************************************
* Copyright (c) 2021,2023 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
* SPDX-License-Identifier: Apache-2.0
******************************************************************************/

package org.eclipse.tractusx.bpdm.gate.exception

import org.springframework.http.HttpStatus
import org.springframework.web.bind.annotation.ResponseStatus

@ResponseStatus(HttpStatus.BAD_REQUEST)
class BpdmInvalidStateRequestException(
msg: String
) : RuntimeException(msg)
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,9 @@ interface SharingStateRepository : PagingAndSortingRepository<SharingState, Long
}
}

fun findByExternalIdAndBusinessPartnerType(externalId: String, businessPartnerType: BusinessPartnerType): SharingState?
fun findByExternalIdInAndBusinessPartnerType(externalId: Collection<String>, businessPartnerType: BusinessPartnerType): Collection<SharingState>

fun findBySharingStateType(sharingStateType: SharingStateType): Set<SharingState>

fun findBySharingStateTypeAndTaskIdNotNull(sharingStateType: SharingStateType): Set<SharingState>
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import org.eclipse.tractusx.bpdm.common.dto.BusinessPartnerType
import org.eclipse.tractusx.bpdm.common.model.StageType
import org.eclipse.tractusx.bpdm.common.util.replace
import org.eclipse.tractusx.bpdm.gate.api.model.ChangelogType
import org.eclipse.tractusx.bpdm.gate.api.model.SharingStateType
import org.eclipse.tractusx.bpdm.gate.api.model.request.AddressGateInputRequest
import org.eclipse.tractusx.bpdm.gate.api.model.request.AddressGateOutputRequest
import org.eclipse.tractusx.bpdm.gate.entity.ChangelogEntry
Expand Down Expand Up @@ -75,10 +74,12 @@ class AddressPersistenceService(
}
?: run {
gateAddressRepository.save(fullAddress)
sharingStateService.upsertSharingState(address.toSharingStateDTO())
saveChangelog(address.externalId, ChangelogType.CREATE, dataType)
}
}

val initRequests = addresses.map { SharingStateService.SharingStateIdentifierDto(it.externalId, BusinessPartnerType.ADDRESS ) }
sharingStateService.setInitial(initRequests)
}

private fun updateAddress(address: LogisticAddress, changeAddress: AddressGateInputRequest, legalEntityRecord: LegalEntity?, siteRecord: Site?) {
Expand Down Expand Up @@ -127,8 +128,15 @@ class AddressPersistenceService(
saveChangelog(address.externalId, ChangelogType.CREATE, dataType)
}
}
sharingStateService.upsertSharingState(address.toSharingStateDTO(SharingStateType.Success))
}

val successRequests = addresses.map {
SharingStateService.SuccessRequest(
SharingStateService.SharingStateIdentifierDto(it.externalId, BusinessPartnerType.ADDRESS),
it.bpn
)
}
sharingStateService.setSuccess(successRequests)
}

private fun updateAddressOutput(address: LogisticAddress, changeAddress: AddressGateOutputRequest, legalEntityRecord: LegalEntity?, siteRecord: Site?) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import org.eclipse.tractusx.bpdm.gate.api.model.request.BusinessPartnerInputRequ
import org.eclipse.tractusx.bpdm.gate.api.model.request.BusinessPartnerOutputRequest
import org.eclipse.tractusx.bpdm.gate.api.model.response.BusinessPartnerInputDto
import org.eclipse.tractusx.bpdm.gate.api.model.response.BusinessPartnerOutputDto
import org.eclipse.tractusx.bpdm.gate.api.model.response.SharingStateDto
import org.eclipse.tractusx.bpdm.gate.entity.ChangelogEntry
import org.eclipse.tractusx.bpdm.gate.entity.generic.*
import org.eclipse.tractusx.bpdm.gate.exception.BpdmMissingStageException
Expand All @@ -59,15 +58,15 @@ class BusinessPartnerService(
) {

@Transactional
fun upsertBusinessPartnersInput(dtos: Collection<BusinessPartnerInputRequest>): Collection<BusinessPartnerInputDto> {
fun upsertBusinessPartnersInput(dtos: List<BusinessPartnerInputRequest>): List<BusinessPartnerInputDto> {
val entities = dtos.map { dto -> businessPartnerMappings.toBusinessPartnerInput(dto) }
return upsertBusinessPartnersInput(entities).map(businessPartnerMappings::toBusinessPartnerInputDto)
return upsertBusinessPartnersInputFromCandidates(entities).map(businessPartnerMappings::toBusinessPartnerInputDto)
}

@Transactional
fun upsertBusinessPartnersOutput(dtos: Collection<BusinessPartnerOutputRequest>): Collection<BusinessPartnerOutputDto> {
val entities = dtos.map { dto -> businessPartnerMappings.toBusinessPartnerOutput(dto) }
return upsertBusinessPartnersOutput(entities).map(businessPartnerMappings::toBusinessPartnerOutputDto)
return upsertBusinessPartnersOutputFromCandidates(entities).map(businessPartnerMappings::toBusinessPartnerOutputDto)
}

fun getBusinessPartnersInput(pageRequest: PageRequest, externalIds: Collection<String>?): PageDto<BusinessPartnerInputDto> {
Expand All @@ -82,35 +81,50 @@ class BusinessPartnerService(
.toPageDto(businessPartnerMappings::toBusinessPartnerOutputDto)
}

private fun upsertBusinessPartnersInput(entityCandidates: List<BusinessPartner>): List<BusinessPartner> {

private fun upsertBusinessPartnersInputFromCandidates(entityCandidates: List<BusinessPartner>): List<BusinessPartner> {
val resolutionResults = resolveCandidatesForStage(entityCandidates, StageType.Input)

saveChangelog(resolutionResults)

val partners = resolutionResults.map { it.businessPartner }
val orchestratorBusinessPartnersDto = resolutionResults.map { orchestratorMappings.toBusinessPartnerGenericDto(it.businessPartner) }
partners.forEach { entity ->
initSharingState(entity)
}

val taskCreateResponse = createGoldenRecordTasks(orchestratorBusinessPartnersDto)
val taskIds = createGoldenRecordTasks(orchestratorBusinessPartnersDto).createdTasks.map { it.taskId }

for (i in partners.indices) {
updateSharingState(partners[i].externalId, taskCreateResponse.createdTasks[i])
}
val pendingRequests = partners.zip(taskIds)
.map { (partner, taskId) ->
SharingStateService.PendingRequest(
SharingStateService.SharingStateIdentifierDto(
partner.externalId,
BusinessPartnerType.GENERIC
), taskId
)
}
sharingStateService.setPending(pendingRequests)

return businessPartnerRepository.saveAll(partners)
}

private fun upsertBusinessPartnersOutput(entityCandidates: List<BusinessPartner>): List<BusinessPartner> {
private fun upsertBusinessPartnersOutputFromCandidates(entityCandidates: List<BusinessPartner>): List<BusinessPartner> {
val externalIds = entityCandidates.map { it.externalId }
assertInputStageExists(externalIds)

val resolutionResults = resolveCandidatesForStage(entityCandidates, StageType.Output)

saveChangelog(resolutionResults)

return businessPartnerRepository.saveAll(resolutionResults.map { it.businessPartner })
val partners = resolutionResults.map { it.businessPartner }

val successRequests = partners.map {
SharingStateService.SuccessRequest(
SharingStateService.SharingStateIdentifierDto(it.externalId, BusinessPartnerType.GENERIC),
it.bpnA!!
)
}
sharingStateService.setSuccess(successRequests)

return businessPartnerRepository.saveAll(partners)
}

private fun getBusinessPartners(pageRequest: PageRequest, externalIds: Collection<String>?, stage: StageType): Page<BusinessPartner> {
Expand All @@ -120,11 +134,6 @@ class BusinessPartnerService(
}
}

private fun initSharingState(entity: BusinessPartner) {
// TODO make businessPartnerType optional
sharingStateService.upsertSharingState(SharingStateDto(BusinessPartnerType.GENERIC, entity.externalId))
}

private fun saveChangelog(resolutionResults: Collection<ResolutionResult>) {
resolutionResults.forEach { result ->
if (result.wasResolved)
Expand Down Expand Up @@ -233,59 +242,33 @@ class BusinessPartnerService(
)
}

private fun updateSharingState(externalId: String, stateDto: TaskClientStateDto) {

val errorMessage = if (stateDto.processingState.errors.isNotEmpty()) stateDto.processingState.errors.joinToString(" // ") { it.description } else null
val errorCode = if (stateDto.processingState.errors.isNotEmpty()) BusinessPartnerSharingError.SharingProcessError else null

sharingStateService.upsertSharingState(
SharingStateDto(
BusinessPartnerType.ADDRESS,
externalId,
sharingStateType = orchestratorMappings.toSharingStateType(stateDto.processingState.resultState),
sharingErrorCode = errorCode,
sharingErrorMessage = errorMessage,
taskId = stateDto.taskId
)
)
}

@Scheduled(cron = "\${cleaningService.pollingCron:-}", zone = "UTC")
@Transactional
fun finishCleaningTask() {

var validBusinessPartner: List<BusinessPartner> = emptyList()

val sharingStates = sharingStateRepository.findBySharingStateType(SharingStateType.Pending)
val nonNullTaskIds = sharingStates.mapNotNull { it.taskId }

val taskStates = orchestrationApiClient.goldenRecordTasks.searchTaskStates(TaskStateRequest(nonNullTaskIds))
val sharingStates = sharingStateRepository.findBySharingStateTypeAndTaskIdNotNull(SharingStateType.Pending)
val tasks = orchestrationApiClient.goldenRecordTasks.searchTaskStates(TaskStateRequest(sharingStates.map { it.taskId!! })).tasks

val sharingStateMap = sharingStates.associateBy { it.taskId }

//Task verification and map
taskStates.tasks.forEach { task ->
val taskStatesByResult = tasks
.map { Pair(it, sharingStateMap[it.taskId]!!) }
.groupBy { (task, _) -> task.processingState.resultState }

val relatedSharingState = sharingStateMap[task.taskId]
val businessPartnersToUpsert = taskStatesByResult[ResultState.Success]?.map { (task, sharingState) ->
orchestratorMappings.toBusinessPartner(task.businessPartnerResult!!, sharingState.externalId)
} ?: emptyList()
upsertBusinessPartnersOutputFromCandidates(businessPartnersToUpsert)

// Check if relatedSharingState exists
if (relatedSharingState != null) {
if (task.processingState.resultState == ResultState.Success) {
val businessPartner = orchestratorMappings.toBusinessPartner(task.businessPartnerResult!!, relatedSharingState.externalId)
validBusinessPartner = validBusinessPartner.plus(businessPartner)

// Set Sharing State to Success
updateSharingState(businessPartner.externalId, task)
} else if (task.processingState.resultState == ResultState.Error) {
// Set related Sharing State Type as Error
updateSharingState(relatedSharingState.externalId, task)
}
}
}
val errorRequests = taskStatesByResult[ResultState.Error]?.map { (task, sharingState) ->
SharingStateService.ErrorRequest(
SharingStateService.SharingStateIdentifierDto(sharingState.externalId, sharingState.businessPartnerType),
BusinessPartnerSharingError.SharingProcessError,
if (task.processingState.errors.isNotEmpty()) task.processingState.errors.joinToString(" // ") { it.description } else null
)
} ?: emptyList()
sharingStateService.setError(errorRequests)

//If it is cleaned, upsert Output
if (validBusinessPartner.isNotEmpty()) {
upsertBusinessPartnersOutput(validBusinessPartner)
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import org.eclipse.tractusx.bpdm.common.exception.BpdmNotFoundException
import org.eclipse.tractusx.bpdm.common.model.StageType
import org.eclipse.tractusx.bpdm.common.util.replace
import org.eclipse.tractusx.bpdm.gate.api.model.ChangelogType
import org.eclipse.tractusx.bpdm.gate.api.model.SharingStateType
import org.eclipse.tractusx.bpdm.gate.api.model.request.LegalEntityGateInputRequest
import org.eclipse.tractusx.bpdm.gate.api.model.request.LegalEntityGateOutputRequest
import org.eclipse.tractusx.bpdm.gate.entity.AddressState
Expand Down Expand Up @@ -75,9 +74,10 @@ class LegalEntityPersistenceService(
?: run {
gateLegalEntityRepository.save(fullLegalEntity)
saveChangelog(legalEntity.externalId, ChangelogType.CREATE, datatype)
sharingStateService.upsertSharingState(legalEntity.toSharingStateDTO())
}
}
val initRequests = legalEntities.map { SharingStateService.SharingStateIdentifierDto(it.externalId, BusinessPartnerType.LEGAL_ENTITY ) }
sharingStateService.setInitial(initRequests)
}

//Creates Changelog for both Legal Entity and Logistic Address when they are created or updated
Expand Down Expand Up @@ -156,8 +156,15 @@ class LegalEntityPersistenceService(
saveChangelog(legalEntity.externalId, ChangelogType.CREATE, datatype)
}
}
sharingStateService.upsertSharingState(legalEntity.toSharingStateDTO(SharingStateType.Success))
}

val successRequests = legalEntities.map {
SharingStateService.SuccessRequest(
SharingStateService.SharingStateIdentifierDto(it.externalId, BusinessPartnerType.LEGAL_ENTITY),
it.bpn
)
}
sharingStateService.setSuccess(successRequests)
}

private fun updateLegalEntityOutput(
Expand Down
Loading

0 comments on commit 752dc06

Please sign in to comment.