diff --git a/src/test/kotlin/org/wfanet/measurement/reporting/service/api/v2alpha/MetricsServiceTest.kt b/src/test/kotlin/org/wfanet/measurement/reporting/service/api/v2alpha/MetricsServiceTest.kt index 9b15ebb8a36..a660fbdf4aa 100644 --- a/src/test/kotlin/org/wfanet/measurement/reporting/service/api/v2alpha/MetricsServiceTest.kt +++ b/src/test/kotlin/org/wfanet/measurement/reporting/service/api/v2alpha/MetricsServiceTest.kt @@ -223,9 +223,6 @@ private const val MAXIMUM_WATCH_DURATION_PER_USER = 4000 private const val DIFFERENTIAL_PRIVACY_DELTA = 1e-12 -private const val MAXIMUM_FREQUENCY_PER_USER = 10 -private const val MAXIMUM_WATCH_DURATION_PER_USER = 300 - private const val SECURE_RANDOM_OUTPUT_INT = 0 private const val SECURE_RANDOM_OUTPUT_LONG = 0L @@ -698,7 +695,7 @@ private val INTERNAL_REQUESTING_UNION_ALL_WATCH_DURATION_MEASUREMENT = internalM private val INTERNAL_PENDING_NOT_CREATED_UNION_ALL_WATCH_DURATION_MEASUREMENT = INTERNAL_REQUESTING_UNION_ALL_WATCH_DURATION_MEASUREMENT.copy { - externalMeasurementId = 414L + cmmsMeasurementId = externalIdToApiId(414L) cmmsCreateMeasurementRequestId = "UNION_ALL_WATCH_DURATION_MEASUREMENT" state = InternalMeasurement.State.PENDING } @@ -711,10 +708,13 @@ private val INTERNAL_PENDING_UNION_ALL_WATCH_DURATION_MEASUREMENT = private val INTERNAL_SUCCEEDED_UNION_ALL_WATCH_DURATION_MEASUREMENT = INTERNAL_PENDING_UNION_ALL_WATCH_DURATION_MEASUREMENT.copy { state = InternalMeasurement.State.SUCCEEDED - result = - InternalMeasurementKt.result { - watchDuration = - InternalMeasurementKt.ResultKt.watchDuration { value = TOTAL_WATCH_DURATION } + details = + InternalMeasurementKt.details { + result = + InternalMeasurementKt.result { + watchDuration = + InternalMeasurementKt.ResultKt.watchDuration { value = TOTAL_WATCH_DURATION } + } } } @@ -874,6 +874,64 @@ private val PENDING_SINGLE_PUBLISHER_IMPRESSION_MEASUREMENT = state = Measurement.State.COMPUTING } +// CMMs cross publisher watch duration measurements +private val UNION_ALL_WATCH_DURATION_MEASUREMENT_SPEC = measurementSpec { + measurementPublicKey = MEASUREMENT_CONSUMER_PUBLIC_KEY.toByteString() + + nonceHashes.addAll( + listOf( + hashSha256(SECURE_RANDOM_OUTPUT_LONG), + hashSha256(SECURE_RANDOM_OUTPUT_LONG), + hashSha256(SECURE_RANDOM_OUTPUT_LONG) + ) + ) + + duration = + MeasurementSpecKt.duration { + privacyParams = differentialPrivacyParams { + epsilon = WATCH_DURATION_EPSILON + delta = DIFFERENTIAL_PRIVACY_DELTA + } + privacyParams = differentialPrivacyParams { + epsilon = WATCH_DURATION_EPSILON + delta = DIFFERENTIAL_PRIVACY_DELTA + } + maximumWatchDurationPerUser = MAXIMUM_WATCH_DURATION_PER_USER + } + vidSamplingInterval = + MeasurementSpecKt.vidSamplingInterval { + start = WATCH_DURATION_VID_SAMPLING_START + width = WATCH_DURATION_VID_SAMPLING_WIDTH + } +} + +private val REQUESTING_UNION_ALL_WATCH_DURATION_MEASUREMENT = + BASE_MEASUREMENT.copy { + dataProviders += DATA_PROVIDERS.keys.map { DATA_PROVIDER_ENTRIES.getValue(it) } + + measurementSpec = + signMeasurementSpec( + UNION_ALL_WATCH_DURATION_MEASUREMENT_SPEC.copy { + nonceHashes += hashSha256(SECURE_RANDOM_OUTPUT_LONG) + }, + MEASUREMENT_CONSUMER_SIGNING_KEY_HANDLE + ) + + measurementReferenceId = + INTERNAL_PENDING_UNION_ALL_WATCH_DURATION_MEASUREMENT.cmmsCreateMeasurementRequestId + } + +private val PENDING_UNION_ALL_WATCH_DURATION_MEASUREMENT = + REQUESTING_UNION_ALL_WATCH_DURATION_MEASUREMENT.copy { + name = + MeasurementKey( + MEASUREMENT_CONSUMERS.keys.first().measurementConsumerId, + INTERNAL_PENDING_UNION_ALL_WATCH_DURATION_MEASUREMENT.cmmsMeasurementId + ) + .toName() + state = Measurement.State.COMPUTING + } + // Metric Specs private val REACH_METRIC_SPEC: MetricSpec = metricSpec { @@ -1045,14 +1103,23 @@ private val INTERNAL_FAILED_SINGLE_PUBLISHER_IMPRESSION_METRIC = // Internal Cross Publisher Watch Duration Metrics private val INTERNAL_REQUESTING_CROSS_PUBLISHER_WATCH_DURATION_METRIC = internalMetric { cmmsMeasurementConsumerId = MEASUREMENT_CONSUMERS.keys.first().measurementConsumerId - metricIdempotencyKey = CROSS_PUBLISHER_WATCH_DURATION_METRIC_IDEMPOTENCY_KEY externalReportingSetId = INTERNAL_UNION_ALL_REPORTING_SET.externalReportingSetId timeInterval = INTERNAL_TIME_INTERVAL metricSpec = internalMetricSpec { watchDuration = InternalMetricSpecKt.watchDurationParams { + privacyParams = + InternalMetricSpecKt.differentialPrivacyParams { + epsilon = WATCH_DURATION_EPSILON + delta = DIFFERENTIAL_PRIVACY_DELTA + } maximumWatchDurationPerUser = MAXIMUM_WATCH_DURATION_PER_USER } + vidSamplingInterval = + InternalMetricSpecKt.vidSamplingInterval { + start = WATCH_DURATION_VID_SAMPLING_START + width = WATCH_DURATION_VID_SAMPLING_WIDTH + } } weightedMeasurements += weightedMeasurement { weight = 1 @@ -1090,7 +1157,9 @@ private val INTERNAL_SUCCEEDED_CROSS_PUBLISHER_WATCH_DURATION_METRIC = filters += this@copy.details.filtersList result = internalMetricResult { watchDuration = - InternalMetricResultKt.doubleResult { value = TOTAL_WATCH_DURATION.seconds.toDouble() } + InternalMetricResultKt.watchDurationResult { + value = TOTAL_WATCH_DURATION.seconds.toDouble() + } } } } @@ -1191,6 +1260,21 @@ private val PENDING_CROSS_PUBLISHER_WATCH_DURATION_METRIC = externalIdToApiId(INTERNAL_PENDING_CROSS_PUBLISHER_WATCH_DURATION_METRIC.externalMetricId) ) .toName() + metricSpec = metricSpec { + watchDuration = watchDurationParams { + privacyParams = + MetricSpecKt.differentialPrivacyParams { + epsilon = WATCH_DURATION_EPSILON + delta = DIFFERENTIAL_PRIVACY_DELTA + } + maximumWatchDurationPerUser = MAXIMUM_WATCH_DURATION_PER_USER + } + vidSamplingInterval = + MetricSpecKt.vidSamplingInterval { + start = WATCH_DURATION_VID_SAMPLING_START + width = WATCH_DURATION_VID_SAMPLING_WIDTH + } + } state = Metric.State.RUNNING createTime = INTERNAL_PENDING_CROSS_PUBLISHER_WATCH_DURATION_METRIC.createTime } @@ -1199,11 +1283,8 @@ private val SUCCEEDED_CROSS_PUBLISHER_WATCH_DURATION_METRIC = PENDING_CROSS_PUBLISHER_WATCH_DURATION_METRIC.copy { state = Metric.State.SUCCEEDED result = metricResult { - watchDuration = doubleResult { - value = - INTERNAL_SUCCEEDED_CROSS_PUBLISHER_WATCH_DURATION_METRIC.details.result.watchDuration - .value - } + watchDuration = + MetricResultKt.watchDurationResult { value = TOTAL_WATCH_DURATION.seconds.toDouble() } } } @@ -4143,8 +4224,10 @@ class MetricsServiceTest { @Test fun `getMetric returns the metric with SUCCEEDED when the metric is already succeeded`() = runBlocking { - whenever(internalMetricsMock.getMetric(any())) - .thenReturn(INTERNAL_SUCCEEDED_INCREMENTAL_REACH_METRIC) + whenever(internalMetricsMock.batchGetMetrics(any())) + .thenReturn( + internalBatchGetMetricsResponse { metrics += INTERNAL_SUCCEEDED_INCREMENTAL_REACH_METRIC } + ) val request = getMetricRequest { name = SUCCEEDED_INCREMENTAL_REACH_METRIC.name } @@ -4153,16 +4236,19 @@ class MetricsServiceTest { runBlocking { service.getMetric(request) } } - // Verify proto argument of internal MetricsCoroutineImplBase::getMetric - val getInternalMetricCaptor: KArgumentCaptor = argumentCaptor() - verifyBlocking(internalMetricsMock, times(1)) { getMetric(getInternalMetricCaptor.capture()) } - val capturedInternalGetMetricRequests = getInternalMetricCaptor.allValues + // Verify proto argument of internal MetricsCoroutineImplBase::batchGetMetrics + val batchGetInternalMetricsCaptor: KArgumentCaptor = + argumentCaptor() + verifyBlocking(internalMetricsMock, times(1)) { + batchGetMetrics(batchGetInternalMetricsCaptor.capture()) + } + val capturedInternalGetMetricRequests = batchGetInternalMetricsCaptor.allValues assertThat(capturedInternalGetMetricRequests) .containsExactly( - internalGetMetricRequest { + internalBatchGetMetricsRequest { cmmsMeasurementConsumerId = INTERNAL_SUCCEEDED_INCREMENTAL_REACH_METRIC.cmmsMeasurementConsumerId - externalMetricId = INTERNAL_SUCCEEDED_INCREMENTAL_REACH_METRIC.externalMetricId + externalMetricIds += INTERNAL_SUCCEEDED_INCREMENTAL_REACH_METRIC.externalMetricId } ) @@ -4186,8 +4272,12 @@ class MetricsServiceTest { @Test fun `getMetric returns the metric with FAILED when the metric is already failed`() = runBlocking { - whenever(internalMetricsMock.getMetric(any())) - .thenReturn(INTERNAL_FAILED_SINGLE_PUBLISHER_IMPRESSION_METRIC) + whenever(internalMetricsMock.batchGetMetrics(any())) + .thenReturn( + internalBatchGetMetricsResponse { + metrics += INTERNAL_FAILED_SINGLE_PUBLISHER_IMPRESSION_METRIC + } + ) val request = getMetricRequest { name = FAILED_SINGLE_PUBLISHER_IMPRESSION_METRIC.name } @@ -4196,16 +4286,19 @@ class MetricsServiceTest { runBlocking { service.getMetric(request) } } - // Verify proto argument of internal MetricsCoroutineImplBase::getMetric - val getInternalMetricCaptor: KArgumentCaptor = argumentCaptor() - verifyBlocking(internalMetricsMock, times(1)) { getMetric(getInternalMetricCaptor.capture()) } - val capturedInternalGetMetricRequests = getInternalMetricCaptor.allValues + // Verify proto argument of internal MetricsCoroutineImplBase::batchGetMetrics + val batchGetInternalMetricsCaptor: KArgumentCaptor = + argumentCaptor() + verifyBlocking(internalMetricsMock, times(1)) { + batchGetMetrics(batchGetInternalMetricsCaptor.capture()) + } + val capturedInternalGetMetricRequests = batchGetInternalMetricsCaptor.allValues assertThat(capturedInternalGetMetricRequests) .containsExactly( - internalGetMetricRequest { + internalBatchGetMetricsRequest { cmmsMeasurementConsumerId = INTERNAL_FAILED_SINGLE_PUBLISHER_IMPRESSION_METRIC.cmmsMeasurementConsumerId - externalMetricId = INTERNAL_FAILED_SINGLE_PUBLISHER_IMPRESSION_METRIC.externalMetricId + externalMetricIds += INTERNAL_FAILED_SINGLE_PUBLISHER_IMPRESSION_METRIC.externalMetricId } ) @@ -4229,10 +4322,10 @@ class MetricsServiceTest { @Test fun `getMetric returns the metric with RUNNING when measurements are pending`() = runBlocking { - whenever(internalMetricsMock.getMetric(any())) + whenever(internalMetricsMock.batchGetMetrics(any())) .thenReturn( - INTERNAL_PENDING_INCREMENTAL_REACH_METRIC, - INTERNAL_PENDING_INCREMENTAL_REACH_METRIC + internalBatchGetMetricsResponse { metrics += INTERNAL_PENDING_INCREMENTAL_REACH_METRIC }, + internalBatchGetMetricsResponse { metrics += INTERNAL_PENDING_INCREMENTAL_REACH_METRIC } ) val request = getMetricRequest { name = PENDING_INCREMENTAL_REACH_METRIC.name } @@ -4242,21 +4335,24 @@ class MetricsServiceTest { runBlocking { service.getMetric(request) } } - // Verify proto argument of internal MetricsCoroutineImplBase::getMetric - val getInternalMetricCaptor: KArgumentCaptor = argumentCaptor() - verifyBlocking(internalMetricsMock, times(2)) { getMetric(getInternalMetricCaptor.capture()) } - val capturedInternalGetMetricRequests = getInternalMetricCaptor.allValues + // Verify proto argument of internal MetricsCoroutineImplBase::batchGetMetrics + val batchGetInternalMetricsCaptor: KArgumentCaptor = + argumentCaptor() + verifyBlocking(internalMetricsMock, times(2)) { + batchGetMetrics(batchGetInternalMetricsCaptor.capture()) + } + val capturedInternalGetMetricRequests = batchGetInternalMetricsCaptor.allValues assertThat(capturedInternalGetMetricRequests) .containsExactly( - internalGetMetricRequest { + internalBatchGetMetricsRequest { cmmsMeasurementConsumerId = INTERNAL_PENDING_INCREMENTAL_REACH_METRIC.cmmsMeasurementConsumerId - externalMetricId = INTERNAL_PENDING_INCREMENTAL_REACH_METRIC.externalMetricId + externalMetricIds += INTERNAL_PENDING_INCREMENTAL_REACH_METRIC.externalMetricId }, - internalGetMetricRequest { + internalBatchGetMetricsRequest { cmmsMeasurementConsumerId = INTERNAL_PENDING_INCREMENTAL_REACH_METRIC.cmmsMeasurementConsumerId - externalMetricId = INTERNAL_PENDING_INCREMENTAL_REACH_METRIC.externalMetricId + externalMetricIds += INTERNAL_PENDING_INCREMENTAL_REACH_METRIC.externalMetricId } ) @@ -4281,17 +4377,46 @@ class MetricsServiceTest { @Test fun `getMetric returns the metric with SUCCEEDED when measurements are SUCCEEDED`() = runBlocking { - whenever(internalMetricsMock.getMetric(any())) + whenever(internalMetricsMock.batchGetMetrics(any())) .thenReturn( - INTERNAL_PENDING_CROSS_PUBLISHER_WATCH_DURATION_METRIC, - INTERNAL_SUCCEEDED_CROSS_PUBLISHER_WATCH_DURATION_METRIC + internalBatchGetMetricsResponse { + metrics += INTERNAL_PENDING_CROSS_PUBLISHER_WATCH_DURATION_METRIC + }, + internalBatchGetMetricsResponse { + metrics += INTERNAL_SUCCEEDED_CROSS_PUBLISHER_WATCH_DURATION_METRIC + }, ) + + val succeededUnionAllWatchDurationMeasurement = + PENDING_UNION_ALL_WATCH_DURATION_MEASUREMENT.copy { + state = Measurement.State.SUCCEEDED + + results += + DATA_PROVIDERS.keys.zip(WATCH_DURATION_LIST).map { (dataProviderKey, watchDuration) -> + val dataProvider = DATA_PROVIDERS.getValue(dataProviderKey) + resultPair { + val result = + MeasurementKt.result { + this.watchDuration = + MeasurementKt.ResultKt.watchDuration { value = watchDuration } + } + encryptedResult = + encryptResult( + signResult(result, DATA_PROVIDER_SIGNING_KEY), + MEASUREMENT_CONSUMER_PUBLIC_KEY + ) + certificate = dataProvider.certificate + } + } + } whenever(measurementsMock.getMeasurement(any())) + .thenReturn(succeededUnionAllWatchDurationMeasurement) + whenever(internalMeasurementsMock.batchSetMeasurementResults(any())) .thenReturn( - SUCCEEDED_UNION_ALL_WATCH_DURATION_MEASUREMENT, + batchSetCmmsMeasurementResultsResponse { + measurements += INTERNAL_SUCCEEDED_UNION_ALL_WATCH_DURATION_MEASUREMENT + } ) - whenever(internalMeasurementsMock.batchSetMeasurementResults(any())) - .thenReturn(flowOf(INTERNAL_SUCCEEDED_UNION_ALL_WATCH_DURATION_MEASUREMENT)) val request = getMetricRequest { name = PENDING_CROSS_PUBLISHER_WATCH_DURATION_METRIC.name } @@ -4300,22 +4425,25 @@ class MetricsServiceTest { runBlocking { service.getMetric(request) } } - // Verify proto argument of internal MetricsCoroutineImplBase::getMetric - val getInternalMetricCaptor: KArgumentCaptor = argumentCaptor() - verifyBlocking(internalMetricsMock, times(2)) { getMetric(getInternalMetricCaptor.capture()) } - val capturedInternalGetMetricRequests = getInternalMetricCaptor.allValues + // Verify proto argument of internal MetricsCoroutineImplBase::batchGetMetrics + val batchGetInternalMetricsCaptor: KArgumentCaptor = + argumentCaptor() + verifyBlocking(internalMetricsMock, times(2)) { + batchGetMetrics(batchGetInternalMetricsCaptor.capture()) + } + val capturedInternalGetMetricRequests = batchGetInternalMetricsCaptor.allValues assertThat(capturedInternalGetMetricRequests) .containsExactly( - internalGetMetricRequest { + internalBatchGetMetricsRequest { cmmsMeasurementConsumerId = INTERNAL_PENDING_CROSS_PUBLISHER_WATCH_DURATION_METRIC.cmmsMeasurementConsumerId - externalMetricId = + externalMetricIds += INTERNAL_PENDING_CROSS_PUBLISHER_WATCH_DURATION_METRIC.externalMetricId }, - internalGetMetricRequest { + internalBatchGetMetricsRequest { cmmsMeasurementConsumerId = INTERNAL_PENDING_CROSS_PUBLISHER_WATCH_DURATION_METRIC.cmmsMeasurementConsumerId - externalMetricId = + externalMetricIds += INTERNAL_PENDING_CROSS_PUBLISHER_WATCH_DURATION_METRIC.externalMetricId } ) @@ -4332,9 +4460,9 @@ class MetricsServiceTest { cmmsMeasurementConsumerId = INTERNAL_SUCCEEDED_UNION_ALL_WATCH_DURATION_MEASUREMENT.cmmsMeasurementConsumerId measurementResults += measurementResult { - externalMeasurementId = - INTERNAL_SUCCEEDED_UNION_ALL_WATCH_DURATION_MEASUREMENT.externalMeasurementId - this.result = INTERNAL_SUCCEEDED_UNION_ALL_WATCH_DURATION_MEASUREMENT.result + cmmsMeasurementId = + INTERNAL_SUCCEEDED_UNION_ALL_WATCH_DURATION_MEASUREMENT.cmmsMeasurementId + this.result = INTERNAL_SUCCEEDED_UNION_ALL_WATCH_DURATION_MEASUREMENT.details.result } } )