Skip to content

Commit

Permalink
Implement internal Metrics Service Create Methods for Reporting V2. (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
tristanvuong2021 authored May 15, 2023
1 parent 48e7363 commit 5c44ead
Show file tree
Hide file tree
Showing 12 changed files with 2,736 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ kt_jvm_library(
"//src/test/kotlin/org/wfanet/measurement/reporting/deploy/v2/postgres:__pkg__",
],
deps = [
"//src/main/kotlin/org/wfanet/measurement/reporting/deploy/postgres/writers",
"//src/main/kotlin/org/wfanet/measurement/reporting/deploy/v2/postgres/readers",
"//src/main/kotlin/org/wfanet/measurement/reporting/deploy/v2/postgres/writers",
"//src/main/proto/wfa/measurement/internal/reporting/v2:measurement_consumers_service_kt_jvm_grpc_proto",
"//src/main/proto/wfa/measurement/internal/reporting/v2:metrics_service_kt_jvm_grpc_proto",
"//src/main/proto/wfa/measurement/internal/reporting/v2:reporting_sets_service_kt_jvm_grpc_proto",
"@wfa_common_jvm//imports/java/io/grpc:api",
"@wfa_common_jvm//imports/kotlin/kotlinx/coroutines:core",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Copyright 2023 The Cross-Media Measurement Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.wfanet.measurement.reporting.deploy.v2.postgres

import io.grpc.Status
import kotlinx.coroutines.flow.Flow
import org.wfanet.measurement.common.db.r2dbc.DatabaseClient
import org.wfanet.measurement.common.grpc.grpcRequire
import org.wfanet.measurement.common.identity.IdGenerator
import org.wfanet.measurement.internal.reporting.v2.BatchCreateMetricsRequest
import org.wfanet.measurement.internal.reporting.v2.BatchCreateMetricsResponse
import org.wfanet.measurement.internal.reporting.v2.BatchGetMetricsRequest
import org.wfanet.measurement.internal.reporting.v2.BatchGetMetricsResponse
import org.wfanet.measurement.internal.reporting.v2.CreateMetricRequest
import org.wfanet.measurement.internal.reporting.v2.Metric
import org.wfanet.measurement.internal.reporting.v2.MetricSpec
import org.wfanet.measurement.internal.reporting.v2.MetricsGrpcKt.MetricsCoroutineImplBase
import org.wfanet.measurement.internal.reporting.v2.StreamMetricsRequest
import org.wfanet.measurement.internal.reporting.v2.batchCreateMetricsResponse
import org.wfanet.measurement.reporting.deploy.v2.postgres.writers.CreateMetrics
import org.wfanet.measurement.reporting.service.internal.MeasurementConsumerNotFoundException
import org.wfanet.measurement.reporting.service.internal.ReportingSetNotFoundException

private const val MAX_BATCH_CREATE_SIZE = 200

class PostgresMetricsService(
private val idGenerator: IdGenerator,
private val client: DatabaseClient,
) : MetricsCoroutineImplBase() {
override suspend fun createMetric(request: CreateMetricRequest): Metric {
grpcRequire(request.metric.hasTimeInterval()) { "Metric missing time interval." }

grpcRequire(!request.metric.metricSpec.typeCase.equals(MetricSpec.TypeCase.TYPE_NOT_SET)) {
"Metric Spec missing type."
}

grpcRequire(request.metric.metricSpec.hasVidSamplingInterval()) {
"Metric Spec missing vid sampling interval."
}

grpcRequire(request.metric.weightedMeasurementsCount > 0) {
"Metric missing weighted measurements."
}

return try {
CreateMetrics(listOf(request)).execute(client, idGenerator).first()
} catch (e: ReportingSetNotFoundException) {
e.throwStatusRuntimeException(Status.NOT_FOUND) { "Reporting Set not found." }
} catch (e: MeasurementConsumerNotFoundException) {
e.throwStatusRuntimeException(Status.FAILED_PRECONDITION) {
"Measurement Consumer not found."
}
}
}

override suspend fun batchCreateMetrics(
request: BatchCreateMetricsRequest
): BatchCreateMetricsResponse {
grpcRequire(request.requestsList.size <= MAX_BATCH_CREATE_SIZE) { "Too many requests." }

request.requestsList.forEach {
grpcRequire(it.metric.hasTimeInterval()) { "Metric missing time interval." }

grpcRequire(!it.metric.metricSpec.typeCase.equals(MetricSpec.TypeCase.TYPE_NOT_SET)) {
"Metric Spec missing type."
}

grpcRequire(it.metric.metricSpec.hasVidSamplingInterval()) {
"Metric Spec missing vid sampling interval."
}

grpcRequire(it.metric.weightedMeasurementsCount > 0) {
"Metric missing weighted measurements."
}

grpcRequire(it.metric.cmmsMeasurementConsumerId.equals(request.cmmsMeasurementConsumerId)) {
"CmmsMeasurementConsumerId in request doesn't match create metric request"
}
}

return try {
batchCreateMetricsResponse {
metrics += CreateMetrics(request.requestsList).execute(client, idGenerator)
}
} catch (e: ReportingSetNotFoundException) {
e.throwStatusRuntimeException(Status.NOT_FOUND) { "Reporting Set not found." }
} catch (e: MeasurementConsumerNotFoundException) {
e.throwStatusRuntimeException(Status.FAILED_PRECONDITION) {
"Measurement Consumer not found."
}
}
}

override suspend fun batchGetMetrics(request: BatchGetMetricsRequest): BatchGetMetricsResponse {
return super.batchGetMetrics(request)
}

override fun streamMetrics(request: StreamMetricsRequest): Flow<Metric> {
return super.streamMetrics(request)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ kt_jvm_library(
name = "readers",
srcs = glob(["*.kt"]),
deps = [
"//src/main/proto/wfa/measurement/internal/reporting/v2:metrics_service_kt_jvm_grpc_proto",
"//src/main/proto/wfa/measurement/internal/reporting/v2:reporting_sets_service_kt_jvm_grpc_proto",
"@wfa_common_jvm//imports/java/io/r2dbc",
"@wfa_common_jvm//imports/kotlin/kotlinx/coroutines:core",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright 2023 The Cross-Media Measurement Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.wfanet.measurement.reporting.deploy.v2.postgres.readers

import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.emptyFlow
import org.wfanet.measurement.common.db.r2dbc.ReadContext
import org.wfanet.measurement.common.identity.InternalId
import org.wfanet.measurement.internal.reporting.v2.Metric

class MetricReader(private val readContext: ReadContext) {
data class Result(
val measurementConsumerId: InternalId,
val metricId: InternalId,
val createMetricRequestId: String,
val metric: Metric
)

suspend fun readMetricsByRequestId(
measurementConsumerId: InternalId,
createMetricRequestIds: Collection<String>
): Flow<Result> {
if (createMetricRequestIds.isEmpty()) {
return emptyFlow()
}

// TODO(tristanvuong2021): implement read metric
return emptyFlow()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,18 @@ kt_jvm_library(
name = "writers",
srcs = glob(["*.kt"]),
deps = [
"//src/main/kotlin/org/wfanet/measurement/reporting/deploy/postgres/writers",
"//src/main/kotlin/org/wfanet/measurement/reporting/deploy/v2/postgres/readers",
"//src/main/kotlin/org/wfanet/measurement/reporting/service/internal:internal_exception",
"//src/main/proto/wfa/measurement/internal/reporting/v2:measurement_consumer_kt_jvm_proto",
"//src/main/proto/wfa/measurement/internal/reporting/v2:metrics_service_kt_jvm_proto",
"//src/main/proto/wfa/measurement/internal/reporting/v2:reporting_set_kt_jvm_proto",
"@wfa_common_jvm//imports/java/com/google/protobuf",
"@wfa_common_jvm//imports/java/io/r2dbc",
"@wfa_common_jvm//imports/java/org/postgresql:r2dbc",
"@wfa_common_jvm//imports/kotlin/kotlinx/coroutines:core",
"@wfa_common_jvm//src/main/kotlin/org/wfanet/measurement/common",
"@wfa_common_jvm//src/main/kotlin/org/wfanet/measurement/common/db/r2dbc",
"@wfa_common_jvm//src/main/kotlin/org/wfanet/measurement/common/db/r2dbc/postgres",
"@wfa_common_jvm//src/main/kotlin/org/wfanet/measurement/common/identity",
],
)
Loading

0 comments on commit 5c44ead

Please sign in to comment.