Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Execute exchange tasks as new coroutines. #1962

Merged
merged 1 commit into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,16 @@
package org.wfanet.panelmatch.client.deploy

import java.time.Clock
import kotlinx.coroutines.Job
import kotlinx.coroutines.currentCoroutineContext
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.isActive
import kotlinx.coroutines.job
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.wfanet.measurement.common.logAndSuppressExceptionSuspend
import org.wfanet.measurement.common.throttler.Throttler
import org.wfanet.panelmatch.client.common.Identity
import org.wfanet.panelmatch.client.exchangetasks.ExchangeTaskMapper
import org.wfanet.panelmatch.client.launcher.ApiClient
import org.wfanet.panelmatch.client.launcher.ExchangeStepLauncher
import org.wfanet.panelmatch.client.launcher.ExchangeStepExecutor
import org.wfanet.panelmatch.client.launcher.ExchangeStepValidatorImpl
import org.wfanet.panelmatch.client.launcher.ExchangeTaskExecutor
import org.wfanet.panelmatch.client.storage.PrivateStorageSelector
Expand All @@ -36,6 +35,7 @@ import org.wfanet.panelmatch.client.storage.StorageDetailsProvider
import org.wfanet.panelmatch.common.ExchangeDateKey
import org.wfanet.panelmatch.common.Timeout
import org.wfanet.panelmatch.common.certificates.CertificateManager
import org.wfanet.panelmatch.common.loggerFor
import org.wfanet.panelmatch.common.secrets.SecretMap
import org.wfanet.panelmatch.common.storage.StorageFactory

Expand Down Expand Up @@ -99,7 +99,8 @@ abstract class ExchangeWorkflowDaemon : Runnable {
protected val sharedStorageSelector: SharedStorageSelector by lazy {
SharedStorageSelector(certificateManager, sharedStorageFactories, sharedStorageInfo)
}
protected open val stepExecutor by lazy {

protected open val stepExecutor: ExchangeStepExecutor by lazy {
ExchangeTaskExecutor(
apiClient = apiClient,
timeout = taskTimeout,
Expand All @@ -112,35 +113,70 @@ abstract class ExchangeWorkflowDaemon : Runnable {
override fun run() = runBlocking { runSuspending() }

suspend fun runSuspending() {
val exchangeStepLauncher =
ExchangeStepLauncher(apiClient = apiClient, taskLauncher = stepExecutor)
when (runMode) {
RunMode.DAEMON -> runDaemon(exchangeStepLauncher)
RunMode.CRON_JOB -> runCronJob(exchangeStepLauncher)
RunMode.DAEMON -> runDaemon()
RunMode.CRON_JOB -> runCronJob()
}
}

/** Runs [exchangeStepLauncher] in an infinite loop. */
protected open suspend fun runDaemon(exchangeStepLauncher: ExchangeStepLauncher) {
throttler.loopOnReady {
// All errors thrown inside the loop should be suppressed such that the daemon doesn't crash.
logAndSuppressExceptionSuspend { exchangeStepLauncher.findAndRunExchangeStep() }
/**
* Claims and executes exchange steps in an infinite loop. Claimed steps are launched as child
* coroutines to allow multiple steps to execute concurrently.
*/
protected open suspend fun runDaemon() = coroutineScope {
while (coroutineContext.isActive) {
val step =
throttler.onReady {
try {
apiClient.claimExchangeStep()
} catch (e: Exception) {
logger.severe("Failed to claim exchange step: $e")
null
}
}

if (step != null) {
launch {
try {
stepExecutor.execute(step)
} catch (e: Exception) {
logger.severe("Failed to execute exchange step: $e")
}
}
}
}
}

/**
* Runs [exchangeStepLauncher] in a loop until there are no remaining tasks and all launched tasks
* have completed.
* Claims and executes exchange steps until all claimed steps have completed and no further steps
* are available. Claimed steps are launched as child coroutines to allow multiple steps to
* execute concurrently.
*/
protected open suspend fun runCronJob(exchangeStepLauncher: ExchangeStepLauncher) {
val activeJobs = mutableListOf<Job>()
do {
activeJobs.removeIf { !it.isActive }
val job = logAndSuppressExceptionSuspend { exchangeStepLauncher.findAndRunExchangeStep() }
if (job != null) {
activeJobs += job
protected open suspend fun runCronJob() = coroutineScope {
while (coroutineContext.isActive) {
val step =
throttler.onReady {
try {
apiClient.claimExchangeStep()
} catch (e: Exception) {
logger.severe("Failed to claim exchange step: $e")
null
}
}

if (step != null) {
launch {
try {
stepExecutor.execute(step)
} catch (e: Exception) {
logger.severe("Failed to execute exchange step: $e")
}
}
} else if (coroutineContext.job.children.none { it.isActive }) {
logger.info("All available steps executed; shutting down.")
break
}
} while (currentCoroutineContext().isActive && activeJobs.isNotEmpty())
}
}

enum class RunMode {
Expand All @@ -153,4 +189,8 @@ abstract class ExchangeWorkflowDaemon : Runnable {
*/
CRON_JOB,
}

companion object {
private val logger by loggerFor()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,8 @@

package org.wfanet.panelmatch.client.launcher

import kotlinx.coroutines.Job

/** Executes [ApiClient.ClaimedExchangeStep]s. */
interface ExchangeStepExecutor {
/** Executes [exchangeStep] in a new coroutine and returns the running [Job]. */
suspend fun execute(exchangeStep: ApiClient.ClaimedExchangeStep): Job
/** Executes [exchangeStep]. */
suspend fun execute(exchangeStep: ApiClient.ClaimedExchangeStep)
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,9 @@ package org.wfanet.panelmatch.client.launcher
import com.google.protobuf.ByteString
import java.util.logging.Level
import java.util.logging.Logger
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.cancel
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import org.wfanet.measurement.storage.StorageClient
import org.wfanet.measurement.storage.StorageClient.Blob
import org.wfanet.panelmatch.client.common.ExchangeContext
Expand Down Expand Up @@ -54,12 +49,11 @@ class ExchangeTaskExecutor(
private val privateStorageSelector: PrivateStorageSelector,
private val exchangeTaskMapper: ExchangeTaskMapper,
private val validator: ExchangeStepValidator,
private val dispatcher: CoroutineDispatcher = Dispatchers.Default,
) : ExchangeStepExecutor {

override suspend fun execute(exchangeStep: ApiClient.ClaimedExchangeStep): Job = coroutineScope {
override suspend fun execute(exchangeStep: ApiClient.ClaimedExchangeStep) {
val attemptKey = exchangeStep.attemptKey
launch(dispatcher + CoroutineName(attemptKey.toString()) + TaskLog(attemptKey.toString())) {
withContext(CoroutineName(attemptKey.toString()) + TaskLog(attemptKey.toString())) {
try {
val validatedStep = validator.validate(exchangeStep)
val context =
Expand All @@ -79,7 +73,6 @@ class ExchangeTaskExecutor(
else -> ExchangeStepAttempt.State.FAILED
}
markAsFinished(attemptKey, attemptState)
cancel("Task failed and reported back to Kingdom. Cancelling task scope.", e)
}
}
}
Expand Down
33 changes: 33 additions & 0 deletions src/test/kotlin/org/wfanet/panelmatch/client/deploy/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,5 +1,38 @@
load("@wfa_rules_kotlin_jvm//kotlin:defs.bzl", "kt_jvm_test")

kt_jvm_test(
name = "ExchangeWorkflowDaemonTest",
timeout = "short",
srcs = ["ExchangeWorkflowDaemonTest.kt"],
test_class = "org.wfanet.panelmatch.client.deploy.ExchangeWorkflowDaemonTest",
deps = [
"//src/main/kotlin/org/wfanet/panelmatch/client/deploy",
"//src/main/kotlin/org/wfanet/panelmatch/client/deploy/testing",
"//src/main/kotlin/org/wfanet/panelmatch/client/eventpreprocessing/testing",
"//src/main/kotlin/org/wfanet/panelmatch/client/exchangetasks",
"//src/main/kotlin/org/wfanet/panelmatch/client/exchangetasks/testing",
"//src/main/kotlin/org/wfanet/panelmatch/client/launcher",
"//src/main/kotlin/org/wfanet/panelmatch/client/launcher/testing",
"//src/main/kotlin/org/wfanet/panelmatch/client/privatemembership/testing",
"//src/main/kotlin/org/wfanet/panelmatch/client/storage",
"//src/main/kotlin/org/wfanet/panelmatch/client/storage/testing",
"//src/main/kotlin/org/wfanet/panelmatch/common/crypto/testing",
"//src/main/kotlin/org/wfanet/panelmatch/common/secrets/testing",
"//src/main/kotlin/org/wfanet/panelmatch/common/testing",
"//src/main/proto/wfa/panelmatch/client/internal:exchange_workflow_kt_jvm_proto",
"//src/main/proto/wfa/panelmatch/client/storage:storage_details_kt_jvm_proto",
"@wfa_common_jvm//imports/java/com/google/common/truth",
"@wfa_common_jvm//imports/java/com/google/common/truth/extensions/proto",
"@wfa_common_jvm//imports/java/org/junit",
"@wfa_common_jvm//imports/kotlin/kotlin/test",
"@wfa_common_jvm//imports/kotlin/org/mockito/kotlin",
"@wfa_common_jvm//src/main/kotlin/org/wfanet/measurement/common",
"@wfa_common_jvm//src/main/kotlin/org/wfanet/measurement/common/crypto:security_provider",
"@wfa_common_jvm//src/main/kotlin/org/wfanet/measurement/common/crypto/testing",
"@wfa_common_jvm//src/main/kotlin/org/wfanet/measurement/common/throttler/testing",
],
)

kt_jvm_test(
name = "ProductionExchangeTaskMapperTest",
timeout = "short",
Expand Down
Loading