diff --git a/src/main/kotlin/org/wfanet/measurement/reporting/service/api/v2alpha/MetricsService.kt b/src/main/kotlin/org/wfanet/measurement/reporting/service/api/v2alpha/MetricsService.kt index 21e19dd62d6..b51891a74d9 100644 --- a/src/main/kotlin/org/wfanet/measurement/reporting/service/api/v2alpha/MetricsService.kt +++ b/src/main/kotlin/org/wfanet/measurement/reporting/service/api/v2alpha/MetricsService.kt @@ -1389,16 +1389,13 @@ fun ListMetricsRequest.toListMetricsPageToken(): ListMetricsPageToken { } val cmmsMeasurementConsumerId = parentKey.measurementConsumerId - val isValidPageSize = - source.pageSize != 0 && source.pageSize >= MIN_PAGE_SIZE && source.pageSize <= MAX_PAGE_SIZE - return if (pageToken.isNotBlank()) { ListMetricsPageToken.parseFrom(source.pageToken.base64UrlDecode()).copy { grpcRequire(this.cmmsMeasurementConsumerId == cmmsMeasurementConsumerId) { "Arguments must be kept the same when using a page token." } - if (isValidPageSize) { + if (source.pageSize in MIN_PAGE_SIZE..MAX_PAGE_SIZE) { pageSize = source.pageSize } } diff --git a/src/main/kotlin/org/wfanet/measurement/reporting/service/api/v2alpha/ProtoConversions.kt b/src/main/kotlin/org/wfanet/measurement/reporting/service/api/v2alpha/ProtoConversions.kt index 23935b86ea3..8192e6a16d5 100644 --- a/src/main/kotlin/org/wfanet/measurement/reporting/service/api/v2alpha/ProtoConversions.kt +++ b/src/main/kotlin/org/wfanet/measurement/reporting/service/api/v2alpha/ProtoConversions.kt @@ -39,17 +39,21 @@ import org.wfanet.measurement.internal.reporting.v2.StreamMetricsRequest import org.wfanet.measurement.internal.reporting.v2.StreamMetricsRequestKt import org.wfanet.measurement.internal.reporting.v2.StreamReportingSetsRequest import org.wfanet.measurement.internal.reporting.v2.StreamReportingSetsRequestKt +import org.wfanet.measurement.internal.reporting.v2.StreamReportsRequest +import org.wfanet.measurement.internal.reporting.v2.StreamReportsRequestKt import org.wfanet.measurement.internal.reporting.v2.TimeInterval as InternalTimeInterval import org.wfanet.measurement.internal.reporting.v2.TimeIntervals as InternalTimeIntervals import org.wfanet.measurement.internal.reporting.v2.metricSpec as internalMetricSpec import org.wfanet.measurement.internal.reporting.v2.periodicTimeInterval as internalPeriodicTimeInterval import org.wfanet.measurement.internal.reporting.v2.streamMetricsRequest import org.wfanet.measurement.internal.reporting.v2.streamReportingSetsRequest +import org.wfanet.measurement.internal.reporting.v2.streamReportsRequest import org.wfanet.measurement.internal.reporting.v2.timeInterval as internalTimeInterval import org.wfanet.measurement.internal.reporting.v2.timeIntervals as internalTimeIntervals import org.wfanet.measurement.reporting.v2alpha.CreateMetricRequest import org.wfanet.measurement.reporting.v2alpha.ListMetricsPageToken import org.wfanet.measurement.reporting.v2alpha.ListReportingSetsPageToken +import org.wfanet.measurement.reporting.v2alpha.ListReportsPageToken import org.wfanet.measurement.reporting.v2alpha.MetricSpec import org.wfanet.measurement.reporting.v2alpha.MetricSpecKt import org.wfanet.measurement.reporting.v2alpha.PeriodicTimeInterval @@ -658,3 +662,17 @@ fun Map.Entry.toReportingMe } } } + +/** Converts a [ListReportsPageToken] to an internal [StreamReportsRequest]. */ +fun ListReportsPageToken.toStreamReportsRequest(): StreamReportsRequest { + val source = this + return streamReportsRequest { + // get one more than the actual page size for deciding whether to set page token + limit = pageSize + 1 + filter = + StreamReportsRequestKt.filter { + cmmsMeasurementConsumerId = source.cmmsMeasurementConsumerId + externalReportIdAfter = source.lastReport.externalReportId + } + } +} diff --git a/src/main/kotlin/org/wfanet/measurement/reporting/service/api/v2alpha/ReportsService.kt b/src/main/kotlin/org/wfanet/measurement/reporting/service/api/v2alpha/ReportsService.kt index 87bd0a21667..3bc7c0ebf52 100644 --- a/src/main/kotlin/org/wfanet/measurement/reporting/service/api/v2alpha/ReportsService.kt +++ b/src/main/kotlin/org/wfanet/measurement/reporting/service/api/v2alpha/ReportsService.kt @@ -19,8 +19,11 @@ package org.wfanet.measurement.reporting.service.api.v2alpha import io.grpc.Status import io.grpc.StatusException import kotlin.math.min +import kotlinx.coroutines.flow.toList import org.wfanet.measurement.api.v2alpha.MeasurementConsumerKey import org.wfanet.measurement.api.withAuthenticationKey +import org.wfanet.measurement.common.base64UrlDecode +import org.wfanet.measurement.common.base64UrlEncode import org.wfanet.measurement.common.grpc.failGrpc import org.wfanet.measurement.common.grpc.grpcRequire import org.wfanet.measurement.common.grpc.grpcRequireNotNull @@ -32,6 +35,7 @@ import org.wfanet.measurement.internal.reporting.v2.MetricSpec as InternalMetric import org.wfanet.measurement.internal.reporting.v2.Report as InternalReport import org.wfanet.measurement.internal.reporting.v2.ReportKt as InternalReportKt import org.wfanet.measurement.internal.reporting.v2.ReportsGrpcKt.ReportsCoroutineStub +import org.wfanet.measurement.internal.reporting.v2.StreamReportsRequest import org.wfanet.measurement.internal.reporting.v2.TimeInterval as InternalTimeInterval import org.wfanet.measurement.internal.reporting.v2.copy import org.wfanet.measurement.internal.reporting.v2.createReportRequest as internalCreateReportRequest @@ -42,6 +46,10 @@ import org.wfanet.measurement.reporting.v2alpha.BatchGetMetricsRequest import org.wfanet.measurement.reporting.v2alpha.CreateMetricRequest import org.wfanet.measurement.reporting.v2alpha.CreateReportRequest import org.wfanet.measurement.reporting.v2alpha.GetReportRequest +import org.wfanet.measurement.reporting.v2alpha.ListReportsPageToken +import org.wfanet.measurement.reporting.v2alpha.ListReportsPageTokenKt +import org.wfanet.measurement.reporting.v2alpha.ListReportsRequest +import org.wfanet.measurement.reporting.v2alpha.ListReportsResponse import org.wfanet.measurement.reporting.v2alpha.Metric import org.wfanet.measurement.reporting.v2alpha.MetricsGrpcKt.MetricsCoroutineStub import org.wfanet.measurement.reporting.v2alpha.Report @@ -49,8 +57,15 @@ import org.wfanet.measurement.reporting.v2alpha.ReportKt import org.wfanet.measurement.reporting.v2alpha.ReportsGrpcKt.ReportsCoroutineImplBase import org.wfanet.measurement.reporting.v2alpha.batchCreateMetricsRequest import org.wfanet.measurement.reporting.v2alpha.batchGetMetricsRequest +import org.wfanet.measurement.reporting.v2alpha.copy +import org.wfanet.measurement.reporting.v2alpha.listReportsPageToken +import org.wfanet.measurement.reporting.v2alpha.listReportsResponse import org.wfanet.measurement.reporting.v2alpha.report +private const val MIN_PAGE_SIZE = 1 +private const val DEFAULT_PAGE_SIZE = 50 +private const val MAX_PAGE_SIZE = 1000 + private const val MAX_BATCH_SIZE_FOR_BATCH_CREATE_METRICS = 1000 private const val MAX_BATCH_SIZE_FOR_BATCH_GET_METRICS = 100 @@ -75,6 +90,78 @@ class ReportsService( val internalTimeRange: InternalTimeRange, ) + override suspend fun listReports(request: ListReportsRequest): ListReportsResponse { + val listReportsPageToken = request.toListReportsPageToken() + + val principal: ReportingPrincipal = principalFromCurrentContext + when (principal) { + is MeasurementConsumerPrincipal -> { + if (request.parent != principal.resourceKey.toName()) { + failGrpc(Status.PERMISSION_DENIED) { + "Cannot list Reports belonging to other MeasurementConsumers." + } + } + } + } + + val streamInternalReportsRequest: StreamReportsRequest = + listReportsPageToken.toStreamReportsRequest() + + val results: List = + try { + internalReportsStub.streamReports(streamInternalReportsRequest).toList() + } catch (e: StatusException) { + throw Exception("Unable to list reports from the reporting database.", e) + } + + if (results.isEmpty()) { + return ListReportsResponse.getDefaultInstance() + } + + val nextPageToken: ListReportsPageToken? = + if (results.size > listReportsPageToken.pageSize) { + listReportsPageToken.copy { + lastReport = + ListReportsPageTokenKt.previousPageEnd { + cmmsMeasurementConsumerId = results[results.lastIndex - 1].cmmsMeasurementConsumerId + externalReportId = results[results.lastIndex - 1].externalReportId + } + } + } else { + null + } + + val subResults: List = + results.subList(0, min(results.size, listReportsPageToken.pageSize)) + + // Get metrics. + val metricNames: List = + subResults.flatMap { internalReport -> + internalReport.externalMetricIds.map { externalMetricId -> + MetricKey( + principal.resourceKey.measurementConsumerId, + externalIdToApiId(externalMetricId) + ) + .toName() + } + } + + val externalIdToMetricMap: Map = + batchGetMetrics(principal.resourceKey.toName(), principal.config.apiKey, metricNames) + .associateBy { apiIdToExternalId(checkNotNull(MetricKey.fromName(it.name)).metricId) } + + return listReportsResponse { + reports += + subResults.map { internalReport -> + convertInternalReportToPublic(internalReport, externalIdToMetricMap) + } + + if (nextPageToken != null) { + this.nextPageToken = nextPageToken.toByteString().base64UrlEncode() + } + } + } + override suspend fun getReport(request: GetReportRequest): Report { val reportKey = grpcRequireNotNull(ReportKey.fromName(request.name)) { @@ -104,24 +191,18 @@ class ReportsService( throw Exception("Unable to get the report from the reporting database.", e) } - // Create metrics. + // Get metrics. val metricNames: List = - internalReport.reportingMetricEntriesMap.flatMap { (_, reportingMetricCalculationSpec) -> - reportingMetricCalculationSpec.metricCalculationSpecsList.flatMap { metricCalculationSpec -> - metricCalculationSpec.reportingMetricsList.map { reportingMetric -> - MetricKey( - principal.resourceKey.measurementConsumerId, - externalIdToApiId(reportingMetric.externalMetricId) - ) - .toName() - } - } + internalReport.externalMetricIds.map { externalMetricId -> + MetricKey(principal.resourceKey.measurementConsumerId, externalIdToApiId(externalMetricId)) + .toName() } - val metrics: List = + val externalIdToMetricMap: Map = batchGetMetrics(principal.resourceKey.toName(), principal.config.apiKey, metricNames) + .associateBy { apiIdToExternalId(checkNotNull(MetricKey.fromName(it.name)).metricId) } // Convert the internal report to public and return. - return convertInternalReportToPublic(internalReport, metrics) + return convertInternalReportToPublic(internalReport, externalIdToMetricMap) } private suspend fun batchGetMetrics( @@ -209,8 +290,9 @@ class ReportsService( } } .map { it.toCreateMetricRequest(principal.resourceKey) } - val metrics: List = + val externalIdToMetricMap: Map = batchCreateMetrics(request.parent, principal.config.apiKey, createMetricRequests) + .associateBy { apiIdToExternalId(checkNotNull(MetricKey.fromName(it.name)).metricId) } // Once all metrics are created, get the updated internal report with the metric IDs filled. val updatedInternalReport = @@ -226,13 +308,13 @@ class ReportsService( } // Convert the internal report to public and return. - return convertInternalReportToPublic(updatedInternalReport, metrics) + return convertInternalReportToPublic(updatedInternalReport, externalIdToMetricMap) } /** Converts an internal [InternalReport] to a public [Report]. */ private fun convertInternalReportToPublic( internalReport: InternalReport, - metrics: List, + externalIdToMetricMap: Map, ): Report { return report { name = @@ -259,6 +341,11 @@ class ReportsService( error("The time in the internal report should've been set.") } + val metrics: List = + internalReport.externalMetricIds.map { externalMetricId -> + externalIdToMetricMap.getValue(externalMetricId) + } + state = inferReportState(metrics) createTime = internalReport.createTime @@ -267,7 +354,7 @@ class ReportsService( buildMetricCalculationResults( internalReport.cmmsMeasurementConsumerId, internalReport.reportingMetricEntriesMap, - metrics + externalIdToMetricMap ) } } @@ -277,11 +364,8 @@ class ReportsService( private fun buildMetricCalculationResults( cmmsMeasurementConsumerId: String, internalReportingMetricEntries: InternalReportingMetricEntries, - metrics: List, + externalIdToMetricMap: Map, ): List { - val externalIdToMetricMap: Map = - metrics.associateBy { apiIdToExternalId(checkNotNull(MetricKey.fromName(it.name)).metricId) } - return internalReportingMetricEntries.flatMap { (externalReportingSetId, reportingMetricCalculationSpec) -> val reportingSetName = @@ -607,3 +691,48 @@ private fun inferReportState(metrics: Collection): Report.State { Report.State.RUNNING } } + +/** Gets all external metric IDs used in the [InternalReport]. */ +private val InternalReport.externalMetricIds: List + get() = + reportingMetricEntriesMap.flatMap { (_, reportingMetricCalculationSpec) -> + reportingMetricCalculationSpec.metricCalculationSpecsList.flatMap { metricCalculationSpec -> + metricCalculationSpec.reportingMetricsList.map { reportingMetric -> + reportingMetric.externalMetricId + } + } + } + +/** Converts a public [ListReportsRequest] to a [ListReportsPageToken]. */ +private fun ListReportsRequest.toListReportsPageToken(): ListReportsPageToken { + grpcRequire(pageSize >= 0) { "Page size cannot be less than 0" } + + val source = this + val parentKey: MeasurementConsumerKey = + grpcRequireNotNull(MeasurementConsumerKey.fromName(parent)) { + "Parent is either unspecified or invalid." + } + val cmmsMeasurementConsumerId = parentKey.measurementConsumerId + + return if (pageToken.isNotBlank()) { + ListReportsPageToken.parseFrom(pageToken.base64UrlDecode()).copy { + grpcRequire(this.cmmsMeasurementConsumerId == cmmsMeasurementConsumerId) { + "Arguments must be kept the same when using a page token" + } + + if (source.pageSize in MIN_PAGE_SIZE..MAX_PAGE_SIZE) { + pageSize = source.pageSize + } + } + } else { + listReportsPageToken { + pageSize = + when { + source.pageSize < MIN_PAGE_SIZE -> DEFAULT_PAGE_SIZE + source.pageSize > MAX_PAGE_SIZE -> MAX_PAGE_SIZE + else -> source.pageSize + } + this.cmmsMeasurementConsumerId = cmmsMeasurementConsumerId + } + } +} diff --git a/src/test/kotlin/org/wfanet/measurement/reporting/service/api/v2alpha/ReportsServiceTest.kt b/src/test/kotlin/org/wfanet/measurement/reporting/service/api/v2alpha/ReportsServiceTest.kt index cc79e538acb..3d3d3ef5403 100644 --- a/src/test/kotlin/org/wfanet/measurement/reporting/service/api/v2alpha/ReportsServiceTest.kt +++ b/src/test/kotlin/org/wfanet/measurement/reporting/service/api/v2alpha/ReportsServiceTest.kt @@ -25,6 +25,7 @@ import io.grpc.StatusRuntimeException import java.time.Duration import java.time.Instant import kotlin.test.assertFailsWith +import kotlinx.coroutines.flow.flowOf import kotlinx.coroutines.runBlocking import org.junit.Before import org.junit.Rule @@ -41,13 +42,13 @@ import org.mockito.kotlin.whenever import org.wfanet.measurement.api.v2alpha.DataProviderKey import org.wfanet.measurement.api.v2alpha.MeasurementConsumerKey import org.wfanet.measurement.api.v2alpha.withDataProviderPrincipal +import org.wfanet.measurement.common.base64UrlEncode import org.wfanet.measurement.common.grpc.testing.GrpcTestServerRule import org.wfanet.measurement.common.grpc.testing.mockService import org.wfanet.measurement.common.identity.ExternalId import org.wfanet.measurement.common.identity.apiIdToExternalId import org.wfanet.measurement.common.identity.externalIdToApiId import org.wfanet.measurement.common.testing.verifyProtoArgument -import org.wfanet.measurement.common.toProtoDuration import org.wfanet.measurement.common.toProtoTime import org.wfanet.measurement.config.reporting.MetricSpecConfigKt import org.wfanet.measurement.config.reporting.measurementConsumerConfig @@ -58,6 +59,7 @@ import org.wfanet.measurement.internal.reporting.v2.Report as InternalReport import org.wfanet.measurement.internal.reporting.v2.ReportKt as InternalReportKt import org.wfanet.measurement.internal.reporting.v2.ReportsGrpcKt.ReportsCoroutineImplBase import org.wfanet.measurement.internal.reporting.v2.ReportsGrpcKt.ReportsCoroutineStub as InternalReportsCoroutineStub +import org.wfanet.measurement.internal.reporting.v2.StreamReportsRequestKt import org.wfanet.measurement.internal.reporting.v2.TimeInterval as InternalTimeInterval import org.wfanet.measurement.internal.reporting.v2.copy import org.wfanet.measurement.internal.reporting.v2.createReportRequest as internalCreateReportRequest @@ -65,12 +67,16 @@ import org.wfanet.measurement.internal.reporting.v2.getReportRequest as internal import org.wfanet.measurement.internal.reporting.v2.metricSpec as internalMetricSpec import org.wfanet.measurement.internal.reporting.v2.periodicTimeInterval as internalPeriodicTimeInterval import org.wfanet.measurement.internal.reporting.v2.report as internalReport +import org.wfanet.measurement.internal.reporting.v2.streamReportsRequest import org.wfanet.measurement.internal.reporting.v2.timeInterval as internalTimeInterval import org.wfanet.measurement.internal.reporting.v2.timeIntervals as internalTimeIntervals import org.wfanet.measurement.reporting.v2alpha.BatchCreateMetricsRequest import org.wfanet.measurement.reporting.v2alpha.BatchGetMetricsRequest +import org.wfanet.measurement.reporting.v2alpha.ListReportsPageTokenKt +import org.wfanet.measurement.reporting.v2alpha.ListReportsRequest import org.wfanet.measurement.reporting.v2alpha.Metric import org.wfanet.measurement.reporting.v2alpha.MetricResultKt.reachResult +import org.wfanet.measurement.reporting.v2alpha.MetricResultKt.watchDurationResult import org.wfanet.measurement.reporting.v2alpha.MetricSpec import org.wfanet.measurement.reporting.v2alpha.MetricSpecKt import org.wfanet.measurement.reporting.v2alpha.MetricsGrpcKt @@ -87,6 +93,9 @@ import org.wfanet.measurement.reporting.v2alpha.copy import org.wfanet.measurement.reporting.v2alpha.createMetricRequest import org.wfanet.measurement.reporting.v2alpha.createReportRequest import org.wfanet.measurement.reporting.v2alpha.getReportRequest +import org.wfanet.measurement.reporting.v2alpha.listReportsPageToken +import org.wfanet.measurement.reporting.v2alpha.listReportsRequest +import org.wfanet.measurement.reporting.v2alpha.listReportsResponse import org.wfanet.measurement.reporting.v2alpha.metric import org.wfanet.measurement.reporting.v2alpha.metricResult import org.wfanet.measurement.reporting.v2alpha.metricSpec @@ -96,6 +105,9 @@ import org.wfanet.measurement.reporting.v2alpha.reportingSet import org.wfanet.measurement.reporting.v2alpha.timeInterval import org.wfanet.measurement.reporting.v2alpha.timeIntervals +private const val DEFAULT_PAGE_SIZE = 50 +private const val MAX_PAGE_SIZE = 1000 + // Authentication key private const val API_AUTHENTICATION_KEY = "nR5QPN7ptx" @@ -137,25 +149,48 @@ class ReportsServiceTest { private val internalReportsMock: ReportsCoroutineImplBase = mockService { onBlocking { - createReport(eq(internalCreateReportRequest { report = INTERNAL_REPORTS.requestingReport })) + createReport( + eq(internalCreateReportRequest { report = INTERNAL_REACH_REPORTS.requestingReport }) + ) } - .thenReturn(INTERNAL_REPORTS.initialReport) + .thenReturn(INTERNAL_REACH_REPORTS.initialReport) onBlocking { getReport( eq( internalGetReportRequest { - cmmsMeasurementConsumerId = INTERNAL_REPORTS.initialReport.cmmsMeasurementConsumerId - externalReportId = INTERNAL_REPORTS.initialReport.externalReportId + cmmsMeasurementConsumerId = + INTERNAL_REACH_REPORTS.initialReport.cmmsMeasurementConsumerId + externalReportId = INTERNAL_REACH_REPORTS.initialReport.externalReportId } ) ) } - .thenReturn(INTERNAL_REPORTS.pendingReport) + .thenReturn(INTERNAL_REACH_REPORTS.pendingReport) + onBlocking { streamReports(any()) } + .thenReturn( + flowOf( + INTERNAL_REACH_REPORTS.pendingReport, + INTERNAL_WATCH_DURATION_REPORTS.pendingReport, + ) + ) } private val metricsMock: MetricsGrpcKt.MetricsCoroutineImplBase = mockService { onBlocking { batchCreateMetrics(any()) } .thenReturn(batchCreateMetricsResponse { metrics += RUNNING_REACH_METRIC }) + + onBlocking { batchGetMetrics(any()) } + .thenAnswer { + val request = it.arguments[0] as BatchGetMetricsRequest + val metricsMap = + mapOf( + RUNNING_REACH_METRIC.name to RUNNING_REACH_METRIC, + RUNNING_WATCH_DURATION_METRIC.name to RUNNING_WATCH_DURATION_METRIC + ) + batchGetMetricsResponse { + metrics += request.namesList.map { metricName -> metricsMap.getValue(metricName) } + } + } } @get:Rule @@ -327,7 +362,7 @@ class ReportsServiceTest { name = MetricKey( MEASUREMENT_CONSUMER_KEYS.first().measurementConsumerId, - ExternalId(EXTERNAL_METRIC_ID_BASE + index).apiId.value + ExternalId(EXTERNAL_REACH_METRIC_ID_BASE + index).apiId.value ) .toName() state = Metric.State.RUNNING @@ -470,7 +505,7 @@ class ReportsServiceTest { name = MetricKey( MEASUREMENT_CONSUMER_KEYS.first().measurementConsumerId, - ExternalId(EXTERNAL_METRIC_ID_BASE + index).apiId.value + ExternalId(EXTERNAL_REACH_METRIC_ID_BASE + index).apiId.value ) .toName() state = Metric.State.RUNNING @@ -609,7 +644,7 @@ class ReportsServiceTest { name = MetricKey( MEASUREMENT_CONSUMER_KEYS.first().measurementConsumerId, - ExternalId(EXTERNAL_METRIC_ID_BASE + index).apiId.value + ExternalId(EXTERNAL_REACH_METRIC_ID_BASE + index).apiId.value ) .toName() state = Metric.State.RUNNING @@ -814,7 +849,7 @@ class ReportsServiceTest { name = MetricKey( MEASUREMENT_CONSUMER_KEYS.first().measurementConsumerId, - ExternalId(EXTERNAL_METRIC_ID_BASE + index).apiId.value + ExternalId(EXTERNAL_REACH_METRIC_ID_BASE + index).apiId.value ) .toName() state = Metric.State.RUNNING @@ -950,7 +985,7 @@ class ReportsServiceTest { listOf( createMetricRequest.copy { this.createMetricRequestId = requestId.toString() - externalMetricId = EXTERNAL_METRIC_ID_BASE + requestId + externalMetricId = EXTERNAL_REACH_METRIC_ID_BASE + requestId } ) requestId++ @@ -1003,7 +1038,7 @@ class ReportsServiceTest { name = MetricKey( MEASUREMENT_CONSUMER_KEYS.first().measurementConsumerId, - ExternalId(EXTERNAL_METRIC_ID_BASE + index).apiId.value + ExternalId(EXTERNAL_REACH_METRIC_ID_BASE + index).apiId.value ) .toName() state = Metric.State.RUNNING @@ -1148,7 +1183,7 @@ class ReportsServiceTest { reportingMetrics.map { reportingMetric -> reportingMetric.copy { this.createMetricRequestId = requestId.toString() - externalMetricId = EXTERNAL_METRIC_ID_BASE + requestId + externalMetricId = EXTERNAL_REACH_METRIC_ID_BASE + requestId } } this.reportingMetrics.clear() @@ -1202,7 +1237,7 @@ class ReportsServiceTest { name = MetricKey( MEASUREMENT_CONSUMER_KEYS.first().measurementConsumerId, - ExternalId(EXTERNAL_METRIC_ID_BASE + index).apiId.value + ExternalId(EXTERNAL_REACH_METRIC_ID_BASE + index).apiId.value ) .toName() state = Metric.State.RUNNING @@ -1344,7 +1379,7 @@ class ReportsServiceTest { reportingCreateMetricRequests.mapIndexed { requestId, request -> request.copy { this.createMetricRequestId = requestId.toString() - externalMetricId = EXTERNAL_METRIC_ID_BASE + requestId + externalMetricId = EXTERNAL_REACH_METRIC_ID_BASE + requestId } } @@ -1387,7 +1422,7 @@ class ReportsServiceTest { name = MetricKey( MEASUREMENT_CONSUMER_KEYS.first().measurementConsumerId, - ExternalId(EXTERNAL_METRIC_ID_BASE + index).apiId.value + ExternalId(EXTERNAL_REACH_METRIC_ID_BASE + index).apiId.value ) .toName() reportingSet = PRIMITIVE_REPORTING_SETS.first().name @@ -2196,7 +2231,7 @@ class ReportsServiceTest { reportingCreateMetricRequests.mapIndexed { requestId, request -> request.copy { this.createMetricRequestId = requestId.toString() - externalMetricId = EXTERNAL_METRIC_ID_BASE + requestId + externalMetricId = EXTERNAL_REACH_METRIC_ID_BASE + requestId } } @@ -2232,7 +2267,7 @@ class ReportsServiceTest { name = MetricKey( MEASUREMENT_CONSUMER_KEYS.first().measurementConsumerId, - ExternalId(EXTERNAL_METRIC_ID_BASE + index).apiId.value + ExternalId(EXTERNAL_REACH_METRIC_ID_BASE + index).apiId.value ) .toName() reportingSet = PRIMITIVE_REPORTING_SETS.first().name @@ -2317,6 +2352,435 @@ class ReportsServiceTest { assertThat(exception.status.code).isEqualTo(Status.Code.UNAUTHENTICATED) } + @Test + fun `listReports returns without a next page token when there is no previous page token`() { + val request = listReportsRequest { parent = MEASUREMENT_CONSUMER_KEYS.first().toName() } + + val result = + withMeasurementConsumerPrincipal(MEASUREMENT_CONSUMER_KEYS.first().toName(), CONFIG) { + runBlocking { service.listReports(request) } + } + + val expected = listReportsResponse { + reports += PENDING_REACH_REPORT + reports += PENDING_WATCH_DURATION_REPORT + } + + verifyProtoArgument(internalReportsMock, ReportsCoroutineImplBase::streamReports) + .isEqualTo( + streamReportsRequest { + limit = DEFAULT_PAGE_SIZE + 1 + filter = + StreamReportsRequestKt.filter { + cmmsMeasurementConsumerId = MEASUREMENT_CONSUMER_KEYS.first().measurementConsumerId + } + } + ) + + assertThat(result).ignoringRepeatedFieldOrder().isEqualTo(expected) + } + + @Test + fun `listReports returns with a next page token when there is no previous page token`() { + val pageSize = 1 + val request = listReportsRequest { + parent = MEASUREMENT_CONSUMER_KEYS.first().toName() + this.pageSize = pageSize + } + + val result = + withMeasurementConsumerPrincipal(MEASUREMENT_CONSUMER_KEYS.first().toName(), CONFIG) { + runBlocking { service.listReports(request) } + } + + val expected = listReportsResponse { + reports.add(PENDING_REACH_REPORT) + + nextPageToken = + listReportsPageToken { + this.pageSize = pageSize + cmmsMeasurementConsumerId = MEASUREMENT_CONSUMER_KEYS.first().measurementConsumerId + lastReport = + ListReportsPageTokenKt.previousPageEnd { + cmmsMeasurementConsumerId = MEASUREMENT_CONSUMER_KEYS.first().measurementConsumerId + externalReportId = PENDING_REACH_REPORT.externalId + } + } + .toByteString() + .base64UrlEncode() + } + + verifyProtoArgument(internalReportsMock, ReportsCoroutineImplBase::streamReports) + .isEqualTo( + streamReportsRequest { + limit = pageSize + 1 + this.filter = + StreamReportsRequestKt.filter { + cmmsMeasurementConsumerId = MEASUREMENT_CONSUMER_KEYS.first().measurementConsumerId + } + } + ) + + assertThat(result).ignoringRepeatedFieldOrder().isEqualTo(expected) + } + + @Test + fun `listReports returns without a next page token when there is a previous page token`() = + runBlocking { + whenever(internalReportsMock.streamReports(any())) + .thenReturn(flowOf(INTERNAL_WATCH_DURATION_REPORTS.pendingReport)) + + val pageSize = 1 + val request = listReportsRequest { + parent = MEASUREMENT_CONSUMER_KEYS.first().toName() + this.pageSize = pageSize + pageToken = + listReportsPageToken { + this.pageSize = pageSize + cmmsMeasurementConsumerId = MEASUREMENT_CONSUMER_KEYS.first().measurementConsumerId + lastReport = + ListReportsPageTokenKt.previousPageEnd { + cmmsMeasurementConsumerId = + MEASUREMENT_CONSUMER_KEYS.first().measurementConsumerId + externalReportId = PENDING_REACH_REPORT.externalId + } + } + .toByteString() + .base64UrlEncode() + } + + val result = + withMeasurementConsumerPrincipal(MEASUREMENT_CONSUMER_KEYS.first().toName(), CONFIG) { + runBlocking { service.listReports(request) } + } + + val expected = listReportsResponse { reports.add(PENDING_WATCH_DURATION_REPORT) } + + verifyProtoArgument(internalReportsMock, ReportsCoroutineImplBase::streamReports) + .isEqualTo( + streamReportsRequest { + limit = pageSize + 1 + this.filter = + StreamReportsRequestKt.filter { + cmmsMeasurementConsumerId = MEASUREMENT_CONSUMER_KEYS.first().measurementConsumerId + externalReportIdAfter = PENDING_REACH_REPORT.externalId + } + } + ) + + assertThat(result).ignoringRepeatedFieldOrder().isEqualTo(expected) + } + + @Test + fun `listReports with page size replaced with a valid value and no previous page token`() { + val invalidPageSize = MAX_PAGE_SIZE * 2 + val request = listReportsRequest { + parent = MEASUREMENT_CONSUMER_KEYS.first().toName() + pageSize = invalidPageSize + } + + val result = + withMeasurementConsumerPrincipal(MEASUREMENT_CONSUMER_KEYS.first().toName(), CONFIG) { + runBlocking { service.listReports(request) } + } + + val expected = listReportsResponse { + reports += PENDING_REACH_REPORT + reports += PENDING_WATCH_DURATION_REPORT + } + + verifyProtoArgument(internalReportsMock, ReportsCoroutineImplBase::streamReports) + .isEqualTo( + streamReportsRequest { + limit = MAX_PAGE_SIZE + 1 + this.filter = + StreamReportsRequestKt.filter { + cmmsMeasurementConsumerId = MEASUREMENT_CONSUMER_KEYS.first().measurementConsumerId + } + } + ) + + assertThat(result).ignoringRepeatedFieldOrder().isEqualTo(expected) + } + + @Test + fun `listReports with invalid page size replaced with the one in previous page token`() = + runBlocking { + whenever(internalReportsMock.streamReports(any())) + .thenReturn(flowOf(INTERNAL_WATCH_DURATION_REPORTS.pendingReport)) + + val invalidPageSize = MAX_PAGE_SIZE * 2 + val previousPageSize = 1 + + val request = listReportsRequest { + parent = MEASUREMENT_CONSUMER_KEYS.first().toName() + pageSize = invalidPageSize + pageToken = + listReportsPageToken { + pageSize = previousPageSize + cmmsMeasurementConsumerId = MEASUREMENT_CONSUMER_KEYS.first().measurementConsumerId + lastReport = + ListReportsPageTokenKt.previousPageEnd { + cmmsMeasurementConsumerId = + MEASUREMENT_CONSUMER_KEYS.first().measurementConsumerId + externalReportId = PENDING_REACH_REPORT.externalId + } + } + .toByteString() + .base64UrlEncode() + } + + val result = + withMeasurementConsumerPrincipal(MEASUREMENT_CONSUMER_KEYS.first().toName(), CONFIG) { + runBlocking { service.listReports(request) } + } + + val expected = listReportsResponse { reports += PENDING_WATCH_DURATION_REPORT } + + verifyProtoArgument(internalReportsMock, ReportsCoroutineImplBase::streamReports) + .isEqualTo( + streamReportsRequest { + limit = previousPageSize + 1 + this.filter = + StreamReportsRequestKt.filter { + cmmsMeasurementConsumerId = MEASUREMENT_CONSUMER_KEYS.first().measurementConsumerId + externalReportIdAfter = PENDING_REACH_REPORT.externalId + } + } + ) + + assertThat(result).ignoringRepeatedFieldOrder().isEqualTo(expected) + } + + @Test + fun `listReports with page size replacing the one in previous page token`() = runBlocking { + whenever(internalReportsMock.streamReports(any())) + .thenReturn(flowOf(INTERNAL_WATCH_DURATION_REPORTS.pendingReport)) + + val newPageSize = 10 + val previousPageSize = 1 + val request = listReportsRequest { + parent = MEASUREMENT_CONSUMER_KEYS.first().toName() + pageSize = newPageSize + pageToken = + listReportsPageToken { + pageSize = previousPageSize + cmmsMeasurementConsumerId = MEASUREMENT_CONSUMER_KEYS.first().measurementConsumerId + lastReport = + ListReportsPageTokenKt.previousPageEnd { + cmmsMeasurementConsumerId = MEASUREMENT_CONSUMER_KEYS.first().measurementConsumerId + externalReportId = PENDING_REACH_REPORT.externalId + } + } + .toByteString() + .base64UrlEncode() + } + + val result = + withMeasurementConsumerPrincipal(MEASUREMENT_CONSUMER_KEYS.first().toName(), CONFIG) { + runBlocking { service.listReports(request) } + } + + val expected = listReportsResponse { reports += PENDING_WATCH_DURATION_REPORT } + + verifyProtoArgument(internalReportsMock, ReportsCoroutineImplBase::streamReports) + .isEqualTo( + streamReportsRequest { + limit = newPageSize + 1 + this.filter = + StreamReportsRequestKt.filter { + cmmsMeasurementConsumerId = MEASUREMENT_CONSUMER_KEYS.first().measurementConsumerId + externalReportIdAfter = PENDING_REACH_REPORT.externalId + } + } + ) + + assertThat(result).ignoringRepeatedFieldOrder().isEqualTo(expected) + } + + @Test + fun `listReports returns reports with SUCCEEDED states when metrics are SUCCEEDED`() = + runBlocking { + whenever(metricsMock.batchGetMetrics(any())).thenAnswer { + val request = it.arguments[0] as BatchGetMetricsRequest + val metricsMap = + mapOf( + SUCCEEDED_REACH_METRIC.name to SUCCEEDED_REACH_METRIC, + SUCCEEDED_WATCH_DURATION_METRIC.name to SUCCEEDED_WATCH_DURATION_METRIC + ) + batchGetMetricsResponse { + metrics += request.namesList.map { metricName -> metricsMap.getValue(metricName) } + } + } + + val request = listReportsRequest { parent = MEASUREMENT_CONSUMER_KEYS.first().toName() } + + val result = + withMeasurementConsumerPrincipal(MEASUREMENT_CONSUMER_KEYS.first().toName(), CONFIG) { + runBlocking { service.listReports(request) } + } + + val expected = listReportsResponse { + reports += SUCCEEDED_REACH_REPORT + reports += + PENDING_WATCH_DURATION_REPORT.copy { + state = Report.State.SUCCEEDED + metricCalculationResults += + ReportKt.metricCalculationResult { + displayName = DISPLAY_NAME + reportingSet = SUCCEEDED_WATCH_DURATION_METRIC.reportingSet + cumulative = false + resultAttributes += + ReportKt.MetricCalculationResultKt.resultAttribute { + metricSpec = SUCCEEDED_WATCH_DURATION_METRIC.metricSpec + timeInterval = SUCCEEDED_WATCH_DURATION_METRIC.timeInterval + metricResult = SUCCEEDED_WATCH_DURATION_METRIC.result + } + } + } + } + + verifyProtoArgument(internalReportsMock, ReportsCoroutineImplBase::streamReports) + .isEqualTo( + streamReportsRequest { + limit = DEFAULT_PAGE_SIZE + 1 + this.filter = + StreamReportsRequestKt.filter { + cmmsMeasurementConsumerId = MEASUREMENT_CONSUMER_KEYS.first().measurementConsumerId + } + } + ) + assertThat(result).ignoringRepeatedFieldOrder().isEqualTo(expected) + } + + @Test + fun `listReports returns reports with FAILED states when metrics are FAILED`() = runBlocking { + val failedReachMetric = RUNNING_REACH_METRIC.copy { state = Metric.State.FAILED } + val failedWatchDurationMetric = + RUNNING_WATCH_DURATION_METRIC.copy { state = Metric.State.FAILED } + + whenever(metricsMock.batchGetMetrics(any())).thenAnswer { + val request = it.arguments[0] as BatchGetMetricsRequest + val metricsMap = + mapOf( + failedReachMetric.name to failedReachMetric, + failedWatchDurationMetric.name to failedWatchDurationMetric + ) + batchGetMetricsResponse { + metrics += request.namesList.map { metricName -> metricsMap.getValue(metricName) } + } + } + + val request = listReportsRequest { parent = MEASUREMENT_CONSUMER_KEYS.first().toName() } + + val result = + withMeasurementConsumerPrincipal(MEASUREMENT_CONSUMER_KEYS.first().toName(), CONFIG) { + runBlocking { service.listReports(request) } + } + + val expected = listReportsResponse { + reports += PENDING_REACH_REPORT.copy { state = Report.State.FAILED } + reports += PENDING_WATCH_DURATION_REPORT.copy { state = Report.State.FAILED } + } + + verifyProtoArgument(internalReportsMock, ReportsCoroutineImplBase::streamReports) + .isEqualTo( + streamReportsRequest { + limit = DEFAULT_PAGE_SIZE + 1 + this.filter = + StreamReportsRequestKt.filter { + cmmsMeasurementConsumerId = MEASUREMENT_CONSUMER_KEYS.first().measurementConsumerId + } + } + ) + assertThat(result).ignoringRepeatedFieldOrder().isEqualTo(expected) + } + + @Test + fun `listReports throws UNAUTHENTICATED when no principal is found`() { + val request = listReportsRequest { parent = MEASUREMENT_CONSUMER_KEYS.first().toName() } + val exception = + assertFailsWith { runBlocking { service.listReports(request) } } + assertThat(exception.status.code).isEqualTo(Status.Code.UNAUTHENTICATED) + } + + @Test + fun `listReports throws PERMISSION_DENIED when MeasurementConsumer caller doesn't match`() { + val request = listReportsRequest { parent = MEASUREMENT_CONSUMER_KEYS.first().toName() } + val exception = + assertFailsWith { + withMeasurementConsumerPrincipal(MEASUREMENT_CONSUMER_KEYS.last().toName(), CONFIG) { + runBlocking { service.listReports(request) } + } + } + assertThat(exception.status.code).isEqualTo(Status.Code.PERMISSION_DENIED) + } + + @Test + fun `listReports throws UNAUTHENTICATED when the caller is not MeasurementConsumer`() { + val request = listReportsRequest { parent = MEASUREMENT_CONSUMER_KEYS.first().toName() } + val exception = + assertFailsWith { + withDataProviderPrincipal(DataProviderKey(ExternalId(550L).apiId.value).toName()) { + runBlocking { service.listReports(request) } + } + } + assertThat(exception.status.code).isEqualTo(Status.Code.UNAUTHENTICATED) + } + + @Test + fun `listReports throws INVALID_ARGUMENT when page size is less than 0`() { + val request = listReportsRequest { + parent = MEASUREMENT_CONSUMER_KEYS.first().toName() + pageSize = -1 + } + val exception = + assertFailsWith { + withMeasurementConsumerPrincipal(MEASUREMENT_CONSUMER_KEYS.first().toName(), CONFIG) { + runBlocking { service.listReports(request) } + } + } + assertThat(exception.status.code).isEqualTo(Status.Code.INVALID_ARGUMENT) + } + + @Test + fun `listReports throws INVALID_ARGUMENT when parent is unspecified`() { + val exception = + assertFailsWith { + withMeasurementConsumerPrincipal(MEASUREMENT_CONSUMER_KEYS.first().toName(), CONFIG) { + runBlocking { service.listReports(ListReportsRequest.getDefaultInstance()) } + } + } + assertThat(exception.status.code).isEqualTo(Status.Code.INVALID_ARGUMENT) + } + + @Test + fun `listReports throws INVALID_ARGUMENT when mc id doesn't match one in page token`() { + val cmmsMeasurementConsumerId = MEASUREMENT_CONSUMER_KEYS.last().measurementConsumerId + val request = listReportsRequest { + parent = MEASUREMENT_CONSUMER_KEYS.first().toName() + pageToken = + listReportsPageToken { + this.cmmsMeasurementConsumerId = cmmsMeasurementConsumerId + lastReport = + ListReportsPageTokenKt.previousPageEnd { + this.cmmsMeasurementConsumerId = cmmsMeasurementConsumerId + externalReportId = 220L + } + } + .toByteString() + .base64UrlEncode() + } + + val exception = + assertFailsWith { + withMeasurementConsumerPrincipal(MEASUREMENT_CONSUMER_KEYS.first().toName(), CONFIG) { + runBlocking { service.listReports(request) } + } + } + assertThat(exception.status.code).isEqualTo(Status.Code.INVALID_ARGUMENT) + } + companion object { private fun buildInitialReportingMetric( externalReportingSetId: Long, @@ -2366,6 +2830,7 @@ class ReportsServiceTest { externalReportingSetId: Long, reportingMetrics: List, groupings: List, + externalMetricIdBase: Long = EXTERNAL_REACH_METRIC_ID_BASE ): InternalReports { // Internal reports of reach val internalRequestingReport = internalReport { @@ -2384,7 +2849,7 @@ class ReportsServiceTest { } val internalInitialReport = internalRequestingReport.copy { - externalReportId = 330L + externalReportId = externalMetricIdBase + 330L createTime = Instant.now().toProtoTime() val reportingCreateMetricRequests = @@ -2408,7 +2873,7 @@ class ReportsServiceTest { reportingMetrics.mapIndexed { requestId, request -> request.copy { this.createMetricRequestId = requestId.toString() - externalMetricId = EXTERNAL_METRIC_ID_BASE + requestId + externalMetricId = externalMetricIdBase + requestId } } @@ -2452,8 +2917,6 @@ class ReportsServiceTest { private val END_INSTANT = START_INSTANT.plus(Duration.ofDays(1)) private val START_TIME = START_INSTANT.toProtoTime() - private val TIME_INTERVAL_INCREMENT = Duration.ofDays(1).toProtoDuration() - private const val INTERVAL_COUNT = 2 private val END_TIME = END_INSTANT.toProtoTime() // Metric Specs @@ -2612,9 +3075,26 @@ class ReportsServiceTest { width = WATCH_DURATION_VID_SAMPLING_WIDTH } } + private val INTERNAL_WATCH_DURATION_METRIC_SPEC: InternalMetricSpec = internalMetricSpec { + watchDuration = + InternalMetricSpecKt.watchDurationParams { + privacyParams = + InternalMetricSpecKt.differentialPrivacyParams { + epsilon = WATCH_DURATION_EPSILON + delta = DIFFERENTIAL_PRIVACY_DELTA + } + maximumWatchDurationPerUser = MAXIMUM_WATCH_DURATION_PER_USER + } + vidSamplingInterval = + InternalMetricSpecKt.vidSamplingInterval { + start = WATCH_DURATION_VID_SAMPLING_START + width = WATCH_DURATION_VID_SAMPLING_WIDTH + } + } // Metrics - private const val EXTERNAL_METRIC_ID_BASE = 220L + private const val EXTERNAL_REACH_METRIC_ID_BASE = 220L + private const val EXTERNAL_WATCH_DURATION_METRIC_ID_BASE = 250L private val REQUESTING_REACH_METRIC = metric { reportingSet = PRIMITIVE_REPORTING_SETS.first().name timeInterval = timeInterval { @@ -2629,7 +3109,7 @@ class ReportsServiceTest { name = MetricKey( MEASUREMENT_CONSUMER_KEYS.first().measurementConsumerId, - ExternalId(EXTERNAL_METRIC_ID_BASE).apiId.value + ExternalId(EXTERNAL_REACH_METRIC_ID_BASE).apiId.value ) .toName() state = Metric.State.RUNNING @@ -2642,10 +3122,33 @@ class ReportsServiceTest { result = metricResult { reach = reachResult { value = 123 } } } + private val RUNNING_WATCH_DURATION_METRIC = metric { + name = + MetricKey( + MEASUREMENT_CONSUMER_KEYS.first().measurementConsumerId, + ExternalId(EXTERNAL_WATCH_DURATION_METRIC_ID_BASE).apiId.value + ) + .toName() + reportingSet = PRIMITIVE_REPORTING_SETS.first().name + timeInterval = timeInterval { + startTime = START_TIME + endTime = END_TIME + } + metricSpec = WATCH_DURATION_METRIC_SPEC + state = Metric.State.RUNNING + createTime = Instant.now().toProtoTime() + } + + private val SUCCEEDED_WATCH_DURATION_METRIC = + RUNNING_WATCH_DURATION_METRIC.copy { + state = Metric.State.SUCCEEDED + result = metricResult { watchDuration = watchDurationResult { value = 123.0 } } + } + // Reports private const val DISPLAY_NAME = "DISPLAY_NAME" // Internal reports - private val INITIAL_REPORTING_CREATE_METRIC_REQUEST = + private val INITIAL_REACH_REPORTING_METRIC = buildInitialReportingMetric( PRIMITIVE_REPORTING_SETS.first().externalId, internalTimeInterval { @@ -2656,7 +3159,31 @@ class ReportsServiceTest { listOf() ) - private val INTERNAL_REPORTS = + private val INTERNAL_REACH_REPORTS = + buildInternalReports( + MEASUREMENT_CONSUMER_KEYS.first().measurementConsumerId, + listOf( + internalTimeInterval { + startTime = START_TIME + endTime = END_TIME + } + ), + PRIMITIVE_REPORTING_SETS.first().externalId, + listOf(INITIAL_REACH_REPORTING_METRIC), + listOf(), + ) + + private val INITIAL_WATCH_DURATION_REPORTING_METRIC = + buildInitialReportingMetric( + PRIMITIVE_REPORTING_SETS.first().externalId, + internalTimeInterval { + startTime = START_TIME + endTime = END_TIME + }, + INTERNAL_WATCH_DURATION_METRIC_SPEC, + listOf() + ) + private val INTERNAL_WATCH_DURATION_REPORTS = buildInternalReports( MEASUREMENT_CONSUMER_KEYS.first().measurementConsumerId, listOf( @@ -2666,13 +3193,14 @@ class ReportsServiceTest { } ), PRIMITIVE_REPORTING_SETS.first().externalId, - listOf(INITIAL_REPORTING_CREATE_METRIC_REQUEST), + listOf(INITIAL_WATCH_DURATION_REPORTING_METRIC), listOf(), + EXTERNAL_WATCH_DURATION_METRIC_ID_BASE ) // Public reports private val PENDING_REACH_REPORT: Report = report { - name = INTERNAL_REPORTS.pendingReport.resourceName + name = INTERNAL_REACH_REPORTS.pendingReport.resourceName reportingMetricEntries += ReportKt.reportingMetricEntry { key = PRIMITIVE_REPORTING_SETS.first().name @@ -2693,7 +3221,7 @@ class ReportsServiceTest { } } state = Report.State.RUNNING - createTime = INTERNAL_REPORTS.pendingReport.createTime + createTime = INTERNAL_REACH_REPORTS.pendingReport.createTime } private val SUCCEEDED_REACH_REPORT = @@ -2712,6 +3240,31 @@ class ReportsServiceTest { } } } + + private val PENDING_WATCH_DURATION_REPORT: Report = report { + name = INTERNAL_WATCH_DURATION_REPORTS.pendingReport.resourceName + reportingMetricEntries += + ReportKt.reportingMetricEntry { + key = PRIMITIVE_REPORTING_SETS.first().name + value = + ReportKt.reportingMetricCalculationSpec { + metricCalculationSpecs += + ReportKt.metricCalculationSpec { + displayName = DISPLAY_NAME + metricSpecs += WATCH_DURATION_METRIC_SPEC + cumulative = false + } + } + } + timeIntervals = timeIntervals { + timeIntervals += timeInterval { + startTime = START_TIME + endTime = END_TIME + } + } + state = Report.State.RUNNING + createTime = INTERNAL_WATCH_DURATION_REPORTS.pendingReport.createTime + } } }