diff --git a/build/repositories.bzl b/build/repositories.bzl index bde87c8fb92..2f7c55ec16b 100644 --- a/build/repositories.bzl +++ b/build/repositories.bzl @@ -42,9 +42,10 @@ def wfa_measurement_system_repositories(): # switch to a release version before submitting the PR. wfa_repo_archive( name = "wfa_measurement_proto", + # DO_NOT_SUBMIT(world-federation-of-advertisers/cross-media-measurement-api#167): Use version. + commit = "61bd4cc58b0384acaf1158f27b17bfe19433f4e7", repo = "cross-media-measurement-api", - sha256 = "d8f13212e6f56e3bee74263800a8710a99616bedf96be5347bf2e290f83865cd", - version = "0.40.0", + sha256 = "f626a69aa3695eb7671125da2cb7348699fbf4b38070545c7ea3699f973804c1", ) wfa_repo_archive( diff --git a/src/main/k8s/local/metric_spec_config.textproto b/src/main/k8s/local/metric_spec_config.textproto index 8314de06cbe..5f880d967e4 100644 --- a/src/main/k8s/local/metric_spec_config.textproto +++ b/src/main/k8s/local/metric_spec_config.textproto @@ -29,7 +29,9 @@ watch_duration_params { epsilon: 0.001 delta: 1.0E-12 } - maximum_watch_duration_per_user: 4000 + maximum_watch_duration_per_user { + seconds: 4000 + } } reach_vid_sampling_interval { width: 0.01 diff --git a/src/main/kotlin/org/wfanet/measurement/api/v2alpha/tools/BUILD.bazel b/src/main/kotlin/org/wfanet/measurement/api/v2alpha/tools/BUILD.bazel index 7b99b0129e4..976fbe56da0 100644 --- a/src/main/kotlin/org/wfanet/measurement/api/v2alpha/tools/BUILD.bazel +++ b/src/main/kotlin/org/wfanet/measurement/api/v2alpha/tools/BUILD.bazel @@ -33,6 +33,7 @@ kt_jvm_library( "//src/main/proto/wfa/measurement/api/v2alpha:date_interval_kt_jvm_proto", "//src/main/proto/wfa/measurement/api/v2alpha:measurement_spec_kt_jvm_proto", "@wfa_common_jvm//imports/java/picocli", + "@wfa_common_jvm//src/main/kotlin/org/wfanet/measurement/common", ], ) @@ -96,7 +97,7 @@ java_binary( ) kt_jvm_library( - name = "benchmark_library", + name = "benchmark", srcs = [ "Benchmark.kt", ], @@ -114,6 +115,7 @@ kt_jvm_library( "//src/main/proto/wfa/measurement/api/v2alpha:measurement_kt_jvm_proto", "//src/main/proto/wfa/measurement/api/v2alpha:measurements_service_kt_jvm_grpc_proto", "//src/main/proto/wfa/measurement/api/v2alpha:model_line_kt_jvm_proto", + "@wfa_common_jvm//imports/java/org/jetbrains/annotations", "@wfa_common_jvm//imports/java/picocli", "@wfa_common_jvm//imports/kotlin/com/google/protobuf/kotlin", "@wfa_common_jvm//imports/kotlin/kotlinx/coroutines:core", @@ -128,7 +130,6 @@ kt_jvm_library( java_binary( name = "Benchmark", - main_class = "org.wfanet.measurement.api.v2alpha.tools.BenchmarkKt", - tags = ["manual"], - runtime_deps = [":benchmark_library"], + main_class = "org.wfanet.measurement.api.v2alpha.tools.BenchmarkReport", + runtime_deps = [":benchmark"], ) diff --git a/src/main/kotlin/org/wfanet/measurement/api/v2alpha/tools/Benchmark.kt b/src/main/kotlin/org/wfanet/measurement/api/v2alpha/tools/Benchmark.kt index 735eab1d32d..72d0d277bcd 100644 --- a/src/main/kotlin/org/wfanet/measurement/api/v2alpha/tools/Benchmark.kt +++ b/src/main/kotlin/org/wfanet/measurement/api/v2alpha/tools/Benchmark.kt @@ -63,6 +63,7 @@ import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.delay import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking +import org.jetbrains.annotations.VisibleForTesting import org.wfanet.measurement.api.v2alpha.DataProvidersGrpcKt.DataProvidersCoroutineStub import org.wfanet.measurement.api.v2alpha.EncryptionPublicKey import org.wfanet.measurement.api.v2alpha.Measurement @@ -591,7 +592,7 @@ class Benchmark( description = ["Benchmark report from Kingdom"], sortOptions = false, ) -class BenchmarkReport(val clock: Clock = Clock.systemUTC()) : Runnable { +class BenchmarkReport private constructor(val clock: Clock = Clock.systemUTC()) : Runnable { @CommandLine.Mixin private lateinit var tlsFlags: TlsFlags @CommandLine.Mixin private lateinit var apiFlags: ApiFlags @CommandLine.Mixin private lateinit var baseFlags: BaseFlags @@ -620,11 +621,17 @@ class BenchmarkReport(val clock: Clock = Clock.systemUTC()) : Runnable { Benchmark(baseFlags, createMeasurementFlags, channel, apiAuthenticationKey, clock) benchmark.generateBenchmarkReport() } -} -/** - * Create a benchmarking report of the public Kingdom API. - * - * Use the `help` command to see usage details. - */ -fun main(args: Array) = commandLineMain(BenchmarkReport(), args) + companion object { + /** + * Create a benchmarking report of the public Kingdom API. + * + * Use the `help` command to see usage details. + */ + fun main(args: Array) = commandLineMain(BenchmarkReport(), args) + + @VisibleForTesting + internal fun main(args: Array, clock: Clock) = + commandLineMain(BenchmarkReport(clock), args) + } +} diff --git a/src/main/kotlin/org/wfanet/measurement/api/v2alpha/tools/CreateMeasurementFlags.kt b/src/main/kotlin/org/wfanet/measurement/api/v2alpha/tools/CreateMeasurementFlags.kt index 3fb6b7d5a48..5dd4dd75967 100644 --- a/src/main/kotlin/org/wfanet/measurement/api/v2alpha/tools/CreateMeasurementFlags.kt +++ b/src/main/kotlin/org/wfanet/measurement/api/v2alpha/tools/CreateMeasurementFlags.kt @@ -15,11 +15,13 @@ package org.wfanet.measurement.api.v2alpha.tools import java.io.File +import java.time.Duration import java.time.Instant import kotlin.properties.Delegates import org.wfanet.measurement.api.v2alpha.MeasurementSpec import org.wfanet.measurement.api.v2alpha.MeasurementSpecKt import org.wfanet.measurement.api.v2alpha.differentialPrivacyParams +import org.wfanet.measurement.common.toProtoDuration import picocli.CommandLine.ArgGroup import picocli.CommandLine.Option @@ -266,12 +268,13 @@ class CreateMeasurementFlags { var privacyDelta by Delegates.notNull() private set - @set:Option( + @Option( names = ["--max-duration"], - description = ["Maximum watch duration per user"], + description = + ["Maximum watch duration per user as a human-readable string, e.g. 5m20s"], required = true, ) - var maximumWatchDurationPerUser by Delegates.notNull() + lateinit var maximumWatchDurationPerUser: Duration private set } @@ -392,6 +395,7 @@ class CreateMeasurementFlags { maximumWatchDurationPerUser = measurementParams.eventMeasurementParams.eventMeasurementTypeParams.duration .maximumWatchDurationPerUser + .toProtoDuration() } } diff --git a/src/main/kotlin/org/wfanet/measurement/api/v2alpha/tools/MeasurementSystem.kt b/src/main/kotlin/org/wfanet/measurement/api/v2alpha/tools/MeasurementSystem.kt index ce0bebc8dc3..211fa0dc5de 100644 --- a/src/main/kotlin/org/wfanet/measurement/api/v2alpha/tools/MeasurementSystem.kt +++ b/src/main/kotlin/org/wfanet/measurement/api/v2alpha/tools/MeasurementSystem.kt @@ -28,7 +28,7 @@ import java.security.SignatureException import java.security.cert.CertPathValidatorException import java.security.cert.X509Certificate import java.time.Clock -import java.time.Duration as systemDuration +import java.time.Duration as JavaDuration import java.time.Instant import java.time.LocalDate import kotlinx.coroutines.CoroutineDispatcher @@ -157,7 +157,7 @@ import picocli.CommandLine.Parameters import picocli.CommandLine.ParentCommand import picocli.CommandLine.Spec -private val CHANNEL_SHUTDOWN_TIMEOUT = systemDuration.ofSeconds(30) +private val CHANNEL_SHUTDOWN_TIMEOUT = JavaDuration.ofSeconds(30) @Command( name = "MeasurementSystem", diff --git a/src/main/kotlin/org/wfanet/measurement/integration/common/reporting/v2/InProcessReportingServer.kt b/src/main/kotlin/org/wfanet/measurement/integration/common/reporting/v2/InProcessReportingServer.kt index 4beb7c24866..82721be1ad4 100644 --- a/src/main/kotlin/org/wfanet/measurement/integration/common/reporting/v2/InProcessReportingServer.kt +++ b/src/main/kotlin/org/wfanet/measurement/integration/common/reporting/v2/InProcessReportingServer.kt @@ -17,6 +17,7 @@ package org.wfanet.measurement.integration.common.reporting.v2 import com.google.protobuf.ByteString +import com.google.protobuf.util.Durations import io.grpc.Channel import io.grpc.Status import io.grpc.StatusException @@ -294,7 +295,7 @@ class InProcessReportingServer( epsilon = 0.001 delta = 1e-12 } - maximumWatchDurationPerUser = 4000 + maximumWatchDurationPerUser = Durations.fromSeconds(4000) } watchDurationVidSamplingInterval = MetricSpecConfigKt.vidSamplingInterval { diff --git a/src/main/kotlin/org/wfanet/measurement/kingdom/service/api/v2alpha/MeasurementsService.kt b/src/main/kotlin/org/wfanet/measurement/kingdom/service/api/v2alpha/MeasurementsService.kt index 812a9c27dc2..cc9a105aa9a 100644 --- a/src/main/kotlin/org/wfanet/measurement/kingdom/service/api/v2alpha/MeasurementsService.kt +++ b/src/main/kotlin/org/wfanet/measurement/kingdom/service/api/v2alpha/MeasurementsService.kt @@ -316,7 +316,7 @@ private fun MeasurementSpec.validate() { "Duration privacy params are invalid" } - grpcRequire(duration.maximumWatchDurationPerUser > 0) { + grpcRequire(duration.hasMaximumWatchDurationPerUser()) { "Maximum watch duration per user is unspecified" } } diff --git a/src/main/kotlin/org/wfanet/measurement/loadtest/measurementconsumer/MeasurementConsumerSimulator.kt b/src/main/kotlin/org/wfanet/measurement/loadtest/measurementconsumer/MeasurementConsumerSimulator.kt index 47137391edc..80e5443c2d8 100644 --- a/src/main/kotlin/org/wfanet/measurement/loadtest/measurementconsumer/MeasurementConsumerSimulator.kt +++ b/src/main/kotlin/org/wfanet/measurement/loadtest/measurementconsumer/MeasurementConsumerSimulator.kt @@ -17,6 +17,7 @@ package org.wfanet.measurement.loadtest.measurementconsumer import com.google.common.truth.Truth.assertThat import com.google.protobuf.ByteString import com.google.protobuf.Message +import com.google.protobuf.util.Durations import com.google.type.interval import io.grpc.StatusException import java.security.SignatureException @@ -612,7 +613,7 @@ class MeasurementConsumerSimulator( measurementPublicKey = serializedMeasurementPublicKey duration = duration { privacyParams = outputDpParams - maximumWatchDurationPerUser = 1 + maximumWatchDurationPerUser = Durations.fromMinutes(1) } this.nonceHashes += nonceHashes } diff --git a/src/main/kotlin/org/wfanet/measurement/reporting/deploy/v2/postgres/readers/BUILD.bazel b/src/main/kotlin/org/wfanet/measurement/reporting/deploy/v2/postgres/readers/BUILD.bazel index 05808cd0025..d4581a4f6fd 100644 --- a/src/main/kotlin/org/wfanet/measurement/reporting/deploy/v2/postgres/readers/BUILD.bazel +++ b/src/main/kotlin/org/wfanet/measurement/reporting/deploy/v2/postgres/readers/BUILD.bazel @@ -14,6 +14,7 @@ kt_jvm_library( "//src/main/proto/wfa/measurement/internal/reporting/v2:reporting_sets_service_kt_jvm_grpc_proto", "//src/main/proto/wfa/measurement/internal/reporting/v2:reports_service_kt_jvm_grpc_proto", "@wfa_common_jvm//imports/java/io/r2dbc", + "@wfa_common_jvm//imports/java/org/postgresql:r2dbc", "@wfa_common_jvm//imports/kotlin/kotlinx/coroutines:core", "@wfa_common_jvm//src/main/kotlin/org/wfanet/measurement/common", "@wfa_common_jvm//src/main/kotlin/org/wfanet/measurement/common/db/r2dbc", diff --git a/src/main/kotlin/org/wfanet/measurement/reporting/deploy/v2/postgres/readers/MetricReader.kt b/src/main/kotlin/org/wfanet/measurement/reporting/deploy/v2/postgres/readers/MetricReader.kt index 43a027f809c..d3ea47179e6 100644 --- a/src/main/kotlin/org/wfanet/measurement/reporting/deploy/v2/postgres/readers/MetricReader.kt +++ b/src/main/kotlin/org/wfanet/measurement/reporting/deploy/v2/postgres/readers/MetricReader.kt @@ -19,6 +19,7 @@ package org.wfanet.measurement.reporting.deploy.v2.postgres.readers import com.google.protobuf.Timestamp import com.google.type.Interval import com.google.type.interval +import io.r2dbc.postgresql.codec.Interval as PostgresInterval import java.time.Instant import java.util.UUID import kotlinx.coroutines.flow.Flow @@ -29,6 +30,7 @@ import org.wfanet.measurement.common.db.r2dbc.ReadContext import org.wfanet.measurement.common.db.r2dbc.ResultRow import org.wfanet.measurement.common.db.r2dbc.boundStatement import org.wfanet.measurement.common.identity.InternalId +import org.wfanet.measurement.common.toProtoDuration import org.wfanet.measurement.common.toProtoTime import org.wfanet.measurement.internal.reporting.v2.BatchGetMetricsRequest import org.wfanet.measurement.internal.reporting.v2.Measurement @@ -370,7 +372,7 @@ class MetricReader(private val readContext: ReadContext) { val frequencyDifferentialPrivacyDelta: Double? = row["FrequencyDifferentialPrivacyDelta"] val maximumFrequency: Int? = row["MaximumFrequency"] val maximumFrequencyPerUser: Int? = row["MaximumFrequencyPerUser"] - val maximumWatchDurationPerUser: Int? = row["MaximumWatchDurationPerUser"] + val maximumWatchDurationPerUser: PostgresInterval? = row["MaximumWatchDurationPerUser"] val vidSamplingStart: Float = row["VidSamplingIntervalStart"] val vidSamplingWidth: Float = row["VidSamplingIntervalWidth"] val createTime: Instant = row["CreateTime"] @@ -453,10 +455,6 @@ class MetricReader(private val readContext: ReadContext) { } } MetricSpec.TypeCase.WATCH_DURATION -> { - if (maximumWatchDurationPerUser == null) { - throw IllegalStateException() - } - watchDuration = MetricSpecKt.watchDurationParams { privacyParams = @@ -464,7 +462,8 @@ class MetricReader(private val readContext: ReadContext) { epsilon = differentialPrivacyEpsilon delta = differentialPrivacyDelta } - this.maximumWatchDurationPerUser = maximumWatchDurationPerUser + this.maximumWatchDurationPerUser = + checkNotNull(maximumWatchDurationPerUser).duration.toProtoDuration() } } MetricSpec.TypeCase.TYPE_NOT_SET -> throw IllegalStateException() diff --git a/src/main/kotlin/org/wfanet/measurement/reporting/deploy/v2/postgres/writers/CreateMetrics.kt b/src/main/kotlin/org/wfanet/measurement/reporting/deploy/v2/postgres/writers/CreateMetrics.kt index bc30caa5050..f06d3a68fa2 100644 --- a/src/main/kotlin/org/wfanet/measurement/reporting/deploy/v2/postgres/writers/CreateMetrics.kt +++ b/src/main/kotlin/org/wfanet/measurement/reporting/deploy/v2/postgres/writers/CreateMetrics.kt @@ -16,6 +16,7 @@ package org.wfanet.measurement.reporting.deploy.v2.postgres.writers +import io.r2dbc.postgresql.codec.Interval as PostgresInterval import java.time.Instant import java.time.ZoneOffset import java.util.UUID @@ -24,6 +25,7 @@ import org.wfanet.measurement.common.db.r2dbc.BoundStatement import org.wfanet.measurement.common.db.r2dbc.boundStatement import org.wfanet.measurement.common.db.r2dbc.postgres.PostgresWriter import org.wfanet.measurement.common.identity.InternalId +import org.wfanet.measurement.common.toDuration import org.wfanet.measurement.common.toInstant import org.wfanet.measurement.common.toJson import org.wfanet.measurement.common.toProtoTime @@ -209,7 +211,7 @@ class CreateMetrics(private val requests: List) : bind("$11", frequencyHistogram.frequencyPrivacyParams.epsilon) bind("$12", frequencyHistogram.reachPrivacyParams.delta) bind("$13", null) - bind("$14", null) + bind("$14", null) bind("$20", frequencyHistogram.maximumFrequency) } MetricSpec.TypeCase.REACH -> { @@ -219,7 +221,7 @@ class CreateMetrics(private val requests: List) : bind("$11", null) bind("$12", null) bind("$13", null) - bind("$14", null) + bind("$14", null) bind("$20", null) } MetricSpec.TypeCase.IMPRESSION_COUNT -> { @@ -229,7 +231,7 @@ class CreateMetrics(private val requests: List) : bind("$11", null) bind("$12", null) bind("$13", impressionCount.maximumFrequencyPerUser) - bind("$14", null) + bind("$14", null) bind("$20", null) } MetricSpec.TypeCase.WATCH_DURATION -> { @@ -239,7 +241,10 @@ class CreateMetrics(private val requests: List) : bind("$11", null) bind("$12", null) bind("$13", null) - bind("$14", watchDuration.maximumWatchDurationPerUser) + bind( + "$14", + PostgresInterval.of(watchDuration.maximumWatchDurationPerUser.toDuration()) + ) bind("$20", null) } MetricSpec.TypeCase.TYPE_NOT_SET -> {} diff --git a/src/main/kotlin/org/wfanet/measurement/reporting/service/api/v1alpha/ReportsService.kt b/src/main/kotlin/org/wfanet/measurement/reporting/service/api/v1alpha/ReportsService.kt index 48c5a4d321f..f3dd0f3732d 100644 --- a/src/main/kotlin/org/wfanet/measurement/reporting/service/api/v1alpha/ReportsService.kt +++ b/src/main/kotlin/org/wfanet/measurement/reporting/service/api/v1alpha/ReportsService.kt @@ -17,8 +17,7 @@ package org.wfanet.measurement.reporting.service.api.v1alpha import com.google.protobuf.ByteString -import com.google.protobuf.Duration -import com.google.protobuf.duration +import com.google.protobuf.Duration as ProtoDuration import com.google.protobuf.util.Durations import com.google.protobuf.util.Timestamps import com.google.type.Interval @@ -1256,7 +1255,9 @@ class ReportsService( InternalMetricTypeCase.WATCH_DURATION -> { duration = buildDurationMeasurementSpec( - internalMetricDetails.watchDuration.maximumWatchDurationPerUser + Durations.fromSeconds( + internalMetricDetails.watchDuration.maximumWatchDurationPerUserSeconds.toLong() + ) ) vidSamplingInterval = buildDurationVidSamplingInterval(secureRandom) } @@ -1695,7 +1696,7 @@ private fun buildImpressionMeasurementSpec( /** Builds a [MeasurementSpec.ReachAndFrequency] for watch duration. */ private fun buildDurationMeasurementSpec( - maximumWatchDurationPerUser: Int + maximumWatchDurationPerUser: ProtoDuration ): MeasurementSpec.Duration { return MeasurementSpecKt.duration { privacyParams = differentialPrivacyParams { @@ -1721,7 +1722,7 @@ private fun SetOperation.Type.toInternal(): InternalSetOperation.Type { private fun WatchDurationParams.toInternal(): InternalWatchDurationParams { val source = this return InternalMetricKt.watchDurationParams { - maximumWatchDurationPerUser = source.maximumWatchDurationPerUser + maximumWatchDurationPerUserSeconds = source.maximumWatchDurationPerUser } } @@ -1795,7 +1796,7 @@ private fun buildRowHeader(timeInterval: TimeInterval): String { return "$startTimeInstant-$endTimeInstant" } -private operator fun Duration.plus(other: Duration): Duration { +private operator fun ProtoDuration.plus(other: ProtoDuration): ProtoDuration { return Durations.add(this, other) } @@ -1832,10 +1833,7 @@ private fun aggregateResults( var reachValue = 0L var impressionValue = 0L val frequencyDistribution = mutableMapOf() - var watchDurationValue = duration { - seconds = 0 - nanos = 0 - } + var watchDurationValue = ProtoDuration.getDefaultInstance() // Aggregation for (result in internalResultsList) { @@ -2078,7 +2076,9 @@ private fun InternalSetOperation.Type.toType(): SetOperation.Type { /** Converts an internal [InternalWatchDurationParams] to a public [WatchDurationParams]. */ private fun InternalWatchDurationParams.toWatchDuration(): WatchDurationParams { val source = this - return watchDurationParams { maximumWatchDurationPerUser = source.maximumWatchDurationPerUser } + return watchDurationParams { + maximumWatchDurationPerUser = source.maximumWatchDurationPerUserSeconds + } } /** Converts an internal [InternalImpressionCountParams] to a public [ImpressionCountParams]. */ diff --git a/src/main/kotlin/org/wfanet/measurement/reporting/service/internal/testing/MeasurementsServiceTest.kt b/src/main/kotlin/org/wfanet/measurement/reporting/service/internal/testing/MeasurementsServiceTest.kt index da7edf796e8..df044a7d3b7 100644 --- a/src/main/kotlin/org/wfanet/measurement/reporting/service/internal/testing/MeasurementsServiceTest.kt +++ b/src/main/kotlin/org/wfanet/measurement/reporting/service/internal/testing/MeasurementsServiceTest.kt @@ -442,7 +442,7 @@ abstract class MeasurementsServiceTest { fun `setMeasurementResult succeeds in setting the result for report with duration metric`() { val metricDetails = MetricKt.details { - watchDuration = MetricKt.watchDurationParams { maximumWatchDurationPerUser = 100 } + watchDuration = MetricKt.watchDurationParams { maximumWatchDurationPerUserSeconds = 100 } } val createdReport = runBlocking { reportsService.createReport( diff --git a/src/main/kotlin/org/wfanet/measurement/reporting/service/internal/testing/v2/MetricsServiceTest.kt b/src/main/kotlin/org/wfanet/measurement/reporting/service/internal/testing/v2/MetricsServiceTest.kt index 7a18249e9f6..94a8a0b061a 100644 --- a/src/main/kotlin/org/wfanet/measurement/reporting/service/internal/testing/v2/MetricsServiceTest.kt +++ b/src/main/kotlin/org/wfanet/measurement/reporting/service/internal/testing/v2/MetricsServiceTest.kt @@ -19,6 +19,7 @@ package org.wfanet.measurement.reporting.service.internal.testing.v2 import com.google.common.truth.Truth.assertThat import com.google.common.truth.extensions.proto.ProtoTruth.assertThat import com.google.protobuf.timestamp +import com.google.protobuf.util.Durations import com.google.type.interval import io.grpc.Status import io.grpc.StatusRuntimeException @@ -396,7 +397,7 @@ abstract class MetricsServiceTest { epsilon = 1.0 delta = 2.0 } - maximumWatchDurationPerUser = 100 + maximumWatchDurationPerUser = Durations.fromSeconds(100) } vidSamplingInterval = MetricSpecKt.vidSamplingInterval { @@ -492,7 +493,7 @@ abstract class MetricsServiceTest { epsilon = 1.0 delta = 2.0 } - maximumWatchDurationPerUser = 100 + maximumWatchDurationPerUser = Durations.fromSeconds(100) } vidSamplingInterval = MetricSpecKt.vidSamplingInterval { @@ -2262,7 +2263,7 @@ abstract class MetricsServiceTest { epsilon = 1.0 delta = 2.0 } - maximumWatchDurationPerUser = 100 + maximumWatchDurationPerUser = Durations.fromSeconds(100) } vidSamplingInterval = MetricSpecKt.vidSamplingInterval { diff --git a/src/main/proto/wfa/measurement/config/reporting/BUILD.bazel b/src/main/proto/wfa/measurement/config/reporting/BUILD.bazel index 551b9d780b9..68c62fa4ff2 100644 --- a/src/main/proto/wfa/measurement/config/reporting/BUILD.bazel +++ b/src/main/proto/wfa/measurement/config/reporting/BUILD.bazel @@ -44,6 +44,7 @@ proto_library( name = "metric_spec_config_proto", srcs = ["metric_spec_config.proto"], strip_import_prefix = IMPORT_PREFIX, + deps = ["@com_google_protobuf//:duration_proto"], ) java_proto_library( diff --git a/src/main/proto/wfa/measurement/config/reporting/metric_spec_config.proto b/src/main/proto/wfa/measurement/config/reporting/metric_spec_config.proto index c672a2e93c5..2a430ddfacb 100644 --- a/src/main/proto/wfa/measurement/config/reporting/metric_spec_config.proto +++ b/src/main/proto/wfa/measurement/config/reporting/metric_spec_config.proto @@ -18,6 +18,8 @@ syntax = "proto3"; package wfa.measurement.config; +import "google/protobuf/duration.proto"; + option java_package = "org.wfanet.measurement.config.reporting"; option java_multiple_files = true; @@ -68,9 +70,9 @@ message MetricSpecConfig { message WatchDurationParams { // Differential privacy parameters. DifferentialPrivacyParams privacy_params = 1; - // Maximum watch duration per user in second that will be included in this - // metric. Enforced on a per EDP basis. - int32 maximum_watch_duration_per_user = 2; + // Maximum watch duration per user that will be included in this metric. + // Enforced on a per EDP basis. + google.protobuf.Duration maximum_watch_duration_per_user = 2; } // Parameters for generating the count of unique audiences reached given a set diff --git a/src/main/proto/wfa/measurement/internal/reporting/metric.proto b/src/main/proto/wfa/measurement/internal/reporting/metric.proto index 8782fc8ec21..81f53818fa9 100644 --- a/src/main/proto/wfa/measurement/internal/reporting/metric.proto +++ b/src/main/proto/wfa/measurement/internal/reporting/metric.proto @@ -32,8 +32,7 @@ message Metric { } message WatchDurationParams { reserved 1; - // In seconds. - int32 maximum_watch_duration_per_user = 2; + int32 maximum_watch_duration_per_user_seconds = 2; } message Details { diff --git a/src/main/proto/wfa/measurement/internal/reporting/v2/BUILD.bazel b/src/main/proto/wfa/measurement/internal/reporting/v2/BUILD.bazel index c48fa07d6f5..95bcb17370b 100644 --- a/src/main/proto/wfa/measurement/internal/reporting/v2/BUILD.bazel +++ b/src/main/proto/wfa/measurement/internal/reporting/v2/BUILD.bazel @@ -77,6 +77,7 @@ proto_and_java_proto_library( deps = [ ":measurement_proto", "@com_google_googleapis//google/type:interval_proto", + "@com_google_protobuf//:duration_proto", "@com_google_protobuf//:timestamp_proto", ], ) diff --git a/src/main/proto/wfa/measurement/internal/reporting/v2/metric.proto b/src/main/proto/wfa/measurement/internal/reporting/v2/metric.proto index a0333c8b638..43f62371168 100644 --- a/src/main/proto/wfa/measurement/internal/reporting/v2/metric.proto +++ b/src/main/proto/wfa/measurement/internal/reporting/v2/metric.proto @@ -16,6 +16,7 @@ syntax = "proto3"; package wfa.measurement.internal.reporting.v2; +import "google/protobuf/duration.proto"; import "google/protobuf/timestamp.proto"; import "google/type/interval.proto"; import "wfa/measurement/internal/reporting/v2/measurement.proto"; @@ -43,8 +44,7 @@ message MetricSpec { } message WatchDurationParams { DifferentialPrivacyParams privacy_params = 1; - // In seconds. - int32 maximum_watch_duration_per_user = 2; + google.protobuf.Duration maximum_watch_duration_per_user = 2; } oneof type { diff --git a/src/main/proto/wfa/measurement/reporting/v2alpha/BUILD.bazel b/src/main/proto/wfa/measurement/reporting/v2alpha/BUILD.bazel index c799d1c5b71..bab31d20282 100644 --- a/src/main/proto/wfa/measurement/reporting/v2alpha/BUILD.bazel +++ b/src/main/proto/wfa/measurement/reporting/v2alpha/BUILD.bazel @@ -39,6 +39,7 @@ proto_library( "@com_google_googleapis//google/api:field_behavior_proto", "@com_google_googleapis//google/api:resource_proto", "@com_google_googleapis//google/type:interval_proto", + "@com_google_protobuf//:duration_proto", "@com_google_protobuf//:timestamp_proto", ], ) diff --git a/src/main/proto/wfa/measurement/reporting/v2alpha/metric.proto b/src/main/proto/wfa/measurement/reporting/v2alpha/metric.proto index d379081a129..6162b015032 100644 --- a/src/main/proto/wfa/measurement/reporting/v2alpha/metric.proto +++ b/src/main/proto/wfa/measurement/reporting/v2alpha/metric.proto @@ -18,6 +18,7 @@ package wfa.measurement.reporting.v2alpha; import "google/api/field_behavior.proto"; import "google/api/resource.proto"; +import "google/protobuf/duration.proto"; import "google/protobuf/timestamp.proto"; import "google/type/interval.proto"; @@ -83,14 +84,13 @@ message MetricSpec { // Differential privacy parameters. DifferentialPrivacyParams privacy_params = 1 [(google.api.field_behavior) = REQUIRED]; - // Maximum watch duration per user in second that will be included in this - // metric. + // Maximum watch duration per user that will be included in this metric. // // Recommended maximum_watch_duration_per_user = cap on the total watch // duration of all the impressions of a user = 4000 sec for the case with // over 1M audience size. Enforced on a per EDP basis. // If not set, the default value will be used and outputted here. - optional int32 maximum_watch_duration_per_user = 2; + google.protobuf.Duration maximum_watch_duration_per_user = 2; } // Types of metric with parameters. diff --git a/src/main/resources/reporting/postgres/create-v2-reporting-schema.sql b/src/main/resources/reporting/postgres/create-v2-reporting-schema.sql index a492db13eb9..f0fedb7b82d 100644 --- a/src/main/resources/reporting/postgres/create-v2-reporting-schema.sql +++ b/src/main/resources/reporting/postgres/create-v2-reporting-schema.sql @@ -243,7 +243,7 @@ CREATE TABLE Metrics ( -- Must not be NULL if MetricType is IMPRESSION_COUNT MaximumFrequencyPerUser bigint, -- Must not be NULL if MetricType is WATCH_DURATION - MaximumWatchDurationPerUser bigint, + MaximumWatchDurationPerUser interval, VidSamplingIntervalStart DOUBLE PRECISION NOT NULL, VidSamplingIntervalWidth DOUBLE PRECISION NOT NULL, diff --git a/src/test/kotlin/org/wfanet/measurement/api/v2alpha/tools/BUILD.bazel b/src/test/kotlin/org/wfanet/measurement/api/v2alpha/tools/BUILD.bazel index 4fa6abf079d..776a2c08dc5 100644 --- a/src/test/kotlin/org/wfanet/measurement/api/v2alpha/tools/BUILD.bazel +++ b/src/test/kotlin/org/wfanet/measurement/api/v2alpha/tools/BUILD.bazel @@ -56,13 +56,13 @@ kt_jvm_test( name = "BenchmarkTest", size = "small", srcs = ["BenchmarkTest.kt"], + associates = ["//src/main/kotlin/org/wfanet/measurement/api/v2alpha/tools:benchmark"], data = [ "//src/main/k8s/testing/secretfiles:root_certs", "//src/main/k8s/testing/secretfiles:secret_files", ], test_class = "org.wfanet.measurement.api.v2alpha.tools.BenchmarkTest", deps = [ - "//src/main/kotlin/org/wfanet/measurement/api/v2alpha/tools:benchmark_library", "//src/main/proto/wfa/measurement/api/v2alpha:certificate_kt_jvm_proto", "//src/main/proto/wfa/measurement/api/v2alpha:certificates_service_kt_jvm_grpc_proto", "@wfa_common_jvm//imports/java/com/google/common/truth", diff --git a/src/test/kotlin/org/wfanet/measurement/api/v2alpha/tools/BenchmarkTest.kt b/src/test/kotlin/org/wfanet/measurement/api/v2alpha/tools/BenchmarkTest.kt index 2729169babe..0899c279b51 100644 --- a/src/test/kotlin/org/wfanet/measurement/api/v2alpha/tools/BenchmarkTest.kt +++ b/src/test/kotlin/org/wfanet/measurement/api/v2alpha/tools/BenchmarkTest.kt @@ -18,10 +18,7 @@ import com.google.common.truth.Truth.assertThat import com.google.common.truth.extensions.proto.ProtoTruth.assertThat import com.google.protobuf.ByteString import com.google.protobuf.duration as protoDuration -import io.grpc.Metadata -import io.grpc.ServerCall -import io.grpc.ServerCallHandler -import io.grpc.ServerInterceptor +import com.google.protobuf.util.Durations import io.grpc.ServerInterceptors import io.grpc.ServerServiceDefinition import io.netty.handler.ssl.ClientAuth @@ -44,7 +41,6 @@ import org.wfanet.measurement.api.v2alpha.DataProvidersGrpcKt.DataProvidersCorou import org.wfanet.measurement.api.v2alpha.EncryptionPublicKey import org.wfanet.measurement.api.v2alpha.Measurement import org.wfanet.measurement.api.v2alpha.MeasurementConsumersGrpcKt.MeasurementConsumersCoroutineImplBase -import org.wfanet.measurement.api.v2alpha.MeasurementKt import org.wfanet.measurement.api.v2alpha.MeasurementKt.ResultKt import org.wfanet.measurement.api.v2alpha.MeasurementKt.result import org.wfanet.measurement.api.v2alpha.MeasurementKt.resultPair @@ -73,11 +69,11 @@ import org.wfanet.measurement.common.getRuntimePath import org.wfanet.measurement.common.grpc.CommonServer import org.wfanet.measurement.common.grpc.testing.mockService import org.wfanet.measurement.common.readByteString +import org.wfanet.measurement.common.testing.ExitInterceptingSecurityManager import org.wfanet.measurement.common.testing.captureFirst import org.wfanet.measurement.consent.client.common.toEncryptionPublicKey import org.wfanet.measurement.consent.client.duchy.encryptResult import org.wfanet.measurement.consent.client.duchy.signResult -import picocli.CommandLine private const val HOST = "localhost" private val SECRETS_DIR: Path = @@ -152,7 +148,7 @@ private val SUCCEEDED_REACH_AND_FREQUENCY_MEASUREMENT = measurement { val result = result { reach = ResultKt.reach { value = 4096 } frequency = - MeasurementKt.ResultKt.frequency { + ResultKt.frequency { relativeFrequencyDistribution.put(1, 1.0 / 6) relativeFrequencyDistribution.put(2, 3.0 / 6) relativeFrequencyDistribution.put(3, 2.0 / 6) @@ -221,21 +217,6 @@ private fun getEncryptedResult( return encryptResult(signedResult, publicKey) } -private class HeaderCapturingInterceptor : ServerInterceptor { - override fun interceptCall( - call: ServerCall, - headers: Metadata, - next: ServerCallHandler, - ): ServerCall.Listener { - _capturedHeaders.add(headers) - return next.startCall(call, headers) - } - - private val _capturedHeaders = mutableListOf() - val capturedHeaders: List - get() = _capturedHeaders -} - @RunWith(JUnit4::class) class BenchmarkTest { private val measurementConsumersServiceMock: MeasurementConsumersCoroutineImplBase = @@ -246,8 +227,6 @@ class BenchmarkTest { private val certificatesServiceMock: CertificatesGrpcKt.CertificatesCoroutineImplBase = mockService() { onBlocking { getCertificate(any()) }.thenReturn(AGGREGATOR_CERTIFICATE) } - private val headerInterceptor = HeaderCapturingInterceptor() - private val port: Int get() = server.port @@ -256,7 +235,7 @@ class BenchmarkTest { fun initServer() { val services: List = listOf( - ServerInterceptors.intercept(measurementsServiceMock, headerInterceptor), + ServerInterceptors.intercept(measurementsServiceMock), measurementConsumersServiceMock.bindService(), dataProvidersServiceMock.bindService(), certificatesServiceMock.bindService(), @@ -323,7 +302,7 @@ class BenchmarkTest { "--output-file=$tempFile", ) - CommandLine(BenchmarkReport(clock)).execute(*args) + BenchmarkReport.main(args, clock) val request = captureFirst { @@ -401,7 +380,7 @@ class BenchmarkTest { "--event-end-time=$TIME_STRING_2", "--output-file=$tempFile", ) - CommandLine(BenchmarkReport(clock)).execute(*args) + BenchmarkReport.main(args, clock) val request = captureFirst { @@ -459,7 +438,7 @@ class BenchmarkTest { "--duration", "--duration-privacy-epsilon=0.015", "--duration-privacy-delta=0.0", - "--max-duration=1000", + "--max-duration=5m20s", "--vid-sampling-start=0.1", "--vid-sampling-width=0.2", "--private-key-der-file=$SECRETS_DIR/mc_cs_private.der", @@ -471,7 +450,7 @@ class BenchmarkTest { "--event-end-time=$TIME_STRING_2", "--output-file=$tempFile", ) - CommandLine(BenchmarkReport(clock)).execute(*args) + BenchmarkReport.main(args, clock) val request = captureFirst { @@ -489,7 +468,8 @@ class BenchmarkTest { epsilon = 0.015 delta = 0.0 } - maximumWatchDurationPerUser = 1000 + maximumWatchDurationPerUser = + Durations.add(Durations.fromMinutes(5), Durations.fromSeconds(20)) } vidSamplingInterval = MeasurementSpecKt.vidSamplingInterval { @@ -536,7 +516,7 @@ class BenchmarkTest { "--population-end-time=$TIME_STRING_2", "--output-file=$tempFile", ) - CommandLine(BenchmarkReport(clock)).execute(*args) + BenchmarkReport.main(args, clock) val request = captureFirst { @@ -556,4 +536,10 @@ class BenchmarkTest { .isEqualTo("replica,startTime,ackTime,computeTime,endTime,status,msg,population") assertThat(result[1]).isEqualTo("1,0.0,0.0,0.0,0.0,success,,100") } + + companion object { + init { + System.setSecurityManager(ExitInterceptingSecurityManager) + } + } } diff --git a/src/test/kotlin/org/wfanet/measurement/api/v2alpha/tools/MeasurementSystemTest.kt b/src/test/kotlin/org/wfanet/measurement/api/v2alpha/tools/MeasurementSystemTest.kt index 3603fa21cba..045376629ad 100644 --- a/src/test/kotlin/org/wfanet/measurement/api/v2alpha/tools/MeasurementSystemTest.kt +++ b/src/test/kotlin/org/wfanet/measurement/api/v2alpha/tools/MeasurementSystemTest.kt @@ -23,6 +23,7 @@ import com.google.protobuf.Descriptors import com.google.protobuf.duration import com.google.protobuf.empty import com.google.protobuf.timestamp +import com.google.protobuf.util.Durations import com.google.protobuf.value import com.google.type.date import com.google.type.interval @@ -1048,7 +1049,7 @@ class MeasurementSystemTest { "--duration", "--duration-privacy-epsilon=0.015", "--duration-privacy-delta=0.0", - "--max-duration=1000", + "--max-duration=5m20s", "--vid-sampling-start=0.1", "--vid-sampling-width=0.2", "--private-key-der-file=$SECRETS_DIR/mc_cs_private.der", @@ -1077,7 +1078,8 @@ class MeasurementSystemTest { epsilon = 0.015 delta = 0.0 } - maximumWatchDurationPerUser = 1000 + maximumWatchDurationPerUser = + Durations.add(Durations.fromMinutes(5), Durations.fromSeconds(20)) } vidSamplingInterval = MeasurementSpecKt.vidSamplingInterval { diff --git a/src/test/kotlin/org/wfanet/measurement/kingdom/service/api/v2alpha/MeasurementsServiceTest.kt b/src/test/kotlin/org/wfanet/measurement/kingdom/service/api/v2alpha/MeasurementsServiceTest.kt index 3579afe472a..f4b7d9af681 100644 --- a/src/test/kotlin/org/wfanet/measurement/kingdom/service/api/v2alpha/MeasurementsServiceTest.kt +++ b/src/test/kotlin/org/wfanet/measurement/kingdom/service/api/v2alpha/MeasurementsServiceTest.kt @@ -20,6 +20,7 @@ import com.google.common.truth.Truth.assertThat import com.google.common.truth.extensions.proto.ProtoTruth.assertThat import com.google.protobuf.ByteString import com.google.protobuf.Timestamp +import com.google.protobuf.util.Durations import io.grpc.Status import io.grpc.StatusRuntimeException import java.time.Instant @@ -1904,7 +1905,7 @@ class MeasurementsServiceTest { epsilon = 1.0 delta = 0.0 } - maximumWatchDurationPerUser = 1 + maximumWatchDurationPerUser = Durations.fromMinutes(5) } } diff --git a/src/test/kotlin/org/wfanet/measurement/reporting/service/api/v1alpha/ReportsServiceTest.kt b/src/test/kotlin/org/wfanet/measurement/reporting/service/api/v1alpha/ReportsServiceTest.kt index 8eaf3d9028f..944eded5b92 100644 --- a/src/test/kotlin/org/wfanet/measurement/reporting/service/api/v1alpha/ReportsServiceTest.kt +++ b/src/test/kotlin/org/wfanet/measurement/reporting/service/api/v1alpha/ReportsServiceTest.kt @@ -22,6 +22,7 @@ import com.google.protobuf.duration import com.google.protobuf.kotlin.toByteString import com.google.protobuf.kotlin.toByteStringUtf8 import com.google.protobuf.timestamp +import com.google.protobuf.util.Durations import com.google.protobuf.util.Timestamps import com.google.type.interval import io.grpc.Status @@ -851,7 +852,7 @@ private val WATCH_DURATION_MEASUREMENT_SPEC = measurementSpec { epsilon = WATCH_DURATION_EPSILON delta = DIFFERENTIAL_PRIVACY_DELTA } - maximumWatchDurationPerUser = MAXIMUM_WATCH_DURATION_PER_USER + maximumWatchDurationPerUser = Durations.fromSeconds(MAXIMUM_WATCH_DURATION_PER_USER.toLong()) } vidSamplingInterval = vidSamplingInterval { start = WATCH_DURATION_VID_SAMPLING_START_LIST[SECURE_RANDOM_OUTPUT_INT] @@ -1060,7 +1061,7 @@ private val INTERNAL_WATCH_DURATION_METRIC = internalMetric { InternalMetricKt.details { watchDuration = InternalMetricKt.watchDurationParams { - maximumWatchDurationPerUser = MAXIMUM_WATCH_DURATION_PER_USER + maximumWatchDurationPerUserSeconds = MAXIMUM_WATCH_DURATION_PER_USER } cumulative = false } diff --git a/src/test/kotlin/org/wfanet/measurement/reporting/service/api/v2alpha/MetricSpecDefaultsTest.kt b/src/test/kotlin/org/wfanet/measurement/reporting/service/api/v2alpha/MetricSpecDefaultsTest.kt index 34eb140e4d7..8a94292eb2e 100644 --- a/src/test/kotlin/org/wfanet/measurement/reporting/service/api/v2alpha/MetricSpecDefaultsTest.kt +++ b/src/test/kotlin/org/wfanet/measurement/reporting/service/api/v2alpha/MetricSpecDefaultsTest.kt @@ -17,6 +17,7 @@ package org.wfanet.measurement.reporting.service.api.v2alpha import com.google.common.truth.Truth.assertThat +import com.google.protobuf.util.Durations import org.junit.Assert.assertThrows import org.junit.Test import org.junit.runner.RunWith @@ -51,7 +52,7 @@ private const val IMPRESSION_MAXIMUM_FREQUENCY_PER_USER = 60 private const val WATCH_DURATION_VID_SAMPLING_WIDTH = 95.0f / NUMBER_VID_BUCKETS private const val WATCH_DURATION_VID_SAMPLING_START = 205.0f / NUMBER_VID_BUCKETS private const val WATCH_DURATION_EPSILON = 0.001 -private const val MAXIMUM_WATCH_DURATION_PER_USER = 4000 +private val MAXIMUM_WATCH_DURATION_PER_USER = Durations.fromMinutes(5) private const val DIFFERENTIAL_PRIVACY_DELTA = 1e-12 @@ -227,7 +228,10 @@ class MetricSpecDefaultsTest { delta = METRIC_SPEC_CONFIG.watchDurationParams.privacyParams.delta * 2 } maximumWatchDurationPerUser = - METRIC_SPEC_CONFIG.watchDurationParams.maximumWatchDurationPerUser * 2 + Durations.add( + METRIC_SPEC_CONFIG.watchDurationParams.maximumWatchDurationPerUser, + METRIC_SPEC_CONFIG.watchDurationParams.maximumWatchDurationPerUser + ) } vidSamplingInterval = MetricSpecKt.vidSamplingInterval { diff --git a/src/test/kotlin/org/wfanet/measurement/reporting/service/api/v2alpha/MetricsServiceTest.kt b/src/test/kotlin/org/wfanet/measurement/reporting/service/api/v2alpha/MetricsServiceTest.kt index e536e9c8d07..6ad7fbc5b52 100644 --- a/src/test/kotlin/org/wfanet/measurement/reporting/service/api/v2alpha/MetricsServiceTest.kt +++ b/src/test/kotlin/org/wfanet/measurement/reporting/service/api/v2alpha/MetricsServiceTest.kt @@ -22,6 +22,7 @@ import com.google.protobuf.duration import com.google.protobuf.kotlin.toByteString import com.google.protobuf.kotlin.toByteStringUtf8 import com.google.protobuf.timestamp +import com.google.protobuf.util.Durations import com.google.type.Interval import com.google.type.interval import io.grpc.Status @@ -220,7 +221,7 @@ private const val IMPRESSION_MAXIMUM_FREQUENCY_PER_USER = 60 private const val WATCH_DURATION_VID_SAMPLING_WIDTH = 95.0f / NUMBER_VID_BUCKETS private const val WATCH_DURATION_VID_SAMPLING_START = 205.0f / NUMBER_VID_BUCKETS private const val WATCH_DURATION_EPSILON = 0.001 -private const val MAXIMUM_WATCH_DURATION_PER_USER = 4000 +private val MAXIMUM_WATCH_DURATION_PER_USER = Durations.fromSeconds(4000) private const val DIFFERENTIAL_PRIVACY_DELTA = 1e-12 diff --git a/src/test/kotlin/org/wfanet/measurement/reporting/service/api/v2alpha/ReportsServiceTest.kt b/src/test/kotlin/org/wfanet/measurement/reporting/service/api/v2alpha/ReportsServiceTest.kt index 961c5794f14..2714d9105c6 100644 --- a/src/test/kotlin/org/wfanet/measurement/reporting/service/api/v2alpha/ReportsServiceTest.kt +++ b/src/test/kotlin/org/wfanet/measurement/reporting/service/api/v2alpha/ReportsServiceTest.kt @@ -20,6 +20,7 @@ import com.google.common.truth.Truth.assertThat import com.google.common.truth.extensions.proto.ProtoTruth.assertThat import com.google.protobuf.duration import com.google.protobuf.timestamp +import com.google.protobuf.util.Durations import com.google.type.Interval import com.google.type.interval import io.grpc.Status @@ -129,7 +130,7 @@ private const val IMPRESSION_MAXIMUM_FREQUENCY_PER_USER = 60 private const val WATCH_DURATION_VID_SAMPLING_WIDTH = 95.0f / NUMBER_VID_BUCKETS private const val WATCH_DURATION_VID_SAMPLING_START = 205.0f / NUMBER_VID_BUCKETS private const val WATCH_DURATION_EPSILON = 0.001 -private const val MAXIMUM_WATCH_DURATION_PER_USER = 4000 +private val MAXIMUM_WATCH_DURATION_PER_USER = Durations.fromSeconds(4000) private const val DIFFERENTIAL_PRIVACY_DELTA = 1e-12