Skip to content

Commit

Permalink
Merge pull request #230 from clementguillot/229-cleanup-command-is-fe…
Browse files Browse the repository at this point in the history
…tching-excessive-data-per-query

fix(server): fetch all runs and tasks to delete as streams/flows instead of bulk collections
  • Loading branch information
clementguillot authored Oct 22, 2024
2 parents 7750e9e + 59b13ab commit a1d341c
Show file tree
Hide file tree
Showing 15 changed files with 66 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ class ArtifactRepositoryImpl(
return entities.map { it.toDomain() }
}

override suspend fun delete(artifact: Artifact.Exist): Boolean =
artifactPanacheRepository.deleteByArtifactId(artifact.id.value).awaitSuspending().let {
override suspend fun delete(artifactId: ArtifactId): Boolean =
artifactPanacheRepository.deleteByArtifactId(artifactId.value).awaitSuspending().let {
when (it) {
0L -> false
1L -> true
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package org.nxcloudce.server.persistence.gateway

import io.smallrye.mutiny.coroutines.asFlow
import io.smallrye.mutiny.coroutines.awaitSuspending
import jakarta.enterprise.context.ApplicationScoped
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.map
import org.bson.types.ObjectId
import org.nxcloudce.server.domain.run.gateway.RunRepository
import org.nxcloudce.server.domain.run.model.Run
Expand All @@ -25,10 +28,8 @@ class RunRepositoryImpl(
return runPanacheRepository.persist(entity).awaitSuspending().run { entity.toDomain() }
}

override suspend fun findAllByCreationDateOlderThan(date: LocalDateTime): Collection<Run> =
runPanacheRepository.findAllByEndTimeLowerThan(date).awaitSuspending().map {
it.toDomain()
}
override fun findAllByCreationDateOlderThan(date: LocalDateTime): Flow<Run> =
runPanacheRepository.findAllByEndTimeLowerThan(date).asFlow().map { it.toDomain() }

override suspend fun delete(run: Run) = runPanacheRepository.deleteById(ObjectId(run.id.value)).awaitSuspending()
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package org.nxcloudce.server.persistence.gateway

import io.smallrye.mutiny.coroutines.asFlow
import io.smallrye.mutiny.coroutines.awaitSuspending
import jakarta.enterprise.context.ApplicationScoped
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.map
import org.bson.types.ObjectId
import org.nxcloudce.server.domain.run.gateway.TaskRepository
import org.nxcloudce.server.domain.run.model.RunId
Expand All @@ -24,8 +27,8 @@ class TaskRepositoryImpl(private val taskPanacheRepository: TaskPanacheRepositor
return entities.map { it.toDomain() }
}

override suspend fun findAllByRunId(runId: RunId): Collection<Task> =
taskPanacheRepository.findAllByRunId(ObjectId(runId.value)).awaitSuspending().map {
override fun findAllByRunId(runId: RunId): Flow<Task> =
taskPanacheRepository.findAllByRunId(ObjectId(runId.value)).asFlow().map {
it.toDomain()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,8 @@ class ArtifactRepositoryImplTest {
} returns Uni.createFrom().item(0)

// When
val validResult = artifactRepository.delete(validArtifact)
val invalidResult = artifactRepository.delete(invalidArtifact)
val validResult = artifactRepository.delete(validArtifact.id)
val invalidResult = artifactRepository.delete(invalidArtifact.id)

// Then
expect(validResult).toEqual(true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ import io.mockk.every
import io.mockk.verify
import io.quarkiverse.test.junit.mockk.InjectMock
import io.quarkus.test.junit.QuarkusTest
import io.smallrye.mutiny.Multi
import io.smallrye.mutiny.Uni
import jakarta.inject.Inject
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.test.runTest
import org.bson.types.ObjectId
import org.junit.jupiter.api.Test
Expand Down Expand Up @@ -120,10 +122,14 @@ class RunRepositoryImplTest {
// Given
val dummyRuns = listOf(buildRunEntity(), buildRunEntity())
val thresholdDate = LocalDateTime.now()
every { mockRunPanacheRepository.findAllByEndTimeLowerThan(thresholdDate) } returns Uni.createFrom().item(dummyRuns)
every {
mockRunPanacheRepository.findAllByEndTimeLowerThan(
thresholdDate,
)
} returns Multi.createFrom().items(*dummyRuns.toTypedArray())

// When
val result = runRepository.findAllByCreationDateOlderThan(thresholdDate)
val result = runRepository.findAllByCreationDateOlderThan(thresholdDate).toList()

// Then
expect(result.size).toEqual(2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ import ch.tutteli.atrium.api.verbs.expect
import io.mockk.every
import io.quarkiverse.test.junit.mockk.InjectMock
import io.quarkus.test.junit.QuarkusTest
import io.smallrye.mutiny.Multi
import io.smallrye.mutiny.Uni
import jakarta.inject.Inject
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.test.runTest
import org.bson.types.ObjectId
import org.junit.jupiter.api.Test
Expand Down Expand Up @@ -103,10 +105,10 @@ class TaskRepositoryImplTest {
// Given
val dummyRunId = ObjectId()
val dummyTaskEntities = listOf(buildTaskEntity(dummyRunId))
every { mockTaskPanacheRepository.findAllByRunId(dummyRunId) } returns Uni.createFrom().item(dummyTaskEntities)
every { mockTaskPanacheRepository.findAllByRunId(dummyRunId) } returns Multi.createFrom().items(*dummyTaskEntities.toTypedArray())

// When
val result = taskRepository.findAllByRunId(RunId(dummyRunId.toString()))
val result = taskRepository.findAllByRunId(RunId(dummyRunId.toString())).toList()

// Then
expect(result.size).toEqual(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,5 @@ interface ArtifactRepository {
workspaceId: WorkspaceId,
): Collection<Artifact.Exist>

suspend fun delete(artifact: Artifact.Exist): Boolean
suspend fun delete(artifactId: ArtifactId): Boolean
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.nxcloudce.server.domain.run.gateway

import kotlinx.coroutines.flow.Flow
import org.nxcloudce.server.domain.run.model.Run
import org.nxcloudce.server.domain.run.model.RunStatus
import org.nxcloudce.server.domain.run.usecase.EndRunRequest
Expand All @@ -13,7 +14,7 @@ interface RunRepository {
workspaceId: WorkspaceId,
): Run

suspend fun findAllByCreationDateOlderThan(date: LocalDateTime): Collection<Run>
fun findAllByCreationDateOlderThan(date: LocalDateTime): Flow<Run>

suspend fun delete(run: Run): Boolean
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.nxcloudce.server.domain.run.gateway

import kotlinx.coroutines.flow.Flow
import org.nxcloudce.server.domain.run.model.RunId
import org.nxcloudce.server.domain.run.model.Task
import org.nxcloudce.server.domain.run.usecase.EndRunRequest
Expand All @@ -12,7 +13,7 @@ interface TaskRepository {
workspaceId: WorkspaceId,
): Collection<Task>

suspend fun findAllByRunId(runId: RunId): Collection<Task>
fun findAllByRunId(runId: RunId): Flow<Task>

suspend fun deleteAllByRunId(runId: RunId): Long
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package org.nxcloudce.server.domain.run.interactor

import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.launch
import org.jboss.logging.Logger
Expand All @@ -27,37 +25,31 @@ class CleanupRunImpl(
request: CleanupRunRequest,
presenter: (CleanupRunResponse) -> T,
): T {
val runs = runRepository.findAllByCreationDateOlderThan(request.creationDateThreshold)
logger.info("Found run(s) to delete: ${runs.size}")

runs.forEach { run ->
val tasks = taskRepository.findAllByRunId(run.id)
val artifacts =
artifactRepository.findByHash(
tasks.filter { it.artifactId != null }.map { it.hash },
run.workspaceId,
)
logger.info("Run contains ${tasks.size} task(s) and ${artifacts.size} artifact(s)")
var deletedRunCount = 0

coroutineScope {
artifacts.map { artifact ->
async {
Pair(
storageService.deleteArtifact(artifact.id, run.workspaceId),
artifactRepository.delete(artifact),
)
runRepository.findAllByCreationDateOlderThan(request.creationDateThreshold).collect { run ->
logger.info("Removing run ${run.id}")

taskRepository.findAllByRunId(run.id).collect { task ->
task.artifactId?.let { artifactId ->
coroutineScope {
launch { storageService.deleteArtifact(artifactId, run.workspaceId) }
launch { artifactRepository.delete(artifactId) }
}
}
}
.awaitAll()
logger.info("Artifacts and their files have deleted")

logger.info("Artifacts and their files have been deleted")

coroutineScope {
launch { taskRepository.deleteAllByRunId(run.id) }
launch { runRepository.delete(run) }
}

logger.info("Tasks and run have been deleted")
deletedRunCount++
}

return presenter(CleanupRunResponse(deletedCount = runs.size))
return presenter(CleanupRunResponse(deletedCount = deletedRunCount))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package org.nxcloudce.server.domain.run.interactor
import ch.tutteli.atrium.api.fluent.en_GB.toEqual
import ch.tutteli.atrium.api.verbs.expect
import io.mockk.*
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.test.runTest
import org.junit.jupiter.api.Test
import org.nxcloudce.server.domain.run.gateway.ArtifactRepository
Expand Down Expand Up @@ -31,22 +32,16 @@ class CleanupRunImplTest {
val dummyTasks = listOf(buildTask(dummyRuns[0].id, dummyRuns[0].workspaceId), buildTask(dummyRuns[0].id, dummyRuns[0].workspaceId))
val dummyArtifacts =
listOf(
buildArtifact(dummyTasks[0].hash, dummyTasks[0].workspaceId),
buildArtifact(dummyTasks[1].hash, dummyTasks[1].workspaceId),
buildArtifact(dummyTasks[0].hash, dummyTasks[0].workspaceId, dummyTasks[0].artifactId!!),
buildArtifact(dummyTasks[1].hash, dummyTasks[1].workspaceId, dummyTasks[1].artifactId!!),
)

coEvery { mockRunRepository.findAllByCreationDateOlderThan(thresholdDate) } returns dummyRuns
coEvery { mockTaskRepository.findAllByRunId(any()) } returns dummyTasks
coEvery { mockRunRepository.findAllByCreationDateOlderThan(thresholdDate) } returns flowOf(*dummyRuns.toTypedArray())
coEvery { mockTaskRepository.findAllByRunId(any()) } returns flowOf(*dummyTasks.toTypedArray())
coEvery { mockArtifactRepository.findByHash(any(), dummyRuns[0].workspaceId) } returns dummyArtifacts

coEvery { mockStorageService.deleteArtifact(any(), dummyRuns[0].workspaceId) } just runs
coEvery {
mockArtifactRepository.delete(
match {
dummyArtifacts.contains(it)
},
)
} returns true
coEvery { mockArtifactRepository.delete(any()) } returns true
coEvery { mockTaskRepository.deleteAllByRunId(any()) } returns 2L
coEvery { mockRunRepository.delete(match { dummyRuns.contains(it) }) } returns true

Expand All @@ -59,7 +54,7 @@ class CleanupRunImplTest {

coVerify(exactly = 1) { mockStorageService.deleteArtifact(dummyArtifacts[0].id, dummyRuns[0].workspaceId) }
coVerify(exactly = 1) { mockStorageService.deleteArtifact(dummyArtifacts[1].id, dummyRuns[0].workspaceId) }
coVerify(exactly = 2) { mockArtifactRepository.delete(match { dummyArtifacts.contains(it) }) }
coVerify(exactly = 2) { mockArtifactRepository.delete(any()) }
coVerify(exactly = 1) { mockTaskRepository.deleteAllByRunId(dummyRuns[0].id) }
coVerify(exactly = 1) { mockRunRepository.delete(dummyRuns[0]) }
}
Expand Down Expand Up @@ -108,9 +103,10 @@ class CleanupRunImplTest {
private fun buildArtifact(
hash: Hash,
workspaceId: WorkspaceId,
id: ArtifactId = ArtifactId(),
): Artifact.Exist =
Artifact.Exist(
id = ArtifactId(),
id = id,
hash = hash,
workspaceId = workspaceId,
put = "put-url",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package org.nxcloudce.server.persistence.repository

import io.quarkus.mongodb.panache.kotlin.reactive.ReactivePanacheMongoRepository
import io.smallrye.mutiny.Uni
import io.smallrye.mutiny.Multi
import jakarta.enterprise.context.ApplicationScoped
import org.nxcloudce.server.persistence.entity.RunEntity
import java.time.LocalDateTime

@ApplicationScoped
class RunPanacheRepository : ReactivePanacheMongoRepository<RunEntity> {
fun findAllByEndTimeLowerThan(creationDate: LocalDateTime): Uni<List<RunEntity>> {
return find("${RunEntity::endTime.name} < ?1", creationDate).list()
fun findAllByEndTimeLowerThan(creationDate: LocalDateTime): Multi<RunEntity> {
return find("${RunEntity::endTime.name} < ?1", creationDate).stream()
}
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
package org.nxcloudce.server.persistence.repository

import io.quarkus.mongodb.panache.kotlin.reactive.ReactivePanacheMongoRepository
import io.smallrye.mutiny.Multi
import io.smallrye.mutiny.Uni
import jakarta.enterprise.context.ApplicationScoped
import org.bson.types.ObjectId
import org.nxcloudce.server.persistence.entity.TaskEntity

@ApplicationScoped
class TaskPanacheRepository : ReactivePanacheMongoRepository<TaskEntity> {
fun findAllByRunId(runId: ObjectId): Uni<List<TaskEntity>> = find(TaskEntity::runId.name, runId).list()
fun findAllByRunId(runId: ObjectId): Multi<TaskEntity> = find(TaskEntity::runId.name, runId).stream()

fun deleteAllByRunId(runId: ObjectId): Uni<Long> = delete(TaskEntity::runId.name, runId)
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package org.nxcloudce.server.persistence.repository
import ch.tutteli.atrium.api.fluent.en_GB.toEqual
import ch.tutteli.atrium.api.verbs.expect
import io.quarkus.test.junit.QuarkusTest
import io.smallrye.mutiny.coroutines.asFlow
import io.smallrye.mutiny.coroutines.awaitSuspending
import jakarta.inject.Inject
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.test.runTest
import org.bson.types.ObjectId
Expand Down Expand Up @@ -43,7 +45,7 @@ class RunPanacheRepositoryTest {
runPanacheRepository.persist(listOf(buildRunEntity())).awaitSuspending()
runPanacheRepository.persist(listOf(buildRunEntity(olderDate))).awaitSuspending()

val result = runPanacheRepository.findAllByEndTimeLowerThan(LocalDateTime.now().minusDays(1)).awaitSuspending()
val result = runPanacheRepository.findAllByEndTimeLowerThan(LocalDateTime.now().minusDays(1)).asFlow().toList()
val totalCount = runPanacheRepository.count().awaitSuspending()

expect(result.size).toEqual(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import ch.tutteli.atrium.api.fluent.en_GB.toContainExactly
import ch.tutteli.atrium.api.fluent.en_GB.toEqual
import ch.tutteli.atrium.api.verbs.expect
import io.quarkus.test.junit.QuarkusTest
import io.smallrye.mutiny.coroutines.asFlow
import io.smallrye.mutiny.coroutines.awaitSuspending
import jakarta.inject.Inject
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.test.runTest
import org.bson.types.ObjectId
Expand Down Expand Up @@ -43,7 +45,7 @@ class TaskPanacheRepositoryTest {
val runId = ObjectId()
taskPanacheRepository.persist(listOf(buildTaskEntity(runId), buildTaskEntity(runId), buildTaskEntity())).awaitSuspending()

val result = taskPanacheRepository.findAllByRunId(runId).awaitSuspending()
val result = taskPanacheRepository.findAllByRunId(runId).asFlow().toList()
val totalCount = taskPanacheRepository.count().awaitSuspending()

expect(totalCount).toEqual(3)
Expand Down

0 comments on commit a1d341c

Please sign in to comment.