Skip to content

Commit

Permalink
Add listReport (#1053)
Browse files Browse the repository at this point in the history
  • Loading branch information
riemanli authored Jun 13, 2023
1 parent 615242e commit 01e99ef
Show file tree
Hide file tree
Showing 4 changed files with 751 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1389,16 +1389,13 @@ fun ListMetricsRequest.toListMetricsPageToken(): ListMetricsPageToken {
}
val cmmsMeasurementConsumerId = parentKey.measurementConsumerId

val isValidPageSize =
source.pageSize != 0 && source.pageSize >= MIN_PAGE_SIZE && source.pageSize <= MAX_PAGE_SIZE

return if (pageToken.isNotBlank()) {
ListMetricsPageToken.parseFrom(source.pageToken.base64UrlDecode()).copy {
grpcRequire(this.cmmsMeasurementConsumerId == cmmsMeasurementConsumerId) {
"Arguments must be kept the same when using a page token."
}

if (isValidPageSize) {
if (source.pageSize in MIN_PAGE_SIZE..MAX_PAGE_SIZE) {
pageSize = source.pageSize
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,21 @@ import org.wfanet.measurement.internal.reporting.v2.StreamMetricsRequest
import org.wfanet.measurement.internal.reporting.v2.StreamMetricsRequestKt
import org.wfanet.measurement.internal.reporting.v2.StreamReportingSetsRequest
import org.wfanet.measurement.internal.reporting.v2.StreamReportingSetsRequestKt
import org.wfanet.measurement.internal.reporting.v2.StreamReportsRequest
import org.wfanet.measurement.internal.reporting.v2.StreamReportsRequestKt
import org.wfanet.measurement.internal.reporting.v2.TimeInterval as InternalTimeInterval
import org.wfanet.measurement.internal.reporting.v2.TimeIntervals as InternalTimeIntervals
import org.wfanet.measurement.internal.reporting.v2.metricSpec as internalMetricSpec
import org.wfanet.measurement.internal.reporting.v2.periodicTimeInterval as internalPeriodicTimeInterval
import org.wfanet.measurement.internal.reporting.v2.streamMetricsRequest
import org.wfanet.measurement.internal.reporting.v2.streamReportingSetsRequest
import org.wfanet.measurement.internal.reporting.v2.streamReportsRequest
import org.wfanet.measurement.internal.reporting.v2.timeInterval as internalTimeInterval
import org.wfanet.measurement.internal.reporting.v2.timeIntervals as internalTimeIntervals
import org.wfanet.measurement.reporting.v2alpha.CreateMetricRequest
import org.wfanet.measurement.reporting.v2alpha.ListMetricsPageToken
import org.wfanet.measurement.reporting.v2alpha.ListReportingSetsPageToken
import org.wfanet.measurement.reporting.v2alpha.ListReportsPageToken
import org.wfanet.measurement.reporting.v2alpha.MetricSpec
import org.wfanet.measurement.reporting.v2alpha.MetricSpecKt
import org.wfanet.measurement.reporting.v2alpha.PeriodicTimeInterval
Expand Down Expand Up @@ -658,3 +662,17 @@ fun Map.Entry<Long, InternalReport.ReportingMetricCalculationSpec>.toReportingMe
}
}
}

/** Converts a [ListReportsPageToken] to an internal [StreamReportsRequest]. */
fun ListReportsPageToken.toStreamReportsRequest(): StreamReportsRequest {
val source = this
return streamReportsRequest {
// get one more than the actual page size for deciding whether to set page token
limit = pageSize + 1
filter =
StreamReportsRequestKt.filter {
cmmsMeasurementConsumerId = source.cmmsMeasurementConsumerId
externalReportIdAfter = source.lastReport.externalReportId
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@ package org.wfanet.measurement.reporting.service.api.v2alpha
import io.grpc.Status
import io.grpc.StatusException
import kotlin.math.min
import kotlinx.coroutines.flow.toList
import org.wfanet.measurement.api.v2alpha.MeasurementConsumerKey
import org.wfanet.measurement.api.withAuthenticationKey
import org.wfanet.measurement.common.base64UrlDecode
import org.wfanet.measurement.common.base64UrlEncode
import org.wfanet.measurement.common.grpc.failGrpc
import org.wfanet.measurement.common.grpc.grpcRequire
import org.wfanet.measurement.common.grpc.grpcRequireNotNull
Expand All @@ -32,6 +35,7 @@ import org.wfanet.measurement.internal.reporting.v2.MetricSpec as InternalMetric
import org.wfanet.measurement.internal.reporting.v2.Report as InternalReport
import org.wfanet.measurement.internal.reporting.v2.ReportKt as InternalReportKt
import org.wfanet.measurement.internal.reporting.v2.ReportsGrpcKt.ReportsCoroutineStub
import org.wfanet.measurement.internal.reporting.v2.StreamReportsRequest
import org.wfanet.measurement.internal.reporting.v2.TimeInterval as InternalTimeInterval
import org.wfanet.measurement.internal.reporting.v2.copy
import org.wfanet.measurement.internal.reporting.v2.createReportRequest as internalCreateReportRequest
Expand All @@ -42,15 +46,26 @@ import org.wfanet.measurement.reporting.v2alpha.BatchGetMetricsRequest
import org.wfanet.measurement.reporting.v2alpha.CreateMetricRequest
import org.wfanet.measurement.reporting.v2alpha.CreateReportRequest
import org.wfanet.measurement.reporting.v2alpha.GetReportRequest
import org.wfanet.measurement.reporting.v2alpha.ListReportsPageToken
import org.wfanet.measurement.reporting.v2alpha.ListReportsPageTokenKt
import org.wfanet.measurement.reporting.v2alpha.ListReportsRequest
import org.wfanet.measurement.reporting.v2alpha.ListReportsResponse
import org.wfanet.measurement.reporting.v2alpha.Metric
import org.wfanet.measurement.reporting.v2alpha.MetricsGrpcKt.MetricsCoroutineStub
import org.wfanet.measurement.reporting.v2alpha.Report
import org.wfanet.measurement.reporting.v2alpha.ReportKt
import org.wfanet.measurement.reporting.v2alpha.ReportsGrpcKt.ReportsCoroutineImplBase
import org.wfanet.measurement.reporting.v2alpha.batchCreateMetricsRequest
import org.wfanet.measurement.reporting.v2alpha.batchGetMetricsRequest
import org.wfanet.measurement.reporting.v2alpha.copy
import org.wfanet.measurement.reporting.v2alpha.listReportsPageToken
import org.wfanet.measurement.reporting.v2alpha.listReportsResponse
import org.wfanet.measurement.reporting.v2alpha.report

private const val MIN_PAGE_SIZE = 1
private const val DEFAULT_PAGE_SIZE = 50
private const val MAX_PAGE_SIZE = 1000

private const val MAX_BATCH_SIZE_FOR_BATCH_CREATE_METRICS = 1000
private const val MAX_BATCH_SIZE_FOR_BATCH_GET_METRICS = 100

Expand All @@ -75,6 +90,78 @@ class ReportsService(
val internalTimeRange: InternalTimeRange,
)

override suspend fun listReports(request: ListReportsRequest): ListReportsResponse {
val listReportsPageToken = request.toListReportsPageToken()

val principal: ReportingPrincipal = principalFromCurrentContext
when (principal) {
is MeasurementConsumerPrincipal -> {
if (request.parent != principal.resourceKey.toName()) {
failGrpc(Status.PERMISSION_DENIED) {
"Cannot list Reports belonging to other MeasurementConsumers."
}
}
}
}

val streamInternalReportsRequest: StreamReportsRequest =
listReportsPageToken.toStreamReportsRequest()

val results: List<InternalReport> =
try {
internalReportsStub.streamReports(streamInternalReportsRequest).toList()
} catch (e: StatusException) {
throw Exception("Unable to list reports from the reporting database.", e)
}

if (results.isEmpty()) {
return ListReportsResponse.getDefaultInstance()
}

val nextPageToken: ListReportsPageToken? =
if (results.size > listReportsPageToken.pageSize) {
listReportsPageToken.copy {
lastReport =
ListReportsPageTokenKt.previousPageEnd {
cmmsMeasurementConsumerId = results[results.lastIndex - 1].cmmsMeasurementConsumerId
externalReportId = results[results.lastIndex - 1].externalReportId
}
}
} else {
null
}

val subResults: List<InternalReport> =
results.subList(0, min(results.size, listReportsPageToken.pageSize))

// Get metrics.
val metricNames: List<String> =
subResults.flatMap { internalReport ->
internalReport.externalMetricIds.map { externalMetricId ->
MetricKey(
principal.resourceKey.measurementConsumerId,
externalIdToApiId(externalMetricId)
)
.toName()
}
}

val externalIdToMetricMap: Map<Long, Metric> =
batchGetMetrics(principal.resourceKey.toName(), principal.config.apiKey, metricNames)
.associateBy { apiIdToExternalId(checkNotNull(MetricKey.fromName(it.name)).metricId) }

return listReportsResponse {
reports +=
subResults.map { internalReport ->
convertInternalReportToPublic(internalReport, externalIdToMetricMap)
}

if (nextPageToken != null) {
this.nextPageToken = nextPageToken.toByteString().base64UrlEncode()
}
}
}

override suspend fun getReport(request: GetReportRequest): Report {
val reportKey =
grpcRequireNotNull(ReportKey.fromName(request.name)) {
Expand Down Expand Up @@ -104,24 +191,18 @@ class ReportsService(
throw Exception("Unable to get the report from the reporting database.", e)
}

// Create metrics.
// Get metrics.
val metricNames: List<String> =
internalReport.reportingMetricEntriesMap.flatMap { (_, reportingMetricCalculationSpec) ->
reportingMetricCalculationSpec.metricCalculationSpecsList.flatMap { metricCalculationSpec ->
metricCalculationSpec.reportingMetricsList.map { reportingMetric ->
MetricKey(
principal.resourceKey.measurementConsumerId,
externalIdToApiId(reportingMetric.externalMetricId)
)
.toName()
}
}
internalReport.externalMetricIds.map { externalMetricId ->
MetricKey(principal.resourceKey.measurementConsumerId, externalIdToApiId(externalMetricId))
.toName()
}
val metrics: List<Metric> =
val externalIdToMetricMap: Map<Long, Metric> =
batchGetMetrics(principal.resourceKey.toName(), principal.config.apiKey, metricNames)
.associateBy { apiIdToExternalId(checkNotNull(MetricKey.fromName(it.name)).metricId) }

// Convert the internal report to public and return.
return convertInternalReportToPublic(internalReport, metrics)
return convertInternalReportToPublic(internalReport, externalIdToMetricMap)
}

private suspend fun batchGetMetrics(
Expand Down Expand Up @@ -209,8 +290,9 @@ class ReportsService(
}
}
.map { it.toCreateMetricRequest(principal.resourceKey) }
val metrics: List<Metric> =
val externalIdToMetricMap: Map<Long, Metric> =
batchCreateMetrics(request.parent, principal.config.apiKey, createMetricRequests)
.associateBy { apiIdToExternalId(checkNotNull(MetricKey.fromName(it.name)).metricId) }

// Once all metrics are created, get the updated internal report with the metric IDs filled.
val updatedInternalReport =
Expand All @@ -226,13 +308,13 @@ class ReportsService(
}

// Convert the internal report to public and return.
return convertInternalReportToPublic(updatedInternalReport, metrics)
return convertInternalReportToPublic(updatedInternalReport, externalIdToMetricMap)
}

/** Converts an internal [InternalReport] to a public [Report]. */
private fun convertInternalReportToPublic(
internalReport: InternalReport,
metrics: List<Metric>,
externalIdToMetricMap: Map<Long, Metric>,
): Report {
return report {
name =
Expand All @@ -259,6 +341,11 @@ class ReportsService(
error("The time in the internal report should've been set.")
}

val metrics: List<Metric> =
internalReport.externalMetricIds.map { externalMetricId ->
externalIdToMetricMap.getValue(externalMetricId)
}

state = inferReportState(metrics)
createTime = internalReport.createTime

Expand All @@ -267,7 +354,7 @@ class ReportsService(
buildMetricCalculationResults(
internalReport.cmmsMeasurementConsumerId,
internalReport.reportingMetricEntriesMap,
metrics
externalIdToMetricMap
)
}
}
Expand All @@ -277,11 +364,8 @@ class ReportsService(
private fun buildMetricCalculationResults(
cmmsMeasurementConsumerId: String,
internalReportingMetricEntries: InternalReportingMetricEntries,
metrics: List<Metric>,
externalIdToMetricMap: Map<Long, Metric>,
): List<Report.MetricCalculationResult> {
val externalIdToMetricMap: Map<Long, Metric> =
metrics.associateBy { apiIdToExternalId(checkNotNull(MetricKey.fromName(it.name)).metricId) }

return internalReportingMetricEntries.flatMap {
(externalReportingSetId, reportingMetricCalculationSpec) ->
val reportingSetName =
Expand Down Expand Up @@ -607,3 +691,48 @@ private fun inferReportState(metrics: Collection<Metric>): Report.State {
Report.State.RUNNING
}
}

/** Gets all external metric IDs used in the [InternalReport]. */
private val InternalReport.externalMetricIds: List<Long>
get() =
reportingMetricEntriesMap.flatMap { (_, reportingMetricCalculationSpec) ->
reportingMetricCalculationSpec.metricCalculationSpecsList.flatMap { metricCalculationSpec ->
metricCalculationSpec.reportingMetricsList.map { reportingMetric ->
reportingMetric.externalMetricId
}
}
}

/** Converts a public [ListReportsRequest] to a [ListReportsPageToken]. */
private fun ListReportsRequest.toListReportsPageToken(): ListReportsPageToken {
grpcRequire(pageSize >= 0) { "Page size cannot be less than 0" }

val source = this
val parentKey: MeasurementConsumerKey =
grpcRequireNotNull(MeasurementConsumerKey.fromName(parent)) {
"Parent is either unspecified or invalid."
}
val cmmsMeasurementConsumerId = parentKey.measurementConsumerId

return if (pageToken.isNotBlank()) {
ListReportsPageToken.parseFrom(pageToken.base64UrlDecode()).copy {
grpcRequire(this.cmmsMeasurementConsumerId == cmmsMeasurementConsumerId) {
"Arguments must be kept the same when using a page token"
}

if (source.pageSize in MIN_PAGE_SIZE..MAX_PAGE_SIZE) {
pageSize = source.pageSize
}
}
} else {
listReportsPageToken {
pageSize =
when {
source.pageSize < MIN_PAGE_SIZE -> DEFAULT_PAGE_SIZE
source.pageSize > MAX_PAGE_SIZE -> MAX_PAGE_SIZE
else -> source.pageSize
}
this.cmmsMeasurementConsumerId = cmmsMeasurementConsumerId
}
}
}
Loading

0 comments on commit 01e99ef

Please sign in to comment.