diff --git a/bpdm-gate-api/src/main/kotlin/org/eclipse/tractusx/bpdm/gate/api/GateSharingStateApi.kt b/bpdm-gate-api/src/main/kotlin/org/eclipse/tractusx/bpdm/gate/api/GateSharingStateApi.kt index 6ff2abcb4..57f1ec8fb 100644 --- a/bpdm-gate-api/src/main/kotlin/org/eclipse/tractusx/bpdm/gate/api/GateSharingStateApi.kt +++ b/bpdm-gate-api/src/main/kotlin/org/eclipse/tractusx/bpdm/gate/api/GateSharingStateApi.kt @@ -28,12 +28,14 @@ import jakarta.validation.Valid import org.eclipse.tractusx.bpdm.common.dto.BusinessPartnerType import org.eclipse.tractusx.bpdm.common.dto.PageDto import org.eclipse.tractusx.bpdm.common.dto.PaginationRequest +import org.eclipse.tractusx.bpdm.gate.api.model.request.PostSharingStateReadyRequest import org.eclipse.tractusx.bpdm.gate.api.model.response.SharingStateDto import org.springdoc.core.annotations.ParameterObject import org.springframework.http.MediaType import org.springframework.web.bind.annotation.* import org.springframework.web.service.annotation.GetExchange import org.springframework.web.service.annotation.HttpExchange +import org.springframework.web.service.annotation.PostExchange import org.springframework.web.service.annotation.PutExchange @RequestMapping("/api/catena/sharing-state", produces = [MediaType.APPLICATION_JSON_VALUE]) @@ -56,6 +58,22 @@ interface GateSharingStateApi { @Parameter(description = "External IDs") @RequestParam(required = false) externalIds: Collection? ): PageDto + @Operation( + summary = "Sets the given business partners into ready to be shared state", + description = "The business partners to set the ready state for are identified by their external-id. Only business partners in an initial or error state can be set to ready. If any given business partner could not be set into ready state for any reason (for example, it has not been found or it is in the wrong state) the whole request fails (all or nothing approach)." + ) + @ApiResponses( + value = [ + ApiResponse(responseCode = "204", description = "All business partners put in ready to be shared state"), + ApiResponse(responseCode = "404", description = "Business partners can't be put into ready state (e.g. external-ID not found, wrong sharing state)") + ] + ) + @PostMapping + @PostExchange + fun postSharingStateReady(@RequestBody request: PostSharingStateReadyRequest) + + + @Operation( summary = "Creates or updates a sharing state of a business partner", deprecated = true @@ -69,4 +87,6 @@ interface GateSharingStateApi { @PutMapping @PutExchange fun upsertSharingState(@RequestBody request: SharingStateDto) + + } diff --git a/bpdm-gate-api/src/main/kotlin/org/eclipse/tractusx/bpdm/gate/api/model/SharingStateType.kt b/bpdm-gate-api/src/main/kotlin/org/eclipse/tractusx/bpdm/gate/api/model/SharingStateType.kt index c19639214..6213cd43f 100644 --- a/bpdm-gate-api/src/main/kotlin/org/eclipse/tractusx/bpdm/gate/api/model/SharingStateType.kt +++ b/bpdm-gate-api/src/main/kotlin/org/eclipse/tractusx/bpdm/gate/api/model/SharingStateType.kt @@ -23,5 +23,6 @@ enum class SharingStateType { Pending, Success, Error, - Initial + Initial, + Ready } \ No newline at end of file diff --git a/bpdm-gate-api/src/main/kotlin/org/eclipse/tractusx/bpdm/gate/api/model/request/PostSharingStateReadyRequest.kt b/bpdm-gate-api/src/main/kotlin/org/eclipse/tractusx/bpdm/gate/api/model/request/PostSharingStateReadyRequest.kt new file mode 100644 index 000000000..3386fda39 --- /dev/null +++ b/bpdm-gate-api/src/main/kotlin/org/eclipse/tractusx/bpdm/gate/api/model/request/PostSharingStateReadyRequest.kt @@ -0,0 +1,27 @@ +/******************************************************************************* + * Copyright (c) 2021,2023 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + ******************************************************************************/ + +package org.eclipse.tractusx.bpdm.gate.api.model.request + +import io.swagger.v3.oas.annotations.media.Schema + +@Schema(description = "Request for setting business partners into ready to be shared to golden record state") +data class PostSharingStateReadyRequest( + val externalIds: List +) diff --git a/bpdm-gate/src/main/kotlin/org/eclipse/tractusx/bpdm/gate/config/GoldenRecordProcessConfigProperties.kt b/bpdm-gate/src/main/kotlin/org/eclipse/tractusx/bpdm/gate/config/GoldenRecordProcessConfigProperties.kt new file mode 100644 index 000000000..c3b61d0c9 --- /dev/null +++ b/bpdm-gate/src/main/kotlin/org/eclipse/tractusx/bpdm/gate/config/GoldenRecordProcessConfigProperties.kt @@ -0,0 +1,40 @@ +/******************************************************************************* + * Copyright (c) 2021,2023 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + ******************************************************************************/ + +package org.eclipse.tractusx.bpdm.gate.config + +import org.springframework.boot.context.properties.ConfigurationProperties +import org.springframework.stereotype.Component + +@Component +@ConfigurationProperties(prefix = "bpdm.tasks") +data class GoldenRecordTaskConfigProperties( + val creation: CreationProperties = CreationProperties(), + val check: TaskProcessProperties = TaskProcessProperties() +) { + data class CreationProperties( + val fromSharingMember: TaskProcessProperties = TaskProcessProperties(), + val fromPool: TaskProcessProperties = TaskProcessProperties() + ) + + data class TaskProcessProperties( + var batchSize: Int = 100, + var cron: String = "-", + ) +} \ No newline at end of file diff --git a/bpdm-gate/src/main/kotlin/org/eclipse/tractusx/bpdm/gate/controller/SharingStateController.kt b/bpdm-gate/src/main/kotlin/org/eclipse/tractusx/bpdm/gate/controller/SharingStateController.kt index 8782d6576..502559622 100644 --- a/bpdm-gate/src/main/kotlin/org/eclipse/tractusx/bpdm/gate/controller/SharingStateController.kt +++ b/bpdm-gate/src/main/kotlin/org/eclipse/tractusx/bpdm/gate/controller/SharingStateController.kt @@ -24,6 +24,7 @@ import org.eclipse.tractusx.bpdm.common.dto.BusinessPartnerType import org.eclipse.tractusx.bpdm.common.dto.PageDto import org.eclipse.tractusx.bpdm.common.dto.PaginationRequest import org.eclipse.tractusx.bpdm.gate.api.GateSharingStateApi +import org.eclipse.tractusx.bpdm.gate.api.model.request.PostSharingStateReadyRequest import org.eclipse.tractusx.bpdm.gate.api.model.response.SharingStateDto import org.eclipse.tractusx.bpdm.gate.service.SharingStateService import org.springframework.security.access.prepost.PreAuthorize @@ -44,6 +45,10 @@ class SharingStateController( return sharingStateService.findSharingStates(paginationRequest, businessPartnerType, externalIds) } + override fun postSharingStateReady(request: PostSharingStateReadyRequest) { + sharingStateService.setReady(request.externalIds) + } + @PreAuthorize("hasAuthority(@gateSecurityConfigProperties.getChangeCompanyOutputDataAsRole())") override fun upsertSharingState(request: SharingStateDto) { logger.info { "upsertSharingState() called with $request" } diff --git a/bpdm-gate/src/main/kotlin/org/eclipse/tractusx/bpdm/gate/exception/BpdmInvalidStateException.kt b/bpdm-gate/src/main/kotlin/org/eclipse/tractusx/bpdm/gate/exception/BpdmInvalidStateException.kt new file mode 100644 index 000000000..899fc40af --- /dev/null +++ b/bpdm-gate/src/main/kotlin/org/eclipse/tractusx/bpdm/gate/exception/BpdmInvalidStateException.kt @@ -0,0 +1,34 @@ +/******************************************************************************* + * Copyright (c) 2021,2023 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + ******************************************************************************/ + +package org.eclipse.tractusx.bpdm.gate.exception + +import org.eclipse.tractusx.bpdm.gate.api.model.SharingStateType +import org.springframework.http.HttpStatus +import org.springframework.web.bind.annotation.ResponseStatus + +@ResponseStatus(HttpStatus.BAD_REQUEST) +class BpdmInvalidStateException( + invalidStates: List +) : RuntimeException("The following business partners are in an invalid state: ${invalidStates.joinToString { it.toString() }}") { + data class InvalidState( + val externalId: String, + val stateType: SharingStateType + ) +} \ No newline at end of file diff --git a/bpdm-gate/src/main/kotlin/org/eclipse/tractusx/bpdm/gate/exception/BpdmMissingPartnerException.kt b/bpdm-gate/src/main/kotlin/org/eclipse/tractusx/bpdm/gate/exception/BpdmMissingPartnerException.kt new file mode 100644 index 000000000..28abdec78 --- /dev/null +++ b/bpdm-gate/src/main/kotlin/org/eclipse/tractusx/bpdm/gate/exception/BpdmMissingPartnerException.kt @@ -0,0 +1,28 @@ +/******************************************************************************* + * Copyright (c) 2021,2023 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + ******************************************************************************/ + +package org.eclipse.tractusx.bpdm.gate.exception + +import org.springframework.http.HttpStatus +import org.springframework.web.bind.annotation.ResponseStatus + +@ResponseStatus(HttpStatus.BAD_REQUEST) +class BpdmMissingPartnerException( + externalIds: List +) : RuntimeException("Business partners with the following external ids not found: ${externalIds.joinToString()}") \ No newline at end of file diff --git a/bpdm-gate/src/main/kotlin/org/eclipse/tractusx/bpdm/gate/repository/SharingStateRepository.kt b/bpdm-gate/src/main/kotlin/org/eclipse/tractusx/bpdm/gate/repository/SharingStateRepository.kt index fb8079b05..26ce1ac74 100644 --- a/bpdm-gate/src/main/kotlin/org/eclipse/tractusx/bpdm/gate/repository/SharingStateRepository.kt +++ b/bpdm-gate/src/main/kotlin/org/eclipse/tractusx/bpdm/gate/repository/SharingStateRepository.kt @@ -22,6 +22,8 @@ package org.eclipse.tractusx.bpdm.gate.repository import org.eclipse.tractusx.bpdm.common.dto.BusinessPartnerType import org.eclipse.tractusx.bpdm.gate.api.model.SharingStateType import org.eclipse.tractusx.bpdm.gate.entity.SharingState +import org.springframework.data.domain.Page +import org.springframework.data.domain.Pageable import org.springframework.data.jpa.domain.Specification import org.springframework.data.jpa.repository.JpaSpecificationExecutor import org.springframework.data.repository.CrudRepository @@ -55,5 +57,7 @@ interface SharingStateRepository : PagingAndSortingRepository - fun findBySharingStateTypeAndTaskIdNotNull(sharingStateType: SharingStateType): Set + fun findBySharingStateType(sharingStateType: SharingStateType, pageable: Pageable): Page + + fun findBySharingStateTypeAndTaskIdNotNull(sharingStateType: SharingStateType, pageable: Pageable): Page } \ No newline at end of file diff --git a/bpdm-gate/src/main/kotlin/org/eclipse/tractusx/bpdm/gate/service/BusinessPartnerService.kt b/bpdm-gate/src/main/kotlin/org/eclipse/tractusx/bpdm/gate/service/BusinessPartnerService.kt index 1f41c50d7..b74896cf9 100644 --- a/bpdm-gate/src/main/kotlin/org/eclipse/tractusx/bpdm/gate/service/BusinessPartnerService.kt +++ b/bpdm-gate/src/main/kotlin/org/eclipse/tractusx/bpdm/gate/service/BusinessPartnerService.kt @@ -22,12 +22,10 @@ package org.eclipse.tractusx.bpdm.gate.service import mu.KotlinLogging import org.eclipse.tractusx.bpdm.common.dto.BusinessPartnerType import org.eclipse.tractusx.bpdm.common.dto.PageDto -import org.eclipse.tractusx.bpdm.common.dto.PaginationRequest import org.eclipse.tractusx.bpdm.common.model.StageType import org.eclipse.tractusx.bpdm.common.service.toPageDto import org.eclipse.tractusx.bpdm.common.util.copyAndSync import org.eclipse.tractusx.bpdm.common.util.replace -import org.eclipse.tractusx.bpdm.gate.api.exception.BusinessPartnerSharingError import org.eclipse.tractusx.bpdm.gate.api.model.ChangelogType import org.eclipse.tractusx.bpdm.gate.api.model.SharingStateType import org.eclipse.tractusx.bpdm.gate.api.model.request.BusinessPartnerInputRequest @@ -35,22 +33,21 @@ import org.eclipse.tractusx.bpdm.gate.api.model.response.BusinessPartnerInputDto import org.eclipse.tractusx.bpdm.gate.api.model.response.BusinessPartnerOutputDto import org.eclipse.tractusx.bpdm.gate.config.PoolConfigProperties import org.eclipse.tractusx.bpdm.gate.entity.ChangelogEntry -import org.eclipse.tractusx.bpdm.gate.entity.SyncType import org.eclipse.tractusx.bpdm.gate.entity.generic.* import org.eclipse.tractusx.bpdm.gate.exception.BpdmMissingStageException import org.eclipse.tractusx.bpdm.gate.repository.ChangelogRepository import org.eclipse.tractusx.bpdm.gate.repository.SharingStateRepository import org.eclipse.tractusx.bpdm.gate.repository.generic.BusinessPartnerRepository import org.eclipse.tractusx.bpdm.pool.api.client.PoolApiClient -import org.eclipse.tractusx.bpdm.pool.api.model.request.ChangelogSearchRequest import org.eclipse.tractusx.orchestrator.api.client.OrchestrationApiClient -import org.eclipse.tractusx.orchestrator.api.model.* +import org.eclipse.tractusx.orchestrator.api.model.BusinessPartnerGenericDto +import org.eclipse.tractusx.orchestrator.api.model.TaskCreateRequest +import org.eclipse.tractusx.orchestrator.api.model.TaskCreateResponse +import org.eclipse.tractusx.orchestrator.api.model.TaskMode import org.springframework.data.domain.Page import org.springframework.data.domain.PageRequest -import org.springframework.scheduling.annotation.Scheduled import org.springframework.stereotype.Service import org.springframework.transaction.annotation.Transactional -import org.eclipse.tractusx.bpdm.pool.api.model.ChangelogType as PoolChangeLogType @Service class BusinessPartnerService( @@ -97,25 +94,12 @@ class BusinessPartnerService( saveChangelog(resolutionResults) val partners = resolutionResults.map { it.businessPartner } - val orchestratorBusinessPartnersDto = resolutionResults.map { orchestratorMappings.toBusinessPartnerGenericDto(it.businessPartner) } - - val taskIds = createGoldenRecordTasks(TaskMode.UpdateFromSharingMember, orchestratorBusinessPartnersDto).createdTasks.map { it.taskId } - - val pendingRequests = partners.zip(taskIds) - .map { (partner, taskId) -> - SharingStateService.PendingRequest( - SharingStateService.SharingStateIdentifierDto( - partner.externalId, - BusinessPartnerType.GENERIC - ), taskId - ) - } - sharingStateService.setPending(pendingRequests) + sharingStateService.setInitial(partners.map { SharingStateService.SharingStateIdentifierDto(it.externalId, BusinessPartnerType.GENERIC) }) return businessPartnerRepository.saveAll(partners) } - private fun upsertBusinessPartnersOutputFromCandidates(entityCandidates: List): List { + fun upsertBusinessPartnersOutputFromCandidates(entityCandidates: List): List { val externalIds = entityCandidates.map { it.externalId } assertInputStageExists(externalIds) @@ -305,90 +289,4 @@ class BusinessPartnerService( ) } - @Scheduled(cron = "\${bpdm.cleaningService.pollingCron:-}", zone = "UTC") - @Transactional - fun finishCleaningTask() { - val sharingStates = sharingStateRepository.findBySharingStateTypeAndTaskIdNotNull(SharingStateType.Pending) - val tasks = orchestrationApiClient.goldenRecordTasks.searchTaskStates(TaskStateRequest(sharingStates.map { it.taskId!! })).tasks - - val sharingStateMap = sharingStates.associateBy { it.taskId } - - val taskStatesByResult = tasks - .map { Pair(it, sharingStateMap[it.taskId]!!) } - .groupBy { (task, _) -> task.processingState.resultState } - - val sharingStatesWithoutTasks = sharingStates.filter { it.taskId !in tasks.map { task -> task.taskId } } - - val businessPartnersToUpsert = taskStatesByResult[ResultState.Success]?.map { (task, sharingState) -> - orchestratorMappings.toBusinessPartner(task.businessPartnerResult!!, sharingState.externalId) - } ?: emptyList() - upsertBusinessPartnersOutputFromCandidates(businessPartnersToUpsert) - - val errorRequests = (taskStatesByResult[ResultState.Error]?.map { (task, sharingState) -> - SharingStateService.ErrorRequest( - SharingStateService.SharingStateIdentifierDto(sharingState.externalId, sharingState.businessPartnerType), - BusinessPartnerSharingError.SharingProcessError, - if (task.processingState.errors.isNotEmpty()) task.processingState.errors.joinToString(" // ") { it.description } else null - ) - } ?: emptyList()).toMutableList() - - errorRequests.addAll(sharingStatesWithoutTasks.map { sharingState -> - SharingStateService.ErrorRequest( - SharingStateService.SharingStateIdentifierDto(sharingState.externalId, sharingState.businessPartnerType), - BusinessPartnerSharingError.MissingTaskID, - errorMessage = "Missing Task in Orchestrator" - ) - }) - - sharingStateService.setError(errorRequests) - - } - - @Scheduled(cron = "\${cleaningService.pollingCron:-}", zone = "UTC") - @Transactional - fun requestCleaningTaskForGateOutputIfPoolBpnHasChanges() { - - val poolPaginationRequest = PaginationRequest(0, poolConfigProperties.searchChangelogPageSize) - - val syncRecord = syncRecordService.getOrCreateRecord(SyncType.POOL_TO_GATE_OUTPUT) - - val changelogSearchRequest = ChangelogSearchRequest(syncRecord.finishedAt) - - - val poolChangelogEntries = poolClient.changelogs.getChangelogEntries(changelogSearchRequest, poolPaginationRequest) - val poolUpdatedEntries = poolChangelogEntries.content.filter { it.changelogType == PoolChangeLogType.UPDATE } - - - val bpnA = - poolUpdatedEntries.filter { it.businessPartnerType == BusinessPartnerType.ADDRESS }.map { it.bpn } - val bpnL = poolUpdatedEntries.filter { it.businessPartnerType == BusinessPartnerType.LEGAL_ENTITY } - .map { it.bpn } - val bpnS = - poolUpdatedEntries.filter { it.businessPartnerType == BusinessPartnerType.SITE }.map { it.bpn } - - val gateOutputEntries = businessPartnerRepository.findByStageAndBpnLInOrBpnSInOrBpnAIn(StageType.Output, bpnL, bpnS, bpnA) - - val businessPartnerGenericDtoList = gateOutputEntries.map { bp -> - orchestratorMappings.toBusinessPartnerGenericDto(bp) - } - - val taskIds = createGoldenRecordTasks(TaskMode.UpdateFromPool, businessPartnerGenericDtoList).createdTasks.map { it.taskId } - - val pendingRequests = gateOutputEntries.zip(taskIds) - .map { (partner, taskId) -> - SharingStateService.PendingRequest( - SharingStateService.SharingStateIdentifierDto( - partner.externalId, - partner.parentType ?: BusinessPartnerType.GENERIC - ), taskId - ) - } - sharingStateService.setPending(pendingRequests) - - if (poolUpdatedEntries.isNotEmpty()) { - syncRecordService.setSynchronizationStart(SyncType.POOL_TO_GATE_OUTPUT) - syncRecordService.setSynchronizationSuccess(SyncType.POOL_TO_GATE_OUTPUT, poolUpdatedEntries.last().timestamp) - } - } - } diff --git a/bpdm-gate/src/main/kotlin/org/eclipse/tractusx/bpdm/gate/service/GoldenRecordTaskService.kt b/bpdm-gate/src/main/kotlin/org/eclipse/tractusx/bpdm/gate/service/GoldenRecordTaskService.kt new file mode 100644 index 000000000..6799a455c --- /dev/null +++ b/bpdm-gate/src/main/kotlin/org/eclipse/tractusx/bpdm/gate/service/GoldenRecordTaskService.kt @@ -0,0 +1,185 @@ +/******************************************************************************* + * Copyright (c) 2021,2023 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + ******************************************************************************/ + +package org.eclipse.tractusx.bpdm.gate.service + +import mu.KotlinLogging +import org.eclipse.tractusx.bpdm.common.dto.BusinessPartnerType +import org.eclipse.tractusx.bpdm.common.dto.PaginationRequest +import org.eclipse.tractusx.bpdm.common.model.StageType +import org.eclipse.tractusx.bpdm.gate.api.exception.BusinessPartnerSharingError +import org.eclipse.tractusx.bpdm.gate.api.model.SharingStateType +import org.eclipse.tractusx.bpdm.gate.config.GoldenRecordTaskConfigProperties +import org.eclipse.tractusx.bpdm.gate.entity.SyncType +import org.eclipse.tractusx.bpdm.gate.repository.SharingStateRepository +import org.eclipse.tractusx.bpdm.gate.repository.generic.BusinessPartnerRepository +import org.eclipse.tractusx.bpdm.pool.api.client.PoolApiClient +import org.eclipse.tractusx.bpdm.pool.api.model.ChangelogType +import org.eclipse.tractusx.bpdm.pool.api.model.request.ChangelogSearchRequest +import org.eclipse.tractusx.orchestrator.api.client.OrchestrationApiClient +import org.eclipse.tractusx.orchestrator.api.model.* +import org.springframework.data.domain.Pageable +import org.springframework.scheduling.annotation.Scheduled +import org.springframework.stereotype.Service +import org.springframework.transaction.annotation.Transactional + +@Service +class GoldenRecordTaskService( + private val sharingStateRepository: SharingStateRepository, + private val sharingStateService: SharingStateService, + private val businessPartnerRepository: BusinessPartnerRepository, + private val businessPartnerService: BusinessPartnerService, + private val orchestratorMappings: OrchestratorMappings, + private val orchestrationApiClient: OrchestrationApiClient, + private val properties: GoldenRecordTaskConfigProperties, + private val syncRecordService: SyncRecordService, + private val poolClient: PoolApiClient +) { + private val logger = KotlinLogging.logger { } + + @Scheduled(cron = "#{@goldenRecordTaskConfigProperties.getCreation().getFromSharingMember().getCron()}") + @Transactional + fun createTasksForReadyBusinessPartners() { + logger.info { "Started scheduled task to create golden record tasks from ready business partners" } + + val pageRequest = Pageable.ofSize(properties.creation.fromSharingMember.batchSize) + val foundStates = sharingStateRepository.findBySharingStateType(SharingStateType.Ready, pageRequest).content + + logger.debug { "Found ${foundStates.size} business partners in ready state" } + + val partners = businessPartnerRepository.findByStageAndExternalIdIn(StageType.Input, foundStates.map { it.externalId }) + + val orchestratorBusinessPartnersDto = partners.map { orchestratorMappings.toBusinessPartnerGenericDto(it) } + + val createdTasks = createGoldenRecordTasks(TaskMode.UpdateFromSharingMember, orchestratorBusinessPartnersDto) + + val pendingRequests = partners.zip(createdTasks) + .map { (partner, task) -> + SharingStateService.PendingRequest( + SharingStateService.SharingStateIdentifierDto( + partner.externalId, + BusinessPartnerType.GENERIC + ), task.taskId + ) + } + + sharingStateService.setPending(pendingRequests) + + logger.info { "Created ${createdTasks.size} new golden record tasks from ready business partners" } + } + + @Scheduled(cron = "#{@goldenRecordTaskConfigProperties.getCheck().getCron()}") + @Transactional + fun resolvePendingTasks() { + logger.info { "Started scheduled task to resolve pending golden record tasks" } + + val pageRequest = Pageable.ofSize(properties.check.batchSize) + val sharingStates = sharingStateRepository.findBySharingStateTypeAndTaskIdNotNull(SharingStateType.Pending, pageRequest).content + + logger.debug { "Found ${sharingStates.size} business partners in pending state" } + + val tasks = orchestrationApiClient.goldenRecordTasks.searchTaskStates(TaskStateRequest(sharingStates.map { it.taskId!! })).tasks + val sharingStateMap = sharingStates.associateBy { it.taskId } + + val taskStatesByResult = tasks + .map { Pair(it, sharingStateMap[it.taskId]!!) } + .groupBy { (task, _) -> task.processingState.resultState } + + val sharingStatesWithoutTasks = sharingStates.filter { it.taskId !in tasks.map { task -> task.taskId } } + + val businessPartnersToUpsert = taskStatesByResult[ResultState.Success]?.map { (task, sharingState) -> + orchestratorMappings.toBusinessPartner(task.businessPartnerResult!!, sharingState.externalId) + } ?: emptyList() + businessPartnerService.upsertBusinessPartnersOutputFromCandidates(businessPartnersToUpsert) + + val errorRequests = (taskStatesByResult[ResultState.Error]?.map { (task, sharingState) -> + SharingStateService.ErrorRequest( + SharingStateService.SharingStateIdentifierDto(sharingState.externalId, sharingState.businessPartnerType), + BusinessPartnerSharingError.SharingProcessError, + if (task.processingState.errors.isNotEmpty()) task.processingState.errors.joinToString(" // ") { it.description } else null + ) + } ?: emptyList()).toMutableList() + + errorRequests.addAll(sharingStatesWithoutTasks.map { sharingState -> + SharingStateService.ErrorRequest( + SharingStateService.SharingStateIdentifierDto(sharingState.externalId, sharingState.businessPartnerType), + BusinessPartnerSharingError.MissingTaskID, + errorMessage = "Missing Task in Orchestrator" + ) + }) + + sharingStateService.setError(errorRequests) + + logger.info { "Resolved ${businessPartnersToUpsert.size} tasks as successful and ${errorRequests.size} as errors" } + } + + @Scheduled(cron = "#{@goldenRecordTaskConfigProperties.getCreation().getFromPool().getCron()}") + @Transactional + fun createTasksForGoldenRecordUpdates() { + logger.info { "Started scheduled task to create golden record tasks from Pool updates" } + + val syncRecord = syncRecordService.getOrCreateRecord(SyncType.POOL_TO_GATE_OUTPUT) + + val pageRequest = PaginationRequest(0, properties.creation.fromPool.batchSize) + val changelogSearchRequest = ChangelogSearchRequest(syncRecord.finishedAt) + val poolChangelogEntries = poolClient.changelogs.getChangelogEntries(changelogSearchRequest, pageRequest) + + val poolUpdatedEntries = poolChangelogEntries.content.filter { it.changelogType == ChangelogType.UPDATE } + + val bpnA = + poolUpdatedEntries.filter { it.businessPartnerType == BusinessPartnerType.ADDRESS }.map { it.bpn } + val bpnL = poolUpdatedEntries.filter { it.businessPartnerType == BusinessPartnerType.LEGAL_ENTITY } + .map { it.bpn } + val bpnS = + poolUpdatedEntries.filter { it.businessPartnerType == BusinessPartnerType.SITE }.map { it.bpn } + + val gateOutputEntries = businessPartnerRepository.findByStageAndBpnLInOrBpnSInOrBpnAIn(StageType.Output, bpnL, bpnS, bpnA) + + val businessPartnerGenericDtoList = gateOutputEntries.map { bp -> + orchestratorMappings.toBusinessPartnerGenericDto(bp) + } + + val tasks = createGoldenRecordTasks(TaskMode.UpdateFromPool, businessPartnerGenericDtoList) + + val pendingRequests = gateOutputEntries.zip(tasks) + .map { (partner, task) -> + SharingStateService.PendingRequest( + SharingStateService.SharingStateIdentifierDto( + partner.externalId, + partner.parentType ?: BusinessPartnerType.GENERIC + ), task.taskId + ) + } + sharingStateService.setPending(pendingRequests) + + if (poolUpdatedEntries.isNotEmpty()) { + syncRecordService.setSynchronizationStart(SyncType.POOL_TO_GATE_OUTPUT) + syncRecordService.setSynchronizationSuccess(SyncType.POOL_TO_GATE_OUTPUT, poolUpdatedEntries.last().timestamp) + } + + logger.info { "Created ${tasks.size} new golden record tasks from pool updates" } + } + + private fun createGoldenRecordTasks(mode: TaskMode, orchestratorBusinessPartnersDto: List): List { + if (orchestratorBusinessPartnersDto.isEmpty()) + return emptyList() + + return orchestrationApiClient.goldenRecordTasks.createTasks(TaskCreateRequest(mode, orchestratorBusinessPartnersDto)).createdTasks + } +} \ No newline at end of file diff --git a/bpdm-gate/src/main/kotlin/org/eclipse/tractusx/bpdm/gate/service/SharingStateService.kt b/bpdm-gate/src/main/kotlin/org/eclipse/tractusx/bpdm/gate/service/SharingStateService.kt index ac3c52a36..74091712c 100644 --- a/bpdm-gate/src/main/kotlin/org/eclipse/tractusx/bpdm/gate/service/SharingStateService.kt +++ b/bpdm-gate/src/main/kotlin/org/eclipse/tractusx/bpdm/gate/service/SharingStateService.kt @@ -28,7 +28,9 @@ import org.eclipse.tractusx.bpdm.gate.api.exception.BusinessPartnerSharingError import org.eclipse.tractusx.bpdm.gate.api.model.SharingStateType import org.eclipse.tractusx.bpdm.gate.api.model.response.SharingStateDto import org.eclipse.tractusx.bpdm.gate.entity.SharingState +import org.eclipse.tractusx.bpdm.gate.exception.BpdmInvalidStateException import org.eclipse.tractusx.bpdm.gate.exception.BpdmInvalidStateRequestException +import org.eclipse.tractusx.bpdm.gate.exception.BpdmMissingPartnerException import org.eclipse.tractusx.bpdm.gate.repository.SharingStateRepository import org.eclipse.tractusx.bpdm.gate.repository.SharingStateRepository.Specs.byBusinessPartnerType import org.eclipse.tractusx.bpdm.gate.repository.SharingStateRepository.Specs.byExternalIdsIn @@ -74,6 +76,8 @@ class SharingStateService(private val stateRepository: SharingStateRepository) { ) SharingStateType.Initial -> setInitial(sharingState) + + SharingStateType.Ready -> setReady(sharingState) } } @@ -130,6 +134,26 @@ class SharingStateService(private val stateRepository: SharingStateRepository) { .map { (sharingState, request) -> setError(sharingState, request.errorCode, request.errorMessage, request.startTimeOverwrite) } } + fun setReady(externalIds: List): List { + val existingSharingStates = stateRepository.findByExternalIdInAndBusinessPartnerType(externalIds, BusinessPartnerType.GENERIC) + val existingIds = existingSharingStates.map { it.externalId }.toSet() + val missingIds = externalIds.minus(existingIds) + + if (missingIds.isNotEmpty()) + throw BpdmMissingPartnerException(missingIds) + + + val (correctStates, incorrectStates) = existingSharingStates.partition { + it.sharingStateType == SharingStateType.Initial + || it.sharingStateType == SharingStateType.Error + } + + if (incorrectStates.isNotEmpty()) + throw BpdmInvalidStateException(incorrectStates.map { BpdmInvalidStateException.InvalidState(it.externalId, it.sharingStateType) }) + + return correctStates.map { setReady(it) } + } + private fun setInitial(sharingState: SharingState): SharingState { sharingState.sharingStateType = SharingStateType.Initial sharingState.sharingErrorCode = null @@ -175,6 +199,20 @@ class SharingStateService(private val stateRepository: SharingStateRepository) { return stateRepository.save(sharingState) } + private fun setReady( + sharingState: SharingState + ): SharingState { + sharingState.sharingStateType = SharingStateType.Ready + sharingState.sharingErrorCode = null + sharingState.sharingErrorMessage = null + sharingState.sharingProcessStarted = null + sharingState.taskId = null + + return stateRepository.save(sharingState) + } + + + private fun getOrCreate(sharingStateIdentifiers: List): List{ val identifiersByType = sharingStateIdentifiers.groupBy { it.businessPartnerType } diff --git a/bpdm-gate/src/main/resources/application.yml b/bpdm-gate/src/main/resources/application.yml index 190272a80..a93dc798b 100644 --- a/bpdm-gate/src/main/resources/application.yml +++ b/bpdm-gate/src/main/resources/application.yml @@ -35,9 +35,18 @@ bpdm: api: upsert-limit: 100 - # Cleaning Task Job Configurations - goldenRecordTask: - pollingCron: '-' + # Configuration for golden record tasks + tasks: + creation: + fromSharingMember: + batchSize: 100 + cron: '-' + fromPool: + batchSize: 100 + cron: '-' + check: + batchSize: 100 + cron: '-' # Connection to the pool and orchestrator [No auth on default] client: diff --git a/bpdm-gate/src/test/kotlin/org/eclipse/tractusx/bpdm/gate/controller/BusinessPartnerControllerIT.kt b/bpdm-gate/src/test/kotlin/org/eclipse/tractusx/bpdm/gate/controller/BusinessPartnerControllerIT.kt index 86f170367..da18bafc8 100644 --- a/bpdm-gate/src/test/kotlin/org/eclipse/tractusx/bpdm/gate/controller/BusinessPartnerControllerIT.kt +++ b/bpdm-gate/src/test/kotlin/org/eclipse/tractusx/bpdm/gate/controller/BusinessPartnerControllerIT.kt @@ -34,10 +34,11 @@ import org.eclipse.tractusx.bpdm.gate.api.model.BusinessPartnerIdentifierDto import org.eclipse.tractusx.bpdm.gate.api.model.BusinessPartnerStateDto import org.eclipse.tractusx.bpdm.gate.api.model.SharingStateType import org.eclipse.tractusx.bpdm.gate.api.model.request.BusinessPartnerInputRequest +import org.eclipse.tractusx.bpdm.gate.api.model.request.PostSharingStateReadyRequest import org.eclipse.tractusx.bpdm.gate.api.model.response.BusinessPartnerInputDto import org.eclipse.tractusx.bpdm.gate.api.model.response.BusinessPartnerOutputDto import org.eclipse.tractusx.bpdm.gate.api.model.response.SharingStateDto -import org.eclipse.tractusx.bpdm.gate.service.BusinessPartnerService +import org.eclipse.tractusx.bpdm.gate.service.GoldenRecordTaskService import org.eclipse.tractusx.bpdm.gate.util.* import org.eclipse.tractusx.bpdm.pool.api.model.ChangelogType import org.eclipse.tractusx.bpdm.pool.api.model.response.ChangelogEntryVerboseDto @@ -66,7 +67,7 @@ class BusinessPartnerControllerIT @Autowired constructor( val testHelpers: DbTestHelpers, val gateClient: GateClient, val objectMapper: ObjectMapper, - val businessPartnerService: BusinessPartnerService + val goldenRecordTaskService: GoldenRecordTaskService ) { companion object { const val ORCHESTRATOR_CREATE_TASKS_URL = "/api/golden-record-tasks" @@ -101,14 +102,11 @@ class BusinessPartnerControllerIT @Autowired constructor( @Test fun `insert minimal business partner`() { - val upsertRequests = listOf(BusinessPartnerNonVerboseValues.bpInputRequestMinimal) val upsertResponses = gateClient.businessParters.upsertBusinessPartnersInput(upsertRequests).body!! assertUpsertResponsesMatchRequests(upsertResponses, upsertRequests) - val searchResponsePage = gateClient.businessParters.getBusinessPartnersInput(null) - assertEquals(1, searchResponsePage.totalElements) - testHelpers.assertRecursively(searchResponsePage.content).isEqualTo(upsertResponses) + assertBusinessPartnersUpsertedCorrectly(upsertResponses) } @Test @@ -119,11 +117,8 @@ class BusinessPartnerControllerIT @Autowired constructor( BusinessPartnerNonVerboseValues.bpInputRequestChina ) val upsertResponses = gateClient.businessParters.upsertBusinessPartnersInput(upsertRequests).body!! - assertUpsertResponsesMatchRequests(upsertResponses, upsertRequests) - val searchResponsePage = gateClient.businessParters.getBusinessPartnersInput(null) - assertEquals(3, searchResponsePage.totalElements) - testHelpers.assertRecursively(searchResponsePage.content).isEqualTo(upsertResponses) + assertBusinessPartnersUpsertedCorrectly(upsertResponses) } @Test @@ -133,7 +128,7 @@ class BusinessPartnerControllerIT @Autowired constructor( BusinessPartnerNonVerboseValues.bpInputRequestMinimal, BusinessPartnerNonVerboseValues.bpInputRequestChina ) - gateClient.businessParters.upsertBusinessPartnersInput(upsertRequests).body!! + upsertBusinessPartnersAndShare(upsertRequests) val externalId1 = BusinessPartnerNonVerboseValues.bpInputRequestFull.externalId val externalId2 = BusinessPartnerNonVerboseValues.bpInputRequestMinimal.externalId @@ -518,7 +513,7 @@ class BusinessPartnerControllerIT @Autowired constructor( BusinessPartnerNonVerboseValues.bpInputRequestCleaned, BusinessPartnerNonVerboseValues.bpInputRequestError ) - gateClient.businessParters.upsertBusinessPartnersInput(upsertRequests).body!! + upsertBusinessPartnersAndShare(upsertRequests) val externalId4 = BusinessPartnerNonVerboseValues.bpInputRequestCleaned.externalId val externalId5 = BusinessPartnerNonVerboseValues.bpInputRequestError.externalId @@ -555,7 +550,7 @@ class BusinessPartnerControllerIT @Autowired constructor( .isEqualTo(createdSharingState) // Call Finish Cleaning Method - businessPartnerService.finishCleaningTask() + goldenRecordTaskService.resolvePendingTasks() val cleanedSharingState = listOf( SharingStateDto( @@ -600,7 +595,7 @@ class BusinessPartnerControllerIT @Autowired constructor( BusinessPartnerNonVerboseValues.bpInputRequestError, BusinessPartnerNonVerboseValues.bpInputRequestChina, ) - gateClient.businessParters.upsertBusinessPartnersInput(upsertRequests).body!! + upsertBusinessPartnersAndShare(upsertRequests) val externalId3 = BusinessPartnerNonVerboseValues.bpInputRequestChina.externalId @@ -626,7 +621,7 @@ class BusinessPartnerControllerIT @Autowired constructor( .isEqualTo(createdSharingState) // Call Finish Cleaning Method - businessPartnerService.finishCleaningTask() + goldenRecordTaskService.resolvePendingTasks() val cleanedSharingState = listOf( SharingStateDto( @@ -661,13 +656,13 @@ class BusinessPartnerControllerIT @Autowired constructor( val upsertRequests = listOf( BusinessPartnerNonVerboseValues.bpInputRequestCleaned ) - gateClient.businessParters.upsertBusinessPartnersInput(upsertRequests).body!! + upsertBusinessPartnersAndShare(upsertRequests) val externalId4 = BusinessPartnerNonVerboseValues.bpInputRequestCleaned.externalId - businessPartnerService.finishCleaningTask() + goldenRecordTaskService.resolvePendingTasks() - businessPartnerService.requestCleaningTaskForGateOutputIfPoolBpnHasChanges() + goldenRecordTaskService.createTasksForGoldenRecordUpdates() val upsertSharingStatesRequests = listOf( SharingStateDto( @@ -703,4 +698,28 @@ class BusinessPartnerControllerIT @Autowired constructor( private fun BusinessPartnerInputRequest.fastCopy(externalId: String, shortName: String) = copy(externalId = externalId, legalEntity = legalEntity.copy(shortName = shortName)) + + private fun upsertBusinessPartnersAndShare(partners: List) { + gateClient.businessParters.upsertBusinessPartnersInput(partners) + gateClient.sharingState.postSharingStateReady(PostSharingStateReadyRequest(partners.map { it.externalId })) + goldenRecordTaskService.createTasksForReadyBusinessPartners() + } + + private fun assertBusinessPartnersUpsertedCorrectly(upsertedBusinessPartners: Collection) { + val searchResponsePage = gateClient.businessParters.getBusinessPartnersInput(null) + assertEquals(upsertedBusinessPartners.size.toLong(), searchResponsePage.totalElements) + testHelpers.assertRecursively(searchResponsePage.content).isEqualTo(upsertedBusinessPartners) + + val sharingStateResponse = gateClient.sharingState.getSharingStates(PaginationRequest(), businessPartnerType = null, externalIds = null) + assertEquals(upsertedBusinessPartners.size.toLong(), sharingStateResponse.totalElements) + Assertions.assertThat(sharingStateResponse.content).isEqualTo( + upsertedBusinessPartners.map { + SharingStateDto( + businessPartnerType = BusinessPartnerType.GENERIC, + externalId = it.externalId, + sharingStateType = SharingStateType.Initial + ) + } + ) + } } diff --git a/bpdm-gate/src/test/kotlin/org/eclipse/tractusx/bpdm/gate/controller/SharingStateControllerIT.kt b/bpdm-gate/src/test/kotlin/org/eclipse/tractusx/bpdm/gate/controller/SharingStateControllerIT.kt index 4653dc1ad..9481c6293 100644 --- a/bpdm-gate/src/test/kotlin/org/eclipse/tractusx/bpdm/gate/controller/SharingStateControllerIT.kt +++ b/bpdm-gate/src/test/kotlin/org/eclipse/tractusx/bpdm/gate/controller/SharingStateControllerIT.kt @@ -27,15 +27,19 @@ import org.eclipse.tractusx.bpdm.common.dto.PaginationRequest import org.eclipse.tractusx.bpdm.gate.api.client.GateClient import org.eclipse.tractusx.bpdm.gate.api.exception.BusinessPartnerSharingError.SharingProcessError import org.eclipse.tractusx.bpdm.gate.api.model.SharingStateType +import org.eclipse.tractusx.bpdm.gate.api.model.request.PostSharingStateReadyRequest import org.eclipse.tractusx.bpdm.gate.api.model.response.SharingStateDto +import org.eclipse.tractusx.bpdm.gate.util.BusinessPartnerNonVerboseValues import org.eclipse.tractusx.bpdm.gate.util.DbTestHelpers import org.eclipse.tractusx.bpdm.gate.util.PostgreSQLContextInitializer +import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.EnumSource import org.springframework.beans.factory.annotation.Autowired import org.springframework.boot.test.context.SpringBootTest +import org.springframework.http.HttpStatus import org.springframework.test.context.ActiveProfiles import org.springframework.test.context.ContextConfiguration import org.springframework.web.reactive.function.client.WebClientResponseException @@ -498,6 +502,103 @@ class SharingStateControllerIT @Autowired constructor( } + /* + GIVEN business partners in state Initial + WHEN set as ready + THEN business partners in state ready + */ + @Test + fun `set initial business partners ready`() { + val givenBusinessPartners = listOf( + BusinessPartnerNonVerboseValues.bpInputRequestChina, + BusinessPartnerNonVerboseValues.bpInputRequestFull, + BusinessPartnerNonVerboseValues.bpInputRequestMinimal, + ) + gateClient.businessParters.upsertBusinessPartnersInput(givenBusinessPartners) + + gateClient.sharingState.postSharingStateReady(PostSharingStateReadyRequest(givenBusinessPartners.map { it.externalId })) + + + val sharingStateResponse = + gateClient.sharingState.getSharingStates(PaginationRequest(), businessPartnerType = null, externalIds = givenBusinessPartners.map { it.externalId }) + + assertThat(sharingStateResponse.content).isEqualTo(givenBusinessPartners.map { + SharingStateDto( + businessPartnerType = BusinessPartnerType.GENERIC, + externalId = it.externalId, + sharingStateType = SharingStateType.Ready + ) + }) + } + + /* + GIVEN business partners in state Error + WHEN set as ready + THEN business partners in state ready + */ + @Test + fun `set error business partners ready`() { + val givenBusinessPartners = listOf( + BusinessPartnerNonVerboseValues.bpInputRequestChina, + BusinessPartnerNonVerboseValues.bpInputRequestFull, + BusinessPartnerNonVerboseValues.bpInputRequestMinimal, + ) + gateClient.businessParters.upsertBusinessPartnersInput(givenBusinessPartners) + + val givenErrorStates = givenBusinessPartners.map { + SharingStateDto( + businessPartnerType = BusinessPartnerType.GENERIC, + externalId = it.externalId, + sharingStateType = SharingStateType.Error, + sharingErrorCode = SharingProcessError, + sharingErrorMessage = "message" + ) + } + + givenErrorStates.forEach { gateClient.sharingState.upsertSharingState(it) } + + + gateClient.sharingState.postSharingStateReady(PostSharingStateReadyRequest(givenBusinessPartners.map { it.externalId })) + + + val sharingStateResponse = + gateClient.sharingState.getSharingStates(PaginationRequest(), businessPartnerType = null, externalIds = givenBusinessPartners.map { it.externalId }) + + assertThat(sharingStateResponse.content).isEqualTo(givenBusinessPartners.map { + SharingStateDto( + businessPartnerType = BusinessPartnerType.GENERIC, + externalId = it.externalId, + sharingStateType = SharingStateType.Ready + ) + }) + } + + /* + GIVEN business partners in invalid state to be shared + WHEN set as ready + THEN return error response + */ + @Test + fun `throw error response on business partners ready in invalid state`() { + val givenBusinessPartner = BusinessPartnerNonVerboseValues.bpInputRequestChina + gateClient.businessParters.upsertBusinessPartnersInput(listOf(givenBusinessPartner)) + + val givenInvalidState = SharingStateDto( + businessPartnerType = BusinessPartnerType.GENERIC, + externalId = givenBusinessPartner.externalId, + sharingStateType = SharingStateType.Ready + ) + + gateClient.sharingState.upsertSharingState(givenInvalidState) + + try { + gateClient.sharingState.postSharingStateReady(PostSharingStateReadyRequest(listOf(givenBusinessPartner.externalId))) + + } catch (e: WebClientResponseException) { + Assertions.assertEquals(HttpStatus.BAD_REQUEST, e.statusCode) + } + } + /** * Insert Sharing State only with required fields filled */