Skip to content

Commit

Permalink
Fix MetricsService after rebasing.
Browse files Browse the repository at this point in the history
  • Loading branch information
riemanli committed Apr 20, 2023
1 parent 396b986 commit b97e0a0
Showing 1 changed file with 18 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -872,28 +872,36 @@ class MetricsService(
}

val internalMetric: InternalMetric =
getInternalMetric(metricKey.cmmsMeasurementConsumerId, apiIdToExternalId(metricKey.metricId))
batchGetInternalMetrics(
metricKey.cmmsMeasurementConsumerId,
listOf(apiIdToExternalId(metricKey.metricId))
)
.first()

// Early exit when the metric is at a terminal state.
if (internalMetric.state != InternalMetric.State.RUNNING) {
return internalMetric.toMetric()
}

// Only syncs pending measurements which can only be in metrics that are still running.
val toBeSyncedInternalMeasurements: List<InternalMeasurement> =
internalMetric.weightedMeasurementsList.map { weightedMeasurement ->
weightedMeasurement.measurement
}
internalMetric.weightedMeasurementsList
.map { weightedMeasurement -> weightedMeasurement.measurement }
.filter { internalMeasurement ->
internalMeasurement.state == InternalMeasurement.State.PENDING
}

measurementSupplier.syncInternalMeasurements(
toBeSyncedInternalMeasurements,
principal.config.apiKey,
principal,
)

return getInternalMetric(
return batchGetInternalMetrics(
metricKey.cmmsMeasurementConsumerId,
apiIdToExternalId(metricKey.metricId)
listOf(apiIdToExternalId(metricKey.metricId))
)
.first()
.toMetric()
}

Expand Down Expand Up @@ -931,11 +939,15 @@ class MetricsService(
val internalMetrics: List<InternalMetric> =
batchGetInternalMetrics(principal.resourceKey.measurementConsumerId, externalMetricIds)

// Only syncs pending measurements which can only be in metrics that are still running.
val toBeSyncedInternalMeasurements: List<InternalMeasurement> =
internalMetrics
.filter { internalMetric -> internalMetric.state == InternalMetric.State.RUNNING }
.flatMap { internalMetric -> internalMetric.weightedMeasurementsList }
.map { weightedMeasurement -> weightedMeasurement.measurement }
.filter { internalMeasurement ->
internalMeasurement.state == InternalMeasurement.State.PENDING
}

measurementSupplier.syncInternalMeasurements(
toBeSyncedInternalMeasurements,
Expand All @@ -949,23 +961,6 @@ class MetricsService(
.map { it.toMetric() }
}
}

private suspend fun getInternalMetric(
cmmsMeasurementConsumerId: String,
externalMetricId: Long,
): org.wfanet.measurement.internal.reporting.v2alpha.Metric {
return try {
internalMetricsStub.getMetric(
internalGetMetricRequest {
this.cmmsMeasurementConsumerId = cmmsMeasurementConsumerId
this.externalMetricId = externalMetricId
}
)
} catch (e: StatusException) {
throw Exception("Unable to get the metric from the reporting database.", e)
}
}

override suspend fun listMetrics(request: ListMetricsRequest): ListMetricsResponse {
val listMetricsPageToken: ListMetricsPageToken = request.toListMetricsPageToken()

Expand Down

0 comments on commit b97e0a0

Please sign in to comment.