From 066617e90c015f0c5a2d9d3c9318c9ace89fd8e9 Mon Sep 17 00:00:00 2001 From: SujitMBRDI Date: Mon, 12 Aug 2024 16:31:11 +0530 Subject: [PATCH] fix(bpdm-orchestrator): introduced pagination for handling pending and retention timeouts fix(bpdm-orchestrator): updated changelog fix(bpdm-orchestrator): introduced pagination for handling pending and retention timeouts fix(bpdm-orchestrator): introduced pagination for handling pending and retention timeouts --- CHANGELOG.md | 4 +- .../repository/GoldenRecordTaskRepository.kt | 4 +- .../service/GoldenRecordTaskService.kt | 92 ++++++++++++------- .../service/TimeoutProcessBatchService.kt | 61 ++++++++++++ 4 files changed, 123 insertions(+), 38 deletions(-) create mode 100644 bpdm-orchestrator/src/main/kotlin/org/eclipse/tractusx/bpdm/orchestrator/service/TimeoutProcessBatchService.kt diff --git a/CHANGELOG.md b/CHANGELOG.md index 0d1f5ece6..4073681e3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,10 +20,10 @@ For changes to the BPDM Helm charts please consult the [changelog](charts/bpdm/C - BPDM Gate: Fixed Gate not resending business partner data to the golden record process on error sharing state when member sends the exact same business partner again -### Changed - - BPDM Pool & Gate: Reduce standard batch size for golden record task processing ([#1032](https://github.com/eclipse-tractusx/bpdm/pull/1032)) +- BPDM Orchestrator: Fix possible out-of-memory exception during the execution of large volumes of tasks ([#1029](https://github.com/eclipse-tractusx/bpdm/pull/1029)) + ## [6.1.0] - [2024-07-15] ### Added diff --git a/bpdm-orchestrator/src/main/kotlin/org/eclipse/tractusx/bpdm/orchestrator/repository/GoldenRecordTaskRepository.kt b/bpdm-orchestrator/src/main/kotlin/org/eclipse/tractusx/bpdm/orchestrator/repository/GoldenRecordTaskRepository.kt index df86b6ec5..caf66c2f7 100644 --- a/bpdm-orchestrator/src/main/kotlin/org/eclipse/tractusx/bpdm/orchestrator/repository/GoldenRecordTaskRepository.kt +++ b/bpdm-orchestrator/src/main/kotlin/org/eclipse/tractusx/bpdm/orchestrator/repository/GoldenRecordTaskRepository.kt @@ -39,7 +39,7 @@ interface GoldenRecordTaskRepository : CrudRepository, @Query("SELECT task from GoldenRecordTaskDb task WHERE task.processingState.step = :step AND task.processingState.stepState = :stepState") fun findByStepAndStepState(step: TaskStep, stepState: StepState, pageable: Pageable): Page - fun findByProcessingStatePendingTimeoutBefore(time: DbTimestamp): Set + fun findByProcessingStatePendingTimeoutBefore(time: DbTimestamp, pageable: Pageable): Page - fun findByProcessingStateRetentionTimeoutBefore(time: DbTimestamp): Set + fun findByProcessingStateRetentionTimeoutBefore(time: DbTimestamp, pageable: Pageable): Page } \ No newline at end of file diff --git a/bpdm-orchestrator/src/main/kotlin/org/eclipse/tractusx/bpdm/orchestrator/service/GoldenRecordTaskService.kt b/bpdm-orchestrator/src/main/kotlin/org/eclipse/tractusx/bpdm/orchestrator/service/GoldenRecordTaskService.kt index f55ee8782..c7da6abf3 100644 --- a/bpdm-orchestrator/src/main/kotlin/org/eclipse/tractusx/bpdm/orchestrator/service/GoldenRecordTaskService.kt +++ b/bpdm-orchestrator/src/main/kotlin/org/eclipse/tractusx/bpdm/orchestrator/service/GoldenRecordTaskService.kt @@ -21,16 +21,17 @@ package org.eclipse.tractusx.bpdm.orchestrator.service import mu.KotlinLogging import org.eclipse.tractusx.bpdm.orchestrator.config.TaskConfigProperties -import org.eclipse.tractusx.bpdm.orchestrator.entity.GateRecordDb import org.eclipse.tractusx.bpdm.orchestrator.entity.DbTimestamp +import org.eclipse.tractusx.bpdm.orchestrator.entity.GateRecordDb import org.eclipse.tractusx.bpdm.orchestrator.entity.GoldenRecordTaskDb import org.eclipse.tractusx.bpdm.orchestrator.exception.BpdmRecordNotFoundException import org.eclipse.tractusx.bpdm.orchestrator.exception.BpdmTaskNotFoundException import org.eclipse.tractusx.bpdm.orchestrator.repository.GateRecordRepository import org.eclipse.tractusx.bpdm.orchestrator.repository.GoldenRecordTaskRepository import org.eclipse.tractusx.orchestrator.api.model.* +import org.springframework.data.domain.Page +import org.springframework.data.domain.PageRequest import org.springframework.data.domain.Pageable -import org.springframework.scheduling.annotation.Scheduled import org.springframework.stereotype.Service import org.springframework.transaction.annotation.Transactional import java.time.Instant @@ -78,7 +79,13 @@ class GoldenRecordTaskService( val pendingTimeout = reservedTasks.minOfOrNull { calculateTaskPendingTimeout(it) } ?: now return reservedTasks - .map { task -> TaskStepReservationEntryDto(task.uuid.toString(), task.gateRecord.publicId.toString(), responseMapper.toBusinessPartnerResult(task.businessPartner)) } + .map { task -> + TaskStepReservationEntryDto( + task.uuid.toString(), + task.gateRecord.publicId.toString(), + responseMapper.toBusinessPartnerResult(task.businessPartner) + ) + } .let { reservations -> TaskStepReservationResponse(reservations, pendingTimeout) } } @@ -105,40 +112,49 @@ class GoldenRecordTaskService( } } - @Scheduled(cron = "\${bpdm.task.timeoutCheckCron}") @Transactional - fun checkForTimeouts() { - try { - logger.debug { "Checking for timeouts" } - checkForPendingTimeouts() - checkForRetentionTimeouts() - } catch (err: RuntimeException) { - logger.error(err) { "Error checking for timeouts" } - } + fun processPendingTimeouts(pageSize: Int): PaginationInfo { + return batchProcessTasks(pageSize, + fetchPage = { pageable -> taskRepository.findByProcessingStatePendingTimeoutBefore(DbTimestamp.now(), pageable) }, + processTask = { task -> + logger.info { "Setting timeout for task ${task.uuid} after reaching pending timeout" } + goldenRecordTaskStateMachine.doResolveTaskToTimeout(task) + } + ) } - private fun checkForPendingTimeouts() { - taskRepository.findByProcessingStatePendingTimeoutBefore(DbTimestamp.now()) - .forEach { - try { - logger.info { "Setting timeout for task ${it.uuid} after reaching pending timeout" } - goldenRecordTaskStateMachine.doResolveTaskToTimeout(it) - } catch (err: RuntimeException) { - logger.error(err) { "Error handling pending timeout for task ${it.uuid}" } - } + @Transactional + fun processRetentionTimeouts(pageSize: Int): PaginationInfo { + return batchProcessTasks(pageSize, + fetchPage = { pageable -> taskRepository.findByProcessingStateRetentionTimeoutBefore(DbTimestamp.now(), pageable) }, + processTask = { task -> + logger.info { "Removing task ${task.uuid} after reaching retention timeout" } + taskRepository.delete(task) } + ) } - private fun checkForRetentionTimeouts() { - taskRepository.findByProcessingStateRetentionTimeoutBefore(DbTimestamp.now()) - .forEach { - try { - logger.info { "Removing task ${it.uuid} after reaching retention timeout" } - taskRepository.delete(it) - } catch (err: RuntimeException) { - logger.error(err) { "Error handling retention timeout for task ${it.uuid}" } - } + private fun batchProcessTasks( + pageSize: Int, + fetchPage: (Pageable) -> Page, + processTask: (GoldenRecordTaskDb) -> Unit + ): PaginationInfo { + val pageable: Pageable = PageRequest.of(0, pageSize) + val page = fetchPage(pageable) + var hasProcessedTasks = false + var processedTaskCount = 0 + + page.forEach { task -> + try { + processTask(task) + hasProcessedTasks = true + processedTaskCount++ // Increment on successful processing + } catch (err: RuntimeException) { + logger.error(err) { "Error processing timeout for task ${task.uuid}" } } + } + + return PaginationInfo(hasProcessedTasks, page.hasNext(), processedTaskCount) } private fun calculateTaskPendingTimeout(task: GoldenRecordTaskDb) = @@ -154,20 +170,28 @@ class GoldenRecordTaskService( throw BpdmTaskNotFoundException(uuidString) } - private fun getOrCreateGateRecords(requests: List): List{ - val privateIds = requests.map { request -> request.recordId?.let { toUUID(it) }} + private fun getOrCreateGateRecords(requests: List): List { + val privateIds = requests.map { request -> request.recordId?.let { toUUID(it) } } val notNullPrivateIds = privateIds.filterNotNull() val foundRecords = gateRecordRepository.findByPrivateIdIn(notNullPrivateIds.toSet()) val foundRecordsByPrivateId = foundRecords.associateBy { it.privateId } val requestedNotFoundRecords = notNullPrivateIds.minus(foundRecordsByPrivateId.keys) - if(requestedNotFoundRecords.isNotEmpty()) + if (requestedNotFoundRecords.isNotEmpty()) throw BpdmRecordNotFoundException(requestedNotFoundRecords) - return privateIds.map { privateId -> + return privateIds.map { privateId -> val gateRecord = privateId?.let { foundRecordsByPrivateId[it] } ?: GateRecordDb(publicId = UUID.randomUUID(), privateId = UUID.randomUUID()) gateRecordRepository.save(gateRecord) } } } + +data class PaginationInfo( + val hasProcessedTasks: Boolean, + val hasNextPage: Boolean, + val processedTaskCount: Int +) { + fun countProcessedTasks(): Int = processedTaskCount +} diff --git a/bpdm-orchestrator/src/main/kotlin/org/eclipse/tractusx/bpdm/orchestrator/service/TimeoutProcessBatchService.kt b/bpdm-orchestrator/src/main/kotlin/org/eclipse/tractusx/bpdm/orchestrator/service/TimeoutProcessBatchService.kt new file mode 100644 index 000000000..490423a6f --- /dev/null +++ b/bpdm-orchestrator/src/main/kotlin/org/eclipse/tractusx/bpdm/orchestrator/service/TimeoutProcessBatchService.kt @@ -0,0 +1,61 @@ +/******************************************************************************* + * Copyright (c) 2021,2024 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + ******************************************************************************/ + +package org.eclipse.tractusx.bpdm.orchestrator.service + +import jakarta.persistence.EntityManager +import mu.KotlinLogging +import org.springframework.scheduling.annotation.Scheduled +import org.springframework.stereotype.Service + +@Service +class TimeoutProcessBatchService( + private val goldenRecordTaskService: GoldenRecordTaskService, + private val entityManager: EntityManager +) { + private val logger = KotlinLogging.logger { } + + @Scheduled(cron = "\${bpdm.task.timeoutCheckCron}") + fun processForTimeouts() { + try { + // Track the total number of processed tasks + var totalProcessedTasks = 0 + logger.debug { "Checking for timeouts" } + // Process pending timeouts + totalProcessedTasks += processTimeouts(goldenRecordTaskService::processPendingTimeouts) + // Process retention timeouts + totalProcessedTasks += processTimeouts(goldenRecordTaskService::processRetentionTimeouts) + logger.info { "Finished processing timeouts. Total processed tasks: $totalProcessedTasks" } + } catch (err: RuntimeException) { + logger.error(err) { "Error checking for timeouts" } + } + } + + + private fun processTimeouts(processFunction: (Int) -> PaginationInfo): Int { + val pageSize = 1000 // Adjust the page size based on memory constraints + var processedTasks = 0 + do { + val paginationInfo = processFunction(pageSize) + processedTasks += paginationInfo.countProcessedTasks() + entityManager.clear() // Clear the persistence context to free memory + } while (paginationInfo.hasNextPage) + return processedTasks + } +}