diff --git a/MODULE.bazel b/MODULE.bazel index 30203c6c081..c54b0c5c375 100644 --- a/MODULE.bazel +++ b/MODULE.bazel @@ -211,6 +211,7 @@ maven.install( "org.slf4j:slf4j-simple:1.7.32", # Google Cloud + "com.google.cloud:google-cloud-bigquerystorage", "com.google.cloud:google-cloud-security-private-ca", # TODO(googleapis/java-cloud-bom#5279): Remove when managed by BOM. "com.google.apis:google-api-services-storage:v1-rev20240706-2.0.0", diff --git a/imports/java/com/google/cloud/bigquery/storage/BUILD.bazel b/imports/java/com/google/cloud/bigquery/storage/BUILD.bazel new file mode 100644 index 00000000000..7e2be351329 --- /dev/null +++ b/imports/java/com/google/cloud/bigquery/storage/BUILD.bazel @@ -0,0 +1,6 @@ +package(default_visibility = ["//visibility:public"]) + +alias( + name = "storage", + actual = "@maven//:com_google_cloud_google_cloud_bigquerystorage", +) diff --git a/maven_install.json b/maven_install.json index 40d08dcda53..908ada88c8f 100755 --- a/maven_install.json +++ b/maven_install.json @@ -1,7 +1,7 @@ { "__AUTOGENERATED_FILE_DO_NOT_MODIFY_THIS_FILE_MANUALLY": "THERE_IS_NO_DATA_ONLY_ZUUL", - "__INPUT_ARTIFACTS_HASH": 1120915116, - "__RESOLVED_ARTIFACTS_HASH": -2081716671, + "__INPUT_ARTIFACTS_HASH": 606715834, + "__RESOLVED_ARTIFACTS_HASH": -1010776065, "artifacts": { "args4j:args4j": { "shasums": { @@ -3426,6 +3426,44 @@ "com.google.http-client:google-http-client", "com.google.http-client:google-http-client-gson" ], + "com.google.api.grpc:grpc-google-cloud-bigquerystorage-v1": [ + "com.google.api.grpc:proto-google-common-protos", + "com.google.api:api-common", + "com.google.auto.value:auto-value-annotations", + "com.google.code.findbugs:jsr305", + "com.google.errorprone:error_prone_annotations", + "com.google.guava:failureaccess", + "com.google.guava:listenablefuture", + "com.google.j2objc:j2objc-annotations", + "io.grpc:grpc-protobuf-lite", + "javax.annotation:javax.annotation-api", + "org.checkerframework:checker-qual" + ], + "com.google.api.grpc:grpc-google-cloud-bigquerystorage-v1beta1": [ + "com.google.api.grpc:proto-google-common-protos", + "com.google.api:api-common", + "com.google.auto.value:auto-value-annotations", + "com.google.code.findbugs:jsr305", + "com.google.errorprone:error_prone_annotations", + "com.google.guava:failureaccess", + "com.google.guava:listenablefuture", + "com.google.j2objc:j2objc-annotations", + "io.grpc:grpc-protobuf-lite", + "org.checkerframework:checker-qual" + ], + "com.google.api.grpc:grpc-google-cloud-bigquerystorage-v1beta2": [ + "com.google.api.grpc:proto-google-common-protos", + "com.google.api:api-common", + "com.google.auto.value:auto-value-annotations", + "com.google.code.findbugs:jsr305", + "com.google.errorprone:error_prone_annotations", + "com.google.guava:failureaccess", + "com.google.guava:listenablefuture", + "com.google.j2objc:j2objc-annotations", + "io.grpc:grpc-protobuf-lite", + "javax.annotation:javax.annotation-api", + "org.checkerframework:checker-qual" + ], "com.google.api.grpc:grpc-google-cloud-bigtable-admin-v2": [ "com.google.api.grpc:proto-google-cloud-bigtable-admin-v2", "com.google.api.grpc:proto-google-common-protos", @@ -3548,6 +3586,36 @@ "javax.annotation:javax.annotation-api", "org.checkerframework:checker-qual" ], + "com.google.api.grpc:proto-google-cloud-bigquerystorage-v1alpha": [ + "com.google.auto.value:auto-value-annotations", + "com.google.code.findbugs:jsr305", + "com.google.errorprone:error_prone_annotations", + "com.google.guava:failureaccess", + "com.google.guava:listenablefuture", + "com.google.j2objc:j2objc-annotations", + "javax.annotation:javax.annotation-api", + "org.checkerframework:checker-qual" + ], + "com.google.api.grpc:proto-google-cloud-bigquerystorage-v1beta1": [ + "com.google.auto.value:auto-value-annotations", + "com.google.code.findbugs:jsr305", + "com.google.errorprone:error_prone_annotations", + "com.google.guava:failureaccess", + "com.google.guava:listenablefuture", + "com.google.j2objc:j2objc-annotations", + "javax.annotation:javax.annotation-api", + "org.checkerframework:checker-qual" + ], + "com.google.api.grpc:proto-google-cloud-bigquerystorage-v1beta2": [ + "com.google.auto.value:auto-value-annotations", + "com.google.code.findbugs:jsr305", + "com.google.errorprone:error_prone_annotations", + "com.google.guava:failureaccess", + "com.google.guava:listenablefuture", + "com.google.j2objc:j2objc-annotations", + "javax.annotation:javax.annotation-api", + "org.checkerframework:checker-qual" + ], "com.google.api.grpc:proto-google-cloud-bigtable-admin-v2": [ "com.google.api.grpc:proto-google-common-protos", "com.google.api.grpc:proto-google-iam-v1", diff --git a/src/main/docker/images.bzl b/src/main/docker/images.bzl index dff05b2b7b0..d434f13569e 100644 --- a/src/main/docker/images.bzl +++ b/src/main/docker/images.bzl @@ -151,6 +151,11 @@ GKE_IMAGES = [ image = "//src/main/kotlin/org/wfanet/measurement/duchy/deploy/gcloud/postgres/tools:update_schema_image", repository = _PREFIX + "/duchy/gcloud-postgres-update-schema", ), + struct( + name = "kingdom_operational_metrics_export_image", + image = "//src/main/kotlin/org/wfanet/measurement/kingdom/deploy/gcloud/job:operational_metrics_export_job_image", + repository = _PREFIX + "/kingdom/bigquery-operational-metrics", + ), ] # List of specs for all Docker containers to push to a container registry. diff --git a/src/main/k8s/dev/kingdom_gke.cue b/src/main/k8s/dev/kingdom_gke.cue index 8c210103303..404d81f1d13 100644 --- a/src/main/k8s/dev/kingdom_gke.cue +++ b/src/main/k8s/dev/kingdom_gke.cue @@ -21,6 +21,9 @@ _systemApiAddressName: string @tag("system_api_address_name") // Name of K8s service account for the internal API server. #InternalServerServiceAccount: "internal-server" +// Name of K8s service account for the operational metrics job. +#OperationalMetricsServiceAccount: "operational-metrics" + // Number of gRPC threads for the internal API server. #InternalServerGrpcThreads: 7 @@ -40,6 +43,16 @@ _systemApiAddressName: string @tag("system_api_address_name") } } +#OperationalMetricsJobResourceRequirements: ResourceRequirements=#ResourceRequirements & { + requests: { + cpu: "10m" + memory: "256Mi" + } + limits: { + memory: ResourceRequirements.requests.memory + } +} + objectSets: [defaultNetworkPolicies] + [ for objectSet in kingdom {objectSet}] kingdom: #Kingdom & { @@ -48,10 +61,30 @@ kingdom: #Kingdom & { _verboseGrpcServerLogging: true + _imageSuffixes: [string]: string + _imageSuffixes: { + "operational-metrics": string | *"kingdom/bigquery-operational-metrics" + } + _imageConfigs: [string]: #ImageConfig + _imageConfigs: { + for name, suffix in _imageSuffixes { + "\(name)": {repoSuffix: suffix} + } + } + _images: [string]: string + _images: { + for name, config in _imageConfigs { + "\(name)": config.image + } + } + serviceAccounts: { "\(#InternalServerServiceAccount)": #WorkloadIdentityServiceAccount & { _iamServiceAccountName: "kingdom-internal" } + "\(#OperationalMetricsServiceAccount)": #WorkloadIdentityServiceAccount & { + _iamServiceAccountName: "operational-metrics" + } } configMaps: "java": #JavaConfigMap @@ -73,6 +106,45 @@ kingdom: #Kingdom & { } } + cronJobs: { + "operational-metrics": { + _container: { + _javaOptions: maxHeapSize: "48M" + image: _images["operational-metrics"] + resources: #OperationalMetricsJobResourceRequirements + args: [ + "--bigquery-project=\(#GCloudProject)", + "--bigquery-dataset=operational_metrics", + "--measurements-table=measurements", + "--requisitions-table=requisitions", + "--latest-measurement-read-table=latest_measurement_read", + "--tls-cert-file=/var/run/secrets/files/kingdom_tls.pem", + "--tls-key-file=/var/run/secrets/files/kingdom_tls.key", + "--cert-collection-file=/var/run/secrets/files/kingdom_root.pem", + "--internal-api-target=" + (#Target & {name: "gcp-kingdom-data-server"}).target, + "--internal-api-cert-host=localhost", + ] + } + spec: { + concurrencyPolicy: "Forbid" + schedule: "30 * * * *" // Hourly, 30 minutes past the hour + jobTemplate: spec: template: spec: #ServiceAccountPodSpec & { + serviceAccountName: #OperationalMetricsServiceAccount + } + } + } + } + + networkPolicies: { + "operational-metrics": { + _app_label: "operational-metrics-app" + _egresses: { + // Need to send external traffic to BigQuery. + any: {} + } + } + } + services: { "system-api-server": _ipAddressName: _systemApiAddressName "v2alpha-public-api-server": _ipAddressName: _publicApiAddressName diff --git a/src/main/k8s/kingdom.cue b/src/main/k8s/kingdom.cue index f6088f5ca26..f70861d74e2 100644 --- a/src/main/k8s/kingdom.cue +++ b/src/main/k8s/kingdom.cue @@ -258,6 +258,7 @@ import ("strings") "completed-measurements-deletion-app", "pending-measurements-cancellation-app", "exchanges-deletion-app", + "operational-metrics-app", ] _egresses: { // Need to send external traffic to Spanner. diff --git a/src/main/kotlin/org/wfanet/measurement/kingdom/deploy/common/server/BUILD.bazel b/src/main/kotlin/org/wfanet/measurement/kingdom/deploy/common/server/BUILD.bazel index 2a8a25c90f6..7cde5ae53c2 100644 --- a/src/main/kotlin/org/wfanet/measurement/kingdom/deploy/common/server/BUILD.bazel +++ b/src/main/kotlin/org/wfanet/measurement/kingdom/deploy/common/server/BUILD.bazel @@ -23,7 +23,10 @@ kt_jvm_library( kt_jvm_library( name = "utils", srcs = ["Utils.kt"], - visibility = ["//src/main/kotlin/org/wfanet/measurement/kingdom/deploy/common/job:__subpackages__"], + visibility = [ + "//src/main/kotlin/org/wfanet/measurement/kingdom/deploy/common/job:__subpackages__", + "//src/main/kotlin/org/wfanet/measurement/kingdom/deploy/gcloud/job:__subpackages__", + ], deps = [ "//src/main/kotlin/org/wfanet/measurement/common/identity", "//src/main/kotlin/org/wfanet/measurement/kingdom/deploy/common:flags", diff --git a/src/main/kotlin/org/wfanet/measurement/kingdom/deploy/gcloud/job/BUILD.bazel b/src/main/kotlin/org/wfanet/measurement/kingdom/deploy/gcloud/job/BUILD.bazel new file mode 100644 index 00000000000..742dc170a6d --- /dev/null +++ b/src/main/kotlin/org/wfanet/measurement/kingdom/deploy/gcloud/job/BUILD.bazel @@ -0,0 +1,63 @@ +load("@rules_java//java:defs.bzl", "java_binary") +load("@wfa_rules_kotlin_jvm//kotlin:defs.bzl", "kt_jvm_library") +load("//src/main/docker:macros.bzl", "java_image") + +kt_jvm_library( + name = "operational_metrics_export_job", + srcs = ["OperationalMetricsExportJob.kt"], + runtime_deps = ["@wfa_common_jvm//imports/java/io/grpc/netty"], + deps = [ + ":operational_metrics_export", + "//imports/java/com/google/cloud/bigquery/storage", + "//src/main/kotlin/org/wfanet/measurement/kingdom/deploy/common/server:utils", + "//src/main/proto/wfa/measurement/internal/kingdom:measurements_service_kt_jvm_grpc_proto", + "@wfa_common_jvm//imports/java/com/google/cloud/bigquery", + "@wfa_common_jvm//imports/java/picocli", + "@wfa_common_jvm//imports/kotlin/kotlinx/coroutines:core", + "@wfa_common_jvm//src/main/kotlin/org/wfanet/measurement/common", + "@wfa_common_jvm//src/main/kotlin/org/wfanet/measurement/common/crypto:signing_certs", + "@wfa_common_jvm//src/main/kotlin/org/wfanet/measurement/common/grpc", + ], +) + +java_binary( + name = "OperationalMetricsExportJob", + main_class = "org.wfanet.measurement.kingdom.deploy.gcloud.job.OperationalMetricsExportJobKt", + runtime_deps = [":operational_metrics_export_job"], +) + +java_image( + name = "operational_metrics_export_job_image", + binary = ":OperationalMetricsExportJob", + main_class = "org.wfanet.measurement.kingdom.deploy.gcloud.job.OperationalMetricsExportJobKt", + visibility = ["//src:docker_image_deployment"], +) + +kt_jvm_library( + name = "operational_metrics_export", + srcs = ["OperationalMetricsExport.kt"], + visibility = ["//src/test/kotlin/org/wfanet/measurement/kingdom/deploy/gcloud/job:__pkg__"], + deps = [ + ":stream_writer_factory", + "//imports/java/com/google/cloud/bigquery/storage", + "//src/main/kotlin/org/wfanet/measurement/api:public_api_version", + "//src/main/kotlin/org/wfanet/measurement/api/v2alpha:packed_messages", + "//src/main/proto/wfa/measurement/api/v2alpha:measurement_spec_kt_jvm_proto", + "//src/main/proto/wfa/measurement/internal/kingdom:measurements_service_kt_jvm_grpc_proto", + "//src/main/proto/wfa/measurement/internal/kingdom/bigquerytables:operational_metrics_dataset_kt_jvm_proto", + "@wfa_common_jvm//imports/java/com/google/cloud/bigquery", + "@wfa_common_jvm//imports/kotlin/kotlinx/coroutines:core", + "@wfa_common_jvm//src/main/kotlin/org/wfanet/measurement/common", + "@wfa_common_jvm//src/main/kotlin/org/wfanet/measurement/common/identity", + "@wfa_common_jvm//src/main/kotlin/org/wfanet/measurement/gcloud/common", + ], +) + +kt_jvm_library( + name = "stream_writer_factory", + srcs = ["StreamWriterFactory.kt"], + visibility = ["//src/test/kotlin/org/wfanet/measurement/kingdom/deploy/gcloud/job:__pkg__"], + deps = [ + "//imports/java/com/google/cloud/bigquery/storage", + ], +) diff --git a/src/main/kotlin/org/wfanet/measurement/kingdom/deploy/gcloud/job/OperationalMetricsExport.kt b/src/main/kotlin/org/wfanet/measurement/kingdom/deploy/gcloud/job/OperationalMetricsExport.kt new file mode 100644 index 00000000000..4327df9702e --- /dev/null +++ b/src/main/kotlin/org/wfanet/measurement/kingdom/deploy/gcloud/job/OperationalMetricsExport.kt @@ -0,0 +1,408 @@ +/* + * Copyright 2024 The Cross-Media Measurement Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.wfanet.measurement.kingdom.deploy.gcloud.job + +import com.google.cloud.bigquery.BigQuery +import com.google.cloud.bigquery.FieldValueList +import com.google.cloud.bigquery.QueryJobConfiguration +import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient +import com.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializationError +import com.google.cloud.bigquery.storage.v1.ProtoRows +import com.google.cloud.bigquery.storage.v1.ProtoSchema +import com.google.cloud.bigquery.storage.v1.ProtoSchemaConverter +import com.google.cloud.bigquery.storage.v1.StreamWriter +import com.google.protobuf.Timestamp +import com.google.protobuf.any +import com.google.protobuf.util.Durations +import com.google.protobuf.util.Timestamps +import com.google.rpc.Code +import java.util.logging.Logger +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.launch +import org.jetbrains.annotations.Blocking +import org.wfanet.measurement.api.Version +import org.wfanet.measurement.api.v2alpha.MeasurementSpec +import org.wfanet.measurement.api.v2alpha.setMessage +import org.wfanet.measurement.api.v2alpha.signedMessage +import org.wfanet.measurement.api.v2alpha.unpack +import org.wfanet.measurement.common.ProtoReflection +import org.wfanet.measurement.common.identity.apiIdToExternalId +import org.wfanet.measurement.common.identity.externalIdToApiId +import org.wfanet.measurement.internal.kingdom.Measurement +import org.wfanet.measurement.internal.kingdom.MeasurementsGrpcKt +import org.wfanet.measurement.internal.kingdom.Requisition +import org.wfanet.measurement.internal.kingdom.StreamMeasurementsRequestKt +import org.wfanet.measurement.internal.kingdom.bigquerytables.LatestMeasurementReadTableRow +import org.wfanet.measurement.internal.kingdom.bigquerytables.MeasurementType +import org.wfanet.measurement.internal.kingdom.bigquerytables.MeasurementsTableRow +import org.wfanet.measurement.internal.kingdom.bigquerytables.RequisitionsTableRow +import org.wfanet.measurement.internal.kingdom.bigquerytables.latestMeasurementReadTableRow +import org.wfanet.measurement.internal.kingdom.bigquerytables.measurementsTableRow +import org.wfanet.measurement.internal.kingdom.bigquerytables.requisitionsTableRow +import org.wfanet.measurement.internal.kingdom.copy +import org.wfanet.measurement.internal.kingdom.measurementKey +import org.wfanet.measurement.internal.kingdom.streamMeasurementsRequest + +class OperationalMetricsExport( + private val measurementsClient: MeasurementsGrpcKt.MeasurementsCoroutineStub, + private val bigQuery: BigQuery, + private val bigQueryWriteClient: BigQueryWriteClient, + private val projectId: String, + private val datasetId: String, + private val latestMeasurementReadTableId: String, + private val measurementsTableId: String, + private val requisitionsTableId: String, + private val streamWriterFactory: StreamWriterFactory = StreamWriterFactoryImpl(), +) { + suspend fun execute() { + var measurementsQueryResponseSize: Int + + val query = + """ + SELECT update_time, external_measurement_consumer_id, external_measurement_id + FROM `$datasetId.$latestMeasurementReadTableId` + ORDER BY update_time DESC, external_measurement_consumer_id DESC, external_measurement_id DESC + LIMIT 1 + """ + .trimIndent() + + val queryJobConfiguration: QueryJobConfiguration = + QueryJobConfiguration.newBuilder(query).build() + + val results = bigQuery.query(queryJobConfiguration).iterateAll() + logger.info("Retrieved latest measurement read info from BigQuery") + + val latestMeasurementReadFromPreviousJob: FieldValueList? = results.firstOrNull() + + var streamMeasurementsRequest = streamMeasurementsRequest { + measurementView = Measurement.View.FULL + limit = BATCH_SIZE + filter = + StreamMeasurementsRequestKt.filter { + states += Measurement.State.SUCCEEDED + states += Measurement.State.FAILED + if (latestMeasurementReadFromPreviousJob != null) { + after = + StreamMeasurementsRequestKt.FilterKt.after { + updateTime = + Timestamps.fromNanos( + latestMeasurementReadFromPreviousJob.get("update_time").longValue + ) + measurement = measurementKey { + externalMeasurementConsumerId = + latestMeasurementReadFromPreviousJob + .get("external_measurement_consumer_id") + .longValue + externalMeasurementId = + latestMeasurementReadFromPreviousJob.get("external_measurement_id").longValue + } + } + } + } + } + + DataWriter( + projectId = projectId, + datasetId = datasetId, + tableId = measurementsTableId, + client = bigQueryWriteClient, + protoSchema = ProtoSchemaConverter.convert(MeasurementsTableRow.getDescriptor()), + streamWriterFactory = streamWriterFactory, + ) + .use { measurementsDataWriter -> + DataWriter( + projectId = projectId, + datasetId = datasetId, + tableId = requisitionsTableId, + client = bigQueryWriteClient, + protoSchema = ProtoSchemaConverter.convert(RequisitionsTableRow.getDescriptor()), + streamWriterFactory = streamWriterFactory, + ) + .use { requisitionsDataWriter -> + DataWriter( + projectId = projectId, + datasetId = datasetId, + tableId = latestMeasurementReadTableId, + client = bigQueryWriteClient, + protoSchema = + ProtoSchema.newBuilder() + .setProtoDescriptor(LatestMeasurementReadTableRow.getDescriptor().toProto()) + .build(), + streamWriterFactory = streamWriterFactory, + ) + .use { latestMeasurementReadDataWriter -> + do { + measurementsQueryResponseSize = 0 + + val measurementsProtoRowsBuilder: ProtoRows.Builder = ProtoRows.newBuilder() + val requisitionsProtoRowsBuilder: ProtoRows.Builder = ProtoRows.newBuilder() + ProtoRows.newBuilder() + var latestUpdateTime: Timestamp = Timestamp.getDefaultInstance() + + measurementsClient.streamMeasurements(streamMeasurementsRequest).collect { + measurement -> + measurementsQueryResponseSize++ + latestUpdateTime = measurement.updateTime + + val measurementSpec = signedMessage { + setMessage( + any { + value = measurement.details.measurementSpec + typeUrl = + when (measurement.details.apiVersion) { + Version.V2_ALPHA.toString() -> + ProtoReflection.getTypeUrl(MeasurementSpec.getDescriptor()) + else -> ProtoReflection.getTypeUrl(MeasurementSpec.getDescriptor()) + } + } + ) + signature = measurement.details.measurementSpecSignature + signatureAlgorithmOid = + measurement.details.measurementSpecSignatureAlgorithmOid + } + val measurementTypeCase = + measurementSpec.unpack().measurementTypeCase + val measurementType = + @Suppress( + "WHEN_ENUM_CAN_BE_NULL_IN_JAVA" + ) // Proto enum fields are never null. + when (measurementTypeCase) { + MeasurementSpec.MeasurementTypeCase.REACH_AND_FREQUENCY -> + MeasurementType.REACH_AND_FREQUENCY + MeasurementSpec.MeasurementTypeCase.IMPRESSION -> MeasurementType.IMPRESSION + MeasurementSpec.MeasurementTypeCase.DURATION -> MeasurementType.DURATION + MeasurementSpec.MeasurementTypeCase.REACH -> MeasurementType.REACH + MeasurementSpec.MeasurementTypeCase.POPULATION -> MeasurementType.POPULATION + MeasurementSpec.MeasurementTypeCase.MEASUREMENTTYPE_NOT_SET -> + MeasurementType.MEASUREMENT_TYPE_UNSPECIFIED + } + + val measurementConsumerId = + externalIdToApiId(measurement.externalMeasurementConsumerId) + val measurementId = externalIdToApiId(measurement.externalMeasurementId) + + val measurementCompletionDurationSeconds = + Durations.toSeconds( + Timestamps.between(measurement.createTime, measurement.updateTime) + ) + + val measurementState = + @Suppress( + "WHEN_ENUM_CAN_BE_NULL_IN_JAVA" + ) // Proto enum fields are never null. + when (measurement.state) { + // StreamMeasurements filter only returns SUCCEEDED and FAILED + // Measurements. + Measurement.State.PENDING_REQUISITION_PARAMS, + Measurement.State.PENDING_REQUISITION_FULFILLMENT, + Measurement.State.PENDING_PARTICIPANT_CONFIRMATION, + Measurement.State.PENDING_COMPUTATION, + Measurement.State.STATE_UNSPECIFIED, + Measurement.State.CANCELLED, + Measurement.State.UNRECOGNIZED -> MeasurementsTableRow.State.UNRECOGNIZED + Measurement.State.SUCCEEDED -> MeasurementsTableRow.State.SUCCEEDED + Measurement.State.FAILED -> MeasurementsTableRow.State.FAILED + } + + measurementsProtoRowsBuilder.addSerializedRows( + measurementsTableRow { + this.measurementConsumerId = measurementConsumerId + this.measurementId = measurementId + isDirect = measurement.details.protocolConfig.hasDirect() + this.measurementType = measurementType + state = measurementState + createTime = measurement.createTime + updateTime = measurement.updateTime + completionDurationSeconds = measurementCompletionDurationSeconds + completionDurationSecondsSquared = + measurementCompletionDurationSeconds * + measurementCompletionDurationSeconds + } + .toByteString() + ) + + for (requisition in measurement.requisitionsList) { + val requisitionState = + @Suppress( + "WHEN_ENUM_CAN_BE_NULL_IN_JAVA" + ) // Proto enum fields are never null. + when (requisition.state) { + Requisition.State.STATE_UNSPECIFIED, + Requisition.State.UNRECOGNIZED, + Requisition.State.PENDING_PARAMS, + Requisition.State.WITHDRAWN, + Requisition.State.UNFULFILLED -> continue + Requisition.State.FULFILLED -> RequisitionsTableRow.State.FULFILLED + Requisition.State.REFUSED -> RequisitionsTableRow.State.REFUSED + } + + val requisitionCompletionDurationSeconds = + Durations.toSeconds( + Timestamps.between(measurement.createTime, requisition.updateTime) + ) + + requisitionsProtoRowsBuilder.addSerializedRows( + requisitionsTableRow { + this.measurementConsumerId = measurementConsumerId + this.measurementId = measurementId + requisitionId = externalIdToApiId(requisition.externalRequisitionId) + dataProviderId = externalIdToApiId(requisition.externalDataProviderId) + isDirect = measurement.details.protocolConfig.hasDirect() + this.measurementType = measurementType + state = requisitionState + createTime = measurement.createTime + updateTime = requisition.updateTime + completionDurationSeconds = requisitionCompletionDurationSeconds + completionDurationSecondsSquared = + requisitionCompletionDurationSeconds * + requisitionCompletionDurationSeconds + } + .toByteString() + ) + } + } + + logger.info("Measurements read from the Kingdom Internal Server") + + if (measurementsProtoRowsBuilder.serializedRowsCount > 0) { + coroutineScope { + launch { + measurementsDataWriter.appendRows(measurementsProtoRowsBuilder.build()) + } + if (requisitionsProtoRowsBuilder.serializedRowsCount > 0) { + launch { + requisitionsDataWriter.appendRows(requisitionsProtoRowsBuilder.build()) + } + } + } + } else { + logger.info("No more Measurements to process") + break + } + + logger.info("Metrics written to BigQuery") + + val lastMeasurement = + MeasurementsTableRow.parseFrom( + measurementsProtoRowsBuilder.serializedRowsList.last() + ) + val latestMeasurementReadTableRow = latestMeasurementReadTableRow { + updateTime = Timestamps.toNanos(latestUpdateTime) + externalMeasurementConsumerId = + apiIdToExternalId(lastMeasurement.measurementConsumerId) + externalMeasurementId = apiIdToExternalId(lastMeasurement.measurementId) + } + + latestMeasurementReadDataWriter.appendRows( + ProtoRows.newBuilder() + .addSerializedRows(latestMeasurementReadTableRow.toByteString()) + .build() + ) + + streamMeasurementsRequest = + streamMeasurementsRequest.copy { + filter = + filter.copy { + after = + StreamMeasurementsRequestKt.FilterKt.after { + updateTime = latestUpdateTime + measurement = measurementKey { + externalMeasurementConsumerId = + latestMeasurementReadTableRow.externalMeasurementConsumerId + externalMeasurementId = + latestMeasurementReadTableRow.externalMeasurementId + } + } + } + } + } while (measurementsQueryResponseSize == BATCH_SIZE) + } + } + } + } + + companion object { + private val logger: Logger = Logger.getLogger(this::class.java.name) + private const val BATCH_SIZE = 3000 + } + + private class DataWriter( + private val projectId: String, + private val datasetId: String, + private val tableId: String, + private val client: BigQueryWriteClient, + private val protoSchema: ProtoSchema, + private val streamWriterFactory: StreamWriterFactory, + ) : AutoCloseable { + private var streamWriter: StreamWriter = + streamWriterFactory.create(projectId, datasetId, tableId, client, protoSchema) + private var recreateCount: Int = 0 + + override fun close() { + streamWriter.close() + } + + /** + * Writes data to the stream. + * + * @param protoRows protos representing the rows to write. + * @throws IllegalStateException if append fails and error is not retriable or too many retry + * attempts have been made + */ + @Blocking + fun appendRows(protoRows: ProtoRows) { + logger.info("Begin writing to stream ${streamWriter.streamName}") + for (i in 1..RETRY_COUNT) { + if (streamWriter.isClosed) { + if (!streamWriter.isUserClosed && recreateCount < MAX_RECREATE_COUNT) { + logger.info("Recreating stream writer") + streamWriter = + streamWriterFactory.create(projectId, datasetId, tableId, client, protoSchema) + recreateCount++ + } else { + throw IllegalStateException("Unable to recreate stream writer") + } + } + + try { + val response = streamWriter.append(protoRows).get() + if (response.hasError()) { + logger.warning("Write response error: ${response.error}") + if (response.error.code != Code.INTERNAL.number) { + throw IllegalStateException("Cannot retry failed append.") + } else if (i == RETRY_COUNT) { + throw IllegalStateException("Too many retries.") + } + } else { + logger.info("End writing to stream ${streamWriter.streamName}") + break + } + } catch (e: AppendSerializationError) { + for (value in e.rowIndexToErrorMessage.values) { + logger.warning(value) + } + throw e + } + } + } + + companion object { + private const val MAX_RECREATE_COUNT = 3 + private const val RETRY_COUNT = 3 + } + } +} diff --git a/src/main/kotlin/org/wfanet/measurement/kingdom/deploy/gcloud/job/OperationalMetricsExportJob.kt b/src/main/kotlin/org/wfanet/measurement/kingdom/deploy/gcloud/job/OperationalMetricsExportJob.kt new file mode 100644 index 00000000000..4e8e84bfb02 --- /dev/null +++ b/src/main/kotlin/org/wfanet/measurement/kingdom/deploy/gcloud/job/OperationalMetricsExportJob.kt @@ -0,0 +1,138 @@ +/* + * Copyright 2024 The Cross-Media Measurement Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.wfanet.measurement.kingdom.deploy.gcloud.job + +import com.google.cloud.bigquery.BigQuery +import com.google.cloud.bigquery.BigQueryOptions +import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient +import kotlinx.coroutines.runBlocking +import org.wfanet.measurement.common.commandLineMain +import org.wfanet.measurement.common.crypto.SigningCerts +import org.wfanet.measurement.common.grpc.CommonServer +import org.wfanet.measurement.common.grpc.buildMutualTlsChannel +import org.wfanet.measurement.common.grpc.withDefaultDeadline +import org.wfanet.measurement.common.grpc.withVerboseLogging +import org.wfanet.measurement.internal.kingdom.MeasurementsGrpcKt +import org.wfanet.measurement.kingdom.deploy.common.server.KingdomApiServerFlags +import picocli.CommandLine + +@CommandLine.Command( + name = "OperationalMetricsJobExecutor", + description = + [ + "Process for reading data for Metrics from the Kingdom Internal Server and writing it to BigQuery." + ], + mixinStandardHelpOptions = true, + showDefaultValues = true, +) +private fun run( + @CommandLine.Mixin kingdomApiServerFlags: KingdomApiServerFlags, + @CommandLine.Mixin commonServerFlags: CommonServer.Flags, + @CommandLine.Mixin operationalMetricsFlags: OperationalMetricsFlags, +) { + val clientCerts = + SigningCerts.fromPemFiles( + certificateFile = commonServerFlags.tlsFlags.certFile, + privateKeyFile = commonServerFlags.tlsFlags.privateKeyFile, + trustedCertCollectionFile = commonServerFlags.tlsFlags.certCollectionFile, + ) + + val channel = + buildMutualTlsChannel( + kingdomApiServerFlags.internalApiFlags.target, + clientCerts, + kingdomApiServerFlags.internalApiFlags.certHost, + ) + .withVerboseLogging(kingdomApiServerFlags.debugVerboseGrpcClientLogging) + .withDefaultDeadline(kingdomApiServerFlags.internalApiFlags.defaultDeadlineDuration) + + val measurementsClient = MeasurementsGrpcKt.MeasurementsCoroutineStub(channel) + + val bigQuery: BigQuery = + BigQueryOptions.newBuilder() + .apply { setProjectId(operationalMetricsFlags.bigQueryProject) } + .build() + .service + + runBlocking { + val projectId = operationalMetricsFlags.bigQueryProject + val datasetId = operationalMetricsFlags.bigQueryDataSet + val measurementsTableId = operationalMetricsFlags.measurementsTable + val requisitionsTableId = operationalMetricsFlags.requisitionsTable + val latestMeasurementReadTableId = operationalMetricsFlags.latestMeasurementReadTable + + BigQueryWriteClient.create().use { bigQueryWriteClient -> + val operationalMetricsExport = + OperationalMetricsExport( + measurementsClient = measurementsClient, + bigQuery = bigQuery, + bigQueryWriteClient = bigQueryWriteClient, + projectId = projectId, + datasetId = datasetId, + latestMeasurementReadTableId = latestMeasurementReadTableId, + measurementsTableId = measurementsTableId, + requisitionsTableId = requisitionsTableId, + ) + + operationalMetricsExport.execute() + } + } +} + +fun main(args: Array) = commandLineMain(::run, args) + +class OperationalMetricsFlags { + @CommandLine.Option( + names = ["--bigquery-project"], + description = ["BigQuery Project ID"], + required = true, + ) + lateinit var bigQueryProject: String + private set + + @CommandLine.Option( + names = ["--bigquery-dataset"], + description = ["BigQuery Dataset ID"], + required = true, + ) + lateinit var bigQueryDataSet: String + private set + + @CommandLine.Option( + names = ["--measurements-table"], + description = ["Measurements table ID"], + required = true, + ) + lateinit var measurementsTable: String + private set + + @CommandLine.Option( + names = ["--requisitions-table"], + description = ["Requisitions table ID"], + required = true, + ) + lateinit var requisitionsTable: String + private set + + @CommandLine.Option( + names = ["--latest-measurement-read-table"], + description = ["Latest Measurement Read table ID"], + required = true, + ) + lateinit var latestMeasurementReadTable: String + private set +} diff --git a/src/main/kotlin/org/wfanet/measurement/kingdom/deploy/gcloud/job/StreamWriterFactory.kt b/src/main/kotlin/org/wfanet/measurement/kingdom/deploy/gcloud/job/StreamWriterFactory.kt new file mode 100644 index 00000000000..412aa3d64bb --- /dev/null +++ b/src/main/kotlin/org/wfanet/measurement/kingdom/deploy/gcloud/job/StreamWriterFactory.kt @@ -0,0 +1,59 @@ +/* + * Copyright 2024 The Cross-Media Measurement Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.wfanet.measurement.kingdom.deploy.gcloud.job + +import com.google.api.gax.core.FixedExecutorProvider +import com.google.cloud.bigquery.storage.v1.AppendRowsRequest +import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient +import com.google.cloud.bigquery.storage.v1.ProtoSchema +import com.google.cloud.bigquery.storage.v1.StreamWriter +import com.google.cloud.bigquery.storage.v1.TableName +import java.util.concurrent.Executors + +fun interface StreamWriterFactory { + fun create( + projectId: String, + datasetId: String, + tableId: String, + client: BigQueryWriteClient, + protoSchema: ProtoSchema, + ): StreamWriter +} + +class StreamWriterFactoryImpl : StreamWriterFactory { + override fun create( + projectId: String, + datasetId: String, + tableId: String, + client: BigQueryWriteClient, + protoSchema: ProtoSchema, + ): StreamWriter { + val tableName = TableName.of(projectId, datasetId, tableId) + return StreamWriter.newBuilder(tableName.toString() + DEFAULT_STREAM_PATH, client) + .setExecutorProvider(FixedExecutorProvider.create(Executors.newScheduledThreadPool(1))) + .setEnableConnectionPool(true) + .setDefaultMissingValueInterpretation( + AppendRowsRequest.MissingValueInterpretation.DEFAULT_VALUE + ) + .setWriterSchema(protoSchema) + .build() + } + + companion object { + private const val DEFAULT_STREAM_PATH = "/streams/_default" + } +} diff --git a/src/main/kotlin/org/wfanet/measurement/kingdom/deploy/gcloud/spanner/queries/StreamMeasurements.kt b/src/main/kotlin/org/wfanet/measurement/kingdom/deploy/gcloud/spanner/queries/StreamMeasurements.kt index a9dee680bd8..4038faf72fd 100644 --- a/src/main/kotlin/org/wfanet/measurement/kingdom/deploy/gcloud/spanner/queries/StreamMeasurements.kt +++ b/src/main/kotlin/org/wfanet/measurement/kingdom/deploy/gcloud/spanner/queries/StreamMeasurements.kt @@ -75,7 +75,8 @@ class StreamMeasurements( return when (view) { Measurement.View.COMPUTATION -> "ORDER BY Measurements.UpdateTime ASC, ExternalComputationId ASC" - Measurement.View.DEFAULT -> + Measurement.View.DEFAULT, + Measurement.View.FULL -> "ORDER BY Measurements.UpdateTime ASC, ExternalMeasurementConsumerId ASC, " + "ExternalMeasurementId ASC" Measurement.View.UNRECOGNIZED -> error("Unrecognized View") @@ -152,22 +153,18 @@ class StreamMeasurements( @Suppress("WHEN_ENUM_CAN_BE_NULL_IN_JAVA") // Protobuf case fields cannot be null. when (filter.after.keyCase) { StreamMeasurementsRequest.Filter.After.KeyCase.MEASUREMENT -> { + // CASE implements short-circuiting, which fixes performance issues with this filter. conjuncts.add( """ - ( - Measurements.UpdateTime > @${AfterParams.UPDATE_TIME} - OR ( - Measurements.UpdateTime = @${AfterParams.UPDATE_TIME} - AND ExternalMeasurementConsumerId > - @${AfterParams.EXTERNAL_MEASUREMENT_CONSUMER_ID} - ) - OR ( - Measurements.UpdateTime = @${AfterParams.UPDATE_TIME} - AND ExternalMeasurementConsumerId = - @${AfterParams.EXTERNAL_MEASUREMENT_CONSUMER_ID} - AND ExternalMeasurementId > @${AfterParams.EXTERNAL_MEASUREMENT_ID} - ) - ) + CASE + WHEN Measurements.UpdateTime > @${AfterParams.UPDATE_TIME} THEN TRUE + WHEN Measurements.UpdateTime = @${AfterParams.UPDATE_TIME} + AND ExternalMeasurementConsumerId > @${AfterParams.EXTERNAL_MEASUREMENT_CONSUMER_ID} THEN TRUE + WHEN Measurements.UpdateTime = @${AfterParams.UPDATE_TIME} + AND ExternalMeasurementConsumerId = @${AfterParams.EXTERNAL_MEASUREMENT_CONSUMER_ID} + AND ExternalMeasurementId > @${AfterParams.EXTERNAL_MEASUREMENT_ID} THEN TRUE + ELSE FALSE + END """ .trimIndent() ) diff --git a/src/main/kotlin/org/wfanet/measurement/kingdom/deploy/gcloud/spanner/readers/MeasurementReader.kt b/src/main/kotlin/org/wfanet/measurement/kingdom/deploy/gcloud/spanner/readers/MeasurementReader.kt index a7e1020aa46..b9e68b51c23 100644 --- a/src/main/kotlin/org/wfanet/measurement/kingdom/deploy/gcloud/spanner/readers/MeasurementReader.kt +++ b/src/main/kotlin/org/wfanet/measurement/kingdom/deploy/gcloud/spanner/readers/MeasurementReader.kt @@ -95,6 +95,7 @@ class MeasurementReader(private val view: Measurement.View, measurementsIndex: I when (view) { Measurement.View.DEFAULT -> DEFAULT_VIEW_SQL Measurement.View.COMPUTATION -> COMPUTATION_VIEW_SQL + Measurement.View.FULL -> FULL_VIEW_SQL Measurement.View.UNRECOGNIZED -> error("Invalid view $view") } appendClause(sql) @@ -172,6 +173,7 @@ class MeasurementReader(private val view: Measurement.View, measurementsIndex: I when (view) { Measurement.View.DEFAULT -> fillDefaultView(struct) Measurement.View.COMPUTATION -> fillComputationView(struct) + Measurement.View.FULL -> fillFullView(struct) Measurement.View.UNRECOGNIZED -> throw IllegalArgumentException("View field of GetMeasurementRequest is not set") } @@ -397,6 +399,101 @@ class MeasurementReader(private val view: Measurement.View, measurementsIndex: I JOIN MeasurementConsumerCertificates USING (MeasurementConsumerId, CertificateId) """ .trimIndent() + + private val FULL_VIEW_SQL = + """ + SELECT + ExternalMeasurementConsumerId, + ExternalMeasurementConsumerCertificateId, + Measurements.MeasurementId, + Measurements.MeasurementConsumerId, + Measurements.ExternalMeasurementId, + Measurements.ExternalComputationId, + Measurements.ProvidedMeasurementId, + Measurements.CreateRequestId, + Measurements.MeasurementDetails, + Measurements.CreateTime, + Measurements.UpdateTime, + Measurements.State AS MeasurementState, + ARRAY( + SELECT AS STRUCT + ExternalDataProviderId, + Requisitions.UpdateTime, + Requisitions.ExternalRequisitionId, + Requisitions.State AS RequisitionState, + Requisitions.FulfillingDuchyId, + Requisitions.RequisitionDetails, + ExternalDataProviderCertificateId, + SubjectKeyIdentifier, + NotValidBefore, + NotValidAfter, + RevocationState, + CertificateDetails, + FROM + Requisitions + JOIN DataProviders USING (DataProviderId) + JOIN DataProviderCertificates ON ( + DataProviderCertificates.DataProviderId = Requisitions.DataProviderId + AND DataProviderCertificates.CertificateId = Requisitions.DataProviderCertificateId + ) + JOIN Certificates USING (CertificateId) + WHERE + Requisitions.MeasurementConsumerId = Measurements.MeasurementConsumerId + AND Requisitions.MeasurementId = Measurements.MeasurementId + ) AS Requisitions, + ARRAY( + SELECT AS STRUCT + ExternalDuchyCertificateId, + ComputationParticipants.DuchyId, + ComputationParticipants.UpdateTime, + ComputationParticipants.State, + ComputationParticipants.ParticipantDetails, + Certificates.SubjectKeyIdentifier, + Certificates.NotValidBefore, + Certificates.NotValidAfter, + Certificates.RevocationState, + Certificates.CertificateDetails, + ARRAY( + SELECT AS STRUCT + DuchyMeasurementLogEntries.CreateTime, + DuchyMeasurementLogEntries.ExternalComputationLogEntryId, + DuchyMeasurementLogEntries.DuchyMeasurementLogDetails, + MeasurementLogEntries.MeasurementLogDetails + FROM + DuchyMeasurementLogEntries + JOIN MeasurementLogEntries USING (MeasurementConsumerId, MeasurementId, CreateTime) + WHERE + DuchyMeasurementLogEntries.DuchyId = ComputationParticipants.DuchyId + AND DuchyMeasurementLogEntries.MeasurementConsumerId = ComputationParticipants.MeasurementConsumerId + AND DuchyMeasurementLogEntries.MeasurementId = ComputationParticipants.MeasurementId + ORDER BY MeasurementLogEntries.CreateTime DESC + ) AS DuchyMeasurementLogEntries + FROM + ComputationParticipants + LEFT JOIN (DuchyCertificates JOIN Certificates USING (CertificateId)) + USING (DuchyId, CertificateId) + WHERE + ComputationParticipants.MeasurementConsumerId = Measurements.MeasurementConsumerId + AND ComputationParticipants.MeasurementId = Measurements.MeasurementId + ) AS ComputationParticipants, + ARRAY( + SELECT AS STRUCT + DuchyMeasurementResults.DuchyId, + ExternalDuchyCertificateId, + EncryptedResult, + PublicApiVersion + FROM + DuchyMeasurementResults + JOIN DuchyCertificates USING (CertificateId) + WHERE + Measurements.MeasurementConsumerId = DuchyMeasurementResults.MeasurementConsumerId + AND Measurements.MeasurementId = DuchyMeasurementResults.MeasurementId + ) AS DuchyResults + FROM + FilteredMeasurements AS Measurements + JOIN MeasurementConsumerCertificates USING (MeasurementConsumerId, CertificateId) + """ + .trimIndent() } } @@ -517,3 +614,80 @@ private fun MeasurementKt.Dsl.fillComputationView(struct: Struct) { ) } } + +private fun MeasurementKt.Dsl.fillFullView(struct: Struct) { + fillMeasurementCommon(struct) + val requisitionsStructs = struct.getStructList("Requisitions") + val dataProvidersCount = requisitionsStructs.size + val measurementSucceeded = state == Measurement.State.SUCCEEDED + + // Map of external Duchy ID to ComputationParticipant struct. + var participantStructs: Map = mapOf() + if (!struct.isNull("ExternalComputationId")) { + val externalMeasurementId = ExternalId(struct.getLong("ExternalMeasurementId")) + val externalMeasurementConsumerId = ExternalId(struct.getLong("ExternalMeasurementConsumerId")) + val externalComputationId = ExternalId(struct.getLong("ExternalComputationId")) + + participantStructs = + struct.getStructList("ComputationParticipants").associateBy { + val duchyId = it.getLong("DuchyId") + checkNotNull(DuchyIds.getExternalId(duchyId)) { + "Duchy with internal ID $duchyId not found" + } + } + + for ((externalDuchyId, participantStruct) in participantStructs) { + computationParticipants += + ComputationParticipantReader.buildComputationParticipant( + externalMeasurementConsumerId = externalMeasurementConsumerId, + externalMeasurementId = externalMeasurementId, + externalDuchyId = externalDuchyId, + externalComputationId = externalComputationId, + measurementDetails = details, + struct = participantStruct, + ) + } + } + + for (requisitionStruct in struct.getStructList("Requisitions")) { + requisitions += + RequisitionReader.buildRequisition( + struct, + requisitionStruct, + participantStructs, + dataProvidersCount, + ) + + val requisitionDetails = + requisitionStruct.getProtoMessage("RequisitionDetails", RequisitionDetails.parser()) + val externalDataProviderId = requisitionStruct.getLong("ExternalDataProviderId") + val externalDataProviderCertificateId = + requisitionStruct.getLong("ExternalDataProviderCertificateId") + dataProviders[externalDataProviderId] = dataProviderValue { + this.externalDataProviderCertificateId = externalDataProviderCertificateId + dataProviderPublicKey = requisitionDetails.dataProviderPublicKey + encryptedRequisitionSpec = requisitionDetails.encryptedRequisitionSpec + nonceHash = requisitionDetails.nonceHash + + // TODO(world-federation-of-advertisers/cross-media-measurement#1301): Stop setting these + // fields. + dataProviderPublicKeySignature = requisitionDetails.dataProviderPublicKeySignature + dataProviderPublicKeySignatureAlgorithmOid = + requisitionDetails.dataProviderPublicKeySignatureAlgorithmOid + } + + if (measurementSucceeded && !requisitionDetails.encryptedData.isEmpty) { + results += resultInfo { + this.externalDataProviderId = externalDataProviderId + externalCertificateId = + if (requisitionDetails.externalCertificateId != 0L) { + requisitionDetails.externalCertificateId + } else { + externalDataProviderCertificateId + } + encryptedResult = requisitionDetails.encryptedData + apiVersion = requisitionDetails.encryptedDataApiVersion.ifEmpty { Version.V2_ALPHA.string } + } + } + } +} diff --git a/src/main/kotlin/org/wfanet/measurement/kingdom/service/internal/testing/MeasurementsServiceTest.kt b/src/main/kotlin/org/wfanet/measurement/kingdom/service/internal/testing/MeasurementsServiceTest.kt index 93ede76e86a..228cb517a78 100644 --- a/src/main/kotlin/org/wfanet/measurement/kingdom/service/internal/testing/MeasurementsServiceTest.kt +++ b/src/main/kotlin/org/wfanet/measurement/kingdom/service/internal/testing/MeasurementsServiceTest.kt @@ -1493,6 +1493,86 @@ abstract class MeasurementsServiceTest { .inOrder() } + @Test + fun `streamMeasurements with full view returns measurements with all fields set`(): Unit = + runBlocking { + val measurementConsumer = + population.createMeasurementConsumer(measurementConsumersService, accountsService) + + val measurement1 = + measurementsService.createMeasurement( + createMeasurementRequest { + measurement = + MEASUREMENT.copy { + externalMeasurementConsumerId = measurementConsumer.externalMeasurementConsumerId + externalMeasurementConsumerCertificateId = + measurementConsumer.certificate.externalCertificateId + } + } + ) + + val dataProvider = population.createDataProvider(dataProvidersService) + val dataProviderValue = dataProvider.toDataProviderValue() + val measurement2 = + measurementsService.createMeasurement( + createMeasurementRequest { + measurement = + MEASUREMENT.copy { + externalMeasurementConsumerId = measurementConsumer.externalMeasurementConsumerId + externalMeasurementConsumerCertificateId = + measurementConsumer.certificate.externalCertificateId + dataProviders.putAll( + mapOf(dataProvider.externalDataProviderId to dataProviderValue) + ) + details = + details.copy { + protocolConfig = protocolConfig { + direct = ProtocolConfig.Direct.getDefaultInstance() + } + clearDuchyProtocolConfig() + } + } + } + ) + + val computationMeasurement1 = + measurementsService.getMeasurementByComputationId( + getMeasurementByComputationIdRequest { + externalComputationId = measurement1.externalComputationId + } + ) + + val requisition2 = + requisitionsService + .streamRequisitions( + streamRequisitionsRequest { + limit = 1 + filter = + StreamRequisitionsRequestKt.filter { + externalMeasurementConsumerId = measurement2.externalMeasurementConsumerId + externalMeasurementId = measurement2.externalMeasurementId + } + } + ) + .toList() + .first() + + val streamMeasurementsRequest = streamMeasurementsRequest { + limit = 10 + measurementView = Measurement.View.FULL + } + + val responses: List = + measurementsService.streamMeasurements(streamMeasurementsRequest).toList() + + assertThat(responses) + .containsExactly( + computationMeasurement1, + measurement2.copy { requisitions += requisition2 }, + ) + .inOrder() + } + @Test fun `streamMeasurements respects limit`(): Unit = runBlocking { val measurementConsumer = diff --git a/src/main/proto/wfa/measurement/internal/kingdom/bigquerytables/BUILD.bazel b/src/main/proto/wfa/measurement/internal/kingdom/bigquerytables/BUILD.bazel new file mode 100644 index 00000000000..3ce60b7b218 --- /dev/null +++ b/src/main/proto/wfa/measurement/internal/kingdom/bigquerytables/BUILD.bazel @@ -0,0 +1,8 @@ +load("//src/main/proto/wfa/measurement/internal/common:macros.bzl", "proto_and_kt_jvm_proto_library") + +package(default_visibility = ["//:__subpackages__"]) + +proto_and_kt_jvm_proto_library( + name = "operational_metrics_dataset", + deps = ["@com_google_protobuf//:timestamp_proto"], +) diff --git a/src/main/proto/wfa/measurement/internal/kingdom/bigquerytables/operational_metrics_dataset.proto b/src/main/proto/wfa/measurement/internal/kingdom/bigquerytables/operational_metrics_dataset.proto new file mode 100644 index 00000000000..688772b91b9 --- /dev/null +++ b/src/main/proto/wfa/measurement/internal/kingdom/bigquerytables/operational_metrics_dataset.proto @@ -0,0 +1,83 @@ +// Copyright 2024 The Cross-Media Measurement Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package wfa.measurement.internal.kingdom.bigquerytables; + +import "google/protobuf/timestamp.proto"; + +option java_package = "org.wfanet.measurement.internal.kingdom.bigquerytables"; +option java_multiple_files = true; + +enum MeasurementType { + MEASUREMENT_TYPE_UNSPECIFIED = 0; + REACH_AND_FREQUENCY = 1; + REACH = 2; + IMPRESSION = 3; + DURATION = 4; + POPULATION = 5; +} + +message MeasurementsTableRow { + string measurement_consumer_id = 1; + string measurement_id = 2; + + bool is_direct = 3; + MeasurementType measurement_type = 4; + + enum State { + STATE_UNSPECIFIED = 0; + SUCCEEDED = 1; + FAILED = 2; + } + State state = 5; + + google.protobuf.Timestamp create_time = 6; + google.protobuf.Timestamp update_time = 7; + + int64 completion_duration_seconds = 8; + int64 completion_duration_seconds_squared = 9; +} + +message RequisitionsTableRow { + string measurement_consumer_id = 1; + string measurement_id = 2; + string requisition_id = 3; + string data_provider_id = 4; + + bool is_direct = 5; + MeasurementType measurement_type = 6; + + enum State { + STATE_UNSPECIFIED = 0; + FULFILLED = 1; + REFUSED = 2; + } + State state = 7; + + google.protobuf.Timestamp create_time = 8; + google.protobuf.Timestamp update_time = 9; + + int64 completion_duration_seconds = 10; + int64 completion_duration_seconds_squared = 11; +} + +message LatestMeasurementReadTableRow { + // Since BigQuery timestamp is microsecond precision, this is in nanoseconds + // and is stored as an integer. + int64 update_time = 1; + int64 external_measurement_consumer_id = 2; + int64 external_measurement_id = 3; +} diff --git a/src/main/proto/wfa/measurement/internal/kingdom/measurement.proto b/src/main/proto/wfa/measurement/internal/kingdom/measurement.proto index 3af09fcb241..6b8609f0f15 100644 --- a/src/main/proto/wfa/measurement/internal/kingdom/measurement.proto +++ b/src/main/proto/wfa/measurement/internal/kingdom/measurement.proto @@ -36,6 +36,9 @@ message Measurement { // View for computation, which includes child Requisitions and // ComputationParticipants. COMPUTATION = 1; + + // Denormalized view, which includes all fields. + FULL = 2; } fixed64 external_measurement_consumer_id = 1; diff --git a/src/main/proto/wfa/measurement/internal/kingdom/measurements_service.proto b/src/main/proto/wfa/measurement/internal/kingdom/measurements_service.proto index 2aec9bf4605..08b7b232867 100644 --- a/src/main/proto/wfa/measurement/internal/kingdom/measurements_service.proto +++ b/src/main/proto/wfa/measurement/internal/kingdom/measurements_service.proto @@ -95,7 +95,7 @@ message StreamMeasurementsRequest { message After { google.protobuf.Timestamp update_time = 1; oneof key { - // Key of the `Measurement`, used when view is DEFAULT. + // Key of the `Measurement`, used when view is DEFAULT or FULL. MeasurementKey measurement = 2; // Computation key of the `Measurement`, used when view is COMPUTATION. ComputationKey computation = 3; diff --git a/src/main/terraform/gcloud/modules/kingdom/main.tf b/src/main/terraform/gcloud/modules/kingdom/main.tf index 58591e882b3..da525c927d8 100644 --- a/src/main/terraform/gcloud/modules/kingdom/main.tf +++ b/src/main/terraform/gcloud/modules/kingdom/main.tf @@ -46,3 +46,209 @@ resource "google_compute_address" "system_v1alpha" { name = "kingdom-system-v1alpha" address = var.system_v1alpha_ip_address } + +resource "google_bigquery_dataset" "operational_metrics" { + dataset_id = "operational_metrics" + friendly_name = "Operational Metrics" + description = "Contains tables for operational metrics" + default_partition_expiration_ms = 31622400000 // 366 days +} + +resource "google_bigquery_table" "measurements" { + dataset_id = google_bigquery_dataset.operational_metrics.dataset_id + table_id = "measurements" + + deletion_protection = true + + time_partitioning { + field = "update_time" + type = "MONTH" + } + + schema = < + val writeStreamMock: WriteStream = mock { writeStream -> + whenever(writeStream.tableSchema).thenReturn(TableSchema.getDefaultInstance()) + whenever(writeStream.location).thenReturn("LOCATION") + } + whenever(bigQueryWriteClient.settings).thenReturn(BigQueryWriteSettings.newBuilder().build()) + whenever( + bigQueryWriteClient.getWriteStream(ArgumentMatchers.isA(GetWriteStreamRequest::class.java)) + ) + .thenReturn(writeStreamMock) + } + + private lateinit var measurementsStreamWriterMock: StreamWriter + private lateinit var requisitionsStreamWriterMock: StreamWriter + private lateinit var latestMeasurementReadStreamWriterMock: StreamWriter + + private lateinit var streamWriterFactoryTestImpl: StreamWriterFactory + + @Before + fun init() { + measurementsClient = MeasurementsGrpcKt.MeasurementsCoroutineStub(grpcTestServerRule.channel) + + measurementsStreamWriterMock = mock { + whenever(it.append(any())) + .thenReturn(ApiFutures.immediateFuture(AppendRowsResponse.getDefaultInstance())) + whenever(it.isClosed).thenReturn(false) + } + + requisitionsStreamWriterMock = mock { + whenever(it.append(any())) + .thenReturn(ApiFutures.immediateFuture(AppendRowsResponse.getDefaultInstance())) + whenever(it.isClosed).thenReturn(false) + } + + latestMeasurementReadStreamWriterMock = mock { + whenever(it.append(any())) + .thenReturn(ApiFutures.immediateFuture(AppendRowsResponse.getDefaultInstance())) + whenever(it.isClosed).thenReturn(false) + } + + streamWriterFactoryTestImpl = + StreamWriterFactory { + _: String, + _: String, + tableId: String, + _: BigQueryWriteClient, + _: ProtoSchema -> + when (tableId) { + MEASUREMENTS_TABLE_ID -> measurementsStreamWriterMock + REQUISITIONS_TABLE_ID -> requisitionsStreamWriterMock + LATEST_MEASUREMENT_READ_TABLE_ID -> latestMeasurementReadStreamWriterMock + else -> mock {} + } + } + } + + @Test + fun `job successfully creates protos for appending to streams`() = runBlocking { + val computationMeasurement = COMPUTATION_MEASUREMENT + val directMeasurement = DIRECT_MEASUREMENT + + val tableResultMock: TableResult = mock { tableResult -> + whenever(tableResult.iterateAll()).thenReturn(emptyList()) + } + + val bigQueryMock: BigQuery = mock { bigQuery -> + whenever(bigQuery.query(any())).thenReturn(tableResultMock) + } + + val operationalMetricsExport = + OperationalMetricsExport( + measurementsClient = measurementsClient, + bigQuery = bigQueryMock, + bigQueryWriteClient = bigQueryWriteClientMock, + projectId = PROJECT_ID, + datasetId = DATASET_ID, + latestMeasurementReadTableId = LATEST_MEASUREMENT_READ_TABLE_ID, + measurementsTableId = MEASUREMENTS_TABLE_ID, + requisitionsTableId = REQUISITIONS_TABLE_ID, + streamWriterFactory = streamWriterFactoryTestImpl, + ) + + operationalMetricsExport.execute() + + with(argumentCaptor()) { + verify(measurementsStreamWriterMock).append(capture()) + + val protoRows: ProtoRows = allValues.first() + assertThat(protoRows.serializedRowsList).hasSize(2) + + val computationMeasurementTableRow = + MeasurementsTableRow.parseFrom(protoRows.serializedRowsList[1]) + assertThat(computationMeasurementTableRow) + .isEqualTo( + measurementsTableRow { + measurementConsumerId = + externalIdToApiId(computationMeasurement.externalMeasurementConsumerId) + measurementId = externalIdToApiId(computationMeasurement.externalMeasurementId) + isDirect = false + measurementType = MeasurementType.REACH_AND_FREQUENCY + state = MeasurementsTableRow.State.SUCCEEDED + createTime = computationMeasurement.createTime + updateTime = computationMeasurement.updateTime + completionDurationSeconds = + Durations.toSeconds( + Timestamps.between( + computationMeasurement.createTime, + computationMeasurement.updateTime, + ) + ) + completionDurationSecondsSquared = completionDurationSeconds * completionDurationSeconds + } + ) + + val directMeasurementTableRow = + MeasurementsTableRow.parseFrom(protoRows.serializedRowsList[0]) + assertThat(directMeasurementTableRow) + .isEqualTo( + measurementsTableRow { + measurementConsumerId = + externalIdToApiId(directMeasurement.externalMeasurementConsumerId) + measurementId = externalIdToApiId(directMeasurement.externalMeasurementId) + isDirect = true + measurementType = MeasurementType.REACH_AND_FREQUENCY + state = MeasurementsTableRow.State.SUCCEEDED + createTime = directMeasurement.createTime + updateTime = directMeasurement.updateTime + completionDurationSeconds = + Durations.toSeconds( + Timestamps.between(directMeasurement.createTime, directMeasurement.updateTime) + ) + completionDurationSecondsSquared = completionDurationSeconds * completionDurationSeconds + } + ) + } + + with(argumentCaptor()) { + verify(requisitionsStreamWriterMock).append(capture()) + + val protoRows: ProtoRows = allValues.first() + assertThat(protoRows.serializedRowsList).hasSize(2) + + val computationRequisitionTableRow = + RequisitionsTableRow.parseFrom(protoRows.serializedRowsList[1]) + assertThat(computationRequisitionTableRow) + .isEqualTo( + requisitionsTableRow { + measurementConsumerId = + externalIdToApiId(computationMeasurement.externalMeasurementConsumerId) + measurementId = externalIdToApiId(computationMeasurement.externalMeasurementId) + requisitionId = + externalIdToApiId(computationMeasurement.requisitionsList[0].externalRequisitionId) + dataProviderId = + externalIdToApiId(computationMeasurement.requisitionsList[0].externalDataProviderId) + isDirect = false + measurementType = MeasurementType.REACH_AND_FREQUENCY + state = RequisitionsTableRow.State.FULFILLED + createTime = computationMeasurement.createTime + updateTime = computationMeasurement.requisitionsList[0].updateTime + completionDurationSeconds = + Durations.toSeconds( + Timestamps.between( + computationMeasurement.createTime, + computationMeasurement.requisitionsList[0].updateTime, + ) + ) + completionDurationSecondsSquared = completionDurationSeconds * completionDurationSeconds + } + ) + + val directRequisitionTableRow = + RequisitionsTableRow.parseFrom(protoRows.serializedRowsList[0]) + assertThat(directRequisitionTableRow) + .isEqualTo( + requisitionsTableRow { + measurementConsumerId = + externalIdToApiId(directMeasurement.externalMeasurementConsumerId) + measurementId = externalIdToApiId(directMeasurement.externalMeasurementId) + requisitionId = + externalIdToApiId(directMeasurement.requisitionsList[0].externalRequisitionId) + dataProviderId = + externalIdToApiId(directMeasurement.requisitionsList[0].externalDataProviderId) + isDirect = true + measurementType = MeasurementType.REACH_AND_FREQUENCY + state = RequisitionsTableRow.State.FULFILLED + createTime = directMeasurement.createTime + updateTime = directMeasurement.requisitionsList[0].updateTime + completionDurationSeconds = + Durations.toSeconds( + Timestamps.between( + directMeasurement.createTime, + directMeasurement.requisitionsList[0].updateTime, + ) + ) + completionDurationSecondsSquared = completionDurationSeconds * completionDurationSeconds + } + ) + } + + with(argumentCaptor()) { + verify(latestMeasurementReadStreamWriterMock).append(capture()) + + val protoRows: ProtoRows = allValues.first() + assertThat(protoRows.serializedRowsList).hasSize(1) + + val latestMeasurementReadTableRow = + LatestMeasurementReadTableRow.parseFrom(protoRows.serializedRowsList.first()) + assertThat(latestMeasurementReadTableRow) + .isEqualTo( + latestMeasurementReadTableRow { + updateTime = Timestamps.toNanos(computationMeasurement.updateTime) + externalMeasurementConsumerId = computationMeasurement.externalMeasurementConsumerId + externalMeasurementId = computationMeasurement.externalMeasurementId + } + ) + } + } + + @Test + fun `job does not create protos for requisitions that are incomplete`() = runBlocking { + whenever(measurementsMock.streamMeasurements(any())) + .thenReturn( + flowOf( + DIRECT_MEASUREMENT, + COMPUTATION_MEASUREMENT.copy { + requisitions.clear() + requisitions += + COMPUTATION_MEASUREMENT.requisitionsList[0].copy { + state = Requisition.State.UNFULFILLED + } + }, + ) + ) + + val directMeasurement = DIRECT_MEASUREMENT + + val tableResultMock: TableResult = mock { tableResult -> + whenever(tableResult.iterateAll()).thenReturn(emptyList()) + } + + val bigQueryMock: BigQuery = mock { bigQuery -> + whenever(bigQuery.query(any())).thenReturn(tableResultMock) + } + + val operationalMetricsExport = + OperationalMetricsExport( + measurementsClient = measurementsClient, + bigQuery = bigQueryMock, + bigQueryWriteClient = bigQueryWriteClientMock, + projectId = PROJECT_ID, + datasetId = DATASET_ID, + latestMeasurementReadTableId = LATEST_MEASUREMENT_READ_TABLE_ID, + measurementsTableId = MEASUREMENTS_TABLE_ID, + requisitionsTableId = REQUISITIONS_TABLE_ID, + streamWriterFactory = streamWriterFactoryTestImpl, + ) + + operationalMetricsExport.execute() + + with(argumentCaptor()) { + verify(requisitionsStreamWriterMock).append(capture()) + + val protoRows: ProtoRows = allValues.first() + assertThat(protoRows.serializedRowsList).hasSize(1) + + val directRequisitionTableRow = + RequisitionsTableRow.parseFrom(protoRows.serializedRowsList[0]) + assertThat(directRequisitionTableRow) + .isEqualTo( + requisitionsTableRow { + measurementConsumerId = + externalIdToApiId(directMeasurement.externalMeasurementConsumerId) + measurementId = externalIdToApiId(directMeasurement.externalMeasurementId) + requisitionId = + externalIdToApiId(directMeasurement.requisitionsList[0].externalRequisitionId) + dataProviderId = + externalIdToApiId(directMeasurement.requisitionsList[0].externalDataProviderId) + isDirect = true + measurementType = MeasurementType.REACH_AND_FREQUENCY + state = RequisitionsTableRow.State.FULFILLED + createTime = directMeasurement.createTime + updateTime = directMeasurement.requisitionsList[0].updateTime + completionDurationSeconds = + Durations.toSeconds( + Timestamps.between( + directMeasurement.createTime, + directMeasurement.requisitionsList[0].updateTime, + ) + ) + completionDurationSecondsSquared = completionDurationSeconds * completionDurationSeconds + } + ) + } + } + + @Test + fun `job can process the next batch of measurements without starting at the beginning`() = + runBlocking { + val directMeasurement = DIRECT_MEASUREMENT + + val updateTimeFieldValue: FieldValue = + FieldValue.of( + FieldValue.Attribute.PRIMITIVE, + "${Timestamps.toNanos(directMeasurement.updateTime)}", + ) + val externalMeasurementConsumerIdFieldValue: FieldValue = + FieldValue.of( + FieldValue.Attribute.PRIMITIVE, + "${directMeasurement.externalMeasurementConsumerId}", + ) + val externalMeasurementIdFieldValue: FieldValue = + FieldValue.of(FieldValue.Attribute.PRIMITIVE, "${directMeasurement.externalMeasurementId}") + + val tableResultMock: TableResult = mock { tableResult -> + whenever(tableResult.iterateAll()) + .thenReturn( + listOf( + FieldValueList.of( + mutableListOf( + updateTimeFieldValue, + externalMeasurementConsumerIdFieldValue, + externalMeasurementIdFieldValue, + ), + LATEST_MEASUREMENT_FIELD_LIST, + ) + ) + ) + } + + whenever(measurementsMock.streamMeasurements(any())).thenAnswer { + val streamMeasurementsRequest: StreamMeasurementsRequest = it.getArgument(0) + assertThat(streamMeasurementsRequest) + .ignoringRepeatedFieldOrder() + .isEqualTo( + streamMeasurementsRequest { + measurementView = Measurement.View.FULL + filter = + StreamMeasurementsRequestKt.filter { + states += Measurement.State.SUCCEEDED + states += Measurement.State.FAILED + after = + StreamMeasurementsRequestKt.FilterKt.after { + updateTime = directMeasurement.updateTime + measurement = measurementKey { + externalMeasurementConsumerId = + directMeasurement.externalMeasurementConsumerId + externalMeasurementId = directMeasurement.externalMeasurementId + } + } + } + limit = 3000 + } + ) + flowOf(COMPUTATION_MEASUREMENT) + } + + val bigQueryMock: BigQuery = mock { bigQuery -> + whenever(bigQuery.query(any())).thenReturn(tableResultMock) + } + + val operationalMetricsExport = + OperationalMetricsExport( + measurementsClient = measurementsClient, + bigQuery = bigQueryMock, + bigQueryWriteClient = bigQueryWriteClientMock, + projectId = PROJECT_ID, + datasetId = DATASET_ID, + latestMeasurementReadTableId = LATEST_MEASUREMENT_READ_TABLE_ID, + measurementsTableId = MEASUREMENTS_TABLE_ID, + requisitionsTableId = REQUISITIONS_TABLE_ID, + streamWriterFactory = streamWriterFactoryTestImpl, + ) + + operationalMetricsExport.execute() + } + + @Test + fun `job recreates streamwriter if it is closed`() = runBlocking { + val tableResultMock: TableResult = mock { tableResult -> + whenever(tableResult.iterateAll()).thenReturn(emptyList()) + } + + val bigQueryMock: BigQuery = mock { bigQuery -> + whenever(bigQuery.query(any())).thenReturn(tableResultMock) + } + + whenever(measurementsStreamWriterMock.isClosed).thenReturn(true) + whenever(measurementsStreamWriterMock.isUserClosed).thenReturn(false) + + val operationalMetricsExport = + OperationalMetricsExport( + measurementsClient = measurementsClient, + bigQuery = bigQueryMock, + bigQueryWriteClient = bigQueryWriteClientMock, + projectId = PROJECT_ID, + datasetId = DATASET_ID, + latestMeasurementReadTableId = LATEST_MEASUREMENT_READ_TABLE_ID, + measurementsTableId = MEASUREMENTS_TABLE_ID, + requisitionsTableId = REQUISITIONS_TABLE_ID, + streamWriterFactory = streamWriterFactoryTestImpl, + ) + + operationalMetricsExport.execute() + } + + @Test + fun `job succeeds when bigquery append fails with internal only once`() = runBlocking { + val tableResultMock: TableResult = mock { tableResult -> + whenever(tableResult.iterateAll()).thenReturn(emptyList()) + } + + val bigQueryMock: BigQuery = mock { bigQuery -> + whenever(bigQuery.query(any())).thenReturn(tableResultMock) + } + + whenever(measurementsStreamWriterMock.append(any())) + .thenReturn( + ApiFutures.immediateFuture( + AppendRowsResponse.newBuilder() + .setError(Status.newBuilder().setCode(Code.INTERNAL_VALUE).build()) + .build() + ) + ) + .thenReturn(ApiFutures.immediateFuture(AppendRowsResponse.getDefaultInstance())) + + val operationalMetricsExport = + OperationalMetricsExport( + measurementsClient = measurementsClient, + bigQuery = bigQueryMock, + bigQueryWriteClient = bigQueryWriteClientMock, + projectId = PROJECT_ID, + datasetId = DATASET_ID, + latestMeasurementReadTableId = LATEST_MEASUREMENT_READ_TABLE_ID, + measurementsTableId = MEASUREMENTS_TABLE_ID, + requisitionsTableId = REQUISITIONS_TABLE_ID, + streamWriterFactory = streamWriterFactoryTestImpl, + ) + + operationalMetricsExport.execute() + } + + @Test + fun `job fails when bigquery append fails with internal too many times`() { + runBlocking { + val tableResultMock: TableResult = mock { tableResult -> + whenever(tableResult.iterateAll()).thenReturn(emptyList()) + } + + val bigQueryMock: BigQuery = mock { bigQuery -> + whenever(bigQuery.query(any())).thenReturn(tableResultMock) + } + + whenever(measurementsStreamWriterMock.append(any())) + .thenReturn( + ApiFutures.immediateFuture( + AppendRowsResponse.newBuilder() + .setError(Status.newBuilder().setCode(Code.INTERNAL_VALUE).build()) + .build() + ) + ) + + val operationalMetricsExport = + OperationalMetricsExport( + measurementsClient = measurementsClient, + bigQuery = bigQueryMock, + bigQueryWriteClient = bigQueryWriteClientMock, + projectId = PROJECT_ID, + datasetId = DATASET_ID, + latestMeasurementReadTableId = LATEST_MEASUREMENT_READ_TABLE_ID, + measurementsTableId = MEASUREMENTS_TABLE_ID, + requisitionsTableId = REQUISITIONS_TABLE_ID, + streamWriterFactory = streamWriterFactoryTestImpl, + ) + + assertFailsWith { operationalMetricsExport.execute() } + } + } + + @Test + fun `job fails when bigquery append fails with invalid argument`() { + runBlocking { + val tableResultMock: TableResult = mock { tableResult -> + whenever(tableResult.iterateAll()).thenReturn(emptyList()) + } + + val bigQueryMock: BigQuery = mock { bigQuery -> + whenever(bigQuery.query(any())).thenReturn(tableResultMock) + } + + whenever(measurementsStreamWriterMock.append(any())) + .thenReturn( + ApiFutures.immediateFuture( + AppendRowsResponse.newBuilder() + .setError(Status.newBuilder().setCode(Code.INVALID_ARGUMENT_VALUE).build()) + .build() + ) + ) + + val operationalMetricsExport = + OperationalMetricsExport( + measurementsClient = measurementsClient, + bigQuery = bigQueryMock, + bigQueryWriteClient = bigQueryWriteClientMock, + projectId = PROJECT_ID, + datasetId = DATASET_ID, + latestMeasurementReadTableId = LATEST_MEASUREMENT_READ_TABLE_ID, + measurementsTableId = MEASUREMENTS_TABLE_ID, + requisitionsTableId = REQUISITIONS_TABLE_ID, + streamWriterFactory = streamWriterFactoryTestImpl, + ) + + assertFailsWith { operationalMetricsExport.execute() } + } + } + + @Test + fun `job fails when bigquery append fails with append serialization error`() { + runBlocking { + val tableResultMock: TableResult = mock { tableResult -> + whenever(tableResult.iterateAll()).thenReturn(emptyList()) + } + + val bigQueryMock: BigQuery = mock { bigQuery -> + whenever(bigQuery.query(any())).thenReturn(tableResultMock) + } + + whenever(measurementsStreamWriterMock.append(any())) + .thenThrow(AppendSerializationError(0, "", "", mapOf())) + + val operationalMetricsExport = + OperationalMetricsExport( + measurementsClient = measurementsClient, + bigQuery = bigQueryMock, + bigQueryWriteClient = bigQueryWriteClientMock, + projectId = PROJECT_ID, + datasetId = DATASET_ID, + latestMeasurementReadTableId = LATEST_MEASUREMENT_READ_TABLE_ID, + measurementsTableId = MEASUREMENTS_TABLE_ID, + requisitionsTableId = REQUISITIONS_TABLE_ID, + streamWriterFactory = streamWriterFactoryTestImpl, + ) + + assertFailsWith { operationalMetricsExport.execute() } + } + } + + companion object { + private const val PROJECT_ID = "project" + private const val DATASET_ID = "dataset" + private const val MEASUREMENTS_TABLE_ID = "measurements" + private const val REQUISITIONS_TABLE_ID = "requisitions" + private const val LATEST_MEASUREMENT_READ_TABLE_ID = "latest_measurement_read" + + private const val API_VERSION = "v2alpha" + + private val PUBLIC_API_ENCRYPTION_PUBLIC_KEY = encryptionPublicKey { + format = EncryptionPublicKey.Format.TINK_KEYSET + data = ByteString.copyFromUtf8("key") + } + + private val PUBLIC_API_MEASUREMENT_SPEC = measurementSpec { + measurementPublicKey = PUBLIC_API_ENCRYPTION_PUBLIC_KEY.pack() + reachAndFrequency = + MeasurementSpecKt.reachAndFrequency { + reachPrivacyParams = differentialPrivacyParams { + epsilon = 1.1 + delta = 1.2 + } + frequencyPrivacyParams = differentialPrivacyParams { + epsilon = 2.1 + delta = 2.2 + } + maximumFrequency = 10 + } + } + + private val MEASUREMENT = measurement { + externalMeasurementConsumerId = 1234 + externalMeasurementConsumerCertificateId = 1234 + details = measurementDetails { + apiVersion = API_VERSION + measurementSpec = PUBLIC_API_MEASUREMENT_SPEC.toByteString() + measurementSpecSignature = ByteString.copyFromUtf8("MeasurementSpec signature") + measurementSpecSignatureAlgorithmOid = "2.9999" + } + } + + private val COMPUTATION_MEASUREMENT = + MEASUREMENT.copy { + externalMeasurementId = 123 + externalComputationId = 124 + providedMeasurementId = "computation-participant" + state = Measurement.State.SUCCEEDED + createTime = timestamp { seconds = 200 } + updateTime = timestamp { + seconds = 300 + nanos = 100 + } + details = + details.copy { + protocolConfig = protocolConfig { + liquidLegionsV2 = ProtocolConfig.LiquidLegionsV2.getDefaultInstance() + } + } + + requisitions += requisition { + externalDataProviderId = 432 + externalRequisitionId = 433 + state = Requisition.State.FULFILLED + updateTime = timestamp { + seconds = 500 + nanos = 100 + } + } + + computationParticipants += computationParticipant { + externalDuchyId = "0" + state = ComputationParticipant.State.READY + updateTime = timestamp { seconds = 300 } + } + computationParticipants += computationParticipant { + externalDuchyId = "1" + state = ComputationParticipant.State.READY + updateTime = timestamp { seconds = 400 } + } + computationParticipants += computationParticipant { + externalDuchyId = "2" + state = ComputationParticipant.State.READY + updateTime = timestamp { seconds = 500 } + } + } + + private val DIRECT_MEASUREMENT = + MEASUREMENT.copy { + externalMeasurementId = 321 + externalComputationId = 0 + providedMeasurementId = "direct" + state = Measurement.State.SUCCEEDED + createTime = timestamp { seconds = 200 } + updateTime = timestamp { + seconds = 220 + nanos = 200 + } + details = + details.copy { + protocolConfig = protocolConfig { direct = ProtocolConfig.Direct.getDefaultInstance() } + } + + requisitions += requisition { + externalDataProviderId = 432 + externalRequisitionId = 437 + state = Requisition.State.FULFILLED + updateTime = timestamp { + seconds = 600 + nanos = 100 + } + } + } + + private val LATEST_MEASUREMENT_FIELD_LIST: FieldList = + FieldList.of( + listOf( + Field.of("update_time", LegacySQLTypeName.INTEGER), + Field.of("external_measurement_consumer_id", LegacySQLTypeName.INTEGER), + Field.of("external_measurement_id", LegacySQLTypeName.INTEGER), + ) + ) + } +}