Skip to content

Commit

Permalink
Refactor the process of updating metric IDs and metric specs.
Browse files Browse the repository at this point in the history
  • Loading branch information
riemanli committed May 26, 2023
1 parent f9316b7 commit d14df7a
Show file tree
Hide file tree
Showing 4 changed files with 452 additions and 341 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ import org.wfanet.measurement.internal.reporting.v2.timeIntervals as internalTim
import org.wfanet.measurement.reporting.v2alpha.CreateMetricRequest
import org.wfanet.measurement.reporting.v2alpha.ListMetricsPageToken
import org.wfanet.measurement.reporting.v2alpha.ListReportingSetsPageToken
import org.wfanet.measurement.reporting.v2alpha.Metric
import org.wfanet.measurement.reporting.v2alpha.MetricSpec
import org.wfanet.measurement.reporting.v2alpha.MetricSpecKt
import org.wfanet.measurement.reporting.v2alpha.PeriodicTimeInterval
Expand Down Expand Up @@ -443,18 +442,13 @@ fun ListReportingSetsPageToken.toStreamReportingSetsRequest(): StreamReportingSe
}

/** Converts an [InternalReport.MetricCalculationSpec] to a [Report.MetricCalculationSpec]. */
fun InternalReport.MetricCalculationSpec.toMetricCalculationSpec(
requestIdToMetricMap: Map<String, Metric>,
): Report.MetricCalculationSpec {
fun InternalReport.MetricCalculationSpec.toMetricCalculationSpec(): Report.MetricCalculationSpec {
val source = this
val metricSpecs: List<MetricSpec> =
source.createMetricRequestsList
.map { request -> requestIdToMetricMap.getValue(request.requestId).metricSpec }
.distinct()

return ReportKt.metricCalculationSpec {
displayName = source.details.displayName
this.metricSpecs += metricSpecs
this.metricSpecs +=
source.createMetricRequestsList.map { it.metricSpec.toMetricSpec() }.distinct()
groupings +=
source.details.groupingsList.map { grouping ->
ReportKt.grouping { predicates += grouping.predicatesList }
Expand Down Expand Up @@ -530,10 +524,30 @@ fun InternalReport.CreateMetricRequest.toCreateMetricRequest(
externalIdToApiId(source.details.externalReportingSetId)
)
.toName()
this.timeInterval = source.details.timeInterval.toTimeInterval()
this.metricSpec = source.details.metricSpec.toMetricSpec()
this.filters += source.details.filtersList
timeInterval = source.details.timeInterval.toTimeInterval()
metricSpec = source.metricSpec.toMetricSpec()
filters += source.details.filtersList
}
requestId = source.requestId
}
}

/**
* Converts an internal ReportingMetricEntry Map.Entry<Long,
* [InternalReport.ReportingMetricCalculationSpec]> to an [Report.ReportingMetricEntry].
*/
fun Map.Entry<Long, InternalReport.ReportingMetricCalculationSpec>.toReportingMetricEntry(
cmmsMeasurementConsumerId: String
): Report.ReportingMetricEntry {
val source = this

return ReportKt.reportingMetricEntry {
key = ReportingSetKey(cmmsMeasurementConsumerId, externalIdToApiId(source.key)).toName()

value =
ReportKt.reportingMetricCalculationSpec {
metricCalculationSpecs +=
source.value.metricCalculationSpecsList.map { it.toMetricCalculationSpec() }
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.wfanet.measurement.internal.reporting.v2.ReportsGrpcKt.ReportsCorouti
import org.wfanet.measurement.internal.reporting.v2.TimeInterval as InternalTimeInterval
import org.wfanet.measurement.internal.reporting.v2.copy
import org.wfanet.measurement.internal.reporting.v2.createReportRequest as internalCreateReportRequest
import org.wfanet.measurement.internal.reporting.v2.getReportRequest as internalGetReportRequest
import org.wfanet.measurement.internal.reporting.v2.metricSpec as internalMetricSpec
import org.wfanet.measurement.internal.reporting.v2.report as internalReport
import org.wfanet.measurement.reporting.v2alpha.BatchCreateMetricsRequest
Expand Down Expand Up @@ -124,18 +125,28 @@ class ReportsService(
val metrics: List<Metric> =
batchCreateMetrics(request.parent, principal.config.apiKey, createMetricRequests)

// Once all metrics are created, get the updated internal report with the metric IDs and the
// updated metric specs filled.
val updatedInternalReport =
try {
internalReportsStub.getReport(
internalGetReportRequest {
cmmsMeasurementConsumerId = internalReport.cmmsMeasurementConsumerId
externalReportId = internalReport.externalReportId
}
)
} catch (e: StatusException) {
throw Exception("Unable to create a report in the reporting database.", e)
}

// Convert the internal report to public and return.
val requestIdToMetricMap: Map<String, Metric> =
createMetricRequests
.zip(metrics) { createMetricRequest, metric -> createMetricRequest.requestId to metric }
.toMap()
return convertInternalReportToPublic(internalReport, requestIdToMetricMap)
return convertInternalReportToPublic(updatedInternalReport, metrics)
}

/** Converts an internal [InternalReport] to a public [Report]. */
private fun convertInternalReportToPublic(
internalReport: InternalReport,
requestIdToMetricMap: Map<String, Metric>,
metrics: List<Metric>,
): Report {
return report {
name =
Expand All @@ -146,24 +157,10 @@ class ReportsService(
.toName()

reportingMetricEntries +=
internalReport.reportingMetricEntriesMap.map {
(externalReportingSetId, internalReportingMetricCalculationSpec) ->
ReportKt.reportingMetricEntry {
key =
ReportingSetKey(
internalReport.cmmsMeasurementConsumerId,
externalIdToApiId(externalReportingSetId)
)
.toName()

value =
ReportKt.reportingMetricCalculationSpec {
metricCalculationSpecs +=
internalReportingMetricCalculationSpec.metricCalculationSpecsList.map {
it.toMetricCalculationSpec(requestIdToMetricMap)
}
}
}
internalReport.reportingMetricEntriesMap.map { internalReportingMetricEntry ->
internalReportingMetricEntry.toReportingMetricEntry(
internalReport.cmmsMeasurementConsumerId
)
}

@Suppress("WHEN_ENUM_CAN_BE_NULL_IN_JAVA") // Proto enum fields are never null.
Expand All @@ -176,15 +173,15 @@ class ReportsService(
error("The time in the internal report should've been set.")
}

state = inferReportState(requestIdToMetricMap.values)
state = inferReportState(metrics)
createTime = internalReport.createTime

if (state == Report.State.SUCCEEDED) {
this.metricCalculationResults +=
buildMetricCalculationResults(
internalReport.cmmsMeasurementConsumerId,
internalReport.reportingMetricEntriesMap,
requestIdToMetricMap
metrics
)
}
}
Expand All @@ -194,8 +191,11 @@ class ReportsService(
private fun buildMetricCalculationResults(
cmmsMeasurementConsumerId: String,
internalReportingMetricEntries: InternalReportingMetricEntries,
requestIdToMetricMap: Map<String, Metric>,
metrics: List<Metric>,
): List<Report.MetricCalculationResult> {
val externalIdToMetricMap: Map<Long, Metric> =
metrics.associateBy { apiIdToExternalId(checkNotNull(MetricKey.fromName(it.name)).metricId) }

return internalReportingMetricEntries.flatMap {
(externalReportingSetId, reportingMetricCalculationSpec) ->
val reportingSetName =
Expand All @@ -210,7 +210,7 @@ class ReportsService(
resultAttributes +=
metricCalculationSpec.createMetricRequestsList.map { createMetricRequest ->
val metric =
requestIdToMetricMap[createMetricRequest.requestId]
externalIdToMetricMap[createMetricRequest.externalMetricId]
?: error("Got a metric not associated with the report.")
ReportKt.MetricCalculationResultKt.resultAttribute {
groupingPredicates += metric.filtersList
Expand Down Expand Up @@ -428,7 +428,7 @@ class ReportsService(
it.predicatesList
}
val allGroupingPredicates = groupings.flatten()
grpcRequire(allGroupingPredicates.size == allGroupingPredicates.toSet().size) {
grpcRequire(allGroupingPredicates.size == allGroupingPredicates.distinct().size) {
"Cannot have duplicate predicates in different groupings."
}
val groupingsCartesianProduct: List<List<String>> = cartesianProduct(groupings)
Expand All @@ -441,12 +441,12 @@ class ReportsService(
metricCalculationSpec.metricSpecsList.flatMap { metricSpec ->
groupingsCartesianProduct.map { predicateGroup ->
InternalReportKt.createMetricRequest {
this.metricSpec = metricSpec.toInternal()
details =
InternalReportKt.CreateMetricRequestKt.details {
this.externalReportingSetId = externalReportingSetId
this.timeInterval = timeInterval
this.metricSpec = metricSpec.toInternal()
this.filters += predicateGroup
filters += predicateGroup
}
}
}
Expand All @@ -456,7 +456,6 @@ class ReportsService(
details =
InternalReportKt.MetricCalculationSpecKt.details {
displayName = metricCalculationSpec.displayName
metricSpecs += metricCalculationSpec.metricSpecsList.map { it.toInternal() }
this.groupings +=
metricCalculationSpec.groupingsList.map { grouping ->
InternalReportKt.MetricCalculationSpecKt.grouping {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,17 @@ message Report {
// Will be filled by the internal reports service. Use the request ID above
// to find the metric.
fixed64 external_metric_id = 2;
// Will be filled by the internal reports service. Use the request ID or the
// external metric ID above to find the metric spec in the metric.
MetricSpec metric_spec = 3;

message Details {
fixed64 external_reporting_set_id = 1;
TimeInterval time_interval = 2;
// The original metric spec provided by the user. It won't be updated with
// default values if some values are not set.
MetricSpec metric_spec = 3;
repeated string filters = 4;
repeated string filters = 3;
}

Details details = 3;
Details details = 4;
}

message MetricCalculationSpec {
Expand All @@ -63,7 +63,6 @@ message Report {

message Details {
string display_name = 1;
repeated MetricSpec metric_specs = 2;
repeated Grouping groupings = 3;
bool cumulative = 4;
}
Expand Down
Loading

0 comments on commit d14df7a

Please sign in to comment.