Skip to content

Commit

Permalink
feat(gate): add transition logic to sharing state service
Browse files Browse the repository at this point in the history
 - moved service logic for handling state transitions from other services to the sharing state service
 - added validation for logic when passing transition requests
 - added and adapted tests
  • Loading branch information
nicoprow committed Oct 24, 2023
1 parent 7b8b4be commit b39f15e
Show file tree
Hide file tree
Showing 9 changed files with 660 additions and 207 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class BusinessPartnerController(
if (businessPartners.size > apiConfigProperties.upsertLimit || businessPartners.map { it.externalId }.containsDuplicates()) {
return ResponseEntity(HttpStatus.BAD_REQUEST)
}
val result = businessPartnerService.upsertBusinessPartnersInput(businessPartners)
val result = businessPartnerService.upsertBusinessPartnersInput(businessPartners.toList())
return ResponseEntity.ok(result)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*******************************************************************************
* Copyright (c) 2021,2023 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
* SPDX-License-Identifier: Apache-2.0
******************************************************************************/

package org.eclipse.tractusx.bpdm.gate.exception

import org.springframework.http.HttpStatus
import org.springframework.web.bind.annotation.ResponseStatus

@ResponseStatus(HttpStatus.BAD_REQUEST)
class BpdmInvalidStateRequestException(
msg: String
) : RuntimeException(msg)
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,9 @@ interface SharingStateRepository : PagingAndSortingRepository<SharingState, Long
}
}

fun findByExternalIdAndBusinessPartnerType(externalId: String, businessPartnerType: BusinessPartnerType): SharingState?
fun findByExternalIdInAndBusinessPartnerType(externalId: Collection<String>, businessPartnerType: BusinessPartnerType): Collection<SharingState>

fun findBySharingStateType(sharingStateType: SharingStateType): Set<SharingState>

fun findBySharingStateTypeAndTaskIdNotNull(sharingStateType: SharingStateType): Set<SharingState>
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import org.eclipse.tractusx.bpdm.common.dto.BusinessPartnerType
import org.eclipse.tractusx.bpdm.common.model.StageType
import org.eclipse.tractusx.bpdm.common.util.replace
import org.eclipse.tractusx.bpdm.gate.api.model.ChangelogType
import org.eclipse.tractusx.bpdm.gate.api.model.SharingStateType
import org.eclipse.tractusx.bpdm.gate.api.model.request.AddressGateInputRequest
import org.eclipse.tractusx.bpdm.gate.api.model.request.AddressGateOutputRequest
import org.eclipse.tractusx.bpdm.gate.entity.ChangelogEntry
Expand Down Expand Up @@ -75,10 +74,12 @@ class AddressPersistenceService(
}
?: run {
gateAddressRepository.save(fullAddress)
sharingStateService.upsertSharingState(address.toSharingStateDTO())
saveChangelog(address.externalId, ChangelogType.CREATE, dataType)
}
}

val initRequests = addresses.map { SharingStateService.SharingStateIdentifierDto(it.externalId, BusinessPartnerType.ADDRESS ) }
sharingStateService.setInitial(initRequests)
}

private fun updateAddress(address: LogisticAddress, changeAddress: AddressGateInputRequest, legalEntityRecord: LegalEntity?, siteRecord: Site?) {
Expand Down Expand Up @@ -127,8 +128,15 @@ class AddressPersistenceService(
saveChangelog(address.externalId, ChangelogType.CREATE, dataType)
}
}
sharingStateService.upsertSharingState(address.toSharingStateDTO(SharingStateType.Success))
}

val successRequests = addresses.map {
SharingStateService.SuccessRequest(
SharingStateService.SharingStateIdentifierDto(it.externalId, BusinessPartnerType.ADDRESS),
it.bpn
)
}
sharingStateService.setSuccess(successRequests)
}

private fun updateAddressOutput(address: LogisticAddress, changeAddress: AddressGateOutputRequest, legalEntityRecord: LegalEntity?, siteRecord: Site?) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ import org.eclipse.tractusx.bpdm.gate.api.model.request.BusinessPartnerInputRequ
import org.eclipse.tractusx.bpdm.gate.api.model.request.BusinessPartnerOutputRequest
import org.eclipse.tractusx.bpdm.gate.api.model.response.BusinessPartnerInputDto
import org.eclipse.tractusx.bpdm.gate.api.model.response.BusinessPartnerOutputDto
import org.eclipse.tractusx.bpdm.gate.api.model.response.SharingStateDto
import org.eclipse.tractusx.bpdm.gate.entity.ChangelogEntry
import org.eclipse.tractusx.bpdm.gate.entity.SharingState
import org.eclipse.tractusx.bpdm.gate.entity.generic.*
import org.eclipse.tractusx.bpdm.gate.exception.BpdmMissingStageException
import org.eclipse.tractusx.bpdm.gate.repository.ChangelogRepository
Expand All @@ -59,15 +59,15 @@ class BusinessPartnerService(
) {

@Transactional
fun upsertBusinessPartnersInput(dtos: Collection<BusinessPartnerInputRequest>): Collection<BusinessPartnerInputDto> {
fun upsertBusinessPartnersInput(dtos: List<BusinessPartnerInputRequest>): List<BusinessPartnerInputDto> {
val entities = dtos.map { dto -> businessPartnerMappings.toBusinessPartnerInput(dto) }
return upsertBusinessPartnersInput(entities).map(businessPartnerMappings::toBusinessPartnerInputDto)
return upsertBusinessPartnersInputFromCandidates(entities).map(businessPartnerMappings::toBusinessPartnerInputDto)
}

@Transactional
fun upsertBusinessPartnersOutput(dtos: Collection<BusinessPartnerOutputRequest>): Collection<BusinessPartnerOutputDto> {
val entities = dtos.map { dto -> businessPartnerMappings.toBusinessPartnerOutput(dto) }
return upsertBusinessPartnersOutput(entities).map(businessPartnerMappings::toBusinessPartnerOutputDto)
return upsertBusinessPartnersOutputFromCandidates(entities).map(businessPartnerMappings::toBusinessPartnerOutputDto)
}

fun getBusinessPartnersInput(pageRequest: PageRequest, externalIds: Collection<String>?): PageDto<BusinessPartnerInputDto> {
Expand All @@ -82,35 +82,38 @@ class BusinessPartnerService(
.toPageDto(businessPartnerMappings::toBusinessPartnerOutputDto)
}

private fun upsertBusinessPartnersInput(entityCandidates: List<BusinessPartner>): List<BusinessPartner> {

private fun upsertBusinessPartnersInputFromCandidates(entityCandidates: List<BusinessPartner>): List<BusinessPartner> {
val resolutionResults = resolveCandidatesForStage(entityCandidates, StageType.Input)

saveChangelog(resolutionResults)

val partners = resolutionResults.map { it.businessPartner }
val orchestratorBusinessPartnersDto = resolutionResults.map { orchestratorMappings.toBusinessPartnerGenericDto(it.businessPartner) }
partners.forEach { entity ->
initSharingState(entity)
}

val taskCreateResponse = createGoldenRecordTasks(orchestratorBusinessPartnersDto)
val taskIds = createGoldenRecordTasks(orchestratorBusinessPartnersDto).createdTasks.map { it.taskId }

for (i in partners.indices) {
updateSharingState(partners[i].externalId, taskCreateResponse.createdTasks[i])
}
val pendingRequests = partners.zip(taskIds)
.map { (partner, taskId) -> SharingStateService.PendingRequest(SharingStateService.SharingStateIdentifierDto(partner.externalId, BusinessPartnerType.ADDRESS), taskId) }
sharingStateService.setPending(pendingRequests)

return businessPartnerRepository.saveAll(partners)
}

private fun upsertBusinessPartnersOutput(entityCandidates: List<BusinessPartner>): List<BusinessPartner> {
private fun upsertBusinessPartnersOutputFromCandidates(entityCandidates: List<BusinessPartner>): List<BusinessPartner> {
val externalIds = entityCandidates.map { it.externalId }
assertInputStageExists(externalIds)

val resolutionResults = resolveCandidatesForStage(entityCandidates, StageType.Output)

saveChangelog(resolutionResults)

return businessPartnerRepository.saveAll(resolutionResults.map { it.businessPartner })
val partners = resolutionResults.map { it.businessPartner }

val successRequests = partners.map { SharingStateService.SuccessRequest(SharingStateService.SharingStateIdentifierDto(it.externalId, BusinessPartnerType.ADDRESS), it.bpnA!!) }
sharingStateService.setSuccess(successRequests)

return businessPartnerRepository.saveAll(partners)
}

private fun getBusinessPartners(pageRequest: PageRequest, externalIds: Collection<String>?, stage: StageType): Page<BusinessPartner> {
Expand All @@ -120,11 +123,6 @@ class BusinessPartnerService(
}
}

private fun initSharingState(entity: BusinessPartner) {
// TODO make businessPartnerType optional
sharingStateService.upsertSharingState(SharingStateDto(BusinessPartnerType.GENERIC, entity.externalId))
}

private fun saveChangelog(resolutionResults: Collection<ResolutionResult>) {
resolutionResults.forEach { result ->
if (result.wasResolved)
Expand Down Expand Up @@ -233,59 +231,33 @@ class BusinessPartnerService(
)
}

private fun updateSharingState(externalId: String, stateDto: TaskClientStateDto) {

val errorMessage = if (stateDto.processingState.errors.isNotEmpty()) stateDto.processingState.errors.joinToString(" // ") { it.description } else null
val errorCode = if (stateDto.processingState.errors.isNotEmpty()) BusinessPartnerSharingError.SharingProcessError else null

sharingStateService.upsertSharingState(
SharingStateDto(
BusinessPartnerType.ADDRESS,
externalId,
sharingStateType = orchestratorMappings.toSharingStateType(stateDto.processingState.resultState),
sharingErrorCode = errorCode,
sharingErrorMessage = errorMessage,
taskId = stateDto.taskId
)
)
}

@Scheduled(cron = "\${cleaningService.pollingCron:-}", zone = "UTC")
@Transactional
fun finishCleaningTask() {

var validBusinessPartner: List<BusinessPartner> = emptyList()

val sharingStates = sharingStateRepository.findBySharingStateType(SharingStateType.Pending)
val nonNullTaskIds = sharingStates.mapNotNull { it.taskId }

val taskStates = orchestrationApiClient.goldenRecordTasks.searchTaskStates(TaskStateRequest(nonNullTaskIds))
val sharingStates = sharingStateRepository.findBySharingStateTypeAndTaskIdNotNull(SharingStateType.Pending)
val tasks = orchestrationApiClient.goldenRecordTasks.searchTaskStates(TaskStateRequest(sharingStates.map { it.taskId!! })).tasks

val sharingStateMap = sharingStates.associateBy { it.taskId }

//Task verification and map
taskStates.tasks.forEach { task ->
val taskStatesByResult = tasks
.map { Pair(it, sharingStateMap[it.taskId]!!) }
.groupBy { (task, _) -> task.processingState.resultState }

val relatedSharingState = sharingStateMap[task.taskId]
val businessPartnersToUpsert = taskStatesByResult[ResultState.Success]?.map { (task, sharingState) ->
orchestratorMappings.toBusinessPartner(task.businessPartnerResult!!, sharingState.externalId)
} ?: emptyList()
upsertBusinessPartnersOutputFromCandidates(businessPartnersToUpsert)

// Check if relatedSharingState exists
if (relatedSharingState != null) {
if (task.processingState.resultState == ResultState.Success) {
val businessPartner = orchestratorMappings.toBusinessPartner(task.businessPartnerResult!!, relatedSharingState.externalId)
validBusinessPartner = validBusinessPartner.plus(businessPartner)

// Set Sharing State to Success
updateSharingState(businessPartner.externalId, task)
} else if (task.processingState.resultState == ResultState.Error) {
// Set related Sharing State Type as Error
updateSharingState(relatedSharingState.externalId, task)
}
}
}
val errorRequests = taskStatesByResult[ResultState.Error]?.map { (task, sharingState) ->
SharingStateService.ErrorRequest(
SharingStateService.SharingStateIdentifierDto(sharingState.externalId, sharingState.businessPartnerType),
BusinessPartnerSharingError.SharingProcessError,
if (task.processingState.errors.isNotEmpty()) task.processingState.errors.joinToString(" // ") { it.description } else null
)
} ?: emptyList()
sharingStateService.setError(errorRequests)

//If it is cleaned, upsert Output
if (validBusinessPartner.isNotEmpty()) {
upsertBusinessPartnersOutput(validBusinessPartner)
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import org.eclipse.tractusx.bpdm.common.exception.BpdmNotFoundException
import org.eclipse.tractusx.bpdm.common.model.StageType
import org.eclipse.tractusx.bpdm.common.util.replace
import org.eclipse.tractusx.bpdm.gate.api.model.ChangelogType
import org.eclipse.tractusx.bpdm.gate.api.model.SharingStateType
import org.eclipse.tractusx.bpdm.gate.api.model.request.LegalEntityGateInputRequest
import org.eclipse.tractusx.bpdm.gate.api.model.request.LegalEntityGateOutputRequest
import org.eclipse.tractusx.bpdm.gate.entity.AddressState
Expand Down Expand Up @@ -75,9 +74,10 @@ class LegalEntityPersistenceService(
?: run {
gateLegalEntityRepository.save(fullLegalEntity)
saveChangelog(legalEntity.externalId, ChangelogType.CREATE, datatype)
sharingStateService.upsertSharingState(legalEntity.toSharingStateDTO())
}
}
val initRequests = legalEntities.map { SharingStateService.SharingStateIdentifierDto(it.externalId, BusinessPartnerType.LEGAL_ENTITY ) }
sharingStateService.setInitial(initRequests)
}

//Creates Changelog for both Legal Entity and Logistic Address when they are created or updated
Expand Down Expand Up @@ -156,8 +156,15 @@ class LegalEntityPersistenceService(
saveChangelog(legalEntity.externalId, ChangelogType.CREATE, datatype)
}
}
sharingStateService.upsertSharingState(legalEntity.toSharingStateDTO(SharingStateType.Success))
}

val successRequests = legalEntities.map {
SharingStateService.SuccessRequest(
SharingStateService.SharingStateIdentifierDto(it.externalId, BusinessPartnerType.LEGAL_ENTITY),
it.bpn
)
}
sharingStateService.setSuccess(successRequests)
}

private fun updateLegalEntityOutput(
Expand Down
Loading

0 comments on commit b39f15e

Please sign in to comment.