Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Herald supports reach-only llv2 protocol. #1135

Merged
merged 11 commits into from
Aug 3, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,7 @@ liquid_legions_v2 {
role: AGGREGATOR
external_aggregator_duchy_id: "aggregator"
}
reach_only_liquid_legions_v2 {
role: AGGREGATOR
external_aggregator_duchy_id: "aggregator"
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,7 @@ liquid_legions_v2 {
role: NON_AGGREGATOR
external_aggregator_duchy_id: "aggregator"
}
reach_only_liquid_legions_v2 {
role: NON_AGGREGATOR
external_aggregator_duchy_id: "aggregator"
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ kt_jvm_library(
srcs = [
"Herald.kt",
"LiquidLegionsV2Starter.kt",
"ReachOnlyLiquidLegionsV2Starter.kt",
],
runtime_deps = ["@wfa_common_jvm//imports/java/io/grpc/netty"],
deps = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,7 @@ import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Semaphore
import org.wfanet.measurement.common.grpc.grpcStatusCode
import org.wfanet.measurement.common.protoTimestamp
import org.wfanet.measurement.duchy.daemon.utils.MeasurementType
import org.wfanet.measurement.duchy.daemon.utils.key
import org.wfanet.measurement.duchy.daemon.utils.toMeasurementType
import org.wfanet.measurement.duchy.service.internal.computations.toGetTokenRequest
import org.wfanet.measurement.internal.duchy.ComputationDetails
import org.wfanet.measurement.internal.duchy.ComputationsGrpcKt.ComputationsCoroutineStub
Expand Down Expand Up @@ -229,16 +227,22 @@ class Herald(
val globalId: String = systemComputation.key.computationId
logger.info("[id=$globalId] Creating Computation...")
try {
when (systemComputation.toMeasurementType()) {
MeasurementType.REACH,
MeasurementType.REACH_AND_FREQUENCY -> {
when (systemComputation.mpcProtocolConfig.protocolCase) {
Computation.MpcProtocolConfig.ProtocolCase.LIQUID_LEGIONS_V2 ->
LiquidLegionsV2Starter.createComputation(
internalComputationsClient,
systemComputation,
protocolsSetupConfig.liquidLegionsV2,
blobStorageBucket
)
}
Computation.MpcProtocolConfig.ProtocolCase.REACH_ONLY_LIQUID_LEGIONS_V2 ->
ReachOnlyLiquidLegionsV2Starter.createComputation(
internalComputationsClient,
systemComputation,
protocolsSetupConfig.reachOnlyLiquidLegionsV2,
blobStorageBucket
)
else -> error("Unknown or unsupported protocol for creation.")
}
logger.info("[id=$globalId]: Created Computation")
} catch (e: StatusException) {
Expand Down Expand Up @@ -302,6 +306,13 @@ class Herald(
systemComputation,
protocolsSetupConfig.liquidLegionsV2.externalAggregatorDuchyId
)
ComputationDetails.ProtocolCase.REACH_ONLY_LIQUID_LEGIONS_V2 ->
ReachOnlyLiquidLegionsV2Starter.updateRequisitionsAndKeySets(
token,
internalComputationsClient,
systemComputation,
protocolsSetupConfig.reachOnlyLiquidLegionsV2.externalAggregatorDuchyId
)
else -> error("Unknown or unsupported protocol.")
}
logger.info("[id=$globalId]: Confirmed Computation")
Expand All @@ -317,6 +328,8 @@ class Herald(
when (token.computationDetails.protocolCase) {
ComputationDetails.ProtocolCase.LIQUID_LEGIONS_V2 ->
LiquidLegionsV2Starter.startComputation(token, internalComputationsClient)
ComputationDetails.ProtocolCase.REACH_ONLY_LIQUID_LEGIONS_V2 ->
ReachOnlyLiquidLegionsV2Starter.startComputation(token, internalComputationsClient)
else -> error("Unknown or unsupported protocol.")
}
logger.info("[id=$globalId]: Started Computation")
Expand Down Expand Up @@ -365,8 +378,10 @@ class Herald(
} ?: return

if (
token.computationDetails.hasLiquidLegionsV2() &&
token.computationStage == LiquidLegionsV2Starter.TERMINAL_STAGE
(token.computationDetails.hasLiquidLegionsV2() &&
token.computationStage == LiquidLegionsV2Starter.TERMINAL_STAGE) ||
(token.computationDetails.hasReachOnlyLiquidLegionsV2() &&
token.computationStage == ReachOnlyLiquidLegionsV2Starter.TERMINAL_STAGE)
) {
return
}
Expand All @@ -376,6 +391,8 @@ class Herald(
endingComputationStage =
when (token.computationDetails.protocolCase) {
ComputationDetails.ProtocolCase.LIQUID_LEGIONS_V2 -> LiquidLegionsV2Starter.TERMINAL_STAGE
ComputationDetails.ProtocolCase.REACH_ONLY_LIQUID_LEGIONS_V2 ->
ReachOnlyLiquidLegionsV2Starter.TERMINAL_STAGE
else -> error { "Unknown or unsupported protocol." }
}
reason = ComputationDetails.CompletedReason.FAILED
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ object LiquidLegionsV2Starter {
Stage.WAIT_EXECUTION_PHASE_THREE_INPUTS,
Stage.EXECUTION_PHASE_THREE,
Stage.COMPLETE -> {
logger.info(
logger.warning(
"[id=${token.globalComputationId}]: not updating," +
" stage '$stage' is after WAIT_REQUISITIONS_AND_KEY_SET"
)
Expand Down Expand Up @@ -245,7 +245,7 @@ object LiquidLegionsV2Starter {
Stage.WAIT_EXECUTION_PHASE_THREE_INPUTS,
Stage.EXECUTION_PHASE_THREE,
Stage.COMPLETE -> {
logger.info(
logger.warning(
"[id=${token.globalComputationId}]: not starting," +
" stage '$stage' is after WAIT_TO_START"
)
Expand Down
Loading