diff --git a/apps/server/src/main/kotlin/org/nxcloudce/server/persistence/gateway/ArtifactRepositoryImpl.kt b/apps/server/src/main/kotlin/org/nxcloudce/server/persistence/gateway/ArtifactRepositoryImpl.kt index b203e373..3d7b1c25 100644 --- a/apps/server/src/main/kotlin/org/nxcloudce/server/persistence/gateway/ArtifactRepositoryImpl.kt +++ b/apps/server/src/main/kotlin/org/nxcloudce/server/persistence/gateway/ArtifactRepositoryImpl.kt @@ -60,8 +60,8 @@ class ArtifactRepositoryImpl( return entities.map { it.toDomain() } } - override suspend fun delete(artifact: Artifact.Exist): Boolean = - artifactPanacheRepository.deleteByArtifactId(artifact.id.value).awaitSuspending().let { + override suspend fun delete(artifactId: ArtifactId): Boolean = + artifactPanacheRepository.deleteByArtifactId(artifactId.value).awaitSuspending().let { when (it) { 0L -> false 1L -> true diff --git a/apps/server/src/main/kotlin/org/nxcloudce/server/persistence/gateway/RunRepositoryImpl.kt b/apps/server/src/main/kotlin/org/nxcloudce/server/persistence/gateway/RunRepositoryImpl.kt index cf02c912..b94d8270 100644 --- a/apps/server/src/main/kotlin/org/nxcloudce/server/persistence/gateway/RunRepositoryImpl.kt +++ b/apps/server/src/main/kotlin/org/nxcloudce/server/persistence/gateway/RunRepositoryImpl.kt @@ -1,7 +1,10 @@ package org.nxcloudce.server.persistence.gateway +import io.smallrye.mutiny.coroutines.asFlow import io.smallrye.mutiny.coroutines.awaitSuspending import jakarta.enterprise.context.ApplicationScoped +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.map import org.bson.types.ObjectId import org.nxcloudce.server.domain.run.gateway.RunRepository import org.nxcloudce.server.domain.run.model.Run @@ -25,10 +28,8 @@ class RunRepositoryImpl( return runPanacheRepository.persist(entity).awaitSuspending().run { entity.toDomain() } } - override suspend fun findAllByCreationDateOlderThan(date: LocalDateTime): Collection = - runPanacheRepository.findAllByEndTimeLowerThan(date).awaitSuspending().map { - it.toDomain() - } + override fun findAllByCreationDateOlderThan(date: LocalDateTime): Flow = + runPanacheRepository.findAllByEndTimeLowerThan(date).asFlow().map { it.toDomain() } override suspend fun delete(run: Run) = runPanacheRepository.deleteById(ObjectId(run.id.value)).awaitSuspending() } diff --git a/apps/server/src/main/kotlin/org/nxcloudce/server/persistence/gateway/TaskRepositoryImpl.kt b/apps/server/src/main/kotlin/org/nxcloudce/server/persistence/gateway/TaskRepositoryImpl.kt index 02d4061a..1759fdb7 100644 --- a/apps/server/src/main/kotlin/org/nxcloudce/server/persistence/gateway/TaskRepositoryImpl.kt +++ b/apps/server/src/main/kotlin/org/nxcloudce/server/persistence/gateway/TaskRepositoryImpl.kt @@ -1,7 +1,10 @@ package org.nxcloudce.server.persistence.gateway +import io.smallrye.mutiny.coroutines.asFlow import io.smallrye.mutiny.coroutines.awaitSuspending import jakarta.enterprise.context.ApplicationScoped +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.map import org.bson.types.ObjectId import org.nxcloudce.server.domain.run.gateway.TaskRepository import org.nxcloudce.server.domain.run.model.RunId @@ -24,8 +27,8 @@ class TaskRepositoryImpl(private val taskPanacheRepository: TaskPanacheRepositor return entities.map { it.toDomain() } } - override suspend fun findAllByRunId(runId: RunId): Collection = - taskPanacheRepository.findAllByRunId(ObjectId(runId.value)).awaitSuspending().map { + override fun findAllByRunId(runId: RunId): Flow = + taskPanacheRepository.findAllByRunId(ObjectId(runId.value)).asFlow().map { it.toDomain() } diff --git a/apps/server/src/test/kotlin/org/nxcloudce/server/persistence/gateway/ArtifactRepositoryImplTest.kt b/apps/server/src/test/kotlin/org/nxcloudce/server/persistence/gateway/ArtifactRepositoryImplTest.kt index 68f722c4..4d411917 100644 --- a/apps/server/src/test/kotlin/org/nxcloudce/server/persistence/gateway/ArtifactRepositoryImplTest.kt +++ b/apps/server/src/test/kotlin/org/nxcloudce/server/persistence/gateway/ArtifactRepositoryImplTest.kt @@ -142,8 +142,8 @@ class ArtifactRepositoryImplTest { } returns Uni.createFrom().item(0) // When - val validResult = artifactRepository.delete(validArtifact) - val invalidResult = artifactRepository.delete(invalidArtifact) + val validResult = artifactRepository.delete(validArtifact.id) + val invalidResult = artifactRepository.delete(invalidArtifact.id) // Then expect(validResult).toEqual(true) diff --git a/apps/server/src/test/kotlin/org/nxcloudce/server/persistence/gateway/RunRepositoryImplTest.kt b/apps/server/src/test/kotlin/org/nxcloudce/server/persistence/gateway/RunRepositoryImplTest.kt index 9c8ec76d..48cd840a 100644 --- a/apps/server/src/test/kotlin/org/nxcloudce/server/persistence/gateway/RunRepositoryImplTest.kt +++ b/apps/server/src/test/kotlin/org/nxcloudce/server/persistence/gateway/RunRepositoryImplTest.kt @@ -7,8 +7,10 @@ import io.mockk.every import io.mockk.verify import io.quarkiverse.test.junit.mockk.InjectMock import io.quarkus.test.junit.QuarkusTest +import io.smallrye.mutiny.Multi import io.smallrye.mutiny.Uni import jakarta.inject.Inject +import kotlinx.coroutines.flow.toList import kotlinx.coroutines.test.runTest import org.bson.types.ObjectId import org.junit.jupiter.api.Test @@ -120,10 +122,14 @@ class RunRepositoryImplTest { // Given val dummyRuns = listOf(buildRunEntity(), buildRunEntity()) val thresholdDate = LocalDateTime.now() - every { mockRunPanacheRepository.findAllByEndTimeLowerThan(thresholdDate) } returns Uni.createFrom().item(dummyRuns) + every { + mockRunPanacheRepository.findAllByEndTimeLowerThan( + thresholdDate, + ) + } returns Multi.createFrom().items(*dummyRuns.toTypedArray()) // When - val result = runRepository.findAllByCreationDateOlderThan(thresholdDate) + val result = runRepository.findAllByCreationDateOlderThan(thresholdDate).toList() // Then expect(result.size).toEqual(2) diff --git a/apps/server/src/test/kotlin/org/nxcloudce/server/persistence/gateway/TaskRepositoryImplTest.kt b/apps/server/src/test/kotlin/org/nxcloudce/server/persistence/gateway/TaskRepositoryImplTest.kt index b53e13f6..acf73c41 100644 --- a/apps/server/src/test/kotlin/org/nxcloudce/server/persistence/gateway/TaskRepositoryImplTest.kt +++ b/apps/server/src/test/kotlin/org/nxcloudce/server/persistence/gateway/TaskRepositoryImplTest.kt @@ -5,8 +5,10 @@ import ch.tutteli.atrium.api.verbs.expect import io.mockk.every import io.quarkiverse.test.junit.mockk.InjectMock import io.quarkus.test.junit.QuarkusTest +import io.smallrye.mutiny.Multi import io.smallrye.mutiny.Uni import jakarta.inject.Inject +import kotlinx.coroutines.flow.toList import kotlinx.coroutines.test.runTest import org.bson.types.ObjectId import org.junit.jupiter.api.Test @@ -103,10 +105,10 @@ class TaskRepositoryImplTest { // Given val dummyRunId = ObjectId() val dummyTaskEntities = listOf(buildTaskEntity(dummyRunId)) - every { mockTaskPanacheRepository.findAllByRunId(dummyRunId) } returns Uni.createFrom().item(dummyTaskEntities) + every { mockTaskPanacheRepository.findAllByRunId(dummyRunId) } returns Multi.createFrom().items(*dummyTaskEntities.toTypedArray()) // When - val result = taskRepository.findAllByRunId(RunId(dummyRunId.toString())) + val result = taskRepository.findAllByRunId(RunId(dummyRunId.toString())).toList() // Then expect(result.size).toEqual(1) diff --git a/libs/server/domain/src/main/kotlin/org/nxcloudce/server/domain/run/gateway/ArtifactRepository.kt b/libs/server/domain/src/main/kotlin/org/nxcloudce/server/domain/run/gateway/ArtifactRepository.kt index 604fe3c8..f38e72fe 100644 --- a/libs/server/domain/src/main/kotlin/org/nxcloudce/server/domain/run/gateway/ArtifactRepository.kt +++ b/libs/server/domain/src/main/kotlin/org/nxcloudce/server/domain/run/gateway/ArtifactRepository.kt @@ -21,5 +21,5 @@ interface ArtifactRepository { workspaceId: WorkspaceId, ): Collection - suspend fun delete(artifact: Artifact.Exist): Boolean + suspend fun delete(artifactId: ArtifactId): Boolean } diff --git a/libs/server/domain/src/main/kotlin/org/nxcloudce/server/domain/run/gateway/RunRepository.kt b/libs/server/domain/src/main/kotlin/org/nxcloudce/server/domain/run/gateway/RunRepository.kt index 591f3ddc..1e3ee6ce 100644 --- a/libs/server/domain/src/main/kotlin/org/nxcloudce/server/domain/run/gateway/RunRepository.kt +++ b/libs/server/domain/src/main/kotlin/org/nxcloudce/server/domain/run/gateway/RunRepository.kt @@ -1,5 +1,6 @@ package org.nxcloudce.server.domain.run.gateway +import kotlinx.coroutines.flow.Flow import org.nxcloudce.server.domain.run.model.Run import org.nxcloudce.server.domain.run.model.RunStatus import org.nxcloudce.server.domain.run.usecase.EndRunRequest @@ -13,7 +14,7 @@ interface RunRepository { workspaceId: WorkspaceId, ): Run - suspend fun findAllByCreationDateOlderThan(date: LocalDateTime): Collection + fun findAllByCreationDateOlderThan(date: LocalDateTime): Flow suspend fun delete(run: Run): Boolean } diff --git a/libs/server/domain/src/main/kotlin/org/nxcloudce/server/domain/run/gateway/TaskRepository.kt b/libs/server/domain/src/main/kotlin/org/nxcloudce/server/domain/run/gateway/TaskRepository.kt index e14a8ea4..a077df63 100644 --- a/libs/server/domain/src/main/kotlin/org/nxcloudce/server/domain/run/gateway/TaskRepository.kt +++ b/libs/server/domain/src/main/kotlin/org/nxcloudce/server/domain/run/gateway/TaskRepository.kt @@ -1,5 +1,6 @@ package org.nxcloudce.server.domain.run.gateway +import kotlinx.coroutines.flow.Flow import org.nxcloudce.server.domain.run.model.RunId import org.nxcloudce.server.domain.run.model.Task import org.nxcloudce.server.domain.run.usecase.EndRunRequest @@ -12,7 +13,7 @@ interface TaskRepository { workspaceId: WorkspaceId, ): Collection - suspend fun findAllByRunId(runId: RunId): Collection + fun findAllByRunId(runId: RunId): Flow suspend fun deleteAllByRunId(runId: RunId): Long } diff --git a/libs/server/domain/src/main/kotlin/org/nxcloudce/server/domain/run/interactor/CleanupRunImpl.kt b/libs/server/domain/src/main/kotlin/org/nxcloudce/server/domain/run/interactor/CleanupRunImpl.kt index 9807377e..59729728 100644 --- a/libs/server/domain/src/main/kotlin/org/nxcloudce/server/domain/run/interactor/CleanupRunImpl.kt +++ b/libs/server/domain/src/main/kotlin/org/nxcloudce/server/domain/run/interactor/CleanupRunImpl.kt @@ -1,7 +1,5 @@ package org.nxcloudce.server.domain.run.interactor -import kotlinx.coroutines.async -import kotlinx.coroutines.awaitAll import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.launch import org.jboss.logging.Logger @@ -27,37 +25,31 @@ class CleanupRunImpl( request: CleanupRunRequest, presenter: (CleanupRunResponse) -> T, ): T { - val runs = runRepository.findAllByCreationDateOlderThan(request.creationDateThreshold) - logger.info("Found run(s) to delete: ${runs.size}") - - runs.forEach { run -> - val tasks = taskRepository.findAllByRunId(run.id) - val artifacts = - artifactRepository.findByHash( - tasks.filter { it.artifactId != null }.map { it.hash }, - run.workspaceId, - ) - logger.info("Run contains ${tasks.size} task(s) and ${artifacts.size} artifact(s)") + var deletedRunCount = 0 - coroutineScope { - artifacts.map { artifact -> - async { - Pair( - storageService.deleteArtifact(artifact.id, run.workspaceId), - artifactRepository.delete(artifact), - ) + runRepository.findAllByCreationDateOlderThan(request.creationDateThreshold).collect { run -> + logger.info("Removing run ${run.id}") + + taskRepository.findAllByRunId(run.id).collect { task -> + task.artifactId?.let { artifactId -> + coroutineScope { + launch { storageService.deleteArtifact(artifactId, run.workspaceId) } + launch { artifactRepository.delete(artifactId) } } } } - .awaitAll() - logger.info("Artifacts and their files have deleted") + + logger.info("Artifacts and their files have been deleted") + coroutineScope { launch { taskRepository.deleteAllByRunId(run.id) } launch { runRepository.delete(run) } } + logger.info("Tasks and run have been deleted") + deletedRunCount++ } - return presenter(CleanupRunResponse(deletedCount = runs.size)) + return presenter(CleanupRunResponse(deletedCount = deletedRunCount)) } } diff --git a/libs/server/domain/src/test/kotlin/org/nxcloudce/server/domain/run/interactor/CleanupRunImplTest.kt b/libs/server/domain/src/test/kotlin/org/nxcloudce/server/domain/run/interactor/CleanupRunImplTest.kt index 07ebc5c5..8a018dbc 100644 --- a/libs/server/domain/src/test/kotlin/org/nxcloudce/server/domain/run/interactor/CleanupRunImplTest.kt +++ b/libs/server/domain/src/test/kotlin/org/nxcloudce/server/domain/run/interactor/CleanupRunImplTest.kt @@ -3,6 +3,7 @@ package org.nxcloudce.server.domain.run.interactor import ch.tutteli.atrium.api.fluent.en_GB.toEqual import ch.tutteli.atrium.api.verbs.expect import io.mockk.* +import kotlinx.coroutines.flow.flowOf import kotlinx.coroutines.test.runTest import org.junit.jupiter.api.Test import org.nxcloudce.server.domain.run.gateway.ArtifactRepository @@ -31,22 +32,16 @@ class CleanupRunImplTest { val dummyTasks = listOf(buildTask(dummyRuns[0].id, dummyRuns[0].workspaceId), buildTask(dummyRuns[0].id, dummyRuns[0].workspaceId)) val dummyArtifacts = listOf( - buildArtifact(dummyTasks[0].hash, dummyTasks[0].workspaceId), - buildArtifact(dummyTasks[1].hash, dummyTasks[1].workspaceId), + buildArtifact(dummyTasks[0].hash, dummyTasks[0].workspaceId, dummyTasks[0].artifactId!!), + buildArtifact(dummyTasks[1].hash, dummyTasks[1].workspaceId, dummyTasks[1].artifactId!!), ) - coEvery { mockRunRepository.findAllByCreationDateOlderThan(thresholdDate) } returns dummyRuns - coEvery { mockTaskRepository.findAllByRunId(any()) } returns dummyTasks + coEvery { mockRunRepository.findAllByCreationDateOlderThan(thresholdDate) } returns flowOf(*dummyRuns.toTypedArray()) + coEvery { mockTaskRepository.findAllByRunId(any()) } returns flowOf(*dummyTasks.toTypedArray()) coEvery { mockArtifactRepository.findByHash(any(), dummyRuns[0].workspaceId) } returns dummyArtifacts coEvery { mockStorageService.deleteArtifact(any(), dummyRuns[0].workspaceId) } just runs - coEvery { - mockArtifactRepository.delete( - match { - dummyArtifacts.contains(it) - }, - ) - } returns true + coEvery { mockArtifactRepository.delete(any()) } returns true coEvery { mockTaskRepository.deleteAllByRunId(any()) } returns 2L coEvery { mockRunRepository.delete(match { dummyRuns.contains(it) }) } returns true @@ -59,7 +54,7 @@ class CleanupRunImplTest { coVerify(exactly = 1) { mockStorageService.deleteArtifact(dummyArtifacts[0].id, dummyRuns[0].workspaceId) } coVerify(exactly = 1) { mockStorageService.deleteArtifact(dummyArtifacts[1].id, dummyRuns[0].workspaceId) } - coVerify(exactly = 2) { mockArtifactRepository.delete(match { dummyArtifacts.contains(it) }) } + coVerify(exactly = 2) { mockArtifactRepository.delete(any()) } coVerify(exactly = 1) { mockTaskRepository.deleteAllByRunId(dummyRuns[0].id) } coVerify(exactly = 1) { mockRunRepository.delete(dummyRuns[0]) } } @@ -108,9 +103,10 @@ class CleanupRunImplTest { private fun buildArtifact( hash: Hash, workspaceId: WorkspaceId, + id: ArtifactId = ArtifactId(), ): Artifact.Exist = Artifact.Exist( - id = ArtifactId(), + id = id, hash = hash, workspaceId = workspaceId, put = "put-url", diff --git a/libs/server/persistence/src/main/kotlin/org/nxcloudce/server/persistence/repository/RunPanacheRepository.kt b/libs/server/persistence/src/main/kotlin/org/nxcloudce/server/persistence/repository/RunPanacheRepository.kt index b82da88f..0850c545 100644 --- a/libs/server/persistence/src/main/kotlin/org/nxcloudce/server/persistence/repository/RunPanacheRepository.kt +++ b/libs/server/persistence/src/main/kotlin/org/nxcloudce/server/persistence/repository/RunPanacheRepository.kt @@ -1,14 +1,14 @@ package org.nxcloudce.server.persistence.repository import io.quarkus.mongodb.panache.kotlin.reactive.ReactivePanacheMongoRepository -import io.smallrye.mutiny.Uni +import io.smallrye.mutiny.Multi import jakarta.enterprise.context.ApplicationScoped import org.nxcloudce.server.persistence.entity.RunEntity import java.time.LocalDateTime @ApplicationScoped class RunPanacheRepository : ReactivePanacheMongoRepository { - fun findAllByEndTimeLowerThan(creationDate: LocalDateTime): Uni> { - return find("${RunEntity::endTime.name} < ?1", creationDate).list() + fun findAllByEndTimeLowerThan(creationDate: LocalDateTime): Multi { + return find("${RunEntity::endTime.name} < ?1", creationDate).stream() } } diff --git a/libs/server/persistence/src/main/kotlin/org/nxcloudce/server/persistence/repository/TaskPanacheRepository.kt b/libs/server/persistence/src/main/kotlin/org/nxcloudce/server/persistence/repository/TaskPanacheRepository.kt index bacb2063..df62a2d5 100644 --- a/libs/server/persistence/src/main/kotlin/org/nxcloudce/server/persistence/repository/TaskPanacheRepository.kt +++ b/libs/server/persistence/src/main/kotlin/org/nxcloudce/server/persistence/repository/TaskPanacheRepository.kt @@ -1,6 +1,7 @@ package org.nxcloudce.server.persistence.repository import io.quarkus.mongodb.panache.kotlin.reactive.ReactivePanacheMongoRepository +import io.smallrye.mutiny.Multi import io.smallrye.mutiny.Uni import jakarta.enterprise.context.ApplicationScoped import org.bson.types.ObjectId @@ -8,7 +9,7 @@ import org.nxcloudce.server.persistence.entity.TaskEntity @ApplicationScoped class TaskPanacheRepository : ReactivePanacheMongoRepository { - fun findAllByRunId(runId: ObjectId): Uni> = find(TaskEntity::runId.name, runId).list() + fun findAllByRunId(runId: ObjectId): Multi = find(TaskEntity::runId.name, runId).stream() fun deleteAllByRunId(runId: ObjectId): Uni = delete(TaskEntity::runId.name, runId) } diff --git a/libs/server/persistence/src/test/kotlin/org/nxcloudce/server/persistence/repository/RunPanacheRepositoryTest.kt b/libs/server/persistence/src/test/kotlin/org/nxcloudce/server/persistence/repository/RunPanacheRepositoryTest.kt index f440e7f5..bd502042 100644 --- a/libs/server/persistence/src/test/kotlin/org/nxcloudce/server/persistence/repository/RunPanacheRepositoryTest.kt +++ b/libs/server/persistence/src/test/kotlin/org/nxcloudce/server/persistence/repository/RunPanacheRepositoryTest.kt @@ -3,8 +3,10 @@ package org.nxcloudce.server.persistence.repository import ch.tutteli.atrium.api.fluent.en_GB.toEqual import ch.tutteli.atrium.api.verbs.expect import io.quarkus.test.junit.QuarkusTest +import io.smallrye.mutiny.coroutines.asFlow import io.smallrye.mutiny.coroutines.awaitSuspending import jakarta.inject.Inject +import kotlinx.coroutines.flow.toList import kotlinx.coroutines.runBlocking import kotlinx.coroutines.test.runTest import org.bson.types.ObjectId @@ -43,7 +45,7 @@ class RunPanacheRepositoryTest { runPanacheRepository.persist(listOf(buildRunEntity())).awaitSuspending() runPanacheRepository.persist(listOf(buildRunEntity(olderDate))).awaitSuspending() - val result = runPanacheRepository.findAllByEndTimeLowerThan(LocalDateTime.now().minusDays(1)).awaitSuspending() + val result = runPanacheRepository.findAllByEndTimeLowerThan(LocalDateTime.now().minusDays(1)).asFlow().toList() val totalCount = runPanacheRepository.count().awaitSuspending() expect(result.size).toEqual(1) diff --git a/libs/server/persistence/src/test/kotlin/org/nxcloudce/server/persistence/repository/TaskPanacheRepositoryTest.kt b/libs/server/persistence/src/test/kotlin/org/nxcloudce/server/persistence/repository/TaskPanacheRepositoryTest.kt index be495b88..c8fb149e 100644 --- a/libs/server/persistence/src/test/kotlin/org/nxcloudce/server/persistence/repository/TaskPanacheRepositoryTest.kt +++ b/libs/server/persistence/src/test/kotlin/org/nxcloudce/server/persistence/repository/TaskPanacheRepositoryTest.kt @@ -4,8 +4,10 @@ import ch.tutteli.atrium.api.fluent.en_GB.toContainExactly import ch.tutteli.atrium.api.fluent.en_GB.toEqual import ch.tutteli.atrium.api.verbs.expect import io.quarkus.test.junit.QuarkusTest +import io.smallrye.mutiny.coroutines.asFlow import io.smallrye.mutiny.coroutines.awaitSuspending import jakarta.inject.Inject +import kotlinx.coroutines.flow.toList import kotlinx.coroutines.runBlocking import kotlinx.coroutines.test.runTest import org.bson.types.ObjectId @@ -43,7 +45,7 @@ class TaskPanacheRepositoryTest { val runId = ObjectId() taskPanacheRepository.persist(listOf(buildTaskEntity(runId), buildTaskEntity(runId), buildTaskEntity())).awaitSuspending() - val result = taskPanacheRepository.findAllByRunId(runId).awaitSuspending() + val result = taskPanacheRepository.findAllByRunId(runId).asFlow().toList() val totalCount = taskPanacheRepository.count().awaitSuspending() expect(totalCount).toEqual(3)