Skip to content

Commit

Permalink
Merge pull request #1029 from eclipse-tractusx/fix/orchestrator-timeouts
Browse files Browse the repository at this point in the history
fix(bpdm-orchestrator): introduced pagination for handling pending and retention timeouts
  • Loading branch information
SujitMBRDI authored Aug 22, 2024
2 parents b03c769 + 066617e commit 25e4c9c
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 38 deletions.
4 changes: 2 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ For changes to the BPDM Helm charts please consult the [changelog](charts/bpdm/C

- BPDM Gate: Fixed Gate not resending business partner data to the golden record process on error sharing state when member sends the exact same business partner again

### Changed

- BPDM Pool & Gate: Reduce standard batch size for golden record task processing ([#1032](https://github.com/eclipse-tractusx/bpdm/pull/1032))

- BPDM Orchestrator: Fix possible out-of-memory exception during the execution of large volumes of tasks ([#1029](https://github.com/eclipse-tractusx/bpdm/pull/1029))

## [6.1.0] - [2024-07-15]

### Added
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ interface GoldenRecordTaskRepository : CrudRepository<GoldenRecordTaskDb, Long>,
@Query("SELECT task from GoldenRecordTaskDb task WHERE task.processingState.step = :step AND task.processingState.stepState = :stepState")
fun findByStepAndStepState(step: TaskStep, stepState: StepState, pageable: Pageable): Page<GoldenRecordTaskDb>

fun findByProcessingStatePendingTimeoutBefore(time: DbTimestamp): Set<GoldenRecordTaskDb>
fun findByProcessingStatePendingTimeoutBefore(time: DbTimestamp, pageable: Pageable): Page<GoldenRecordTaskDb>

fun findByProcessingStateRetentionTimeoutBefore(time: DbTimestamp): Set<GoldenRecordTaskDb>
fun findByProcessingStateRetentionTimeoutBefore(time: DbTimestamp, pageable: Pageable): Page<GoldenRecordTaskDb>
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,17 @@ package org.eclipse.tractusx.bpdm.orchestrator.service

import mu.KotlinLogging
import org.eclipse.tractusx.bpdm.orchestrator.config.TaskConfigProperties
import org.eclipse.tractusx.bpdm.orchestrator.entity.GateRecordDb
import org.eclipse.tractusx.bpdm.orchestrator.entity.DbTimestamp
import org.eclipse.tractusx.bpdm.orchestrator.entity.GateRecordDb
import org.eclipse.tractusx.bpdm.orchestrator.entity.GoldenRecordTaskDb
import org.eclipse.tractusx.bpdm.orchestrator.exception.BpdmRecordNotFoundException
import org.eclipse.tractusx.bpdm.orchestrator.exception.BpdmTaskNotFoundException
import org.eclipse.tractusx.bpdm.orchestrator.repository.GateRecordRepository
import org.eclipse.tractusx.bpdm.orchestrator.repository.GoldenRecordTaskRepository
import org.eclipse.tractusx.orchestrator.api.model.*
import org.springframework.data.domain.Page
import org.springframework.data.domain.PageRequest
import org.springframework.data.domain.Pageable
import org.springframework.scheduling.annotation.Scheduled
import org.springframework.stereotype.Service
import org.springframework.transaction.annotation.Transactional
import java.time.Instant
Expand Down Expand Up @@ -78,7 +79,13 @@ class GoldenRecordTaskService(
val pendingTimeout = reservedTasks.minOfOrNull { calculateTaskPendingTimeout(it) } ?: now

return reservedTasks
.map { task -> TaskStepReservationEntryDto(task.uuid.toString(), task.gateRecord.publicId.toString(), responseMapper.toBusinessPartnerResult(task.businessPartner)) }
.map { task ->
TaskStepReservationEntryDto(
task.uuid.toString(),
task.gateRecord.publicId.toString(),
responseMapper.toBusinessPartnerResult(task.businessPartner)
)
}
.let { reservations -> TaskStepReservationResponse(reservations, pendingTimeout) }
}

Expand All @@ -105,40 +112,49 @@ class GoldenRecordTaskService(
}
}

@Scheduled(cron = "\${bpdm.task.timeoutCheckCron}")
@Transactional
fun checkForTimeouts() {
try {
logger.debug { "Checking for timeouts" }
checkForPendingTimeouts()
checkForRetentionTimeouts()
} catch (err: RuntimeException) {
logger.error(err) { "Error checking for timeouts" }
}
fun processPendingTimeouts(pageSize: Int): PaginationInfo {
return batchProcessTasks(pageSize,
fetchPage = { pageable -> taskRepository.findByProcessingStatePendingTimeoutBefore(DbTimestamp.now(), pageable) },
processTask = { task ->
logger.info { "Setting timeout for task ${task.uuid} after reaching pending timeout" }
goldenRecordTaskStateMachine.doResolveTaskToTimeout(task)
}
)
}

private fun checkForPendingTimeouts() {
taskRepository.findByProcessingStatePendingTimeoutBefore(DbTimestamp.now())
.forEach {
try {
logger.info { "Setting timeout for task ${it.uuid} after reaching pending timeout" }
goldenRecordTaskStateMachine.doResolveTaskToTimeout(it)
} catch (err: RuntimeException) {
logger.error(err) { "Error handling pending timeout for task ${it.uuid}" }
}
@Transactional
fun processRetentionTimeouts(pageSize: Int): PaginationInfo {
return batchProcessTasks(pageSize,
fetchPage = { pageable -> taskRepository.findByProcessingStateRetentionTimeoutBefore(DbTimestamp.now(), pageable) },
processTask = { task ->
logger.info { "Removing task ${task.uuid} after reaching retention timeout" }
taskRepository.delete(task)
}
)
}

private fun checkForRetentionTimeouts() {
taskRepository.findByProcessingStateRetentionTimeoutBefore(DbTimestamp.now())
.forEach {
try {
logger.info { "Removing task ${it.uuid} after reaching retention timeout" }
taskRepository.delete(it)
} catch (err: RuntimeException) {
logger.error(err) { "Error handling retention timeout for task ${it.uuid}" }
}
private fun batchProcessTasks(
pageSize: Int,
fetchPage: (Pageable) -> Page<GoldenRecordTaskDb>,
processTask: (GoldenRecordTaskDb) -> Unit
): PaginationInfo {
val pageable: Pageable = PageRequest.of(0, pageSize)
val page = fetchPage(pageable)
var hasProcessedTasks = false
var processedTaskCount = 0

page.forEach { task ->
try {
processTask(task)
hasProcessedTasks = true
processedTaskCount++ // Increment on successful processing
} catch (err: RuntimeException) {
logger.error(err) { "Error processing timeout for task ${task.uuid}" }
}
}

return PaginationInfo(hasProcessedTasks, page.hasNext(), processedTaskCount)
}

private fun calculateTaskPendingTimeout(task: GoldenRecordTaskDb) =
Expand All @@ -154,20 +170,28 @@ class GoldenRecordTaskService(
throw BpdmTaskNotFoundException(uuidString)
}

private fun getOrCreateGateRecords(requests: List<TaskCreateRequestEntry>): List<GateRecordDb>{
val privateIds = requests.map { request -> request.recordId?.let { toUUID(it) }}
private fun getOrCreateGateRecords(requests: List<TaskCreateRequestEntry>): List<GateRecordDb> {
val privateIds = requests.map { request -> request.recordId?.let { toUUID(it) } }
val notNullPrivateIds = privateIds.filterNotNull()

val foundRecords = gateRecordRepository.findByPrivateIdIn(notNullPrivateIds.toSet())
val foundRecordsByPrivateId = foundRecords.associateBy { it.privateId }
val requestedNotFoundRecords = notNullPrivateIds.minus(foundRecordsByPrivateId.keys)

if(requestedNotFoundRecords.isNotEmpty())
if (requestedNotFoundRecords.isNotEmpty())
throw BpdmRecordNotFoundException(requestedNotFoundRecords)

return privateIds.map { privateId ->
return privateIds.map { privateId ->
val gateRecord = privateId?.let { foundRecordsByPrivateId[it] } ?: GateRecordDb(publicId = UUID.randomUUID(), privateId = UUID.randomUUID())
gateRecordRepository.save(gateRecord)
}
}
}

data class PaginationInfo(
val hasProcessedTasks: Boolean,
val hasNextPage: Boolean,
val processedTaskCount: Int
) {
fun countProcessedTasks(): Int = processedTaskCount
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*******************************************************************************
* Copyright (c) 2021,2024 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
* SPDX-License-Identifier: Apache-2.0
******************************************************************************/

package org.eclipse.tractusx.bpdm.orchestrator.service

import jakarta.persistence.EntityManager
import mu.KotlinLogging
import org.springframework.scheduling.annotation.Scheduled
import org.springframework.stereotype.Service

@Service
class TimeoutProcessBatchService(
private val goldenRecordTaskService: GoldenRecordTaskService,
private val entityManager: EntityManager
) {
private val logger = KotlinLogging.logger { }

@Scheduled(cron = "\${bpdm.task.timeoutCheckCron}")
fun processForTimeouts() {
try {
// Track the total number of processed tasks
var totalProcessedTasks = 0
logger.debug { "Checking for timeouts" }
// Process pending timeouts
totalProcessedTasks += processTimeouts(goldenRecordTaskService::processPendingTimeouts)
// Process retention timeouts
totalProcessedTasks += processTimeouts(goldenRecordTaskService::processRetentionTimeouts)
logger.info { "Finished processing timeouts. Total processed tasks: $totalProcessedTasks" }
} catch (err: RuntimeException) {
logger.error(err) { "Error checking for timeouts" }
}
}


private fun processTimeouts(processFunction: (Int) -> PaginationInfo): Int {
val pageSize = 1000 // Adjust the page size based on memory constraints
var processedTasks = 0
do {
val paginationInfo = processFunction(pageSize)
processedTasks += paginationInfo.countProcessedTasks()
entityManager.clear() // Clear the persistence context to free memory
} while (paginationInfo.hasNextPage)
return processedTasks
}
}

0 comments on commit 25e4c9c

Please sign in to comment.