Skip to content

Commit

Permalink
Circulate direct computation methodology from protocol config to meas…
Browse files Browse the repository at this point in the history
…urement result.
  • Loading branch information
riemanli committed Sep 1, 2023
1 parent bcb46ae commit aac627d
Show file tree
Hide file tree
Showing 27 changed files with 1,508 additions and 225 deletions.
6 changes: 4 additions & 2 deletions build/repositories.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,13 @@ def wfa_measurement_system_repositories():
version = "0.10.0",
)

# DO_NOT_SUBMIT(world-federation-of-advertisers/cross-media-measurement-api/#163):
# switch to a release version before submitting the PR.
wfa_repo_archive(
name = "wfa_measurement_proto",
repo = "cross-media-measurement-api",
sha256 = "1d829e7d95e6dedea1a4ea746e5613915dd60ca095b7b35bdcf19fa067697f2a",
version = "0.39.2",
sha256 = "e9f24d78e06f5ec78fe4c4ed42d73ad5440f2af9ed7b5e560df2040ce75b592f",
version = "0.39.3",
)

wfa_repo_archive(
Expand Down
8 changes: 7 additions & 1 deletion src/main/k8s/kingdom.cue
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ import ("strings")

_open_id_redirect_uri_flag: "--open-id-redirect-uri=https://localhost:2048"

_directNoiseMechanismFlags: [
"--direct-noise-mechanism=NONE",
"--direct-noise-mechanism=CONTINUOUS_LAPLACE",
"--direct-noise-mechanism=CONTINUOUS_GAUSSIAN",
]

_kingdomCompletedMeasurementsTimeToLiveFlag: "--time-to-live=\(_completedMeasurementsTimeToLive)"
_kingdomCompletedMeasurementsDryRunRetentionPolicyFlag: "--dry-run=\(_completedMeasurementsDryRun)"
_kingdomPendingMeasurementsTimeToLiveFlag: "--time-to-live=\(_pendingMeasurementsTimeToLive)"
Expand Down Expand Up @@ -169,7 +175,7 @@ import ("strings")
_akid_to_principal_map_file_flag,
_open_id_redirect_uri_flag,
_duchy_info_config_flag,
] + Container._commonServerFlags
] + _directNoiseMechanismFlags + Container._commonServerFlags
}
spec: template: spec: {
_mounts: "config-files": #ConfigMapMount
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,12 @@ package org.wfanet.measurement.eventdataprovider.noiser
/** Internal Differential Privacy(DP) parameters. */
data class DpParams(val epsilon: Double, val delta: Double)

/**
* Noise mechanism for generating publisher noise for direct measurements.
*
* TODO(@iverson52000): Move this to public API if EDP needs to report back the direct noise
* mechanism for PBM tracking. NONE mechanism is testing only and should not move to public API.
*/
/** Noise mechanism for generating publisher noise for direct measurements. */
enum class DirectNoiseMechanism {
/** NONE mechanism is testing only. */
NONE,
LAPLACE,
GAUSSIAN,
CONTINUOUS_LAPLACE,
CONTINUOUS_GAUSSIAN,
}

/** A base Noiser interface for direct measurements. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ kt_jvm_library(
"//src/main/kotlin/org/wfanet/measurement/kingdom/deploy/common/service:data_services",
"//src/main/kotlin/org/wfanet/measurement/kingdom/service/api/v2alpha:api_key_authentication_server_interceptor",
"//src/main/kotlin/org/wfanet/measurement/loadtest/panelmatchresourcesetup",
"//src/main/proto/wfa/measurement/api/v2alpha:protocol_config_kt_jvm_proto",
"@wfa_common_jvm//src/main/kotlin/org/wfanet/measurement/common/grpc",
"@wfa_common_jvm//src/main/kotlin/org/wfanet/measurement/common/grpc/testing",
"@wfa_common_jvm//src/main/kotlin/org/wfanet/measurement/common/testing",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ import org.wfanet.measurement.api.v2alpha.RequisitionsGrpcKt.RequisitionsCorouti
import org.wfanet.measurement.api.v2alpha.event_group_metadata.testing.SyntheticEventGroupSpec
import org.wfanet.measurement.common.identity.withPrincipalName
import org.wfanet.measurement.common.throttler.MinimumIntervalThrottler
import org.wfanet.measurement.eventdataprovider.noiser.DirectNoiseMechanism
import org.wfanet.measurement.eventdataprovider.privacybudgetmanagement.CompositionMechanism
import org.wfanet.measurement.eventdataprovider.privacybudgetmanagement.InMemoryBackingStore
import org.wfanet.measurement.eventdataprovider.privacybudgetmanagement.PrivacyBucketFilter
Expand Down Expand Up @@ -102,7 +101,6 @@ class InProcessEdpSimulator(
100.0f
),
trustedCertificates = trustedCertificates,
DIRECT_NOISE_MECHANISM,
random = random,
compositionMechanism = COMPOSITION_MECHANISM,
)
Expand Down Expand Up @@ -133,7 +131,6 @@ class InProcessEdpSimulator(
private val logger: Logger = Logger.getLogger(this::class.java.name)
private const val RANDOM_SEED: Long = 1
private val random = Random(RANDOM_SEED)
private val DIRECT_NOISE_MECHANISM = DirectNoiseMechanism.LAPLACE
private val COMPOSITION_MECHANISM = CompositionMechanism.DP_ADVANCED
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import java.util.logging.Logger
import org.junit.rules.TestRule
import org.junit.runner.Description
import org.junit.runners.model.Statement
import org.wfanet.measurement.api.v2alpha.ProtocolConfig
import org.wfanet.measurement.api.v2alpha.testing.withMetadataPrincipalIdentities
import org.wfanet.measurement.common.grpc.testing.GrpcTestServerRule
import org.wfanet.measurement.common.grpc.withDefaultDeadline
Expand Down Expand Up @@ -150,7 +151,7 @@ class InProcessKingdom(
EventGroupMetadataDescriptorsService(internalEventGroupMetadataDescriptorsClient)
.withMetadataPrincipalIdentities()
.withApiKeyAuthenticationServerInterceptor(internalApiKeysClient),
MeasurementsService(internalMeasurementsClient)
MeasurementsService(internalMeasurementsClient, MEASUREMENT_NOISE_MECHANISMS)
.withMetadataPrincipalIdentities()
.withApiKeyAuthenticationServerInterceptor(internalApiKeysClient),
PublicKeysService(internalPublicKeysClient)
Expand Down Expand Up @@ -206,5 +207,11 @@ class InProcessKingdom(

/** Default deadline for RPCs to internal server in milliseconds. */
private const val DEFAULT_INTERNAL_DEADLINE_MILLIS = 30_000L
private val MEASUREMENT_NOISE_MECHANISMS: List<ProtocolConfig.NoiseMechanism> =
listOf(
ProtocolConfig.NoiseMechanism.NONE,
ProtocolConfig.NoiseMechanism.CONTINUOUS_LAPLACE,
ProtocolConfig.NoiseMechanism.CONTINUOUS_GAUSSIAN,
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package org.wfanet.measurement.kingdom.deploy.common.server
import io.grpc.ServerServiceDefinition
import java.io.File
import org.wfanet.measurement.api.v2alpha.AkidPrincipalLookup
import org.wfanet.measurement.api.v2alpha.ProtocolConfig.NoiseMechanism
import org.wfanet.measurement.api.v2alpha.withPrincipalsFromX509AuthorityKeyIdentifiers
import org.wfanet.measurement.common.commandLineMain
import org.wfanet.measurement.common.crypto.SigningCerts
Expand Down Expand Up @@ -143,6 +144,7 @@ private fun run(
.withApiKeyAuthenticationServerInterceptor(internalApiKeysCoroutineStub),
MeasurementsService(
InternalMeasurementsCoroutineStub(channel),
v2alphaFlags.directNoiseMechanisms
)
.withPrincipalsFromX509AuthorityKeyIdentifiers(principalLookup)
.withApiKeyAuthenticationServerInterceptor(internalApiKeysCoroutineStub),
Expand Down Expand Up @@ -192,6 +194,7 @@ fun main(args: Array<String>) = commandLineMain(::run, args)

/** Flags specific to the V2alpha API version. */
private class V2alphaFlags {

@CommandLine.Option(
names = ["--authority-key-identifier-to-principal-map-file"],
description = ["File path to a AuthorityKeyToPrincipalMap textproto"],
Expand All @@ -207,4 +210,51 @@ private class V2alphaFlags {
)
lateinit var redirectUri: String
private set

lateinit var directNoiseMechanisms: List<NoiseMechanism>
private set

@CommandLine.Spec
lateinit var spec: CommandLine.Model.CommandSpec // injected by picocli
private set

@CommandLine.Option(
names = ["--direct-noise-mechanism"],
description =
[
"Noise mechanisms that can be used in direct computation. It can be specified multiple " +
"times."
],
required = true
)
fun setDirectNoiseMechanisms(noiseMechanisms: List<NoiseMechanism>) {
for (noiseMechanism in noiseMechanisms) {
when (noiseMechanism) {
NoiseMechanism.NONE,
NoiseMechanism.CONTINUOUS_LAPLACE,
NoiseMechanism.CONTINUOUS_GAUSSIAN -> {}
NoiseMechanism.GEOMETRIC,
// TODO(@riemanli): support DISCRETE_GAUSSIAN after having a clear definition of it.
NoiseMechanism.DISCRETE_GAUSSIAN -> {
throw CommandLine.ParameterException(
spec.commandLine(),
String.format(
"Invalid noise mechanism $noiseMechanism for option '--direct-noise-mechanism'. " +
"Discrete mechanisms are not supported for direct computations."
)
)
}
NoiseMechanism.NOISE_MECHANISM_UNSPECIFIED,
NoiseMechanism.UNRECOGNIZED -> {
throw CommandLine.ParameterException(
spec.commandLine(),
String.format(
"Invalid noise mechanism $noiseMechanism for option '--direct-noise-mechanism'."
)
)
}
}
}
directNoiseMechanisms = noiseMechanisms
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,9 @@ class CreateMeasurement(private val request: CreateMeasurementRequest) :
ProtocolConfig.ProtocolCase.REACH_ONLY_LIQUID_LEGIONS_V2 -> {
createComputedMeasurement(request.measurement, measurementConsumerId)
}
ProtocolConfig.ProtocolCase.PROTOCOL_NOT_SET ->
ProtocolConfig.ProtocolCase.DIRECT ->
createDirectMeasurement(request.measurement, measurementConsumerId)
ProtocolConfig.ProtocolCase.PROTOCOL_NOT_SET -> error("Protocol is not set.")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,7 @@ class ExchangeStepsService(private val internalExchangeSteps: InternalExchangeSt
try {
it.toV2Alpha()
} catch (e: Throwable) {
failGrpc(Status.INVALID_ARGUMENT) {
e.message ?: "Failed to convert ProtocolConfig ExchangeStep"
}
failGrpc(Status.INVALID_ARGUMENT) { e.message ?: "Failed to convert ExchangeStep" }
}
}
nextPageToken = results.last().updateTime.toByteArray().base64UrlEncode()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import org.wfanet.measurement.api.v2alpha.MeasurementKey
import org.wfanet.measurement.api.v2alpha.MeasurementPrincipal
import org.wfanet.measurement.api.v2alpha.MeasurementSpec
import org.wfanet.measurement.api.v2alpha.MeasurementsGrpcKt.MeasurementsCoroutineImplBase
import org.wfanet.measurement.api.v2alpha.ProtocolConfig.NoiseMechanism
import org.wfanet.measurement.api.v2alpha.copy
import org.wfanet.measurement.api.v2alpha.listMeasurementsPageToken
import org.wfanet.measurement.api.v2alpha.listMeasurementsResponse
Expand Down Expand Up @@ -72,6 +73,7 @@ private const val MISSING_RESOURCE_NAME_ERROR = "Resource name is either unspeci

class MeasurementsService(
private val internalMeasurementsStub: MeasurementsCoroutineStub,
private val noiseMechanisms: List<NoiseMechanism>
) : MeasurementsCoroutineImplBase() {

override suspend fun getMeasurement(request: GetMeasurementRequest): Measurement {
Expand Down Expand Up @@ -167,7 +169,8 @@ class MeasurementsService(
request.measurement.toInternal(
measurementConsumerCertificateKey,
dataProvidersMap,
parsedMeasurementSpec
parsedMeasurementSpec,
noiseMechanisms.map { it.toInternal() }
)
requestId = request.requestId
}
Expand Down
Loading

0 comments on commit aac627d

Please sign in to comment.