Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add listReport #1053

Merged
merged 1 commit into from
Jun 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1389,16 +1389,13 @@ fun ListMetricsRequest.toListMetricsPageToken(): ListMetricsPageToken {
}
val cmmsMeasurementConsumerId = parentKey.measurementConsumerId

val isValidPageSize =
source.pageSize != 0 && source.pageSize >= MIN_PAGE_SIZE && source.pageSize <= MAX_PAGE_SIZE

return if (pageToken.isNotBlank()) {
ListMetricsPageToken.parseFrom(source.pageToken.base64UrlDecode()).copy {
grpcRequire(this.cmmsMeasurementConsumerId == cmmsMeasurementConsumerId) {
"Arguments must be kept the same when using a page token."
}

if (isValidPageSize) {
if (source.pageSize in MIN_PAGE_SIZE..MAX_PAGE_SIZE) {
pageSize = source.pageSize
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,21 @@ import org.wfanet.measurement.internal.reporting.v2.StreamMetricsRequest
import org.wfanet.measurement.internal.reporting.v2.StreamMetricsRequestKt
import org.wfanet.measurement.internal.reporting.v2.StreamReportingSetsRequest
import org.wfanet.measurement.internal.reporting.v2.StreamReportingSetsRequestKt
import org.wfanet.measurement.internal.reporting.v2.StreamReportsRequest
import org.wfanet.measurement.internal.reporting.v2.StreamReportsRequestKt
import org.wfanet.measurement.internal.reporting.v2.TimeInterval as InternalTimeInterval
import org.wfanet.measurement.internal.reporting.v2.TimeIntervals as InternalTimeIntervals
import org.wfanet.measurement.internal.reporting.v2.metricSpec as internalMetricSpec
import org.wfanet.measurement.internal.reporting.v2.periodicTimeInterval as internalPeriodicTimeInterval
import org.wfanet.measurement.internal.reporting.v2.streamMetricsRequest
import org.wfanet.measurement.internal.reporting.v2.streamReportingSetsRequest
import org.wfanet.measurement.internal.reporting.v2.streamReportsRequest
import org.wfanet.measurement.internal.reporting.v2.timeInterval as internalTimeInterval
import org.wfanet.measurement.internal.reporting.v2.timeIntervals as internalTimeIntervals
import org.wfanet.measurement.reporting.v2alpha.CreateMetricRequest
import org.wfanet.measurement.reporting.v2alpha.ListMetricsPageToken
import org.wfanet.measurement.reporting.v2alpha.ListReportingSetsPageToken
import org.wfanet.measurement.reporting.v2alpha.ListReportsPageToken
import org.wfanet.measurement.reporting.v2alpha.MetricSpec
import org.wfanet.measurement.reporting.v2alpha.MetricSpecKt
import org.wfanet.measurement.reporting.v2alpha.PeriodicTimeInterval
Expand Down Expand Up @@ -658,3 +662,17 @@ fun Map.Entry<Long, InternalReport.ReportingMetricCalculationSpec>.toReportingMe
}
}
}

/** Converts a [ListReportsPageToken] to an internal [StreamReportsRequest]. */
fun ListReportsPageToken.toStreamReportsRequest(): StreamReportsRequest {
val source = this
return streamReportsRequest {
// get one more than the actual page size for deciding whether to set page token
limit = pageSize + 1
filter =
StreamReportsRequestKt.filter {
cmmsMeasurementConsumerId = source.cmmsMeasurementConsumerId
externalReportIdAfter = source.lastReport.externalReportId
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@ package org.wfanet.measurement.reporting.service.api.v2alpha
import io.grpc.Status
import io.grpc.StatusException
import kotlin.math.min
import kotlinx.coroutines.flow.toList
import org.wfanet.measurement.api.v2alpha.MeasurementConsumerKey
import org.wfanet.measurement.api.withAuthenticationKey
import org.wfanet.measurement.common.base64UrlDecode
import org.wfanet.measurement.common.base64UrlEncode
import org.wfanet.measurement.common.grpc.failGrpc
import org.wfanet.measurement.common.grpc.grpcRequire
import org.wfanet.measurement.common.grpc.grpcRequireNotNull
Expand All @@ -32,6 +35,7 @@ import org.wfanet.measurement.internal.reporting.v2.MetricSpec as InternalMetric
import org.wfanet.measurement.internal.reporting.v2.Report as InternalReport
import org.wfanet.measurement.internal.reporting.v2.ReportKt as InternalReportKt
import org.wfanet.measurement.internal.reporting.v2.ReportsGrpcKt.ReportsCoroutineStub
import org.wfanet.measurement.internal.reporting.v2.StreamReportsRequest
import org.wfanet.measurement.internal.reporting.v2.TimeInterval as InternalTimeInterval
import org.wfanet.measurement.internal.reporting.v2.copy
import org.wfanet.measurement.internal.reporting.v2.createReportRequest as internalCreateReportRequest
Expand All @@ -42,15 +46,26 @@ import org.wfanet.measurement.reporting.v2alpha.BatchGetMetricsRequest
import org.wfanet.measurement.reporting.v2alpha.CreateMetricRequest
import org.wfanet.measurement.reporting.v2alpha.CreateReportRequest
import org.wfanet.measurement.reporting.v2alpha.GetReportRequest
import org.wfanet.measurement.reporting.v2alpha.ListReportsPageToken
import org.wfanet.measurement.reporting.v2alpha.ListReportsPageTokenKt
import org.wfanet.measurement.reporting.v2alpha.ListReportsRequest
import org.wfanet.measurement.reporting.v2alpha.ListReportsResponse
import org.wfanet.measurement.reporting.v2alpha.Metric
import org.wfanet.measurement.reporting.v2alpha.MetricsGrpcKt.MetricsCoroutineStub
import org.wfanet.measurement.reporting.v2alpha.Report
import org.wfanet.measurement.reporting.v2alpha.ReportKt
import org.wfanet.measurement.reporting.v2alpha.ReportsGrpcKt.ReportsCoroutineImplBase
import org.wfanet.measurement.reporting.v2alpha.batchCreateMetricsRequest
import org.wfanet.measurement.reporting.v2alpha.batchGetMetricsRequest
import org.wfanet.measurement.reporting.v2alpha.copy
import org.wfanet.measurement.reporting.v2alpha.listReportsPageToken
import org.wfanet.measurement.reporting.v2alpha.listReportsResponse
import org.wfanet.measurement.reporting.v2alpha.report

private const val MIN_PAGE_SIZE = 1
private const val DEFAULT_PAGE_SIZE = 50
private const val MAX_PAGE_SIZE = 1000

private const val MAX_BATCH_SIZE_FOR_BATCH_CREATE_METRICS = 1000
private const val MAX_BATCH_SIZE_FOR_BATCH_GET_METRICS = 100

Expand All @@ -75,6 +90,78 @@ class ReportsService(
val internalTimeRange: InternalTimeRange,
)

override suspend fun listReports(request: ListReportsRequest): ListReportsResponse {
val listReportsPageToken = request.toListReportsPageToken()

val principal: ReportingPrincipal = principalFromCurrentContext
when (principal) {
is MeasurementConsumerPrincipal -> {
if (request.parent != principal.resourceKey.toName()) {
failGrpc(Status.PERMISSION_DENIED) {
"Cannot list Reports belonging to other MeasurementConsumers."
}
}
}
}

val streamInternalReportsRequest: StreamReportsRequest =
listReportsPageToken.toStreamReportsRequest()

val results: List<InternalReport> =
try {
internalReportsStub.streamReports(streamInternalReportsRequest).toList()
} catch (e: StatusException) {
throw Exception("Unable to list reports from the reporting database.", e)
}

if (results.isEmpty()) {
return ListReportsResponse.getDefaultInstance()
}

val nextPageToken: ListReportsPageToken? =
if (results.size > listReportsPageToken.pageSize) {
listReportsPageToken.copy {
lastReport =
ListReportsPageTokenKt.previousPageEnd {
cmmsMeasurementConsumerId = results[results.lastIndex - 1].cmmsMeasurementConsumerId
externalReportId = results[results.lastIndex - 1].externalReportId
}
}
} else {
null
}

val subResults: List<InternalReport> =
results.subList(0, min(results.size, listReportsPageToken.pageSize))

// Get metrics.
val metricNames: List<String> =
subResults.flatMap { internalReport ->
internalReport.externalMetricIds.map { externalMetricId ->
MetricKey(
principal.resourceKey.measurementConsumerId,
externalIdToApiId(externalMetricId)
)
.toName()
}
}

val externalIdToMetricMap: Map<Long, Metric> =
batchGetMetrics(principal.resourceKey.toName(), principal.config.apiKey, metricNames)
.associateBy { apiIdToExternalId(checkNotNull(MetricKey.fromName(it.name)).metricId) }

return listReportsResponse {
reports +=
subResults.map { internalReport ->
convertInternalReportToPublic(internalReport, externalIdToMetricMap)
}

if (nextPageToken != null) {
this.nextPageToken = nextPageToken.toByteString().base64UrlEncode()
}
}
}

override suspend fun getReport(request: GetReportRequest): Report {
val reportKey =
grpcRequireNotNull(ReportKey.fromName(request.name)) {
Expand Down Expand Up @@ -104,24 +191,18 @@ class ReportsService(
throw Exception("Unable to get the report from the reporting database.", e)
}

// Create metrics.
// Get metrics.
val metricNames: List<String> =
internalReport.reportingMetricEntriesMap.flatMap { (_, reportingMetricCalculationSpec) ->
reportingMetricCalculationSpec.metricCalculationSpecsList.flatMap { metricCalculationSpec ->
metricCalculationSpec.reportingMetricsList.map { reportingMetric ->
MetricKey(
principal.resourceKey.measurementConsumerId,
externalIdToApiId(reportingMetric.externalMetricId)
)
.toName()
}
}
internalReport.externalMetricIds.map { externalMetricId ->
MetricKey(principal.resourceKey.measurementConsumerId, externalIdToApiId(externalMetricId))
.toName()
}
val metrics: List<Metric> =
val externalIdToMetricMap: Map<Long, Metric> =
batchGetMetrics(principal.resourceKey.toName(), principal.config.apiKey, metricNames)
.associateBy { apiIdToExternalId(checkNotNull(MetricKey.fromName(it.name)).metricId) }

// Convert the internal report to public and return.
return convertInternalReportToPublic(internalReport, metrics)
return convertInternalReportToPublic(internalReport, externalIdToMetricMap)
}

private suspend fun batchGetMetrics(
Expand Down Expand Up @@ -209,8 +290,9 @@ class ReportsService(
}
}
.map { it.toCreateMetricRequest(principal.resourceKey) }
val metrics: List<Metric> =
val externalIdToMetricMap: Map<Long, Metric> =
batchCreateMetrics(request.parent, principal.config.apiKey, createMetricRequests)
.associateBy { apiIdToExternalId(checkNotNull(MetricKey.fromName(it.name)).metricId) }

// Once all metrics are created, get the updated internal report with the metric IDs filled.
val updatedInternalReport =
Expand All @@ -226,13 +308,13 @@ class ReportsService(
}

// Convert the internal report to public and return.
return convertInternalReportToPublic(updatedInternalReport, metrics)
return convertInternalReportToPublic(updatedInternalReport, externalIdToMetricMap)
}

/** Converts an internal [InternalReport] to a public [Report]. */
private fun convertInternalReportToPublic(
internalReport: InternalReport,
metrics: List<Metric>,
externalIdToMetricMap: Map<Long, Metric>,
): Report {
return report {
name =
Expand All @@ -259,6 +341,11 @@ class ReportsService(
error("The time in the internal report should've been set.")
}

val metrics: List<Metric> =
internalReport.externalMetricIds.map { externalMetricId ->
externalIdToMetricMap.getValue(externalMetricId)
}

state = inferReportState(metrics)
createTime = internalReport.createTime

Expand All @@ -267,7 +354,7 @@ class ReportsService(
buildMetricCalculationResults(
internalReport.cmmsMeasurementConsumerId,
internalReport.reportingMetricEntriesMap,
metrics
externalIdToMetricMap
)
}
}
Expand All @@ -277,11 +364,8 @@ class ReportsService(
private fun buildMetricCalculationResults(
cmmsMeasurementConsumerId: String,
internalReportingMetricEntries: InternalReportingMetricEntries,
metrics: List<Metric>,
externalIdToMetricMap: Map<Long, Metric>,
): List<Report.MetricCalculationResult> {
val externalIdToMetricMap: Map<Long, Metric> =
metrics.associateBy { apiIdToExternalId(checkNotNull(MetricKey.fromName(it.name)).metricId) }

return internalReportingMetricEntries.flatMap {
(externalReportingSetId, reportingMetricCalculationSpec) ->
val reportingSetName =
Expand Down Expand Up @@ -607,3 +691,48 @@ private fun inferReportState(metrics: Collection<Metric>): Report.State {
Report.State.RUNNING
}
}

/** Gets all external metric IDs used in the [InternalReport]. */
private val InternalReport.externalMetricIds: List<Long>
get() =
reportingMetricEntriesMap.flatMap { (_, reportingMetricCalculationSpec) ->
reportingMetricCalculationSpec.metricCalculationSpecsList.flatMap { metricCalculationSpec ->
metricCalculationSpec.reportingMetricsList.map { reportingMetric ->
reportingMetric.externalMetricId
}
}
}

/** Converts a public [ListReportsRequest] to a [ListReportsPageToken]. */
private fun ListReportsRequest.toListReportsPageToken(): ListReportsPageToken {
grpcRequire(pageSize >= 0) { "Page size cannot be less than 0" }

val source = this
val parentKey: MeasurementConsumerKey =
grpcRequireNotNull(MeasurementConsumerKey.fromName(parent)) {
"Parent is either unspecified or invalid."
}
val cmmsMeasurementConsumerId = parentKey.measurementConsumerId

return if (pageToken.isNotBlank()) {
ListReportsPageToken.parseFrom(pageToken.base64UrlDecode()).copy {
grpcRequire(this.cmmsMeasurementConsumerId == cmmsMeasurementConsumerId) {
"Arguments must be kept the same when using a page token"
}

if (source.pageSize in MIN_PAGE_SIZE..MAX_PAGE_SIZE) {
pageSize = source.pageSize
}
}
} else {
listReportsPageToken {
pageSize =
when {
source.pageSize < MIN_PAGE_SIZE -> DEFAULT_PAGE_SIZE
source.pageSize > MAX_PAGE_SIZE -> MAX_PAGE_SIZE
else -> source.pageSize
}
this.cmmsMeasurementConsumerId = cmmsMeasurementConsumerId
}
}
}
Loading