Skip to content

Commit

Permalink
Herald supports reach-only llv2 protocol. (#1135)
Browse files Browse the repository at this point in the history
  • Loading branch information
renjiezh authored Aug 3, 2023
1 parent a35369c commit 05950f8
Show file tree
Hide file tree
Showing 11 changed files with 822 additions and 128 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,7 @@ liquid_legions_v2 {
role: AGGREGATOR
external_aggregator_duchy_id: "aggregator"
}
reach_only_liquid_legions_v2 {
role: AGGREGATOR
external_aggregator_duchy_id: "aggregator"
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,7 @@ liquid_legions_v2 {
role: NON_AGGREGATOR
external_aggregator_duchy_id: "aggregator"
}
reach_only_liquid_legions_v2 {
role: NON_AGGREGATOR
external_aggregator_duchy_id: "aggregator"
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ kt_jvm_library(
srcs = [
"Herald.kt",
"LiquidLegionsV2Starter.kt",
"ReachOnlyLiquidLegionsV2Starter.kt",
],
runtime_deps = ["@wfa_common_jvm//imports/java/io/grpc/netty"],
deps = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,7 @@ import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Semaphore
import org.wfanet.measurement.common.grpc.grpcStatusCode
import org.wfanet.measurement.common.protoTimestamp
import org.wfanet.measurement.duchy.daemon.utils.MeasurementType
import org.wfanet.measurement.duchy.daemon.utils.key
import org.wfanet.measurement.duchy.daemon.utils.toMeasurementType
import org.wfanet.measurement.duchy.service.internal.computations.toGetTokenRequest
import org.wfanet.measurement.internal.duchy.ComputationDetails
import org.wfanet.measurement.internal.duchy.ComputationsGrpcKt.ComputationsCoroutineStub
Expand Down Expand Up @@ -229,16 +227,22 @@ class Herald(
val globalId: String = systemComputation.key.computationId
logger.info("[id=$globalId] Creating Computation...")
try {
when (systemComputation.toMeasurementType()) {
MeasurementType.REACH,
MeasurementType.REACH_AND_FREQUENCY -> {
when (systemComputation.mpcProtocolConfig.protocolCase) {
Computation.MpcProtocolConfig.ProtocolCase.LIQUID_LEGIONS_V2 ->
LiquidLegionsV2Starter.createComputation(
internalComputationsClient,
systemComputation,
protocolsSetupConfig.liquidLegionsV2,
blobStorageBucket
)
}
Computation.MpcProtocolConfig.ProtocolCase.REACH_ONLY_LIQUID_LEGIONS_V2 ->
ReachOnlyLiquidLegionsV2Starter.createComputation(
internalComputationsClient,
systemComputation,
protocolsSetupConfig.reachOnlyLiquidLegionsV2,
blobStorageBucket
)
else -> error("Unknown or unsupported protocol for creation.")
}
logger.info("[id=$globalId]: Created Computation")
} catch (e: StatusException) {
Expand Down Expand Up @@ -302,6 +306,13 @@ class Herald(
systemComputation,
protocolsSetupConfig.liquidLegionsV2.externalAggregatorDuchyId
)
ComputationDetails.ProtocolCase.REACH_ONLY_LIQUID_LEGIONS_V2 ->
ReachOnlyLiquidLegionsV2Starter.updateRequisitionsAndKeySets(
token,
internalComputationsClient,
systemComputation,
protocolsSetupConfig.reachOnlyLiquidLegionsV2.externalAggregatorDuchyId
)
else -> error("Unknown or unsupported protocol.")
}
logger.info("[id=$globalId]: Confirmed Computation")
Expand All @@ -317,6 +328,8 @@ class Herald(
when (token.computationDetails.protocolCase) {
ComputationDetails.ProtocolCase.LIQUID_LEGIONS_V2 ->
LiquidLegionsV2Starter.startComputation(token, internalComputationsClient)
ComputationDetails.ProtocolCase.REACH_ONLY_LIQUID_LEGIONS_V2 ->
ReachOnlyLiquidLegionsV2Starter.startComputation(token, internalComputationsClient)
else -> error("Unknown or unsupported protocol.")
}
logger.info("[id=$globalId]: Started Computation")
Expand Down Expand Up @@ -365,8 +378,10 @@ class Herald(
} ?: return

if (
token.computationDetails.hasLiquidLegionsV2() &&
token.computationStage == LiquidLegionsV2Starter.TERMINAL_STAGE
(token.computationDetails.hasLiquidLegionsV2() &&
token.computationStage == LiquidLegionsV2Starter.TERMINAL_STAGE) ||
(token.computationDetails.hasReachOnlyLiquidLegionsV2() &&
token.computationStage == ReachOnlyLiquidLegionsV2Starter.TERMINAL_STAGE)
) {
return
}
Expand All @@ -376,6 +391,8 @@ class Herald(
endingComputationStage =
when (token.computationDetails.protocolCase) {
ComputationDetails.ProtocolCase.LIQUID_LEGIONS_V2 -> LiquidLegionsV2Starter.TERMINAL_STAGE
ComputationDetails.ProtocolCase.REACH_ONLY_LIQUID_LEGIONS_V2 ->
ReachOnlyLiquidLegionsV2Starter.TERMINAL_STAGE
else -> error { "Unknown or unsupported protocol." }
}
reason = ComputationDetails.CompletedReason.FAILED
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ object LiquidLegionsV2Starter {
Stage.WAIT_EXECUTION_PHASE_THREE_INPUTS,
Stage.EXECUTION_PHASE_THREE,
Stage.COMPLETE -> {
logger.info(
logger.warning(
"[id=${token.globalComputationId}]: not updating," +
" stage '$stage' is after WAIT_REQUISITIONS_AND_KEY_SET"
)
Expand Down Expand Up @@ -245,7 +245,7 @@ object LiquidLegionsV2Starter {
Stage.WAIT_EXECUTION_PHASE_THREE_INPUTS,
Stage.EXECUTION_PHASE_THREE,
Stage.COMPLETE -> {
logger.info(
logger.warning(
"[id=${token.globalComputationId}]: not starting," +
" stage '$stage' is after WAIT_TO_START"
)
Expand Down
Loading

0 comments on commit 05950f8

Please sign in to comment.