Skip to content

Commit

Permalink
Update cross-media-measurement-api for max watch duration.
Browse files Browse the repository at this point in the history
  • Loading branch information
SanjayVas committed Sep 6, 2023
1 parent 49d6abe commit 220fcd3
Show file tree
Hide file tree
Showing 31 changed files with 123 additions and 100 deletions.
5 changes: 3 additions & 2 deletions build/repositories.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,10 @@ def wfa_measurement_system_repositories():
# switch to a release version before submitting the PR.
wfa_repo_archive(
name = "wfa_measurement_proto",
# DO_NOT_SUBMIT(world-federation-of-advertisers/cross-media-measurement-api#167): Use version.
commit = "61bd4cc58b0384acaf1158f27b17bfe19433f4e7",
repo = "cross-media-measurement-api",
sha256 = "d8f13212e6f56e3bee74263800a8710a99616bedf96be5347bf2e290f83865cd",
version = "0.40.0",
sha256 = "f626a69aa3695eb7671125da2cb7348699fbf4b38070545c7ea3699f973804c1",
)

wfa_repo_archive(
Expand Down
4 changes: 3 additions & 1 deletion src/main/k8s/local/metric_spec_config.textproto
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ watch_duration_params {
epsilon: 0.001
delta: 1.0E-12
}
maximum_watch_duration_per_user: 4000
maximum_watch_duration_per_user {
seconds: 4000
}
}
reach_vid_sampling_interval {
width: 0.01
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ kt_jvm_library(
"//src/main/proto/wfa/measurement/api/v2alpha:date_interval_kt_jvm_proto",
"//src/main/proto/wfa/measurement/api/v2alpha:measurement_spec_kt_jvm_proto",
"@wfa_common_jvm//imports/java/picocli",
"@wfa_common_jvm//src/main/kotlin/org/wfanet/measurement/common",
],
)

Expand Down Expand Up @@ -96,7 +97,7 @@ java_binary(
)

kt_jvm_library(
name = "benchmark_library",
name = "benchmark",
srcs = [
"Benchmark.kt",
],
Expand All @@ -114,6 +115,7 @@ kt_jvm_library(
"//src/main/proto/wfa/measurement/api/v2alpha:measurement_kt_jvm_proto",
"//src/main/proto/wfa/measurement/api/v2alpha:measurements_service_kt_jvm_grpc_proto",
"//src/main/proto/wfa/measurement/api/v2alpha:model_line_kt_jvm_proto",
"@wfa_common_jvm//imports/java/org/jetbrains/annotations",
"@wfa_common_jvm//imports/java/picocli",
"@wfa_common_jvm//imports/kotlin/com/google/protobuf/kotlin",
"@wfa_common_jvm//imports/kotlin/kotlinx/coroutines:core",
Expand All @@ -128,7 +130,6 @@ kt_jvm_library(

java_binary(
name = "Benchmark",
main_class = "org.wfanet.measurement.api.v2alpha.tools.BenchmarkKt",
tags = ["manual"],
runtime_deps = [":benchmark_library"],
main_class = "org.wfanet.measurement.api.v2alpha.tools.BenchmarkReport",
runtime_deps = [":benchmark"],
)
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.jetbrains.annotations.VisibleForTesting
import org.wfanet.measurement.api.v2alpha.DataProvidersGrpcKt.DataProvidersCoroutineStub
import org.wfanet.measurement.api.v2alpha.EncryptionPublicKey
import org.wfanet.measurement.api.v2alpha.Measurement
Expand Down Expand Up @@ -591,7 +592,7 @@ class Benchmark(
description = ["Benchmark report from Kingdom"],
sortOptions = false,
)
class BenchmarkReport(val clock: Clock = Clock.systemUTC()) : Runnable {
class BenchmarkReport private constructor(val clock: Clock = Clock.systemUTC()) : Runnable {
@CommandLine.Mixin private lateinit var tlsFlags: TlsFlags
@CommandLine.Mixin private lateinit var apiFlags: ApiFlags
@CommandLine.Mixin private lateinit var baseFlags: BaseFlags
Expand Down Expand Up @@ -620,11 +621,17 @@ class BenchmarkReport(val clock: Clock = Clock.systemUTC()) : Runnable {
Benchmark(baseFlags, createMeasurementFlags, channel, apiAuthenticationKey, clock)
benchmark.generateBenchmarkReport()
}
}

/**
* Create a benchmarking report of the public Kingdom API.
*
* Use the `help` command to see usage details.
*/
fun main(args: Array<String>) = commandLineMain(BenchmarkReport(), args)
companion object {
/**
* Create a benchmarking report of the public Kingdom API.
*
* Use the `help` command to see usage details.
*/
fun main(args: Array<String>) = commandLineMain(BenchmarkReport(), args)

@VisibleForTesting
internal fun main(args: Array<String>, clock: Clock) =
commandLineMain(BenchmarkReport(clock), args)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
package org.wfanet.measurement.api.v2alpha.tools

import java.io.File
import java.time.Duration
import java.time.Instant
import kotlin.properties.Delegates
import org.wfanet.measurement.api.v2alpha.MeasurementSpec
import org.wfanet.measurement.api.v2alpha.MeasurementSpecKt
import org.wfanet.measurement.api.v2alpha.differentialPrivacyParams
import org.wfanet.measurement.common.toProtoDuration
import picocli.CommandLine.ArgGroup
import picocli.CommandLine.Option

Expand Down Expand Up @@ -266,12 +268,13 @@ class CreateMeasurementFlags {
var privacyDelta by Delegates.notNull<Double>()
private set

@set:Option(
@Option(
names = ["--max-duration"],
description = ["Maximum watch duration per user"],
description =
["Maximum watch duration per user as a human-readable string, e.g. 5m20s"],
required = true,
)
var maximumWatchDurationPerUser by Delegates.notNull<Int>()
lateinit var maximumWatchDurationPerUser: Duration
private set
}

Expand Down Expand Up @@ -392,6 +395,7 @@ class CreateMeasurementFlags {
maximumWatchDurationPerUser =
measurementParams.eventMeasurementParams.eventMeasurementTypeParams.duration
.maximumWatchDurationPerUser
.toProtoDuration()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import java.security.SignatureException
import java.security.cert.CertPathValidatorException
import java.security.cert.X509Certificate
import java.time.Clock
import java.time.Duration as systemDuration
import java.time.Duration as JavaDuration
import java.time.Instant
import java.time.LocalDate
import kotlinx.coroutines.CoroutineDispatcher
Expand Down Expand Up @@ -157,7 +157,7 @@ import picocli.CommandLine.Parameters
import picocli.CommandLine.ParentCommand
import picocli.CommandLine.Spec

private val CHANNEL_SHUTDOWN_TIMEOUT = systemDuration.ofSeconds(30)
private val CHANNEL_SHUTDOWN_TIMEOUT = JavaDuration.ofSeconds(30)

@Command(
name = "MeasurementSystem",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.wfanet.measurement.integration.common.reporting.v2

import com.google.protobuf.ByteString
import com.google.protobuf.util.Durations
import io.grpc.Channel
import io.grpc.Status
import io.grpc.StatusException
Expand Down Expand Up @@ -294,7 +295,7 @@ class InProcessReportingServer(
epsilon = 0.001
delta = 1e-12
}
maximumWatchDurationPerUser = 4000
maximumWatchDurationPerUser = Durations.fromSeconds(4000)
}
watchDurationVidSamplingInterval =
MetricSpecConfigKt.vidSamplingInterval {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ private fun MeasurementSpec.validate() {
"Duration privacy params are invalid"
}

grpcRequire(duration.maximumWatchDurationPerUser > 0) {
grpcRequire(duration.hasMaximumWatchDurationPerUser()) {
"Maximum watch duration per user is unspecified"
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package org.wfanet.measurement.loadtest.measurementconsumer
import com.google.common.truth.Truth.assertThat
import com.google.protobuf.ByteString
import com.google.protobuf.Message
import com.google.protobuf.util.Durations
import com.google.type.interval
import io.grpc.StatusException
import java.security.SignatureException
Expand Down Expand Up @@ -612,7 +613,7 @@ class MeasurementConsumerSimulator(
measurementPublicKey = serializedMeasurementPublicKey
duration = duration {
privacyParams = outputDpParams
maximumWatchDurationPerUser = 1
maximumWatchDurationPerUser = Durations.fromMinutes(1)
}
this.nonceHashes += nonceHashes
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ kt_jvm_library(
"//src/main/proto/wfa/measurement/internal/reporting/v2:reporting_sets_service_kt_jvm_grpc_proto",
"//src/main/proto/wfa/measurement/internal/reporting/v2:reports_service_kt_jvm_grpc_proto",
"@wfa_common_jvm//imports/java/io/r2dbc",
"@wfa_common_jvm//imports/java/org/postgresql:r2dbc",
"@wfa_common_jvm//imports/kotlin/kotlinx/coroutines:core",
"@wfa_common_jvm//src/main/kotlin/org/wfanet/measurement/common",
"@wfa_common_jvm//src/main/kotlin/org/wfanet/measurement/common/db/r2dbc",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.wfanet.measurement.reporting.deploy.v2.postgres.readers
import com.google.protobuf.Timestamp
import com.google.type.Interval
import com.google.type.interval
import io.r2dbc.postgresql.codec.Interval as PostgresInterval
import java.time.Instant
import java.util.UUID
import kotlinx.coroutines.flow.Flow
Expand All @@ -29,6 +30,7 @@ import org.wfanet.measurement.common.db.r2dbc.ReadContext
import org.wfanet.measurement.common.db.r2dbc.ResultRow
import org.wfanet.measurement.common.db.r2dbc.boundStatement
import org.wfanet.measurement.common.identity.InternalId
import org.wfanet.measurement.common.toProtoDuration
import org.wfanet.measurement.common.toProtoTime
import org.wfanet.measurement.internal.reporting.v2.BatchGetMetricsRequest
import org.wfanet.measurement.internal.reporting.v2.Measurement
Expand Down Expand Up @@ -370,7 +372,7 @@ class MetricReader(private val readContext: ReadContext) {
val frequencyDifferentialPrivacyDelta: Double? = row["FrequencyDifferentialPrivacyDelta"]
val maximumFrequency: Int? = row["MaximumFrequency"]
val maximumFrequencyPerUser: Int? = row["MaximumFrequencyPerUser"]
val maximumWatchDurationPerUser: Int? = row["MaximumWatchDurationPerUser"]
val maximumWatchDurationPerUser: PostgresInterval? = row["MaximumWatchDurationPerUser"]
val vidSamplingStart: Float = row["VidSamplingIntervalStart"]
val vidSamplingWidth: Float = row["VidSamplingIntervalWidth"]
val createTime: Instant = row["CreateTime"]
Expand Down Expand Up @@ -453,18 +455,15 @@ class MetricReader(private val readContext: ReadContext) {
}
}
MetricSpec.TypeCase.WATCH_DURATION -> {
if (maximumWatchDurationPerUser == null) {
throw IllegalStateException()
}

watchDuration =
MetricSpecKt.watchDurationParams {
privacyParams =
MetricSpecKt.differentialPrivacyParams {
epsilon = differentialPrivacyEpsilon
delta = differentialPrivacyDelta
}
this.maximumWatchDurationPerUser = maximumWatchDurationPerUser
this.maximumWatchDurationPerUser =
checkNotNull(maximumWatchDurationPerUser).duration.toProtoDuration()
}
}
MetricSpec.TypeCase.TYPE_NOT_SET -> throw IllegalStateException()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package org.wfanet.measurement.reporting.deploy.v2.postgres.writers

import io.r2dbc.postgresql.codec.Interval as PostgresInterval
import java.time.Instant
import java.time.ZoneOffset
import java.util.UUID
Expand All @@ -24,6 +25,7 @@ import org.wfanet.measurement.common.db.r2dbc.BoundStatement
import org.wfanet.measurement.common.db.r2dbc.boundStatement
import org.wfanet.measurement.common.db.r2dbc.postgres.PostgresWriter
import org.wfanet.measurement.common.identity.InternalId
import org.wfanet.measurement.common.toDuration
import org.wfanet.measurement.common.toInstant
import org.wfanet.measurement.common.toJson
import org.wfanet.measurement.common.toProtoTime
Expand Down Expand Up @@ -209,7 +211,7 @@ class CreateMetrics(private val requests: List<CreateMetricRequest>) :
bind("$11", frequencyHistogram.frequencyPrivacyParams.epsilon)
bind("$12", frequencyHistogram.reachPrivacyParams.delta)
bind<Long?>("$13", null)
bind<Long?>("$14", null)
bind<PostgresInterval?>("$14", null)
bind("$20", frequencyHistogram.maximumFrequency)
}
MetricSpec.TypeCase.REACH -> {
Expand All @@ -219,7 +221,7 @@ class CreateMetrics(private val requests: List<CreateMetricRequest>) :
bind<Double?>("$11", null)
bind<Double?>("$12", null)
bind<Long?>("$13", null)
bind<Long?>("$14", null)
bind<PostgresInterval?>("$14", null)
bind<Long?>("$20", null)
}
MetricSpec.TypeCase.IMPRESSION_COUNT -> {
Expand All @@ -229,7 +231,7 @@ class CreateMetrics(private val requests: List<CreateMetricRequest>) :
bind<Double?>("$11", null)
bind<Double?>("$12", null)
bind("$13", impressionCount.maximumFrequencyPerUser)
bind<Long?>("$14", null)
bind<PostgresInterval?>("$14", null)
bind<Long?>("$20", null)
}
MetricSpec.TypeCase.WATCH_DURATION -> {
Expand All @@ -239,7 +241,10 @@ class CreateMetrics(private val requests: List<CreateMetricRequest>) :
bind<Double?>("$11", null)
bind<Double?>("$12", null)
bind<Long?>("$13", null)
bind("$14", watchDuration.maximumWatchDurationPerUser)
bind(
"$14",
PostgresInterval.of(watchDuration.maximumWatchDurationPerUser.toDuration())
)
bind<Long?>("$20", null)
}
MetricSpec.TypeCase.TYPE_NOT_SET -> {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@
package org.wfanet.measurement.reporting.service.api.v1alpha

import com.google.protobuf.ByteString
import com.google.protobuf.Duration
import com.google.protobuf.duration
import com.google.protobuf.Duration as ProtoDuration
import com.google.protobuf.util.Durations
import com.google.protobuf.util.Timestamps
import com.google.type.Interval
Expand Down Expand Up @@ -1256,7 +1255,9 @@ class ReportsService(
InternalMetricTypeCase.WATCH_DURATION -> {
duration =
buildDurationMeasurementSpec(
internalMetricDetails.watchDuration.maximumWatchDurationPerUser
Durations.fromSeconds(
internalMetricDetails.watchDuration.maximumWatchDurationPerUserSeconds.toLong()
)
)
vidSamplingInterval = buildDurationVidSamplingInterval(secureRandom)
}
Expand Down Expand Up @@ -1695,7 +1696,7 @@ private fun buildImpressionMeasurementSpec(

/** Builds a [MeasurementSpec.ReachAndFrequency] for watch duration. */
private fun buildDurationMeasurementSpec(
maximumWatchDurationPerUser: Int
maximumWatchDurationPerUser: ProtoDuration
): MeasurementSpec.Duration {
return MeasurementSpecKt.duration {
privacyParams = differentialPrivacyParams {
Expand All @@ -1721,7 +1722,7 @@ private fun SetOperation.Type.toInternal(): InternalSetOperation.Type {
private fun WatchDurationParams.toInternal(): InternalWatchDurationParams {
val source = this
return InternalMetricKt.watchDurationParams {
maximumWatchDurationPerUser = source.maximumWatchDurationPerUser
maximumWatchDurationPerUserSeconds = source.maximumWatchDurationPerUser
}
}

Expand Down Expand Up @@ -1795,7 +1796,7 @@ private fun buildRowHeader(timeInterval: TimeInterval): String {
return "$startTimeInstant-$endTimeInstant"
}

private operator fun Duration.plus(other: Duration): Duration {
private operator fun ProtoDuration.plus(other: ProtoDuration): ProtoDuration {
return Durations.add(this, other)
}

Expand Down Expand Up @@ -1832,10 +1833,7 @@ private fun aggregateResults(
var reachValue = 0L
var impressionValue = 0L
val frequencyDistribution = mutableMapOf<Long, Double>()
var watchDurationValue = duration {
seconds = 0
nanos = 0
}
var watchDurationValue = ProtoDuration.getDefaultInstance()

// Aggregation
for (result in internalResultsList) {
Expand Down Expand Up @@ -2078,7 +2076,9 @@ private fun InternalSetOperation.Type.toType(): SetOperation.Type {
/** Converts an internal [InternalWatchDurationParams] to a public [WatchDurationParams]. */
private fun InternalWatchDurationParams.toWatchDuration(): WatchDurationParams {
val source = this
return watchDurationParams { maximumWatchDurationPerUser = source.maximumWatchDurationPerUser }
return watchDurationParams {
maximumWatchDurationPerUser = source.maximumWatchDurationPerUserSeconds
}
}

/** Converts an internal [InternalImpressionCountParams] to a public [ImpressionCountParams]. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ abstract class MeasurementsServiceTest<T : MeasurementsCoroutineImplBase> {
fun `setMeasurementResult succeeds in setting the result for report with duration metric`() {
val metricDetails =
MetricKt.details {
watchDuration = MetricKt.watchDurationParams { maximumWatchDurationPerUser = 100 }
watchDuration = MetricKt.watchDurationParams { maximumWatchDurationPerUserSeconds = 100 }
}
val createdReport = runBlocking {
reportsService.createReport(
Expand Down
Loading

0 comments on commit 220fcd3

Please sign in to comment.