diff --git a/apps/server/src/main/kotlin/org/nxcloudce/server/persistence/gateway/RunRepositoryImpl.kt b/apps/server/src/main/kotlin/org/nxcloudce/server/persistence/gateway/RunRepositoryImpl.kt index cf02c912..b94d8270 100644 --- a/apps/server/src/main/kotlin/org/nxcloudce/server/persistence/gateway/RunRepositoryImpl.kt +++ b/apps/server/src/main/kotlin/org/nxcloudce/server/persistence/gateway/RunRepositoryImpl.kt @@ -1,7 +1,10 @@ package org.nxcloudce.server.persistence.gateway +import io.smallrye.mutiny.coroutines.asFlow import io.smallrye.mutiny.coroutines.awaitSuspending import jakarta.enterprise.context.ApplicationScoped +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.map import org.bson.types.ObjectId import org.nxcloudce.server.domain.run.gateway.RunRepository import org.nxcloudce.server.domain.run.model.Run @@ -25,10 +28,8 @@ class RunRepositoryImpl( return runPanacheRepository.persist(entity).awaitSuspending().run { entity.toDomain() } } - override suspend fun findAllByCreationDateOlderThan(date: LocalDateTime): Collection = - runPanacheRepository.findAllByEndTimeLowerThan(date).awaitSuspending().map { - it.toDomain() - } + override fun findAllByCreationDateOlderThan(date: LocalDateTime): Flow = + runPanacheRepository.findAllByEndTimeLowerThan(date).asFlow().map { it.toDomain() } override suspend fun delete(run: Run) = runPanacheRepository.deleteById(ObjectId(run.id.value)).awaitSuspending() } diff --git a/apps/server/src/test/kotlin/org/nxcloudce/server/persistence/gateway/RunRepositoryImplTest.kt b/apps/server/src/test/kotlin/org/nxcloudce/server/persistence/gateway/RunRepositoryImplTest.kt index 9c8ec76d..48cd840a 100644 --- a/apps/server/src/test/kotlin/org/nxcloudce/server/persistence/gateway/RunRepositoryImplTest.kt +++ b/apps/server/src/test/kotlin/org/nxcloudce/server/persistence/gateway/RunRepositoryImplTest.kt @@ -7,8 +7,10 @@ import io.mockk.every import io.mockk.verify import io.quarkiverse.test.junit.mockk.InjectMock import io.quarkus.test.junit.QuarkusTest +import io.smallrye.mutiny.Multi import io.smallrye.mutiny.Uni import jakarta.inject.Inject +import kotlinx.coroutines.flow.toList import kotlinx.coroutines.test.runTest import org.bson.types.ObjectId import org.junit.jupiter.api.Test @@ -120,10 +122,14 @@ class RunRepositoryImplTest { // Given val dummyRuns = listOf(buildRunEntity(), buildRunEntity()) val thresholdDate = LocalDateTime.now() - every { mockRunPanacheRepository.findAllByEndTimeLowerThan(thresholdDate) } returns Uni.createFrom().item(dummyRuns) + every { + mockRunPanacheRepository.findAllByEndTimeLowerThan( + thresholdDate, + ) + } returns Multi.createFrom().items(*dummyRuns.toTypedArray()) // When - val result = runRepository.findAllByCreationDateOlderThan(thresholdDate) + val result = runRepository.findAllByCreationDateOlderThan(thresholdDate).toList() // Then expect(result.size).toEqual(2) diff --git a/libs/server/domain/src/main/kotlin/org/nxcloudce/server/domain/run/gateway/RunRepository.kt b/libs/server/domain/src/main/kotlin/org/nxcloudce/server/domain/run/gateway/RunRepository.kt index 591f3ddc..1e3ee6ce 100644 --- a/libs/server/domain/src/main/kotlin/org/nxcloudce/server/domain/run/gateway/RunRepository.kt +++ b/libs/server/domain/src/main/kotlin/org/nxcloudce/server/domain/run/gateway/RunRepository.kt @@ -1,5 +1,6 @@ package org.nxcloudce.server.domain.run.gateway +import kotlinx.coroutines.flow.Flow import org.nxcloudce.server.domain.run.model.Run import org.nxcloudce.server.domain.run.model.RunStatus import org.nxcloudce.server.domain.run.usecase.EndRunRequest @@ -13,7 +14,7 @@ interface RunRepository { workspaceId: WorkspaceId, ): Run - suspend fun findAllByCreationDateOlderThan(date: LocalDateTime): Collection + fun findAllByCreationDateOlderThan(date: LocalDateTime): Flow suspend fun delete(run: Run): Boolean } diff --git a/libs/server/domain/src/main/kotlin/org/nxcloudce/server/domain/run/interactor/CleanupRunImpl.kt b/libs/server/domain/src/main/kotlin/org/nxcloudce/server/domain/run/interactor/CleanupRunImpl.kt index 9807377e..53748722 100644 --- a/libs/server/domain/src/main/kotlin/org/nxcloudce/server/domain/run/interactor/CleanupRunImpl.kt +++ b/libs/server/domain/src/main/kotlin/org/nxcloudce/server/domain/run/interactor/CleanupRunImpl.kt @@ -27,10 +27,10 @@ class CleanupRunImpl( request: CleanupRunRequest, presenter: (CleanupRunResponse) -> T, ): T { - val runs = runRepository.findAllByCreationDateOlderThan(request.creationDateThreshold) - logger.info("Found run(s) to delete: ${runs.size}") + var deletedRunCount = 0 - runs.forEach { run -> + runRepository.findAllByCreationDateOlderThan(request.creationDateThreshold).collect { run -> + logger.info("Removing run ${run.id}") val tasks = taskRepository.findAllByRunId(run.id) val artifacts = artifactRepository.findByHash( @@ -50,14 +50,15 @@ class CleanupRunImpl( } } .awaitAll() - logger.info("Artifacts and their files have deleted") + logger.info("Artifacts and their files have been deleted") coroutineScope { launch { taskRepository.deleteAllByRunId(run.id) } launch { runRepository.delete(run) } } logger.info("Tasks and run have been deleted") + deletedRunCount++ } - return presenter(CleanupRunResponse(deletedCount = runs.size)) + return presenter(CleanupRunResponse(deletedCount = deletedRunCount)) } } diff --git a/libs/server/domain/src/test/kotlin/org/nxcloudce/server/domain/run/interactor/CleanupRunImplTest.kt b/libs/server/domain/src/test/kotlin/org/nxcloudce/server/domain/run/interactor/CleanupRunImplTest.kt index 07ebc5c5..2a3a8641 100644 --- a/libs/server/domain/src/test/kotlin/org/nxcloudce/server/domain/run/interactor/CleanupRunImplTest.kt +++ b/libs/server/domain/src/test/kotlin/org/nxcloudce/server/domain/run/interactor/CleanupRunImplTest.kt @@ -3,6 +3,7 @@ package org.nxcloudce.server.domain.run.interactor import ch.tutteli.atrium.api.fluent.en_GB.toEqual import ch.tutteli.atrium.api.verbs.expect import io.mockk.* +import kotlinx.coroutines.flow.flowOf import kotlinx.coroutines.test.runTest import org.junit.jupiter.api.Test import org.nxcloudce.server.domain.run.gateway.ArtifactRepository @@ -35,7 +36,7 @@ class CleanupRunImplTest { buildArtifact(dummyTasks[1].hash, dummyTasks[1].workspaceId), ) - coEvery { mockRunRepository.findAllByCreationDateOlderThan(thresholdDate) } returns dummyRuns + coEvery { mockRunRepository.findAllByCreationDateOlderThan(thresholdDate) } returns flowOf(*dummyRuns.toTypedArray()) coEvery { mockTaskRepository.findAllByRunId(any()) } returns dummyTasks coEvery { mockArtifactRepository.findByHash(any(), dummyRuns[0].workspaceId) } returns dummyArtifacts diff --git a/libs/server/persistence/src/main/kotlin/org/nxcloudce/server/persistence/repository/RunPanacheRepository.kt b/libs/server/persistence/src/main/kotlin/org/nxcloudce/server/persistence/repository/RunPanacheRepository.kt index b82da88f..0850c545 100644 --- a/libs/server/persistence/src/main/kotlin/org/nxcloudce/server/persistence/repository/RunPanacheRepository.kt +++ b/libs/server/persistence/src/main/kotlin/org/nxcloudce/server/persistence/repository/RunPanacheRepository.kt @@ -1,14 +1,14 @@ package org.nxcloudce.server.persistence.repository import io.quarkus.mongodb.panache.kotlin.reactive.ReactivePanacheMongoRepository -import io.smallrye.mutiny.Uni +import io.smallrye.mutiny.Multi import jakarta.enterprise.context.ApplicationScoped import org.nxcloudce.server.persistence.entity.RunEntity import java.time.LocalDateTime @ApplicationScoped class RunPanacheRepository : ReactivePanacheMongoRepository { - fun findAllByEndTimeLowerThan(creationDate: LocalDateTime): Uni> { - return find("${RunEntity::endTime.name} < ?1", creationDate).list() + fun findAllByEndTimeLowerThan(creationDate: LocalDateTime): Multi { + return find("${RunEntity::endTime.name} < ?1", creationDate).stream() } } diff --git a/libs/server/persistence/src/test/kotlin/org/nxcloudce/server/persistence/repository/RunPanacheRepositoryTest.kt b/libs/server/persistence/src/test/kotlin/org/nxcloudce/server/persistence/repository/RunPanacheRepositoryTest.kt index f440e7f5..bd502042 100644 --- a/libs/server/persistence/src/test/kotlin/org/nxcloudce/server/persistence/repository/RunPanacheRepositoryTest.kt +++ b/libs/server/persistence/src/test/kotlin/org/nxcloudce/server/persistence/repository/RunPanacheRepositoryTest.kt @@ -3,8 +3,10 @@ package org.nxcloudce.server.persistence.repository import ch.tutteli.atrium.api.fluent.en_GB.toEqual import ch.tutteli.atrium.api.verbs.expect import io.quarkus.test.junit.QuarkusTest +import io.smallrye.mutiny.coroutines.asFlow import io.smallrye.mutiny.coroutines.awaitSuspending import jakarta.inject.Inject +import kotlinx.coroutines.flow.toList import kotlinx.coroutines.runBlocking import kotlinx.coroutines.test.runTest import org.bson.types.ObjectId @@ -43,7 +45,7 @@ class RunPanacheRepositoryTest { runPanacheRepository.persist(listOf(buildRunEntity())).awaitSuspending() runPanacheRepository.persist(listOf(buildRunEntity(olderDate))).awaitSuspending() - val result = runPanacheRepository.findAllByEndTimeLowerThan(LocalDateTime.now().minusDays(1)).awaitSuspending() + val result = runPanacheRepository.findAllByEndTimeLowerThan(LocalDateTime.now().minusDays(1)).asFlow().toList() val totalCount = runPanacheRepository.count().awaitSuspending() expect(result.size).toEqual(1)