Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Internal Metrics Service Read Methods for Reporting V2 #1009

Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,15 @@
package org.wfanet.measurement.reporting.deploy.v2.postgres

import io.grpc.Status
import java.lang.IllegalStateException
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.emitAll
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.toList
import org.wfanet.measurement.common.db.r2dbc.DatabaseClient
import org.wfanet.measurement.common.db.r2dbc.postgres.SerializableErrors.withSerializableErrorRetries
import org.wfanet.measurement.common.grpc.failGrpc
import org.wfanet.measurement.common.grpc.grpcRequire
import org.wfanet.measurement.common.identity.IdGenerator
import org.wfanet.measurement.internal.reporting.v2.BatchCreateMetricsRequest
Expand All @@ -31,11 +38,13 @@ import org.wfanet.measurement.internal.reporting.v2.MetricSpec
import org.wfanet.measurement.internal.reporting.v2.MetricsGrpcKt.MetricsCoroutineImplBase
import org.wfanet.measurement.internal.reporting.v2.StreamMetricsRequest
import org.wfanet.measurement.internal.reporting.v2.batchCreateMetricsResponse
import org.wfanet.measurement.internal.reporting.v2.batchGetMetricsResponse
import org.wfanet.measurement.reporting.deploy.v2.postgres.readers.MetricReader
import org.wfanet.measurement.reporting.deploy.v2.postgres.writers.CreateMetrics
import org.wfanet.measurement.reporting.service.internal.MeasurementConsumerNotFoundException
import org.wfanet.measurement.reporting.service.internal.ReportingSetNotFoundException

private const val MAX_BATCH_CREATE_SIZE = 200
private const val MAX_BATCH_SIZE = 1000

class PostgresMetricsService(
private val idGenerator: IdGenerator,
Expand Down Expand Up @@ -70,7 +79,7 @@ class PostgresMetricsService(
override suspend fun batchCreateMetrics(
request: BatchCreateMetricsRequest
): BatchCreateMetricsResponse {
grpcRequire(request.requestsList.size <= MAX_BATCH_CREATE_SIZE) { "Too many requests." }
grpcRequire(request.requestsList.size <= MAX_BATCH_SIZE) { "Too many requests." }

request.requestsList.forEach {
grpcRequire(it.metric.hasTimeInterval()) { "Metric missing time interval." }
Expand Down Expand Up @@ -106,10 +115,52 @@ class PostgresMetricsService(
}

override suspend fun batchGetMetrics(request: BatchGetMetricsRequest): BatchGetMetricsResponse {
return super.batchGetMetrics(request)
grpcRequire(request.cmmsMeasurementConsumerId.isNotBlank()) {
"CmmsMeasurementConsumerId is missing."
}

grpcRequire(request.externalMetricIdsList.size <= MAX_BATCH_SIZE) { "Too many requests." }

val readContext = client.readTransaction()
val metrics =
try {
MetricReader(readContext)
.batchGetMetrics(request)
.map { it.metric }
.withSerializableErrorRetries()
.toList()
} catch (e: IllegalStateException) {
failGrpc(Status.DATA_LOSS) { "Metric is missing required fields" }
} finally {
readContext.close()
}

if (metrics.size < request.externalMetricIdsList.size) {
failGrpc(Status.NOT_FOUND) { "Metric is not found" }
}

return batchGetMetricsResponse { this.metrics += metrics }
}

override fun streamMetrics(request: StreamMetricsRequest): Flow<Metric> {
return super.streamMetrics(request)
grpcRequire(request.filter.cmmsMeasurementConsumerId.isNotBlank()) {
"Filter is missing CmmsMeasurementConsumerId"
}

return flow {
val readContext = client.readTransaction()
try {
emitAll(
MetricReader(readContext)
.readMetrics(request)
.map { it.metric }
.withSerializableErrorRetries()
)
} catch (e: IllegalStateException) {
failGrpc(Status.DATA_LOSS) { "Metric is missing required fields" }
} finally {
readContext.close()
}
}
}
}
Loading