Skip to content

Commit

Permalink
feat(gate):Request Cleaning for BPN Update
Browse files Browse the repository at this point in the history
  • Loading branch information
cezaralexandremorais committed Dec 7, 2023
1 parent 265ef60 commit 1864194
Show file tree
Hide file tree
Showing 15 changed files with 426 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,14 @@ abstract class BaseSyncRecordService<SYNC_TYPE : Enum<*>, SYNC_RECORD : BaseSync
}

@Transactional(isolation = Isolation.SERIALIZABLE)
open fun setSynchronizationSuccess(type: SYNC_TYPE): SYNC_RECORD {
open fun setSynchronizationSuccess(type: SYNC_TYPE, finishedAt: Instant? = null): SYNC_RECORD {
val record = getOrCreateRecord(type)
if (record.status != SyncStatus.RUNNING)
throw BpdmSyncStateException("Synchronization of type ${record.type} can't switch from state ${record.status} to ${SyncStatus.SUCCESS}.")

logger.debug { "Set sync of type ${record.type} to status ${SyncStatus.SUCCESS}" }

record.finishedAt = Instant.now().truncatedTo(ChronoUnit.MICROS)
record.finishedAt = finishedAt ?: Instant.now().truncatedTo(ChronoUnit.MICROS)
record.progress = 1f
record.status = SyncStatus.SUCCESS
record.errorDetails = null
Expand Down
4 changes: 4 additions & 0 deletions bpdm-gate/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@
<groupId>${project.groupId}</groupId>
<artifactId>bpdm-gate-api</artifactId>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>bpdm-pool-api</artifactId>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-stdlib</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.eclipse.tractusx.bpdm.gate.config

import org.eclipse.tractusx.bpdm.pool.api.client.PoolApiClient
import org.eclipse.tractusx.bpdm.pool.api.client.PoolClientImpl
import org.eclipse.tractusx.orchestrator.api.client.OrchestrationApiClient
import org.eclipse.tractusx.orchestrator.api.client.OrchestrationApiClientImpl
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
Expand All @@ -37,7 +39,7 @@ import java.util.function.Consumer


@Configuration
class OrchestratorClientConfig {
class ClientsConfig {

// Orchestrator-Client without authentication
@Bean
Expand All @@ -51,6 +53,22 @@ class OrchestratorClientConfig {
return OrchestrationApiClientImpl { webClientBuilder(url).build() }
}

@Bean
@ConditionalOnProperty(
value = ["bpdm.gate-security.pool-security-enabled"],
havingValue = "false",
matchIfMissing = true
)
fun poolClientNoAuth(poolConfigProperties: PoolConfigProperties): PoolApiClient {
val url = poolConfigProperties.baseUrl
return PoolClientImpl {
WebClient.builder()
.baseUrl(url)
.defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE).build()
}
}



@Bean
@ConditionalOnProperty(
Expand All @@ -72,6 +90,25 @@ class OrchestratorClientConfig {
}
}

@Bean
@ConditionalOnProperty(
value = ["bpdm.pool.security-enabled"],
havingValue = "true"
)
fun poolClientWithAuth(
poolConfigProperties: PoolConfigProperties,
clientRegistrationRepository: ClientRegistrationRepository,
authorizedClientService: OAuth2AuthorizedClientService
): PoolApiClient {
val url = poolConfigProperties.baseUrl
val clientRegistrationId = poolConfigProperties.oauth2ClientRegistration
?: throw IllegalArgumentException("bpdm.pool.oauth2-client-registration is required if bpdm.pool.security-enabled is set")
return PoolClientImpl {
webClientBuilder(url)
.apply(oauth2Configuration(clientRegistrationRepository, authorizedClientService, clientRegistrationId))
.build()
}
}

private fun webClientBuilder(url: String) =
WebClient.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,8 @@ import org.springframework.boot.context.properties.ConfigurationProperties

@ConfigurationProperties(prefix = "bpdm.client.pool")
data class PoolConfigProperties(
val baseUrl: String = "http://localhost:8080/api/catena",
val baseUrl: String = "http://localhost:8080",
val searchChangelogPageSize: Int = 100,
val securityEnabled: Boolean = false,
val oauth2ClientRegistration: String?
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*******************************************************************************
* Copyright (c) 2021,2023 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
* SPDX-License-Identifier: Apache-2.0
******************************************************************************/

package org.eclipse.tractusx.bpdm.gate.entity

import jakarta.persistence.*
import org.eclipse.tractusx.bpdm.common.model.BaseEntity
import org.eclipse.tractusx.bpdm.common.model.BaseSyncRecord
import org.eclipse.tractusx.bpdm.common.model.SyncStatus
import java.time.Instant

@Entity
@Table(name = "sync_records")
class SyncRecord(
@Enumerated(EnumType.STRING)
@Column(name = "type", nullable = false, unique = true)
override var type: SyncType,

@Enumerated(EnumType.STRING)
@Column(name = "status", nullable = false)
override var status: SyncStatus,

@Column(name = "from_time", nullable = false)
override var fromTime: Instant,

@Column(name = "progress", nullable = false)
override var progress: Float = 0f,

@Column(name = "count", nullable = false)
override var count: Int = 0,

@Column(name = "status_details")
override var errorDetails: String? = null,

@Column(name = "save_state")
override var errorSave: String? = null,

@Column(name = "started_at")
override var startedAt: Instant? = null,

@Column(name = "finished_at")
override var finishedAt: Instant? = null,

) : BaseEntity(), BaseSyncRecord<SyncType>
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*******************************************************************************
* Copyright (c) 2021,2023 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
* SPDX-License-Identifier: Apache-2.0
******************************************************************************/

package org.eclipse.tractusx.bpdm.gate.entity

enum class SyncType {
POOL_TO_GATE_OUTPUT
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*******************************************************************************
* Copyright (c) 2021,2023 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
* SPDX-License-Identifier: Apache-2.0
******************************************************************************/

package org.eclipse.tractusx.bpdm.gate.repository

import org.eclipse.tractusx.bpdm.gate.entity.SyncRecord
import org.eclipse.tractusx.bpdm.gate.entity.SyncType
import org.springframework.data.repository.CrudRepository

interface SyncRecordRepository : CrudRepository<SyncRecord, Long> {

fun findByType(type: SyncType): SyncRecord?
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,12 @@ import org.eclipse.tractusx.bpdm.gate.entity.generic.BusinessPartner
import org.springframework.data.domain.Page
import org.springframework.data.domain.Pageable
import org.springframework.data.jpa.repository.JpaRepository
import org.springframework.data.jpa.repository.Query
import org.springframework.data.repository.CrudRepository
import org.springframework.data.repository.query.Param
import org.springframework.stereotype.Repository


@Repository
interface BusinessPartnerRepository : JpaRepository<BusinessPartner, Long>, CrudRepository<BusinessPartner, Long> {

Expand All @@ -35,4 +38,13 @@ interface BusinessPartnerRepository : JpaRepository<BusinessPartner, Long>, Crud
fun findByStageAndExternalIdIn(stage: StageType, externalId: Collection<String>, pageable: Pageable): Page<BusinessPartner>

fun findByStage(stage: StageType, pageable: Pageable): Page<BusinessPartner>

@Query("SELECT e FROM BusinessPartner e WHERE e.stage = :stage AND (e.bpnL IN :bpnL OR e.bpnS IN :bpnS OR e.bpnA IN :bpnA)")
fun findByStageAndBpnLInOrBpnSInOrBpnAIn(
@Param("stage") stage: StageType?,
@Param("bpnL") bpnLList: List<String?>?,
@Param("bpnS") bpnSList: List<String?>?,
@Param("bpnA") bpnAList: List<String?>?
): Set<BusinessPartner>

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.eclipse.tractusx.bpdm.gate.service

import org.eclipse.tractusx.bpdm.common.dto.BusinessPartnerType
import org.eclipse.tractusx.bpdm.common.dto.GeoCoordinateDto
import org.eclipse.tractusx.bpdm.common.exception.BpdmNullMappingException
import org.eclipse.tractusx.bpdm.common.model.StageType
Expand Down Expand Up @@ -121,7 +122,7 @@ class BusinessPartnerMappings {
bpnS = dto.siteBpn,
bpnA = dto.addressBpn,
parentId = null,
parentType = null,
parentType = BusinessPartnerType.GENERIC,
postalAddress = toPostalAddress(dto.postalAddress)
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package org.eclipse.tractusx.bpdm.gate.service

import mu.KotlinLogging
import org.eclipse.tractusx.bpdm.common.dto.BusinessPartnerType
import org.eclipse.tractusx.bpdm.common.dto.request.PaginationRequest
import org.eclipse.tractusx.bpdm.common.dto.response.PageDto
import org.eclipse.tractusx.bpdm.common.model.StageType
import org.eclipse.tractusx.bpdm.common.service.toPageDto
Expand All @@ -33,19 +34,24 @@ import org.eclipse.tractusx.bpdm.gate.api.model.request.BusinessPartnerInputRequ
import org.eclipse.tractusx.bpdm.gate.api.model.request.BusinessPartnerOutputRequest
import org.eclipse.tractusx.bpdm.gate.api.model.response.BusinessPartnerInputDto
import org.eclipse.tractusx.bpdm.gate.api.model.response.BusinessPartnerOutputDto
import org.eclipse.tractusx.bpdm.gate.config.PoolConfigProperties
import org.eclipse.tractusx.bpdm.gate.entity.ChangelogEntry
import org.eclipse.tractusx.bpdm.gate.entity.SyncType
import org.eclipse.tractusx.bpdm.gate.entity.generic.*
import org.eclipse.tractusx.bpdm.gate.exception.BpdmMissingStageException
import org.eclipse.tractusx.bpdm.gate.repository.ChangelogRepository
import org.eclipse.tractusx.bpdm.gate.repository.SharingStateRepository
import org.eclipse.tractusx.bpdm.gate.repository.generic.BusinessPartnerRepository
import org.eclipse.tractusx.bpdm.pool.api.client.PoolApiClient
import org.eclipse.tractusx.bpdm.pool.api.model.request.ChangelogSearchRequest
import org.eclipse.tractusx.orchestrator.api.client.OrchestrationApiClient
import org.eclipse.tractusx.orchestrator.api.model.*
import org.springframework.data.domain.Page
import org.springframework.data.domain.PageRequest
import org.springframework.scheduling.annotation.Scheduled
import org.springframework.stereotype.Service
import org.springframework.transaction.annotation.Transactional
import org.eclipse.tractusx.bpdm.pool.api.model.ChangelogType as PoolChangeLogType

@Service
class BusinessPartnerService(
Expand All @@ -55,7 +61,10 @@ class BusinessPartnerService(
private val changelogRepository: ChangelogRepository,
private val orchestrationApiClient: OrchestrationApiClient,
private val orchestratorMappings: OrchestratorMappings,
private val sharingStateRepository: SharingStateRepository
private val sharingStateRepository: SharingStateRepository,
private val poolClient: PoolApiClient,
private val syncRecordService: SyncRecordService,
private val poolConfigProperties: PoolConfigProperties
) {
private val logger = KotlinLogging.logger { }

Expand Down Expand Up @@ -98,7 +107,7 @@ class BusinessPartnerService(
val partners = resolutionResults.map { it.businessPartner }
val orchestratorBusinessPartnersDto = resolutionResults.map { orchestratorMappings.toBusinessPartnerGenericDto(it.businessPartner) }

val taskIds = createGoldenRecordTasks(orchestratorBusinessPartnersDto).createdTasks.map { it.taskId }
val taskIds = createGoldenRecordTasks(TaskMode.UpdateFromSharingMember, orchestratorBusinessPartnersDto).createdTasks.map { it.taskId }

val pendingRequests = partners.zip(taskIds)
.map { (partner, taskId) ->
Expand Down Expand Up @@ -293,10 +302,10 @@ class BusinessPartnerService(
}


private fun createGoldenRecordTasks(orchestratorBusinessPartnersDto: List<BusinessPartnerGenericDto>): TaskCreateResponse {
private fun createGoldenRecordTasks(mode: TaskMode, orchestratorBusinessPartnersDto: List<BusinessPartnerGenericDto>): TaskCreateResponse {
return orchestrationApiClient.goldenRecordTasks.createTasks(
TaskCreateRequest(
TaskMode.UpdateFromSharingMember, orchestratorBusinessPartnersDto
mode, orchestratorBusinessPartnersDto
)
)
}
Expand Down Expand Up @@ -340,4 +349,51 @@ class BusinessPartnerService(

}

@Scheduled(cron = "\${cleaningService.pollingCron:-}", zone = "UTC")
@Transactional
fun requestCleaningTaskForGateOutputIfPoolBpnHasChanges() {

val poolPaginationRequest = PaginationRequest(0, poolConfigProperties.searchChangelogPageSize)

val syncRecord = syncRecordService.getOrCreateRecord(SyncType.POOL_TO_GATE_OUTPUT)

val changelogSearchRequest = ChangelogSearchRequest(syncRecord.finishedAt)


val poolChangelogEntries = poolClient.changelogs.getChangelogEntries(changelogSearchRequest, poolPaginationRequest)
val poolUpdatedEntries = poolChangelogEntries.content.filter { it.changelogType == PoolChangeLogType.UPDATE }


val bpnA =
poolUpdatedEntries.filter { it.businessPartnerType == BusinessPartnerType.ADDRESS }.map { it.bpn }
val bpnL = poolUpdatedEntries.filter { it.businessPartnerType == BusinessPartnerType.LEGAL_ENTITY }
.map { it.bpn }
val bpnS =
poolUpdatedEntries.filter { it.businessPartnerType == BusinessPartnerType.SITE }.map { it.bpn }

val gateOutputEntries = businessPartnerRepository.findByStageAndBpnLInOrBpnSInOrBpnAIn(StageType.Output, bpnL, bpnS, bpnA)

val businessPartnerGenericDtoList = gateOutputEntries.map { bp ->
orchestratorMappings.toBusinessPartnerGenericDto(bp)
}

val taskIds = createGoldenRecordTasks(TaskMode.UpdateFromPool, businessPartnerGenericDtoList).createdTasks.map { it.taskId }

val pendingRequests = gateOutputEntries.zip(taskIds)
.map { (partner, taskId) ->
SharingStateService.PendingRequest(
SharingStateService.SharingStateIdentifierDto(
partner.externalId,
partner.parentType ?: BusinessPartnerType.GENERIC
), taskId
)
}
sharingStateService.setPending(pendingRequests)

if (poolUpdatedEntries.isNotEmpty()) {
syncRecordService.setSynchronizationStart(SyncType.POOL_TO_GATE_OUTPUT)
syncRecordService.setSynchronizationSuccess(SyncType.POOL_TO_GATE_OUTPUT, poolUpdatedEntries.last().timestamp)
}
}

}
Loading

0 comments on commit 1864194

Please sign in to comment.