Skip to content

Commit

Permalink
Implement Internal Metrics Service Read Methods for Reporting V2 (#1009)
Browse files Browse the repository at this point in the history
  • Loading branch information
tristanvuong2021 authored May 24, 2023
1 parent d4d03b2 commit aa165a6
Show file tree
Hide file tree
Showing 6 changed files with 1,185 additions and 440 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,15 @@
package org.wfanet.measurement.reporting.deploy.v2.postgres

import io.grpc.Status
import java.lang.IllegalStateException
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.emitAll
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.toList
import org.wfanet.measurement.common.db.r2dbc.DatabaseClient
import org.wfanet.measurement.common.db.r2dbc.postgres.SerializableErrors.withSerializableErrorRetries
import org.wfanet.measurement.common.grpc.failGrpc
import org.wfanet.measurement.common.grpc.grpcRequire
import org.wfanet.measurement.common.identity.IdGenerator
import org.wfanet.measurement.internal.reporting.v2.BatchCreateMetricsRequest
Expand All @@ -31,11 +38,13 @@ import org.wfanet.measurement.internal.reporting.v2.MetricSpec
import org.wfanet.measurement.internal.reporting.v2.MetricsGrpcKt.MetricsCoroutineImplBase
import org.wfanet.measurement.internal.reporting.v2.StreamMetricsRequest
import org.wfanet.measurement.internal.reporting.v2.batchCreateMetricsResponse
import org.wfanet.measurement.internal.reporting.v2.batchGetMetricsResponse
import org.wfanet.measurement.reporting.deploy.v2.postgres.readers.MetricReader
import org.wfanet.measurement.reporting.deploy.v2.postgres.writers.CreateMetrics
import org.wfanet.measurement.reporting.service.internal.MeasurementConsumerNotFoundException
import org.wfanet.measurement.reporting.service.internal.ReportingSetNotFoundException

private const val MAX_BATCH_CREATE_SIZE = 200
private const val MAX_BATCH_SIZE = 1000

class PostgresMetricsService(
private val idGenerator: IdGenerator,
Expand Down Expand Up @@ -70,7 +79,7 @@ class PostgresMetricsService(
override suspend fun batchCreateMetrics(
request: BatchCreateMetricsRequest
): BatchCreateMetricsResponse {
grpcRequire(request.requestsList.size <= MAX_BATCH_CREATE_SIZE) { "Too many requests." }
grpcRequire(request.requestsList.size <= MAX_BATCH_SIZE) { "Too many requests." }

request.requestsList.forEach {
grpcRequire(it.metric.hasTimeInterval()) { "Metric missing time interval." }
Expand Down Expand Up @@ -106,10 +115,52 @@ class PostgresMetricsService(
}

override suspend fun batchGetMetrics(request: BatchGetMetricsRequest): BatchGetMetricsResponse {
return super.batchGetMetrics(request)
grpcRequire(request.cmmsMeasurementConsumerId.isNotBlank()) {
"CmmsMeasurementConsumerId is missing."
}

grpcRequire(request.externalMetricIdsList.size <= MAX_BATCH_SIZE) { "Too many requests." }

val readContext = client.readTransaction()
val metrics =
try {
MetricReader(readContext)
.batchGetMetrics(request)
.map { it.metric }
.withSerializableErrorRetries()
.toList()
} catch (e: IllegalStateException) {
failGrpc(Status.NOT_FOUND) { "Metric is not found" }
} finally {
readContext.close()
}

if (metrics.size < request.externalMetricIdsList.size) {
failGrpc(Status.NOT_FOUND) { "Metric is not found" }
}

return batchGetMetricsResponse { this.metrics += metrics }
}

override fun streamMetrics(request: StreamMetricsRequest): Flow<Metric> {
return super.streamMetrics(request)
grpcRequire(request.filter.cmmsMeasurementConsumerId.isNotBlank()) {
"Filter is missing CmmsMeasurementConsumerId"
}

return flow {
val readContext = client.readTransaction()
try {
emitAll(
MetricReader(readContext)
.readMetrics(request)
.map { it.metric }
.withSerializableErrorRetries()
)
} catch (e: IllegalStateException) {
failGrpc(Status.NOT_FOUND) { "Metric is not found" }
} finally {
readContext.close()
}
}
}
}
Loading

0 comments on commit aa165a6

Please sign in to comment.