diff --git a/src/main/kotlin/org/wfanet/measurement/reporting/service/api/v2alpha/MetricsService.kt b/src/main/kotlin/org/wfanet/measurement/reporting/service/api/v2alpha/MetricsService.kt index ead8544d2e9..687f8d3a891 100644 --- a/src/main/kotlin/org/wfanet/measurement/reporting/service/api/v2alpha/MetricsService.kt +++ b/src/main/kotlin/org/wfanet/measurement/reporting/service/api/v2alpha/MetricsService.kt @@ -872,17 +872,24 @@ class MetricsService( } val internalMetric: InternalMetric = - getInternalMetric(metricKey.cmmsMeasurementConsumerId, apiIdToExternalId(metricKey.metricId)) + batchGetInternalMetrics( + metricKey.cmmsMeasurementConsumerId, + listOf(apiIdToExternalId(metricKey.metricId)) + ) + .first() // Early exit when the metric is at a terminal state. if (internalMetric.state != InternalMetric.State.RUNNING) { return internalMetric.toMetric() } + // Only syncs pending measurements which can only be in metrics that are still running. val toBeSyncedInternalMeasurements: List = - internalMetric.weightedMeasurementsList.map { weightedMeasurement -> - weightedMeasurement.measurement - } + internalMetric.weightedMeasurementsList + .map { weightedMeasurement -> weightedMeasurement.measurement } + .filter { internalMeasurement -> + internalMeasurement.state == InternalMeasurement.State.PENDING + } measurementSupplier.syncInternalMeasurements( toBeSyncedInternalMeasurements, @@ -890,10 +897,11 @@ class MetricsService( principal, ) - return getInternalMetric( + return batchGetInternalMetrics( metricKey.cmmsMeasurementConsumerId, - apiIdToExternalId(metricKey.metricId) + listOf(apiIdToExternalId(metricKey.metricId)) ) + .first() .toMetric() } @@ -931,11 +939,15 @@ class MetricsService( val internalMetrics: List = batchGetInternalMetrics(principal.resourceKey.measurementConsumerId, externalMetricIds) + // Only syncs pending measurements which can only be in metrics that are still running. val toBeSyncedInternalMeasurements: List = internalMetrics .filter { internalMetric -> internalMetric.state == InternalMetric.State.RUNNING } .flatMap { internalMetric -> internalMetric.weightedMeasurementsList } .map { weightedMeasurement -> weightedMeasurement.measurement } + .filter { internalMeasurement -> + internalMeasurement.state == InternalMeasurement.State.PENDING + } measurementSupplier.syncInternalMeasurements( toBeSyncedInternalMeasurements, @@ -949,23 +961,6 @@ class MetricsService( .map { it.toMetric() } } } - - private suspend fun getInternalMetric( - cmmsMeasurementConsumerId: String, - externalMetricId: Long, - ): org.wfanet.measurement.internal.reporting.v2alpha.Metric { - return try { - internalMetricsStub.getMetric( - internalGetMetricRequest { - this.cmmsMeasurementConsumerId = cmmsMeasurementConsumerId - this.externalMetricId = externalMetricId - } - ) - } catch (e: StatusException) { - throw Exception("Unable to get the metric from the reporting database.", e) - } - } - override suspend fun listMetrics(request: ListMetricsRequest): ListMetricsResponse { val listMetricsPageToken: ListMetricsPageToken = request.toListMetricsPageToken()